| import json | |
| from datetime import datetime | |
| from cbh.api.message.dto import AgentRequestType, EntityType, LocationType | |
| from cbh.api.message.model import MessageModel | |
| def build_worker_search_query(filters: dict) -> str: | |
| query = "SELECT * FROM DIM_WORKERS WHERE 1=1" | |
| conditions = [] | |
| if filters.get('name'): | |
| conditions.append(f"FULL_NAME ILIKE '%{filters['name']}%'") | |
| if filters.get('city'): | |
| conditions.append(f"CITY ILIKE '%{filters['city']}%'") | |
| if filters.get('state'): | |
| conditions.append(f"STATE ILIKE '%{filters['state']}%'") | |
| if filters.get('country'): | |
| conditions.append(f"COUNTRY ILIKE '%{filters['country']}%'") | |
| if filters.get('address'): | |
| conditions.append(f"FORMATTED_ADDRESS ILIKE '%{filters['address']}%'") | |
| if filters.get('qualification'): | |
| conditions.append(f"QUALIFICATION ILIKE '{filters['qualification']}'") | |
| if filters.get('postalCode'): | |
| conditions.append(f"POSTAL_CODE ILIKE '{filters['postalCode']}'") | |
| if conditions: | |
| query += " AND " + " AND ".join(conditions) | |
| query += " LIMIT 1" | |
| return query | |
| def build_worker_shift_count_query(worker_id: str, filters: dict) -> str: | |
| return f""" | |
| WITH filtered_shifts AS ( | |
| SELECT * | |
| FROM fct_shift_logs | |
| WHERE WORKER_ID = '{worker_id}' | |
| AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') | |
| AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') | |
| ), | |
| ranked_shifts AS ( | |
| SELECT *, | |
| ROW_NUMBER() OVER (PARTITION BY SHIFT_ID ORDER BY ACTION_AT DESC) AS rn | |
| FROM filtered_shifts | |
| ) | |
| SELECT | |
| COUNT(CASE WHEN ACTION = 'SHIFT_VERIFY' THEN 1 END) AS shiftVerified, | |
| COUNT(CASE WHEN ACTION = 'FACILITY_CANCEL' THEN 1 END) AS facilityCancelled, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_OPEN' THEN 1 END) AS shiftOpen, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_REASSIGN' THEN 1 END) AS shiftReassign, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) AS shiftTimeChange, | |
| COUNT(CASE WHEN ACTION = 'WORKER_CANCEL' THEN 1 END) AS workerCancel, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_UNASSIGN' THEN 1 END) AS shiftUnassign, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_CLAIM' THEN 1 END) AS shiftClaim, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_DELETE' THEN 1 END) AS shiftDelete, | |
| COUNT(CASE WHEN ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) AS noCallNoShow, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) AS shiftPayRateChange, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_ASSIGN' THEN 1 END) AS shiftAssign, | |
| FROM ranked_shifts | |
| WHERE rn = 1; | |
| """ | |
| def build_facility_search_query(filters: dict) -> str: | |
| query = "SELECT * FROM dim_workplaces WHERE 1=1" | |
| conditions = [] | |
| if filters.get('name'): | |
| conditions.append(f"NAME ILIKE '%{filters['name']}%'") | |
| if filters.get('email'): | |
| conditions.append(f"EMAIL ILIKE '{filters['email']}'") | |
| if filters.get('country'): | |
| conditions.append(f"COUNTRY ILIKE '%{filters['country']}%'") | |
| if filters.get('city'): | |
| conditions.append(f"CITY ILIKE '%{filters['city']}%'") | |
| if filters.get('state'): | |
| conditions.append(f"STATE ILIKE '%{filters['state']}%'") | |
| if filters.get('address'): | |
| conditions.append(f"ADDRESS ILIKE '%{filters['address']}%'") | |
| if filters.get('postalCode'): | |
| conditions.append(f"POSTAL_CODE ILIKE '{filters['postalCode']}'") | |
| if filters.get('type'): | |
| conditions.append(f"TYPE ILIKE '%{filters['type']}%'") | |
| if conditions: | |
| query += " AND " + " AND ".join(conditions) | |
| query += " LIMIT 1" | |
| return query | |
| def build_facility_shift_count_query(facility_id: str, filters: dict) -> str: | |
| return f""" | |
| WITH filtered_shifts AS ( | |
| SELECT SHIFT_ID, ACTION, ACTION_AT | |
| FROM fct_shift_logs | |
| WHERE WORKPLACE_ID = '{facility_id}' | |
| AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') | |
| AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') | |
| ), | |
| ranked_shifts AS ( | |
| SELECT SHIFT_ID, ACTION, ACTION_AT, | |
| ROW_NUMBER() OVER (PARTITION BY SHIFT_ID ORDER BY ACTION_AT DESC) AS rn | |
| FROM filtered_shifts | |
| ) | |
| SELECT | |
| COUNT(CASE WHEN ACTION = 'SHIFT_VERIFY' THEN 1 END) AS shiftVerified, | |
| COUNT(CASE WHEN ACTION = 'FACILITY_CANCEL' THEN 1 END) AS facilityCancelled, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_OPEN' THEN 1 END) AS shiftOpen, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_REASSIGN' THEN 1 END) AS shiftReassign, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) AS shiftTimeChange, | |
| COUNT(CASE WHEN ACTION = 'WORKER_CANCEL' THEN 1 END) AS workerCancel, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_UNASSIGN' THEN 1 END) AS shiftUnassign, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_CLAIM' THEN 1 END) AS shiftClaim, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_DELETE' THEN 1 END) AS shiftDelete, | |
| COUNT(CASE WHEN ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) AS noCallNoShow, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) AS shiftPayRateChange, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_ASSIGN' THEN 1 END) AS shiftAssign, | |
| FROM ranked_shifts | |
| WHERE rn = 1; | |
| """ | |
| def form_worker_module_response(worker: dict, shift_count: dict, request_body: dict, amount_earned: int) -> dict: | |
| keys_needed = ['ACCOUNT_STAGE', 'ATTENDANCE_SCORE', 'AVG_WORKER_RATING', 'CITY', | |
| 'CLIPBOARD_SCORE', 'COUNTRY', 'FULL_NAME', 'FORMATTED_ADDRESS', 'MSA', 'POSTAL_CODE', | |
| 'QUALIFICATION', 'STATE', 'INSTANT_PAY_RATE'] | |
| worker = {k: v for k, v in worker.items() if k in keys_needed} | |
| worker['AVG_WORKER_RATING'] = float(worker.get('AVG_WORKER_RATING') or 0) | |
| worker[f'shiftStatistics (from {request_body["startDate"]} to {request_body["endDate"]})'] = shift_count | |
| worker[f'amountEarned (from {request_body["startDate"]} to {request_body["endDate"]})'] = amount_earned | |
| return worker | |
| def form_facility_module_response(facility: dict, shift_count: dict, request_body: dict, paid_info: dict) -> dict: | |
| keys_needed = ["TYPE", 'NAME', 'EMAIL', 'COUNTRY', 'STATE', 'CITY', 'ADDRESS', | |
| 'POSTAL_CODE', 'HAS_PAY_ON_HOLIDAY', 'WEBSITE', 'PHONE', 'DESCRIPTION'] | |
| worker = {k: v for k, v in facility.items() if k in keys_needed} | |
| worker[f'shiftStatistics (from {request_body["startDate"]} to {request_body["endDate"]})'] = shift_count | |
| worker['paymentInfo (from {request_body["startDate"]} to {request_body["endDate"]})'] = paid_info | |
| return worker | |
| def build_shift_statistics_query(filters: dict) -> str: | |
| if filters['entity'] == EntityType.Facility.value: | |
| key_field = 'WORKPLACE_NAME' | |
| first_condition = "1=1" | |
| else: | |
| key_field = 'WORKER_FULL_NAME' | |
| if filters.get('qualification'): | |
| first_condition = f"WORKER_QUALIFICATION ILIKE '%{filters.get("qualification")}%'" | |
| else: | |
| first_condition = "1=1" | |
| query = f""" | |
| WITH filtered_shifts AS ( | |
| SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT | |
| FROM fct_shift_logs | |
| WHERE {first_condition} | |
| AND ACTION = '{filters.get("shiftType")}' | |
| AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters.get("startDate"), "%Y-%m-%d")}') | |
| AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters.get("endDate"), "%Y-%m-%d")}') | |
| ), | |
| ranked_shifts AS ( | |
| SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT, | |
| ROW_NUMBER() OVER (PARTITION BY SHIFT_ID ORDER BY ACTION_AT DESC) AS rn | |
| FROM filtered_shifts | |
| ), | |
| shift_dates AS ( | |
| SELECT DISTINCT DATE(SHIFT_START_AT) AS shift_date | |
| FROM ranked_shifts | |
| WHERE rn = 1 | |
| ) | |
| SELECT | |
| {key_field}, | |
| COUNT(DISTINCT SHIFT_ID) AS count, | |
| COUNT(*) / COUNT(DISTINCT shift_date) | |
| FROM ranked_shifts | |
| JOIN shift_dates | |
| ON DATE(SHIFT_START_AT) = shift_dates.shift_date | |
| WHERE rn = 1 | |
| GROUP BY {key_field} | |
| ORDER BY count DESC | |
| LIMIT {filters.get("length")}; | |
| """ | |
| return query | |
| def build_avg_rating_query(filters: dict) -> str: | |
| cond = '' | |
| if filters.get('qualification'): | |
| cond += f"AND QUALIFICATION ILIKE '{filters['qualification']}'\n" | |
| if filters.get('city'): | |
| cond += f"AND CITY ILIKE '{filters['city']}'\n" | |
| if filters.get('country'): | |
| cond += f"AND COUNTRY ILIKE '{filters['country']}'\n" | |
| if filters.get('state'): | |
| cond += f"AND STATE ILIKE '{filters['state']}'\n" | |
| query = f""" | |
| SELECT AVG(AVG_WORKER_RATING), AVG(CLIPBOARD_SCORE), AVG(ATTENDANCE_SCORE) | |
| FROM DIM_WORKERS | |
| WHERE 1=1 | |
| {cond} | |
| AND AVG_WORKER_RATING IS NOT NULL; | |
| """ | |
| return query | |
| def build_avg_pay_rate_query(filters: dict) -> str: | |
| second_cond = '' | |
| if filters.get('qualification'): | |
| second_cond = f"AND WORKER_QUALIFICATION ILIKE '{filters['qualification']}'" | |
| query = f""" | |
| SELECT AVG(NET_PAY) AS avg_net_pay, AVG(PAY) AS avg_pay | |
| FROM fct_shifts | |
| WHERE IS_VERIFIED = true | |
| {second_cond} | |
| AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') | |
| AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') | |
| """ | |
| return query | |
| def build_worker_count_query(filters: dict) -> str: | |
| additional_cond = '' | |
| if filters.get('city'): | |
| additional_cond += f"AND CITY ILIKE '%{filters['city']}%'\n" | |
| if filters.get('country'): | |
| additional_cond += f"AND COUNTRY ILIKE '%{filters['country']}%'\n" | |
| if filters.get('state'): | |
| additional_cond += f"AND STATE ILIKE '%{filters['state']}%'\n" | |
| created_add_cond = '' | |
| start_of_year = datetime(datetime.now().year, 1, 1).strftime('%Y-%m-%d') | |
| today = datetime.now().strftime('%Y-%m-%d') | |
| if filters['startDate'] != start_of_year and filters['endDate'] != today: | |
| created_add_cond = f"AND CREATED_AT <= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}')" | |
| query = f""" | |
| SELECT QUALIFICATION, COUNT(*) AS cnt | |
| FROM DIM_WORKERS | |
| WHERE 1=1 | |
| {additional_cond} | |
| {created_add_cond} | |
| GROUP BY ROLLUP(QUALIFICATION) | |
| """ | |
| return query | |
| def build_facility_count_query(filters: dict) -> str: | |
| additional_cond = '' | |
| if filters.get('city'): | |
| additional_cond += f"AND CITY ILIKE '%{filters['city']}%'\n" | |
| if filters.get('country'): | |
| additional_cond += f"AND COUNTRY ILIKE '%{filters['country']}%'\n" | |
| if filters.get('state'): | |
| additional_cond += f"AND STATE ILIKE '%{filters['state']}%'\n" | |
| created_add_cond = '' | |
| start_of_year = datetime(datetime.now().year, 1, 1).strftime('%Y-%m-%d') | |
| today = datetime.now().strftime('%Y-%m-%d') | |
| if filters['startDate'] != start_of_year and filters['endDate'] != today: | |
| created_add_cond = f"AND CREATED_AT <= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}')" | |
| query = f""" | |
| SELECT TYPE, COUNT(*) AS cnt | |
| FROM dim_workplaces | |
| WHERE 1=1 | |
| {additional_cond} | |
| {created_add_cond} | |
| GROUP BY TYPE | |
| """ | |
| return query | |
| def build_shift_count_query(filters: dict) -> str: | |
| return f""" | |
| WITH filtered_shifts AS ( | |
| SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT | |
| FROM fct_shift_logs | |
| WHERE SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') | |
| AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') | |
| ), | |
| ranked_shifts AS ( | |
| SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT, | |
| ROW_NUMBER() OVER (PARTITION BY SHIFT_ID ORDER BY ACTION_AT DESC) AS rn | |
| FROM filtered_shifts | |
| ) | |
| SELECT | |
| COUNT(CASE WHEN ACTION = 'SHIFT_VERIFY' THEN 1 END) AS shiftVerified, | |
| COUNT(CASE WHEN ACTION = 'FACILITY_CANCEL' THEN 1 END) AS facilityCancelled, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_OPEN' THEN 1 END) AS shiftOpen, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_REASSIGN' THEN 1 END) AS shiftReassign, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) AS shiftTimeChange, | |
| COUNT(CASE WHEN ACTION = 'WORKER_CANCEL' THEN 1 END) AS workerCancel, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_UNASSIGN' THEN 1 END) AS shiftUnassign, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_CLAIM' THEN 1 END) AS shiftClaim, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_DELETE' THEN 1 END) AS shiftDelete, | |
| COUNT(CASE WHEN ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) AS noCallNoShow, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) AS shiftPayRateChange, | |
| COUNT(CASE WHEN ACTION = 'SHIFT_ASSIGN' THEN 1 END) AS shiftAssign, | |
| FROM ranked_shifts | |
| WHERE rn = 1; | |
| """ | |
| def build_facility_payment_info_query(facility_id: str, filters: dict) -> str: | |
| return f""" | |
| SELECT WORKER_QUALIFICATION, | |
| SUM(TOTAL_SHIFT_CHARGE) AS total_paid, | |
| COUNT(SHIFT_ID) AS number_of_shifts | |
| FROM fct_shifts | |
| WHERE WORKPLACE_ID = '{facility_id}' | |
| AND IS_VERIFIED = true | |
| AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') | |
| AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') | |
| GROUP BY WORKER_QUALIFICATION | |
| """ | |
| def build_worker_payment_query(worker_id: str, filters: dict) -> str: | |
| return f""" | |
| SELECT SUM(NET_PAY) | |
| FROM fct_shifts | |
| WHERE WORKER_ID = '{worker_id}' | |
| AND IS_VERIFIED = true | |
| AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') | |
| AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') | |
| AND NET_PAY IS NOT NULL; | |
| """ | |
| def build_avg_shift_count_query(filters: dict) -> str: | |
| cond = '1=1' | |
| if filters.get('qualification'): | |
| cond = f"WHERE WORKER_QUALIFICATION ILIKE '{filters['qualification']}'" | |
| return f""" | |
| WITH filtered_shifts AS ( | |
| SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT, WORKER_QUALIFICATION | |
| FROM fct_shift_logs | |
| {cond} | |
| AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') | |
| AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') | |
| ), | |
| ranked_shifts AS ( | |
| SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT, WORKER_QUALIFICATION, | |
| ROW_NUMBER() OVER (PARTITION BY SHIFT_ID ORDER BY ACTION_AT DESC) AS rn | |
| FROM filtered_shifts | |
| ), | |
| shift_dates AS ( | |
| SELECT DISTINCT DATE(SHIFT_START_AT) AS shift_date | |
| FROM ranked_shifts | |
| WHERE rn = 1 | |
| ) | |
| SELECT | |
| COUNT(CASE WHEN ACTION = 'SHIFT_VERIFY' THEN 1 END) / COUNT(DISTINCT shift_date), | |
| COUNT(CASE WHEN ACTION = 'FACILITY_CANCEL' THEN 1 END) / COUNT(DISTINCT shift_date), | |
| COUNT(CASE WHEN ACTION = 'SHIFT_OPEN' THEN 1 END) / COUNT(DISTINCT shift_date), | |
| COUNT(CASE WHEN ACTION = 'SHIFT_REASSIGN' THEN 1 END) / COUNT(DISTINCT shift_date), | |
| COUNT(CASE WHEN ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) / COUNT(DISTINCT shift_date), | |
| COUNT(CASE WHEN ACTION = 'WORKER_CANCEL' THEN 1 END) / COUNT(DISTINCT shift_date), | |
| COUNT(CASE WHEN ACTION = 'SHIFT_UNASSIGN' THEN 1 END) / COUNT(DISTINCT shift_date), | |
| COUNT(CASE WHEN ACTION = 'SHIFT_CLAIM' THEN 1 END) / COUNT(DISTINCT shift_date), | |
| COUNT(CASE WHEN ACTION = 'SHIFT_DELETE' THEN 1 END) / COUNT(DISTINCT shift_date), | |
| COUNT(CASE WHEN ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) / COUNT(DISTINCT shift_date), | |
| COUNT(CASE WHEN ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) / COUNT(DISTINCT shift_date), | |
| COUNT(CASE WHEN ACTION = 'SHIFT_ASSIGN' THEN 1 END) / COUNT(DISTINCT shift_date), | |
| FROM ranked_shifts | |
| JOIN shift_dates | |
| ON DATE(SHIFT_START_AT) = shift_dates.shift_date | |
| WHERE rn = 1; | |
| """ | |
| def build_location_workers_query(filters: dict) -> str: | |
| worker_qualification = filters['qualification'] | |
| key_field = "CITY" if filters.get('field1') == LocationType.city.value else "STATE" | |
| length = filters['length'] + 1 if filters.get('field1') == LocationType.city.value else filters['length'] | |
| qualification_cond = "" | |
| if worker_qualification: | |
| qualification_cond = f"AND QUALIFICATION ILIKE '%{worker_qualification}%'" | |
| return f"""WITH QualifiedCities AS ( | |
| SELECT {key_field}, COUNT(*) AS worker_count | |
| FROM DIM_WORKERS | |
| WHERE 1=1 | |
| {qualification_cond} | |
| GROUP BY {key_field} | |
| ORDER BY worker_count DESC | |
| LIMIT {length} | |
| ) | |
| SELECT | |
| t.{key_field} AS name, | |
| OBJECT_CONSTRUCT( | |
| 'totalWorkersCount', COUNT(*), | |
| 'CNA', SUM(CASE WHEN d.QUALIFICATION = 'CNA' THEN 1 ELSE 0 END), | |
| 'RN', SUM(CASE WHEN d.QUALIFICATION = 'RN' THEN 1 ELSE 0 END), | |
| 'LVN', SUM(CASE WHEN d.QUALIFICATION = 'LVN' THEN 1 ELSE 0 END), | |
| 'CAREGIVER', SUM(CASE WHEN d.QUALIFICATION = 'CAREGIVER' THEN 1 ELSE 0 END), | |
| 'Non Clinical', SUM(CASE WHEN d.QUALIFICATION = 'Non Clinical' THEN 1 ELSE 0 END), | |
| 'Medical Assistant', SUM(CASE WHEN d.QUALIFICATION = 'Medical Assistant' THEN 1 ELSE 0 END), | |
| 'HOUSEKEEPER', SUM(CASE WHEN d.QUALIFICATION = 'HOUSEKEEPER' THEN 1 ELSE 0 END), | |
| 'Dietary Aide', SUM(CASE WHEN d.QUALIFICATION = 'Dietary Aide' THEN 1 ELSE 0 END), | |
| 'HHA', SUM(CASE WHEN d.QUALIFICATION = 'HHA' THEN 1 ELSE 0 END) | |
| ) AS workerStatistics | |
| FROM DIM_WORKERS d | |
| JOIN QualifiedCities t ON d.{key_field} = t.{key_field} | |
| GROUP BY t.{key_field};""" | |
| def build_location_facilities_query(filters: dict) -> str: | |
| key_field = "CITY" if filters.get('field1') == LocationType.city.value else "STATE" | |
| type_cond = "" | |
| if filters.get('facilityType'): | |
| type_cond = f"AND TYPE ILIKE '%{filters['facilityType']}%'" | |
| return f"""WITH TopCities AS ( | |
| SELECT {key_field}, COUNT(*) AS facility_count | |
| FROM DIM_WORKPLACES | |
| WHERE 1=1 | |
| {type_cond} | |
| GROUP BY {key_field} | |
| ORDER BY facility_count DESC | |
| LIMIT {filters['length']} | |
| ) | |
| SELECT | |
| t.{key_field} AS name, | |
| OBJECT_CONSTRUCT( | |
| 'totalFacilitiesCount', COUNT(*), | |
| 'Long Term Care', SUM(CASE WHEN d.TYPE = 'Long Term Care' THEN 1 ELSE 0 END), | |
| 'Dental Clinic', SUM(CASE WHEN d.TYPE = 'Dental Clinic' THEN 1 ELSE 0 END), | |
| 'Home Healthcare', SUM(CASE WHEN d.TYPE = 'Home Healthcare' THEN 1 ELSE 0 END), | |
| 'Medical Lab', SUM(CASE WHEN d.TYPE = 'Medical Lab' THEN 1 ELSE 0 END), | |
| 'Hospice', SUM(CASE WHEN d.TYPE = 'Hospice' THEN 1 ELSE 0 END), | |
| 'Pharmacy', SUM(CASE WHEN d.TYPE = 'Pharmacy' THEN 1 ELSE 0 END), | |
| 'Hospital', SUM(CASE WHEN d.TYPE = 'Hospital' THEN 1 ELSE 0 END) | |
| ) AS facilityStatistics | |
| FROM DIM_WORKPLACES d | |
| JOIN TopCities t ON d.{key_field} = t.{key_field} | |
| GROUP BY t.{key_field}""" | |
| def build_location_shift_query(filters: dict) -> str: | |
| key_field = "CITY" if filters.get('field1') == LocationType.city.value else "STATE" | |
| return f"""WITH filtered_shifts AS ( | |
| SELECT WORKER_ID, WORKPLACE_ID, ACTION, SHIFT_START_AT | |
| FROM fct_shift_logs | |
| WHERE SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') | |
| AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') | |
| ), | |
| ranked_shifts AS ( | |
| SELECT WORKER_ID, WORKPLACE_ID, ACTION, SHIFT_START_AT, | |
| ROW_NUMBER() OVER (PARTITION BY WORKER_ID ORDER BY SHIFT_START_AT DESC) AS rn | |
| FROM filtered_shifts | |
| ), | |
| city_shifts AS ( | |
| SELECT fsl.WORKPLACE_ID, fw.{key_field}, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_VERIFY' THEN 1 END) AS SHIFT_VERIFY, | |
| COUNT(CASE WHEN fsl.ACTION = 'FACILITY_CANCEL' THEN 1 END) AS FACILITY_CANCEL, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_OPEN' THEN 1 END) AS SHIFT_OPEN, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_REASSIGN' THEN 1 END) AS SHIFT_REASSIGN, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) AS SHIFT_TIME_CHANGE, | |
| COUNT(CASE WHEN fsl.ACTION = 'WORKER_CANCEL' THEN 1 END) AS WORKER_CANCEL, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_UNASSIGN' THEN 1 END) AS SHIFT_UNASSIGN, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_CLAIM' THEN 1 END) AS SHIFT_CLAIM, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_DELETE' THEN 1 END) AS SHIFT_DELETE, | |
| COUNT(CASE WHEN fsl.ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) AS NO_CALL_NO_SHOW, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) AS SHIFT_PAY_RATE_CHANGE, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_ASSIGN' THEN 1 END) AS SHIFT_ASSIGN, | |
| FROM ranked_shifts fsl | |
| JOIN dim_workplaces fw ON fsl.WORKPLACE_ID = fw.WORKPLACE_ID | |
| GROUP BY fsl.WORKPLACE_ID, fw.{key_field} | |
| ) | |
| SELECT {key_field}, | |
| SHIFT_VERIFY, | |
| FACILITY_CANCEL, | |
| SHIFT_OPEN, | |
| SHIFT_REASSIGN, | |
| SHIFT_TIME_CHANGE, | |
| WORKER_CANCEL, | |
| SHIFT_UNASSIGN, | |
| SHIFT_CLAIM, | |
| SHIFT_DELETE, | |
| NO_CALL_NO_SHOW, | |
| SHIFT_PAY_RATE_CHANGE, | |
| SHIFT_ASSIGN | |
| FROM city_shifts | |
| ORDER BY {filters.get('shiftType') or "SHIFT_VERIFY"} DESC | |
| FETCH FIRST {filters['length']} ROWS ONLY;""" | |
| def build_specific_location_worker_query(filters: dict) -> str: | |
| location = filters.get('name') | |
| key_field = "CITY" if filters.get('locationType') == LocationType.city.value else "STATE" | |
| return f"""SELECT | |
| {key_field} AS name, | |
| OBJECT_CONSTRUCT( | |
| 'totalWorkersCount', COUNT(*), | |
| 'CNA', SUM(CASE WHEN QUALIFICATION = 'CNA' THEN 1 ELSE 0 END), | |
| 'RN', SUM(CASE WHEN QUALIFICATION = 'RN' THEN 1 ELSE 0 END), | |
| 'LVN', SUM(CASE WHEN QUALIFICATION = 'LVN' THEN 1 ELSE 0 END), | |
| 'CAREGIVER', SUM(CASE WHEN QUALIFICATION = 'CAREGIVER' THEN 1 ELSE 0 END), | |
| 'Non Clinical', SUM(CASE WHEN QUALIFICATION = 'Non Clinical' THEN 1 ELSE 0 END), | |
| 'Medical Assistant', SUM(CASE WHEN QUALIFICATION = 'Medical Assistant' THEN 1 ELSE 0 END), | |
| 'HOUSEKEEPER', SUM(CASE WHEN QUALIFICATION = 'HOUSEKEEPER' THEN 1 ELSE 0 END), | |
| 'Dietary Aide', SUM(CASE WHEN QUALIFICATION = 'Dietary Aide' THEN 1 ELSE 0 END), | |
| 'HHA', SUM(CASE WHEN QUALIFICATION = 'HHA' THEN 1 ELSE 0 END) | |
| ) as workerStatistics | |
| FROM DIM_WORKERS | |
| WHERE {key_field} ILIKE '{location}' | |
| GROUP BY {key_field};""" | |
| def build_specific_location_facility_query(filters: dict) -> str: | |
| location = filters.get('name') | |
| key_field = "CITY" if filters.get('locationType') == LocationType.city.value else "STATE" | |
| return f"""SELECT | |
| {key_field} AS name, | |
| OBJECT_CONSTRUCT( | |
| 'totalFacilitiesCount', COUNT(*), | |
| 'Long Term Care', SUM(CASE WHEN d.TYPE = 'Long Term Care' THEN 1 ELSE 0 END), | |
| 'Dental Clinic', SUM(CASE WHEN d.TYPE = 'Dental Clinic' THEN 1 ELSE 0 END), | |
| 'Home Healthcare', SUM(CASE WHEN d.TYPE = 'Home Healthcare' THEN 1 ELSE 0 END), | |
| 'Medical Lab', SUM(CASE WHEN d.TYPE = 'Medical Lab' THEN 1 ELSE 0 END), | |
| 'Hospice', SUM(CASE WHEN d.TYPE = 'Hospice' THEN 1 ELSE 0 END), | |
| 'Pharmacy', SUM(CASE WHEN d.TYPE = 'Pharmacy' THEN 1 ELSE 0 END), | |
| 'Hospital', SUM(CASE WHEN d.TYPE = 'Hospital' THEN 1 ELSE 0 END) | |
| ) AS facilityStatistics | |
| FROM DIM_WORKPLACES d | |
| WHERE {key_field} ILIKE '{location}' | |
| GROUP BY {key_field}""" | |
| def build_specific_location_shift_query(filters: dict) -> str: | |
| key_field = "CITY" if filters.get('locationType') == LocationType.city.value else "STATE" | |
| location_value = filters['name'] | |
| return f"""WITH filtered_shifts AS ( | |
| SELECT WORKER_ID, WORKPLACE_ID, ACTION, SHIFT_START_AT | |
| FROM fct_shift_logs | |
| WHERE SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}') | |
| AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}') | |
| ), | |
| city_shifts AS ( | |
| SELECT fsl.WORKPLACE_ID, fw.{key_field}, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_VERIFY' THEN 1 END) AS SHIFT_VERIFY, | |
| COUNT(CASE WHEN fsl.ACTION = 'FACILITY_CANCEL' THEN 1 END) AS FACILITY_CANCEL, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_OPEN' THEN 1 END) AS SHIFT_OPEN, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_REASSIGN' THEN 1 END) AS SHIFT_REASSIGN, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) AS SHIFT_TIME_CHANGE, | |
| COUNT(CASE WHEN fsl.ACTION = 'WORKER_CANCEL' THEN 1 END) AS WORKER_CANCEL, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_UNASSIGN' THEN 1 END) AS SHIFT_UNASSIGN, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_CLAIM' THEN 1 END) AS SHIFT_CLAIM, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_DELETE' THEN 1 END) AS SHIFT_DELETE, | |
| COUNT(CASE WHEN fsl.ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) AS NO_CALL_NO_SHOW, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) AS SHIFT_PAY_RATE_CHANGE, | |
| COUNT(CASE WHEN fsl.ACTION = 'SHIFT_ASSIGN' THEN 1 END) AS SHIFT_ASSIGN | |
| FROM filtered_shifts fsl | |
| JOIN dim_workplaces fw ON fsl.WORKPLACE_ID = fw.WORKPLACE_ID | |
| WHERE fw.{key_field} ILIKE '{location_value}' | |
| GROUP BY fsl.WORKPLACE_ID, fw.{key_field} | |
| ) | |
| SELECT {key_field}, | |
| SHIFT_VERIFY, | |
| FACILITY_CANCEL, | |
| SHIFT_OPEN, | |
| SHIFT_REASSIGN, | |
| SHIFT_TIME_CHANGE, | |
| WORKER_CANCEL, | |
| SHIFT_UNASSIGN, | |
| SHIFT_CLAIM, | |
| SHIFT_DELETE, | |
| NO_CALL_NO_SHOW, | |
| SHIFT_PAY_RATE_CHANGE, | |
| SHIFT_ASSIGN | |
| FROM city_shifts | |
| FETCH FIRST 1 ROWS ONLY;""" | |
| def form_shift_count_response(row: list[int]) -> dict[str, int]: | |
| return { | |
| "shiftVerified": row[0], | |
| "facilityCancellations": row[1], | |
| "shiftOpened": row[2], | |
| "shiftReassigned": row[3], | |
| "shiftTimeChanged": row[4], | |
| "workerCancellations": row[5], | |
| "shiftUnassigned": row[6], | |
| "shiftClaimed": row[7], | |
| "shiftDeleted": row[8], | |
| "noCallNoShow": row[9], | |
| "shiftPayRateChanged": row[10], | |
| "shiftAssigned": row[11], | |
| } | |
| def form_worker_count_response(rows, filters: dict): | |
| qualifications, results = dict(), dict() | |
| total_count = 0 | |
| for qualification, count in rows: | |
| if qualification: | |
| qualifications[qualification] = int(count) | |
| if qualification is None and count > total_count: | |
| total_count = int(count) | |
| results['totalWorkersCount'] = total_count | |
| results['qualifications'] = qualifications | |
| results['selectedDate'] = f"{filters['endDate']}" | |
| return results | |
| def form_facility_count_response(rows, filters: dict): | |
| results = {"types": {}} | |
| total_count = 0 | |
| for type_, count in rows: | |
| if type_ and type_ != '-': | |
| results["types"][type_] = int(count) | |
| total_count += int(count) | |
| results['totalFacilitiesCount'] = total_count | |
| results['selectedDate'] = f"{filters['endDate']}" | |
| return results | |
| def form_shift_statistics_response(rows, request_body: dict): | |
| entity = request_body.get('entity') | |
| results = [] | |
| for row in rows: | |
| key = "workerFullName" if entity == EntityType.Worker.value else "facilityName" | |
| worker_data = { | |
| key: row[0], | |
| "totalShiftCount": row[1], | |
| "avgShiftCountPerDay": round(float(row[2]), 2), | |
| } | |
| results.append(worker_data) | |
| return {"results": results, "startDate": request_body['startDate'], "endDate": request_body['endDate']} | |
| def check_missed_fields(request: dict, request_type: AgentRequestType): | |
| required_fields_map = { | |
| AgentRequestType.WorkerSearch: ['name', 'city', 'state', 'country', 'address', 'qualification', 'postalCode'], | |
| AgentRequestType.FacilitySearch: ['name', 'email', 'country', 'state', 'city', 'address', 'postalCode', 'type'], | |
| AgentRequestType.ShiftStatistics: ["entity", "type", 'length', 'shiftType'], | |
| AgentRequestType.AverageStatistics: ["qualification"], | |
| AgentRequestType.LocationStatistics: ["field1", "field2"], | |
| AgentRequestType.CountStatistics: ["field"], | |
| AgentRequestType.LocationSearch: ["name", "locationType"] | |
| } | |
| fields = required_fields_map[request_type] | |
| if request_type in (AgentRequestType.WorkerSearch, AgentRequestType.FacilitySearch): | |
| if not any( | |
| [request.get('name'), request.get('email'), request.get('address')] | |
| ) and len(list(filter(lambda field: request.get(field), fields))) < 2: | |
| message = form_search_missed_message(request, fields) | |
| raise ValueError(message) | |
| elif request_type == AgentRequestType.ShiftStatistics: | |
| general_conditions = not all([request.get('shiftType'), request.get('entity'), request.get('type')]) | |
| if general_conditions: | |
| message = form_shift_statistics_missed_message(request) | |
| raise ValueError(message) | |
| elif request_type == AgentRequestType.AverageStatistics: | |
| if not request.get('qualification'): | |
| raise ValueError("qualification") | |
| elif request_type == AgentRequestType.CountStatistics: | |
| if not request.get('field'): | |
| raise ValueError("field (worker, facility or shift type)") | |
| elif request_type == AgentRequestType.LocationStatistics: | |
| if not all([request.get('field1'), request.get('field2')]): | |
| raise ValueError( | |
| "Filter field (city or state) and result field (workers or facilities) needs to be specified") | |
| elif request_type == AgentRequestType.LocationSearch: | |
| if not all([request.get('name'), request.get('locationType')]): | |
| raise ValueError( | |
| "Location name and location type (city or state) needs to be specified" | |
| ) | |
| def form_search_missed_message(request: dict, fields: list) -> str: | |
| return ', '.join([field for field in fields if not request.get(field)]) | |
| def form_shift_statistics_missed_message(request: dict) -> str: | |
| result = '' | |
| if not request.get('shiftType'): | |
| result += "shiftType (Verified shifts, cancelled shifts, etc.); " | |
| if not request.get('entity'): | |
| result += "entity (worker or facility); " | |
| if not request.get('type'): | |
| result += "type (most or least); " | |
| if not request.get('qualification') and request.get('entity') == EntityType.Worker.value: | |
| result += "worker qualification; " | |
| if not request.get('length'): | |
| result += "length of list;" | |
| return result | |
| def prepare_message_history_str(message_history: list[MessageModel], search_request: str) -> str: | |
| results = '' | |
| shorted_message_history = message_history[-8:] | |
| for message in shorted_message_history: | |
| results += f'[{message.author.name}] {message.text}\n' | |
| results += f"[User] {search_request}" | |
| return results | |
| def form_location_statistics_response(rows, filters: dict): | |
| result = [] | |
| entity = filters['field2'] | |
| for row in rows: | |
| city_name = row[0] | |
| if entity != EntityType.Shift.value: | |
| entity_statistics = json.loads(row[1]) | |
| total_field = "totalFacilitiesCount" if entity == EntityType.Facility.value else "totalWorkersCount" | |
| total_value = entity_statistics.pop(total_field) | |
| statistics_field = "workersByQualificationCount" if entity == EntityType.Worker.value else "facilitiesByTypeCount" | |
| result.append({ | |
| "name": city_name, | |
| total_field: total_value, | |
| statistics_field: entity_statistics | |
| }) | |
| else: | |
| result.append({ | |
| "name": city_name, | |
| "shiftsByTypeCount": { | |
| "shiftVerified": row[1], | |
| "facilityCancellations": row[2], | |
| "shiftOpened": row[3], | |
| "shiftReassigned": row[4], | |
| "shiftTimeChanged": row[5], | |
| "workerCancellations": row[6], | |
| "shiftUnassigned": row[7], | |
| "shiftClaimed": row[8], | |
| "shiftDeleted": row[9], | |
| "noCallNoShow": row[10], | |
| "shiftPayRateChanged": row[11], | |
| "shiftAssigned": row[12], | |
| }, | |
| "selectedDate": f"from {filters['startDate']} to {filters['endDate']}" | |
| }) | |
| return {"results": result[:filters['length']]} | |
| def form_prev_request(messages: list[MessageModel]) -> tuple: | |
| prev_query = '' | |
| prev_module_response = {} | |
| if len(messages) >= 2: | |
| prev_query = messages[-2].text | |
| prev_module_response = messages[-1].moduleResponse | |
| return prev_query, prev_module_response | |
| def form_avg_statistics_response(rows: list) -> dict: | |
| return { | |
| "shiftVerified": round(float(rows[0]), 2), | |
| "facilityCancellations": round(float(rows[1]), 2), | |
| "shiftOpened": round(float(rows[2]), 2), | |
| "shiftReassigned": round(float(rows[3]), 2), | |
| "shiftTimeChanged": round(float(rows[4]), 2), | |
| "workerCancellations": round(float(rows[5]), 2), | |
| "shiftUnassigned": round(float(rows[6]), 2), | |
| "shiftClaimed": round(float(rows[7]), 2), | |
| "shiftDeleted": round(float(rows[8]), 2), | |
| "noCallNoShow": round(float(rows[9]), 2), | |
| "shiftPayRateChanged": round(float(rows[10]), 2), | |
| "shiftAssigned": round(float(rows[11]), 2), | |
| } | |