import json from datetime import datetime from cbh.api.message.dto import AgentRequestType, EntityType, LocationType from cbh.api.message.model import MessageModel def build_worker_search_query(filters: dict) -> str: query = "SELECT * FROM DIM_WORKERS WHERE 1=1" conditions = [] if filters.get('name'): conditions.append(f"FULL_NAME ILIKE '%{filters['name']}%'") if filters.get('city'): conditions.append(f"CITY ILIKE '%{filters['city']}%'") if filters.get('state'): conditions.append(f"STATE ILIKE '%{filters['state']}%'") if filters.get('country'): conditions.append(f"COUNTRY ILIKE '%{filters['country']}%'") if filters.get('address'): conditions.append(f"FORMATTED_ADDRESS ILIKE '%{filters['address']}%'") if filters.get('qualification'): conditions.append(f"QUALIFICATION ILIKE '{filters['qualification']}'") if filters.get('postalCode'): conditions.append(f"POSTAL_CODE ILIKE '{filters['postalCode']}'") if conditions: query += " AND " + " AND ".join(conditions) query += " LIMIT 1" return query def build_worker_shift_count_query(worker_id: str, filters: dict) -> str: return f""" WITH filtered_shifts AS ( SELECT * FROM fct_shift_logs WHERE WORKER_ID = '{worker_id}' AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') ), ranked_shifts AS ( SELECT *, ROW_NUMBER() OVER (PARTITION BY SHIFT_ID ORDER BY ACTION_AT DESC) AS rn FROM filtered_shifts ) SELECT COUNT(CASE WHEN ACTION = 'SHIFT_VERIFY' THEN 1 END) AS shiftVerified, COUNT(CASE WHEN ACTION = 'FACILITY_CANCEL' THEN 1 END) AS facilityCancelled, COUNT(CASE WHEN ACTION = 'SHIFT_OPEN' THEN 1 END) AS shiftOpen, COUNT(CASE WHEN ACTION = 'SHIFT_REASSIGN' THEN 1 END) AS shiftReassign, COUNT(CASE WHEN ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) AS shiftTimeChange, COUNT(CASE WHEN ACTION = 'WORKER_CANCEL' THEN 1 END) AS workerCancel, COUNT(CASE WHEN ACTION = 'SHIFT_UNASSIGN' THEN 1 END) AS shiftUnassign, COUNT(CASE WHEN ACTION = 'SHIFT_CLAIM' THEN 1 END) AS shiftClaim, COUNT(CASE WHEN ACTION = 'SHIFT_DELETE' THEN 1 END) AS shiftDelete, COUNT(CASE WHEN ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) AS noCallNoShow, COUNT(CASE WHEN ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) AS shiftPayRateChange, COUNT(CASE WHEN ACTION = 'SHIFT_ASSIGN' THEN 1 END) AS shiftAssign, FROM ranked_shifts WHERE rn = 1; """ def build_facility_search_query(filters: dict) -> str: query = "SELECT * FROM dim_workplaces WHERE 1=1" conditions = [] if filters.get('name'): conditions.append(f"NAME ILIKE '%{filters['name']}%'") if filters.get('email'): conditions.append(f"EMAIL ILIKE '{filters['email']}'") if filters.get('country'): conditions.append(f"COUNTRY ILIKE '%{filters['country']}%'") if filters.get('city'): conditions.append(f"CITY ILIKE '%{filters['city']}%'") if filters.get('state'): conditions.append(f"STATE ILIKE '%{filters['state']}%'") if filters.get('address'): conditions.append(f"ADDRESS ILIKE '%{filters['address']}%'") if filters.get('postalCode'): conditions.append(f"POSTAL_CODE ILIKE '{filters['postalCode']}'") if filters.get('type'): conditions.append(f"TYPE ILIKE '%{filters['type']}%'") if conditions: query += " AND " + " AND ".join(conditions) query += " LIMIT 1" return query def build_facility_shift_count_query(facility_id: str, filters: dict) -> str: return f""" WITH filtered_shifts AS ( SELECT SHIFT_ID, ACTION, ACTION_AT FROM fct_shift_logs WHERE WORKPLACE_ID = '{facility_id}' AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') ), ranked_shifts AS ( SELECT SHIFT_ID, ACTION, ACTION_AT, ROW_NUMBER() OVER (PARTITION BY SHIFT_ID ORDER BY ACTION_AT DESC) AS rn FROM filtered_shifts ) SELECT COUNT(CASE WHEN ACTION = 'SHIFT_VERIFY' THEN 1 END) AS shiftVerified, COUNT(CASE WHEN ACTION = 'FACILITY_CANCEL' THEN 1 END) AS facilityCancelled, COUNT(CASE WHEN ACTION = 'SHIFT_OPEN' THEN 1 END) AS shiftOpen, COUNT(CASE WHEN ACTION = 'SHIFT_REASSIGN' THEN 1 END) AS shiftReassign, COUNT(CASE WHEN ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) AS shiftTimeChange, COUNT(CASE WHEN ACTION = 'WORKER_CANCEL' THEN 1 END) AS workerCancel, COUNT(CASE WHEN ACTION = 'SHIFT_UNASSIGN' THEN 1 END) AS shiftUnassign, COUNT(CASE WHEN ACTION = 'SHIFT_CLAIM' THEN 1 END) AS shiftClaim, COUNT(CASE WHEN ACTION = 'SHIFT_DELETE' THEN 1 END) AS shiftDelete, COUNT(CASE WHEN ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) AS noCallNoShow, COUNT(CASE WHEN ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) AS shiftPayRateChange, COUNT(CASE WHEN ACTION = 'SHIFT_ASSIGN' THEN 1 END) AS shiftAssign, FROM ranked_shifts WHERE rn = 1; """ def form_worker_module_response(worker: dict, shift_count: dict, request_body: dict, amount_earned: int) -> dict: keys_needed = ['ACCOUNT_STAGE', 'ATTENDANCE_SCORE', 'AVG_WORKER_RATING', 'CITY', 'CLIPBOARD_SCORE', 'COUNTRY', 'FULL_NAME', 'FORMATTED_ADDRESS', 'MSA', 'POSTAL_CODE', 'QUALIFICATION', 'STATE', 'INSTANT_PAY_RATE'] worker = {k: v for k, v in worker.items() if k in keys_needed} worker['AVG_WORKER_RATING'] = float(worker.get('AVG_WORKER_RATING') or 0) worker[f'shiftStatistics (from {request_body["startDate"]} to {request_body["endDate"]})'] = shift_count worker[f'amountEarned (from {request_body["startDate"]} to {request_body["endDate"]})'] = amount_earned return worker def form_facility_module_response(facility: dict, shift_count: dict, request_body: dict, paid_info: dict) -> dict: keys_needed = ["TYPE", 'NAME', 'EMAIL', 'COUNTRY', 'STATE', 'CITY', 'ADDRESS', 'POSTAL_CODE', 'HAS_PAY_ON_HOLIDAY', 'WEBSITE', 'PHONE', 'DESCRIPTION'] worker = {k: v for k, v in facility.items() if k in keys_needed} worker[f'shiftStatistics (from {request_body["startDate"]} to {request_body["endDate"]})'] = shift_count worker['paymentInfo (from {request_body["startDate"]} to {request_body["endDate"]})'] = paid_info return worker def build_shift_statistics_query(filters: dict) -> str: if filters['entity'] == EntityType.Facility.value: key_field = 'WORKPLACE_NAME' first_condition = "1=1" else: key_field = 'WORKER_FULL_NAME' if filters.get('qualification'): first_condition = f"WORKER_QUALIFICATION ILIKE '%{filters.get("qualification")}%'" else: first_condition = "1=1" query = f""" WITH filtered_shifts AS ( SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT FROM fct_shift_logs WHERE {first_condition} AND ACTION = '{filters.get("shiftType")}' AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters.get("startDate"), "%Y-%m-%d")}') AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters.get("endDate"), "%Y-%m-%d")}') ), ranked_shifts AS ( SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT, ROW_NUMBER() OVER (PARTITION BY SHIFT_ID ORDER BY ACTION_AT DESC) AS rn FROM filtered_shifts ), shift_dates AS ( SELECT DISTINCT DATE(SHIFT_START_AT) AS shift_date FROM ranked_shifts WHERE rn = 1 ) SELECT {key_field}, COUNT(DISTINCT SHIFT_ID) AS count, COUNT(*) / COUNT(DISTINCT shift_date) FROM ranked_shifts JOIN shift_dates ON DATE(SHIFT_START_AT) = shift_dates.shift_date WHERE rn = 1 GROUP BY {key_field} ORDER BY count DESC LIMIT {filters.get("length")}; """ return query def build_avg_rating_query(filters: dict) -> str: cond = '' if filters.get('qualification'): cond += f"AND QUALIFICATION ILIKE '{filters['qualification']}'\n" if filters.get('city'): cond += f"AND CITY ILIKE '{filters['city']}'\n" if filters.get('country'): cond += f"AND COUNTRY ILIKE '{filters['country']}'\n" if filters.get('state'): cond += f"AND STATE ILIKE '{filters['state']}'\n" query = f""" SELECT AVG(AVG_WORKER_RATING), AVG(CLIPBOARD_SCORE), AVG(ATTENDANCE_SCORE) FROM DIM_WORKERS WHERE 1=1 {cond} AND AVG_WORKER_RATING IS NOT NULL; """ return query def build_avg_pay_rate_query(filters: dict) -> str: second_cond = '' if filters.get('qualification'): second_cond = f"AND WORKER_QUALIFICATION ILIKE '{filters['qualification']}'" query = f""" SELECT AVG(NET_PAY) AS avg_net_pay, AVG(PAY) AS avg_pay FROM fct_shifts WHERE IS_VERIFIED = true {second_cond} AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') """ return query def build_worker_count_query(filters: dict) -> str: additional_cond = '' if filters.get('city'): additional_cond += f"AND CITY ILIKE '%{filters['city']}%'\n" if filters.get('country'): additional_cond += f"AND COUNTRY ILIKE '%{filters['country']}%'\n" if filters.get('state'): additional_cond += f"AND STATE ILIKE '%{filters['state']}%'\n" created_add_cond = '' start_of_year = datetime(datetime.now().year, 1, 1).strftime('%Y-%m-%d') today = datetime.now().strftime('%Y-%m-%d') if filters['startDate'] != start_of_year and filters['endDate'] != today: created_add_cond = f"AND CREATED_AT <= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}')" query = f""" SELECT QUALIFICATION, COUNT(*) AS cnt FROM DIM_WORKERS WHERE 1=1 {additional_cond} {created_add_cond} GROUP BY ROLLUP(QUALIFICATION) """ return query def build_facility_count_query(filters: dict) -> str: additional_cond = '' if filters.get('city'): additional_cond += f"AND CITY ILIKE '%{filters['city']}%'\n" if filters.get('country'): additional_cond += f"AND COUNTRY ILIKE '%{filters['country']}%'\n" if filters.get('state'): additional_cond += f"AND STATE ILIKE '%{filters['state']}%'\n" created_add_cond = '' start_of_year = datetime(datetime.now().year, 1, 1).strftime('%Y-%m-%d') today = datetime.now().strftime('%Y-%m-%d') if filters['startDate'] != start_of_year and filters['endDate'] != today: created_add_cond = f"AND CREATED_AT <= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}')" query = f""" SELECT TYPE, COUNT(*) AS cnt FROM dim_workplaces WHERE 1=1 {additional_cond} {created_add_cond} GROUP BY TYPE """ return query def build_shift_count_query(filters: dict) -> str: return f""" WITH filtered_shifts AS ( SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT FROM fct_shift_logs WHERE SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') ), ranked_shifts AS ( SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT, ROW_NUMBER() OVER (PARTITION BY SHIFT_ID ORDER BY ACTION_AT DESC) AS rn FROM filtered_shifts ) SELECT COUNT(CASE WHEN ACTION = 'SHIFT_VERIFY' THEN 1 END) AS shiftVerified, COUNT(CASE WHEN ACTION = 'FACILITY_CANCEL' THEN 1 END) AS facilityCancelled, COUNT(CASE WHEN ACTION = 'SHIFT_OPEN' THEN 1 END) AS shiftOpen, COUNT(CASE WHEN ACTION = 'SHIFT_REASSIGN' THEN 1 END) AS shiftReassign, COUNT(CASE WHEN ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) AS shiftTimeChange, COUNT(CASE WHEN ACTION = 'WORKER_CANCEL' THEN 1 END) AS workerCancel, COUNT(CASE WHEN ACTION = 'SHIFT_UNASSIGN' THEN 1 END) AS shiftUnassign, COUNT(CASE WHEN ACTION = 'SHIFT_CLAIM' THEN 1 END) AS shiftClaim, COUNT(CASE WHEN ACTION = 'SHIFT_DELETE' THEN 1 END) AS shiftDelete, COUNT(CASE WHEN ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) AS noCallNoShow, COUNT(CASE WHEN ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) AS shiftPayRateChange, COUNT(CASE WHEN ACTION = 'SHIFT_ASSIGN' THEN 1 END) AS shiftAssign, FROM ranked_shifts WHERE rn = 1; """ def build_facility_payment_info_query(facility_id: str, filters: dict) -> str: return f""" SELECT WORKER_QUALIFICATION, SUM(TOTAL_SHIFT_CHARGE) AS total_paid, COUNT(SHIFT_ID) AS number_of_shifts FROM fct_shifts WHERE WORKPLACE_ID = '{facility_id}' AND IS_VERIFIED = true AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') GROUP BY WORKER_QUALIFICATION """ def build_worker_payment_query(worker_id: str, filters: dict) -> str: return f""" SELECT SUM(NET_PAY) FROM fct_shifts WHERE WORKER_ID = '{worker_id}' AND IS_VERIFIED = true AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') AND NET_PAY IS NOT NULL; """ def build_avg_shift_count_query(filters: dict) -> str: cond = '1=1' if filters.get('qualification'): cond = f"WHERE WORKER_QUALIFICATION ILIKE '{filters['qualification']}'" return f""" WITH filtered_shifts AS ( SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT, WORKER_QUALIFICATION FROM fct_shift_logs {cond} AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') ), ranked_shifts AS ( SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT, WORKER_QUALIFICATION, ROW_NUMBER() OVER (PARTITION BY SHIFT_ID ORDER BY ACTION_AT DESC) AS rn FROM filtered_shifts ), shift_dates AS ( SELECT DISTINCT DATE(SHIFT_START_AT) AS shift_date FROM ranked_shifts WHERE rn = 1 ) SELECT COUNT(CASE WHEN ACTION = 'SHIFT_VERIFY' THEN 1 END) / COUNT(DISTINCT shift_date), COUNT(CASE WHEN ACTION = 'FACILITY_CANCEL' THEN 1 END) / COUNT(DISTINCT shift_date), COUNT(CASE WHEN ACTION = 'SHIFT_OPEN' THEN 1 END) / COUNT(DISTINCT shift_date), COUNT(CASE WHEN ACTION = 'SHIFT_REASSIGN' THEN 1 END) / COUNT(DISTINCT shift_date), COUNT(CASE WHEN ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) / COUNT(DISTINCT shift_date), COUNT(CASE WHEN ACTION = 'WORKER_CANCEL' THEN 1 END) / COUNT(DISTINCT shift_date), COUNT(CASE WHEN ACTION = 'SHIFT_UNASSIGN' THEN 1 END) / COUNT(DISTINCT shift_date), COUNT(CASE WHEN ACTION = 'SHIFT_CLAIM' THEN 1 END) / COUNT(DISTINCT shift_date), COUNT(CASE WHEN ACTION = 'SHIFT_DELETE' THEN 1 END) / COUNT(DISTINCT shift_date), COUNT(CASE WHEN ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) / COUNT(DISTINCT shift_date), COUNT(CASE WHEN ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) / COUNT(DISTINCT shift_date), COUNT(CASE WHEN ACTION = 'SHIFT_ASSIGN' THEN 1 END) / COUNT(DISTINCT shift_date), FROM ranked_shifts JOIN shift_dates ON DATE(SHIFT_START_AT) = shift_dates.shift_date WHERE rn = 1; """ def build_location_workers_query(filters: dict) -> str: worker_qualification = filters['qualification'] key_field = "CITY" if filters.get('field1') == LocationType.city.value else "STATE" length = filters['length'] + 1 if filters.get('field1') == LocationType.city.value else filters['length'] qualification_cond = "" if worker_qualification: qualification_cond = f"AND QUALIFICATION ILIKE '%{worker_qualification}%'" return f"""WITH QualifiedCities AS ( SELECT {key_field}, COUNT(*) AS worker_count FROM DIM_WORKERS WHERE 1=1 {qualification_cond} GROUP BY {key_field} ORDER BY worker_count DESC LIMIT {length} ) SELECT t.{key_field} AS name, OBJECT_CONSTRUCT( 'totalWorkersCount', COUNT(*), 'CNA', SUM(CASE WHEN d.QUALIFICATION = 'CNA' THEN 1 ELSE 0 END), 'RN', SUM(CASE WHEN d.QUALIFICATION = 'RN' THEN 1 ELSE 0 END), 'LVN', SUM(CASE WHEN d.QUALIFICATION = 'LVN' THEN 1 ELSE 0 END), 'CAREGIVER', SUM(CASE WHEN d.QUALIFICATION = 'CAREGIVER' THEN 1 ELSE 0 END), 'Non Clinical', SUM(CASE WHEN d.QUALIFICATION = 'Non Clinical' THEN 1 ELSE 0 END), 'Medical Assistant', SUM(CASE WHEN d.QUALIFICATION = 'Medical Assistant' THEN 1 ELSE 0 END), 'HOUSEKEEPER', SUM(CASE WHEN d.QUALIFICATION = 'HOUSEKEEPER' THEN 1 ELSE 0 END), 'Dietary Aide', SUM(CASE WHEN d.QUALIFICATION = 'Dietary Aide' THEN 1 ELSE 0 END), 'HHA', SUM(CASE WHEN d.QUALIFICATION = 'HHA' THEN 1 ELSE 0 END) ) AS workerStatistics FROM DIM_WORKERS d JOIN QualifiedCities t ON d.{key_field} = t.{key_field} GROUP BY t.{key_field};""" def build_location_facilities_query(filters: dict) -> str: key_field = "CITY" if filters.get('field1') == LocationType.city.value else "STATE" type_cond = "" if filters.get('facilityType'): type_cond = f"AND TYPE ILIKE '%{filters['facilityType']}%'" return f"""WITH TopCities AS ( SELECT {key_field}, COUNT(*) AS facility_count FROM DIM_WORKPLACES WHERE 1=1 {type_cond} GROUP BY {key_field} ORDER BY facility_count DESC LIMIT {filters['length']} ) SELECT t.{key_field} AS name, OBJECT_CONSTRUCT( 'totalFacilitiesCount', COUNT(*), 'Long Term Care', SUM(CASE WHEN d.TYPE = 'Long Term Care' THEN 1 ELSE 0 END), 'Dental Clinic', SUM(CASE WHEN d.TYPE = 'Dental Clinic' THEN 1 ELSE 0 END), 'Home Healthcare', SUM(CASE WHEN d.TYPE = 'Home Healthcare' THEN 1 ELSE 0 END), 'Medical Lab', SUM(CASE WHEN d.TYPE = 'Medical Lab' THEN 1 ELSE 0 END), 'Hospice', SUM(CASE WHEN d.TYPE = 'Hospice' THEN 1 ELSE 0 END), 'Pharmacy', SUM(CASE WHEN d.TYPE = 'Pharmacy' THEN 1 ELSE 0 END), 'Hospital', SUM(CASE WHEN d.TYPE = 'Hospital' THEN 1 ELSE 0 END) ) AS facilityStatistics FROM DIM_WORKPLACES d JOIN TopCities t ON d.{key_field} = t.{key_field} GROUP BY t.{key_field}""" def build_location_shift_query(filters: dict) -> str: key_field = "CITY" if filters.get('field1') == LocationType.city.value else "STATE" return f"""WITH filtered_shifts AS ( SELECT WORKER_ID, WORKPLACE_ID, ACTION, SHIFT_START_AT FROM fct_shift_logs WHERE SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') ), ranked_shifts AS ( SELECT WORKER_ID, WORKPLACE_ID, ACTION, SHIFT_START_AT, ROW_NUMBER() OVER (PARTITION BY WORKER_ID ORDER BY SHIFT_START_AT DESC) AS rn FROM filtered_shifts ), city_shifts AS ( SELECT fsl.WORKPLACE_ID, fw.{key_field}, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_VERIFY' THEN 1 END) AS SHIFT_VERIFY, COUNT(CASE WHEN fsl.ACTION = 'FACILITY_CANCEL' THEN 1 END) AS FACILITY_CANCEL, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_OPEN' THEN 1 END) AS SHIFT_OPEN, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_REASSIGN' THEN 1 END) AS SHIFT_REASSIGN, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) AS SHIFT_TIME_CHANGE, COUNT(CASE WHEN fsl.ACTION = 'WORKER_CANCEL' THEN 1 END) AS WORKER_CANCEL, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_UNASSIGN' THEN 1 END) AS SHIFT_UNASSIGN, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_CLAIM' THEN 1 END) AS SHIFT_CLAIM, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_DELETE' THEN 1 END) AS SHIFT_DELETE, COUNT(CASE WHEN fsl.ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) AS NO_CALL_NO_SHOW, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) AS SHIFT_PAY_RATE_CHANGE, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_ASSIGN' THEN 1 END) AS SHIFT_ASSIGN, FROM ranked_shifts fsl JOIN dim_workplaces fw ON fsl.WORKPLACE_ID = fw.WORKPLACE_ID GROUP BY fsl.WORKPLACE_ID, fw.{key_field} ) SELECT {key_field}, SHIFT_VERIFY, FACILITY_CANCEL, SHIFT_OPEN, SHIFT_REASSIGN, SHIFT_TIME_CHANGE, WORKER_CANCEL, SHIFT_UNASSIGN, SHIFT_CLAIM, SHIFT_DELETE, NO_CALL_NO_SHOW, SHIFT_PAY_RATE_CHANGE, SHIFT_ASSIGN FROM city_shifts ORDER BY {filters.get('shiftType') or "SHIFT_VERIFY"} DESC FETCH FIRST {filters['length']} ROWS ONLY;""" def build_specific_location_worker_query(filters: dict) -> str: location = filters.get('name') key_field = "CITY" if filters.get('locationType') == LocationType.city.value else "STATE" return f"""SELECT {key_field} AS name, OBJECT_CONSTRUCT( 'totalWorkersCount', COUNT(*), 'CNA', SUM(CASE WHEN QUALIFICATION = 'CNA' THEN 1 ELSE 0 END), 'RN', SUM(CASE WHEN QUALIFICATION = 'RN' THEN 1 ELSE 0 END), 'LVN', SUM(CASE WHEN QUALIFICATION = 'LVN' THEN 1 ELSE 0 END), 'CAREGIVER', SUM(CASE WHEN QUALIFICATION = 'CAREGIVER' THEN 1 ELSE 0 END), 'Non Clinical', SUM(CASE WHEN QUALIFICATION = 'Non Clinical' THEN 1 ELSE 0 END), 'Medical Assistant', SUM(CASE WHEN QUALIFICATION = 'Medical Assistant' THEN 1 ELSE 0 END), 'HOUSEKEEPER', SUM(CASE WHEN QUALIFICATION = 'HOUSEKEEPER' THEN 1 ELSE 0 END), 'Dietary Aide', SUM(CASE WHEN QUALIFICATION = 'Dietary Aide' THEN 1 ELSE 0 END), 'HHA', SUM(CASE WHEN QUALIFICATION = 'HHA' THEN 1 ELSE 0 END) ) as workerStatistics FROM DIM_WORKERS WHERE {key_field} ILIKE '{location}' GROUP BY {key_field};""" def build_specific_location_facility_query(filters: dict) -> str: location = filters.get('name') key_field = "CITY" if filters.get('locationType') == LocationType.city.value else "STATE" return f"""SELECT {key_field} AS name, OBJECT_CONSTRUCT( 'totalFacilitiesCount', COUNT(*), 'Long Term Care', SUM(CASE WHEN d.TYPE = 'Long Term Care' THEN 1 ELSE 0 END), 'Dental Clinic', SUM(CASE WHEN d.TYPE = 'Dental Clinic' THEN 1 ELSE 0 END), 'Home Healthcare', SUM(CASE WHEN d.TYPE = 'Home Healthcare' THEN 1 ELSE 0 END), 'Medical Lab', SUM(CASE WHEN d.TYPE = 'Medical Lab' THEN 1 ELSE 0 END), 'Hospice', SUM(CASE WHEN d.TYPE = 'Hospice' THEN 1 ELSE 0 END), 'Pharmacy', SUM(CASE WHEN d.TYPE = 'Pharmacy' THEN 1 ELSE 0 END), 'Hospital', SUM(CASE WHEN d.TYPE = 'Hospital' THEN 1 ELSE 0 END) ) AS facilityStatistics FROM DIM_WORKPLACES d WHERE {key_field} ILIKE '{location}' GROUP BY {key_field}""" def build_specific_location_shift_query(filters: dict) -> str: key_field = "CITY" if filters.get('locationType') == LocationType.city.value else "STATE" location_value = filters['name'] return f"""WITH filtered_shifts AS ( SELECT WORKER_ID, WORKPLACE_ID, ACTION, SHIFT_START_AT FROM fct_shift_logs WHERE SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') ), city_shifts AS ( SELECT fsl.WORKPLACE_ID, fw.{key_field}, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_VERIFY' THEN 1 END) AS SHIFT_VERIFY, COUNT(CASE WHEN fsl.ACTION = 'FACILITY_CANCEL' THEN 1 END) AS FACILITY_CANCEL, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_OPEN' THEN 1 END) AS SHIFT_OPEN, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_REASSIGN' THEN 1 END) AS SHIFT_REASSIGN, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) AS SHIFT_TIME_CHANGE, COUNT(CASE WHEN fsl.ACTION = 'WORKER_CANCEL' THEN 1 END) AS WORKER_CANCEL, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_UNASSIGN' THEN 1 END) AS SHIFT_UNASSIGN, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_CLAIM' THEN 1 END) AS SHIFT_CLAIM, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_DELETE' THEN 1 END) AS SHIFT_DELETE, COUNT(CASE WHEN fsl.ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) AS NO_CALL_NO_SHOW, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) AS SHIFT_PAY_RATE_CHANGE, COUNT(CASE WHEN fsl.ACTION = 'SHIFT_ASSIGN' THEN 1 END) AS SHIFT_ASSIGN FROM filtered_shifts fsl JOIN dim_workplaces fw ON fsl.WORKPLACE_ID = fw.WORKPLACE_ID WHERE fw.{key_field} ILIKE '{location_value}' GROUP BY fsl.WORKPLACE_ID, fw.{key_field} ) SELECT {key_field}, SHIFT_VERIFY, FACILITY_CANCEL, SHIFT_OPEN, SHIFT_REASSIGN, SHIFT_TIME_CHANGE, WORKER_CANCEL, SHIFT_UNASSIGN, SHIFT_CLAIM, SHIFT_DELETE, NO_CALL_NO_SHOW, SHIFT_PAY_RATE_CHANGE, SHIFT_ASSIGN FROM city_shifts FETCH FIRST 1 ROWS ONLY;""" def form_shift_count_response(row: list[int]) -> dict[str, int]: return { "shiftVerified": row[0], "facilityCancellations": row[1], "shiftOpened": row[2], "shiftReassigned": row[3], "shiftTimeChanged": row[4], "workerCancellations": row[5], "shiftUnassigned": row[6], "shiftClaimed": row[7], "shiftDeleted": row[8], "noCallNoShow": row[9], "shiftPayRateChanged": row[10], "shiftAssigned": row[11], } def form_worker_count_response(rows, filters: dict): qualifications, results = dict(), dict() total_count = 0 for qualification, count in rows: if qualification: qualifications[qualification] = int(count) if qualification is None and count > total_count: total_count = int(count) results['totalWorkersCount'] = total_count results['qualifications'] = qualifications results['selectedDate'] = f"{filters['endDate']}" return results def form_facility_count_response(rows, filters: dict): results = {"types": {}} total_count = 0 for type_, count in rows: if type_ and type_ != '-': results["types"][type_] = int(count) total_count += int(count) results['totalFacilitiesCount'] = total_count results['selectedDate'] = f"{filters['endDate']}" return results def form_shift_statistics_response(rows, request_body: dict): entity = request_body.get('entity') results = [] for row in rows: key = "workerFullName" if entity == EntityType.Worker.value else "facilityName" worker_data = { key: row[0], "totalShiftCount": row[1], "avgShiftCountPerDay": round(float(row[2]), 2), } results.append(worker_data) return {"results": results, "startDate": request_body['startDate'], "endDate": request_body['endDate']} def check_missed_fields(request: dict, request_type: AgentRequestType): required_fields_map = { AgentRequestType.WorkerSearch: ['name', 'city', 'state', 'country', 'address', 'qualification', 'postalCode'], AgentRequestType.FacilitySearch: ['name', 'email', 'country', 'state', 'city', 'address', 'postalCode', 'type'], AgentRequestType.ShiftStatistics: ["entity", "type", 'length', 'shiftType'], AgentRequestType.AverageStatistics: ["qualification"], AgentRequestType.LocationStatistics: ["field1", "field2"], AgentRequestType.CountStatistics: ["field"], AgentRequestType.LocationSearch: ["name", "locationType"] } fields = required_fields_map[request_type] if request_type in (AgentRequestType.WorkerSearch, AgentRequestType.FacilitySearch): if not any( [request.get('name'), request.get('email'), request.get('address')] ) and len(list(filter(lambda field: request.get(field), fields))) < 2: message = form_search_missed_message(request, fields) raise ValueError(message) elif request_type == AgentRequestType.ShiftStatistics: general_conditions = not all([request.get('shiftType'), request.get('entity'), request.get('type')]) if general_conditions: message = form_shift_statistics_missed_message(request) raise ValueError(message) elif request_type == AgentRequestType.AverageStatistics: if not request.get('qualification'): raise ValueError("qualification") elif request_type == AgentRequestType.CountStatistics: if not request.get('field'): raise ValueError("field (worker, facility or shift type)") elif request_type == AgentRequestType.LocationStatistics: if not all([request.get('field1'), request.get('field2')]): raise ValueError( "Filter field (city or state) and result field (workers or facilities) needs to be specified") elif request_type == AgentRequestType.LocationSearch: if not all([request.get('name'), request.get('locationType')]): raise ValueError( "Location name and location type (city or state) needs to be specified" ) def form_search_missed_message(request: dict, fields: list) -> str: return ', '.join([field for field in fields if not request.get(field)]) def form_shift_statistics_missed_message(request: dict) -> str: result = '' if not request.get('shiftType'): result += "shiftType (Verified shifts, cancelled shifts, etc.); " if not request.get('entity'): result += "entity (worker or facility); " if not request.get('type'): result += "type (most or least); " if not request.get('qualification') and request.get('entity') == EntityType.Worker.value: result += "worker qualification; " if not request.get('length'): result += "length of list;" return result def prepare_message_history_str(message_history: list[MessageModel], search_request: str) -> str: results = '' shorted_message_history = message_history[-8:] for message in shorted_message_history: results += f'[{message.author.name}] {message.text}\n' results += f"[User] {search_request}" return results def form_location_statistics_response(rows, filters: dict): result = [] entity = filters['field2'] for row in rows: city_name = row[0] if entity != EntityType.Shift.value: entity_statistics = json.loads(row[1]) total_field = "totalFacilitiesCount" if entity == EntityType.Facility.value else "totalWorkersCount" total_value = entity_statistics.pop(total_field) statistics_field = "workersByQualificationCount" if entity == EntityType.Worker.value else "facilitiesByTypeCount" result.append({ "name": city_name, total_field: total_value, statistics_field: entity_statistics }) else: result.append({ "name": city_name, "shiftsByTypeCount": { "shiftVerified": row[1], "facilityCancellations": row[2], "shiftOpened": row[3], "shiftReassigned": row[4], "shiftTimeChanged": row[5], "workerCancellations": row[6], "shiftUnassigned": row[7], "shiftClaimed": row[8], "shiftDeleted": row[9], "noCallNoShow": row[10], "shiftPayRateChanged": row[11], "shiftAssigned": row[12], }, "selectedDate": f"from {filters['startDate']} to {filters['endDate']}" }) return {"results": result[:filters['length']]} def form_prev_request(messages: list[MessageModel]) -> tuple: prev_query = '' prev_module_response = {} if len(messages) >= 2: prev_query = messages[-2].text prev_module_response = messages[-1].moduleResponse return prev_query, prev_module_response def form_avg_statistics_response(rows: list) -> dict: return { "shiftVerified": round(float(rows[0]), 2), "facilityCancellations": round(float(rows[1]), 2), "shiftOpened": round(float(rows[2]), 2), "shiftReassigned": round(float(rows[3]), 2), "shiftTimeChanged": round(float(rows[4]), 2), "workerCancellations": round(float(rows[5]), 2), "shiftUnassigned": round(float(rows[6]), 2), "shiftClaimed": round(float(rows[7]), 2), "shiftDeleted": round(float(rows[8]), 2), "noCallNoShow": round(float(rows[9]), 2), "shiftPayRateChanged": round(float(rows[10]), 2), "shiftAssigned": round(float(rows[11]), 2), }