rgdevop's picture
Update data/Data.py
18ea99e
from datetime import datetime
from contextlib import closing
from excel import Driver, Vehicle, Route, Branch, RoutePlan
import psycopg2
import streamlit as st
class Data:
_DEFAULT_ESTIMATE_CONTAINERS = [{
"amount" : 1,
"weightKg" : None,
"materialTypeId" : 17, # Basura
"containerTypeId": 8, # Tambo 200 L
"charge_type" : 2, # Por contenedor (?)
"unitary_price" : 0,
"visit_price" : 0
}]
@staticmethod
def connect():
conn = psycopg2.connect(**{
"host": "blau-production.cvcvpaamxkwo.us-east-2.rds.amazonaws.com",
"port": 5432,
"dbname": "rme_prod",
"user": "3P_dashboards",
"password": "WoUYGWzI6J8IgqHCNDjLiifHwQvSdRj"
})
return conn
#def connect():
# return psycopg2.connect(**st.secrets["postgres"])
def __init__(self, recycler_id: int):
self.recycler_id = recycler_id
def get_drivers(self) -> list[Driver]:
# id, name
data: list[tuple[int, str]] = []
with closing(Data.connect()) as conn:
with conn.cursor() as cur:
cur.execute("""SELECT id, name from drivers WHERE id_cliente_reciclador = %s ORDER BY name ASC""", (self.recycler_id, ))
data = cur.fetchall()
return [Driver(id=id, name=name) for id, name in data]
def get_vehicles(self) -> list[Driver]:
# id, name
data: list[tuple[int, str]] = []
with closing(Data.connect()) as conn:
with conn.cursor() as cur:
cur.execute("""SELECT id, name from trucks WHERE id_cliente_reciclador = %s ORDER BY name ASC""", (self.recycler_id, ))
data = cur.fetchall()
return [Vehicle(id=id, name=name) for id, name in data]
def get_routes(self, date: datetime) -> list[RoutePlan]:
# id, name
routes: list[tuple[int, str]] = []
# id, route_id, driver_id, driver_name, vehicle_id, vehicle_name, index, branch_id, branch_name, customer_id, customer_name
raw_collections: list[tuple[int, int, int, str, int, str, int, int, str, int, str]] = []
with closing(Data.connect()) as conn:
with conn.cursor() as cur:
cur.execute("""SELECT id, name FROM cat_route WHERE id_cliente_reciclador = %s AND status = TRUE ORDER BY name ASC""", (self.recycler_id, ))
routes = cur.fetchall()
with conn.cursor() as cur:
cur.execute(
"""
SELECT
r.id,
r.id_cat_route route_id,
r.id_driver driver_id,
d.name driver_name,
r.id_truck vehicle_id,
t.name vehicle_name,
r."order" "index",
s.id branch_id,
s.nombre_sucursal branch_name,
eu.id customer_id,
CASE
WHEN eu.business_name IS NOT NULL AND TRIM(eu.business_name) != '' THEN eu.business_name
WHEN eu.razon_social IS NOT NULL AND TRIM(eu.razon_social ) != '' THEN eu.razon_social
ELSE eu.id::text
END customer_name
FROM
recollections r LEFT JOIN
drivers d ON r.id_driver = d.id LEFT JOIN
trucks t ON r.id_truck = t.id LEFT JOIN
sucursales s ON r.id_sucursal = s.id LEFT JOIN
end_users eu ON s.id_end_user = eu.id
WHERE
r.id_cliente_reciclador = %s AND
r.enabled = TRUE AND
r.date = %s AND
r.id_sucursal IS NOT NULL AND
s.id_end_user IS NOT NULL
ORDER BY
r."order" ASC
""", (self.recycler_id, f"{date.year}-{date.month}-{date.day}"))
raw_collections = cur.fetchall()
if len(raw_collections) == 0:
return [
RoutePlan(
route = Route (id=route_id, name=route_name),
driver = Driver (id=-1, name="Sin asignar"),
vehicle = Vehicle(id=-1, name="Sin asignar"),
collection_points = []
) for route_id, route_name in routes
]
collections_by_route_id = dict()
for id, route_id, driver_id, driver_name, vehicle_id, vehicle_name, index, branch_id, branch_name, customer_id, customer_name in raw_collections:
if collections_by_route_id.get(route_id, None) is None:
collections_by_route_id[route_id] = []
collections_by_route_id[route_id].append((
index,
Driver(id=driver_id, name=driver_name),
Vehicle(id=vehicle_id, name=vehicle_name),
Branch(
branch_id=branch_id,
branch_name=branch_name,
customer_id=customer_id,
customer_name=customer_name
),
))
output: list[RoutePlan] = []
for (route_id, route_name) in routes:
collections: list[tuple[int, Driver, Vehicle, Branch]] = collections_by_route_id.get(route_id, [])
collections = sorted(collections, key=lambda x: x[0])
if len(collections) == 0:
output.append(RoutePlan(
route = Route (id=route_id, name=route_name),
driver = Driver (id=-1, name="Sin asignar"),
vehicle = Vehicle(id=-1, name="Sin asignar"),
collection_points = []
))
continue
index, driver, vehicle, b = collections[0]
output.append(RoutePlan(
route = Route(id=route_id, name=route_name),
driver = driver,
vehicle = vehicle,
collection_points = [branch for __, ___, ____, branch in collections])
)
return output
def get_branches(self) -> list[Branch]:
# id, name
data: list[tuple[int, str]] = []
with closing(Data.connect()) as conn:
with conn.cursor() as cur:
cur.execute(
"""SELECT
s.id branch_id,
s.nombre_sucursal branch_name,
eu.id customer_id,
CASE
WHEN eu.business_name IS NOT NULL AND TRIM(eu.business_name) != '' THEN eu.business_name
WHEN eu.razon_social IS NOT NULL AND TRIM(eu.razon_social) != '' THEN eu.razon_social
ELSE eu.id::text
END customer_name
FROM sucursales s LEFT JOIN end_users eu
ON s.id_end_user = eu.id
WHERE
s.id_cliente_reciclador = %s AND
s.id_end_user IS NOT NULL
ORDER BY
customer_name ASC""", (self.recycler_id, ))
data = cur.fetchall()
return [Branch(
branch_id=branch_id,
branch_name=branch_name,
customer_id=customer_id,
customer_name=customer_name
) for branch_id, branch_name, customer_id, customer_name in data]
def get_estimated_containers(self, branch_ids: list[int]) -> list[list[dict]]:
"""
Returns list of dictionaries with shape:
{
amount: int,
materialTypeId: int,
containerTypeId: int,
unitary_price: float,
charge_type: int,
visit_price: float
}
"""
if len(branch_ids) == 0:
return []
BASE_QUERY = """(SELECT id, estimate_containers FROM recollections WHERE id_cliente_reciclador = %s AND id_sucursal = %s ORDER BY created_at DESC LIMIT 1)"""
args = []
full_query = []
for branch_id in branch_ids:
args.append(self.recycler_id)
args.append(branch_id)
full_query.append(BASE_QUERY)
full_query = " UNION ALL ".join(full_query)
output: list[list[dict]] = []
with closing(Data.connect()) as conn:
with conn.cursor() as cur:
# Tomar la última recolección programada y usar sus contenedores
cur.execute(full_query, args)
data: list[tuple[int, list[dict]]] = cur.fetchall()
if len(data) == 0:
for _ in branch_ids:
output.append(Data._DEFAULT_ESTIMATE_CONTAINERS.copy())
for row in data:
estimate_containers: list[dict] = []
if row is None or row[1] is None or len(row[1]) == 0:
output.append(Data._DEFAULT_ESTIMATE_CONTAINERS.copy())
continue
collection_id, containers = row
for c in containers:
if c.get("materialTypeId", False) and c.get("containerTypeId", False) and c.get("amount", False):
estimate_containers.append({
"amount" : c["amount"],
"weightKg" : c.get("weightKg", None),
"materialTypeId" : c["materialTypeId"],
"containerTypeId": c["containerTypeId"],
"unitary_price" : c.get("unitary_price", Data._DEFAULT_ESTIMATE_CONTAINERS[0]["unitary_price"]),
"charge_type" : c.get("charge_type", Data._DEFAULT_ESTIMATE_CONTAINERS[0]["charge_type"]),
"visit_price" : c.get("visit_price", Data._DEFAULT_ESTIMATE_CONTAINERS[0]["visit_price"])
})
output.append(estimate_containers if len(estimate_containers) > 0 else Data._DEFAULT_ESTIMATE_CONTAINERS.copy())
if len(output) == 0:
print("[OUTPUT WAS EMPTY]", branch_ids)
return output