brestok's picture
init
4cd2145
import json
from datetime import datetime
from cbh.api.message.dto import AgentRequestType, EntityType, LocationType
from cbh.api.message.model import MessageModel
def build_worker_search_query(filters: dict) -> str:
query = "SELECT * FROM DIM_WORKERS WHERE 1=1"
conditions = []
if filters.get('name'):
conditions.append(f"FULL_NAME ILIKE '%{filters['name']}%'")
if filters.get('city'):
conditions.append(f"CITY ILIKE '%{filters['city']}%'")
if filters.get('state'):
conditions.append(f"STATE ILIKE '%{filters['state']}%'")
if filters.get('country'):
conditions.append(f"COUNTRY ILIKE '%{filters['country']}%'")
if filters.get('address'):
conditions.append(f"FORMATTED_ADDRESS ILIKE '%{filters['address']}%'")
if filters.get('qualification'):
conditions.append(f"QUALIFICATION ILIKE '{filters['qualification']}'")
if filters.get('postalCode'):
conditions.append(f"POSTAL_CODE ILIKE '{filters['postalCode']}'")
if conditions:
query += " AND " + " AND ".join(conditions)
query += " LIMIT 1"
return query
def build_worker_shift_count_query(worker_id: str, filters: dict) -> str:
return f"""
WITH filtered_shifts AS (
SELECT *
FROM fct_shift_logs
WHERE WORKER_ID = '{worker_id}'
AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}')
AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}')
),
ranked_shifts AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY SHIFT_ID ORDER BY ACTION_AT DESC) AS rn
FROM filtered_shifts
)
SELECT
COUNT(CASE WHEN ACTION = 'SHIFT_VERIFY' THEN 1 END) AS shiftVerified,
COUNT(CASE WHEN ACTION = 'FACILITY_CANCEL' THEN 1 END) AS facilityCancelled,
COUNT(CASE WHEN ACTION = 'SHIFT_OPEN' THEN 1 END) AS shiftOpen,
COUNT(CASE WHEN ACTION = 'SHIFT_REASSIGN' THEN 1 END) AS shiftReassign,
COUNT(CASE WHEN ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) AS shiftTimeChange,
COUNT(CASE WHEN ACTION = 'WORKER_CANCEL' THEN 1 END) AS workerCancel,
COUNT(CASE WHEN ACTION = 'SHIFT_UNASSIGN' THEN 1 END) AS shiftUnassign,
COUNT(CASE WHEN ACTION = 'SHIFT_CLAIM' THEN 1 END) AS shiftClaim,
COUNT(CASE WHEN ACTION = 'SHIFT_DELETE' THEN 1 END) AS shiftDelete,
COUNT(CASE WHEN ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) AS noCallNoShow,
COUNT(CASE WHEN ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) AS shiftPayRateChange,
COUNT(CASE WHEN ACTION = 'SHIFT_ASSIGN' THEN 1 END) AS shiftAssign,
FROM ranked_shifts
WHERE rn = 1;
"""
def build_facility_search_query(filters: dict) -> str:
query = "SELECT * FROM dim_workplaces WHERE 1=1"
conditions = []
if filters.get('name'):
conditions.append(f"NAME ILIKE '%{filters['name']}%'")
if filters.get('email'):
conditions.append(f"EMAIL ILIKE '{filters['email']}'")
if filters.get('country'):
conditions.append(f"COUNTRY ILIKE '%{filters['country']}%'")
if filters.get('city'):
conditions.append(f"CITY ILIKE '%{filters['city']}%'")
if filters.get('state'):
conditions.append(f"STATE ILIKE '%{filters['state']}%'")
if filters.get('address'):
conditions.append(f"ADDRESS ILIKE '%{filters['address']}%'")
if filters.get('postalCode'):
conditions.append(f"POSTAL_CODE ILIKE '{filters['postalCode']}'")
if filters.get('type'):
conditions.append(f"TYPE ILIKE '%{filters['type']}%'")
if conditions:
query += " AND " + " AND ".join(conditions)
query += " LIMIT 1"
return query
def build_facility_shift_count_query(facility_id: str, filters: dict) -> str:
return f"""
WITH filtered_shifts AS (
SELECT SHIFT_ID, ACTION, ACTION_AT
FROM fct_shift_logs
WHERE WORKPLACE_ID = '{facility_id}'
AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}')
AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}')
),
ranked_shifts AS (
SELECT SHIFT_ID, ACTION, ACTION_AT,
ROW_NUMBER() OVER (PARTITION BY SHIFT_ID ORDER BY ACTION_AT DESC) AS rn
FROM filtered_shifts
)
SELECT
COUNT(CASE WHEN ACTION = 'SHIFT_VERIFY' THEN 1 END) AS shiftVerified,
COUNT(CASE WHEN ACTION = 'FACILITY_CANCEL' THEN 1 END) AS facilityCancelled,
COUNT(CASE WHEN ACTION = 'SHIFT_OPEN' THEN 1 END) AS shiftOpen,
COUNT(CASE WHEN ACTION = 'SHIFT_REASSIGN' THEN 1 END) AS shiftReassign,
COUNT(CASE WHEN ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) AS shiftTimeChange,
COUNT(CASE WHEN ACTION = 'WORKER_CANCEL' THEN 1 END) AS workerCancel,
COUNT(CASE WHEN ACTION = 'SHIFT_UNASSIGN' THEN 1 END) AS shiftUnassign,
COUNT(CASE WHEN ACTION = 'SHIFT_CLAIM' THEN 1 END) AS shiftClaim,
COUNT(CASE WHEN ACTION = 'SHIFT_DELETE' THEN 1 END) AS shiftDelete,
COUNT(CASE WHEN ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) AS noCallNoShow,
COUNT(CASE WHEN ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) AS shiftPayRateChange,
COUNT(CASE WHEN ACTION = 'SHIFT_ASSIGN' THEN 1 END) AS shiftAssign,
FROM ranked_shifts
WHERE rn = 1;
"""
def form_worker_module_response(worker: dict, shift_count: dict, request_body: dict, amount_earned: int) -> dict:
keys_needed = ['ACCOUNT_STAGE', 'ATTENDANCE_SCORE', 'AVG_WORKER_RATING', 'CITY',
'CLIPBOARD_SCORE', 'COUNTRY', 'FULL_NAME', 'FORMATTED_ADDRESS', 'MSA', 'POSTAL_CODE',
'QUALIFICATION', 'STATE', 'INSTANT_PAY_RATE']
worker = {k: v for k, v in worker.items() if k in keys_needed}
worker['AVG_WORKER_RATING'] = float(worker.get('AVG_WORKER_RATING') or 0)
worker[f'shiftStatistics (from {request_body["startDate"]} to {request_body["endDate"]})'] = shift_count
worker[f'amountEarned (from {request_body["startDate"]} to {request_body["endDate"]})'] = amount_earned
return worker
def form_facility_module_response(facility: dict, shift_count: dict, request_body: dict, paid_info: dict) -> dict:
keys_needed = ["TYPE", 'NAME', 'EMAIL', 'COUNTRY', 'STATE', 'CITY', 'ADDRESS',
'POSTAL_CODE', 'HAS_PAY_ON_HOLIDAY', 'WEBSITE', 'PHONE', 'DESCRIPTION']
worker = {k: v for k, v in facility.items() if k in keys_needed}
worker[f'shiftStatistics (from {request_body["startDate"]} to {request_body["endDate"]})'] = shift_count
worker['paymentInfo (from {request_body["startDate"]} to {request_body["endDate"]})'] = paid_info
return worker
def build_shift_statistics_query(filters: dict) -> str:
if filters['entity'] == EntityType.Facility.value:
key_field = 'WORKPLACE_NAME'
first_condition = "1=1"
else:
key_field = 'WORKER_FULL_NAME'
if filters.get('qualification'):
first_condition = f"WORKER_QUALIFICATION ILIKE '%{filters.get("qualification")}%'"
else:
first_condition = "1=1"
query = f"""
WITH filtered_shifts AS (
SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT
FROM fct_shift_logs
WHERE {first_condition}
AND ACTION = '{filters.get("shiftType")}'
AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters.get("startDate"), "%Y-%m-%d")}')
AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters.get("endDate"), "%Y-%m-%d")}')
),
ranked_shifts AS (
SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT,
ROW_NUMBER() OVER (PARTITION BY SHIFT_ID ORDER BY ACTION_AT DESC) AS rn
FROM filtered_shifts
),
shift_dates AS (
SELECT DISTINCT DATE(SHIFT_START_AT) AS shift_date
FROM ranked_shifts
WHERE rn = 1
)
SELECT
{key_field},
COUNT(DISTINCT SHIFT_ID) AS count,
COUNT(*) / COUNT(DISTINCT shift_date)
FROM ranked_shifts
JOIN shift_dates
ON DATE(SHIFT_START_AT) = shift_dates.shift_date
WHERE rn = 1
GROUP BY {key_field}
ORDER BY count DESC
LIMIT {filters.get("length")};
"""
return query
def build_avg_rating_query(filters: dict) -> str:
cond = ''
if filters.get('qualification'):
cond += f"AND QUALIFICATION ILIKE '{filters['qualification']}'\n"
if filters.get('city'):
cond += f"AND CITY ILIKE '{filters['city']}'\n"
if filters.get('country'):
cond += f"AND COUNTRY ILIKE '{filters['country']}'\n"
if filters.get('state'):
cond += f"AND STATE ILIKE '{filters['state']}'\n"
query = f"""
SELECT AVG(AVG_WORKER_RATING), AVG(CLIPBOARD_SCORE), AVG(ATTENDANCE_SCORE)
FROM DIM_WORKERS
WHERE 1=1
{cond}
AND AVG_WORKER_RATING IS NOT NULL;
"""
return query
def build_avg_pay_rate_query(filters: dict) -> str:
second_cond = ''
if filters.get('qualification'):
second_cond = f"AND WORKER_QUALIFICATION ILIKE '{filters['qualification']}'"
query = f"""
SELECT AVG(NET_PAY) AS avg_net_pay, AVG(PAY) AS avg_pay
FROM fct_shifts
WHERE IS_VERIFIED = true
{second_cond}
AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}')
AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}')
"""
return query
def build_worker_count_query(filters: dict) -> str:
additional_cond = ''
if filters.get('city'):
additional_cond += f"AND CITY ILIKE '%{filters['city']}%'\n"
if filters.get('country'):
additional_cond += f"AND COUNTRY ILIKE '%{filters['country']}%'\n"
if filters.get('state'):
additional_cond += f"AND STATE ILIKE '%{filters['state']}%'\n"
created_add_cond = ''
start_of_year = datetime(datetime.now().year, 1, 1).strftime('%Y-%m-%d')
today = datetime.now().strftime('%Y-%m-%d')
if filters['startDate'] != start_of_year and filters['endDate'] != today:
created_add_cond = f"AND CREATED_AT <= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}')"
query = f"""
SELECT QUALIFICATION, COUNT(*) AS cnt
FROM DIM_WORKERS
WHERE 1=1
{additional_cond}
{created_add_cond}
GROUP BY ROLLUP(QUALIFICATION)
"""
return query
def build_facility_count_query(filters: dict) -> str:
additional_cond = ''
if filters.get('city'):
additional_cond += f"AND CITY ILIKE '%{filters['city']}%'\n"
if filters.get('country'):
additional_cond += f"AND COUNTRY ILIKE '%{filters['country']}%'\n"
if filters.get('state'):
additional_cond += f"AND STATE ILIKE '%{filters['state']}%'\n"
created_add_cond = ''
start_of_year = datetime(datetime.now().year, 1, 1).strftime('%Y-%m-%d')
today = datetime.now().strftime('%Y-%m-%d')
if filters['startDate'] != start_of_year and filters['endDate'] != today:
created_add_cond = f"AND CREATED_AT <= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}')"
query = f"""
SELECT TYPE, COUNT(*) AS cnt
FROM dim_workplaces
WHERE 1=1
{additional_cond}
{created_add_cond}
GROUP BY TYPE
"""
return query
def build_shift_count_query(filters: dict) -> str:
return f"""
WITH filtered_shifts AS (
SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT
FROM fct_shift_logs
WHERE SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}')
AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}')
),
ranked_shifts AS (
SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT,
ROW_NUMBER() OVER (PARTITION BY SHIFT_ID ORDER BY ACTION_AT DESC) AS rn
FROM filtered_shifts
)
SELECT
COUNT(CASE WHEN ACTION = 'SHIFT_VERIFY' THEN 1 END) AS shiftVerified,
COUNT(CASE WHEN ACTION = 'FACILITY_CANCEL' THEN 1 END) AS facilityCancelled,
COUNT(CASE WHEN ACTION = 'SHIFT_OPEN' THEN 1 END) AS shiftOpen,
COUNT(CASE WHEN ACTION = 'SHIFT_REASSIGN' THEN 1 END) AS shiftReassign,
COUNT(CASE WHEN ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) AS shiftTimeChange,
COUNT(CASE WHEN ACTION = 'WORKER_CANCEL' THEN 1 END) AS workerCancel,
COUNT(CASE WHEN ACTION = 'SHIFT_UNASSIGN' THEN 1 END) AS shiftUnassign,
COUNT(CASE WHEN ACTION = 'SHIFT_CLAIM' THEN 1 END) AS shiftClaim,
COUNT(CASE WHEN ACTION = 'SHIFT_DELETE' THEN 1 END) AS shiftDelete,
COUNT(CASE WHEN ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) AS noCallNoShow,
COUNT(CASE WHEN ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) AS shiftPayRateChange,
COUNT(CASE WHEN ACTION = 'SHIFT_ASSIGN' THEN 1 END) AS shiftAssign,
FROM ranked_shifts
WHERE rn = 1;
"""
def build_facility_payment_info_query(facility_id: str, filters: dict) -> str:
return f"""
SELECT WORKER_QUALIFICATION,
SUM(TOTAL_SHIFT_CHARGE) AS total_paid,
COUNT(SHIFT_ID) AS number_of_shifts
FROM fct_shifts
WHERE WORKPLACE_ID = '{facility_id}'
AND IS_VERIFIED = true
AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}')
AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}')
GROUP BY WORKER_QUALIFICATION
"""
def build_worker_payment_query(worker_id: str, filters: dict) -> str:
return f"""
SELECT SUM(NET_PAY)
FROM fct_shifts
WHERE WORKER_ID = '{worker_id}'
AND IS_VERIFIED = true
AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}')
AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}')
AND NET_PAY IS NOT NULL;
"""
def build_avg_shift_count_query(filters: dict) -> str:
cond = '1=1'
if filters.get('qualification'):
cond = f"WHERE WORKER_QUALIFICATION ILIKE '{filters['qualification']}'"
return f"""
WITH filtered_shifts AS (
SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT, WORKER_QUALIFICATION
FROM fct_shift_logs
{cond}
AND SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}')
AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}')
),
ranked_shifts AS (
SELECT SHIFT_ID, ACTION, ACTION_AT, SHIFT_START_AT, WORKER_QUALIFICATION,
ROW_NUMBER() OVER (PARTITION BY SHIFT_ID ORDER BY ACTION_AT DESC) AS rn
FROM filtered_shifts
),
shift_dates AS (
SELECT DISTINCT DATE(SHIFT_START_AT) AS shift_date
FROM ranked_shifts
WHERE rn = 1
)
SELECT
COUNT(CASE WHEN ACTION = 'SHIFT_VERIFY' THEN 1 END) / COUNT(DISTINCT shift_date),
COUNT(CASE WHEN ACTION = 'FACILITY_CANCEL' THEN 1 END) / COUNT(DISTINCT shift_date),
COUNT(CASE WHEN ACTION = 'SHIFT_OPEN' THEN 1 END) / COUNT(DISTINCT shift_date),
COUNT(CASE WHEN ACTION = 'SHIFT_REASSIGN' THEN 1 END) / COUNT(DISTINCT shift_date),
COUNT(CASE WHEN ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) / COUNT(DISTINCT shift_date),
COUNT(CASE WHEN ACTION = 'WORKER_CANCEL' THEN 1 END) / COUNT(DISTINCT shift_date),
COUNT(CASE WHEN ACTION = 'SHIFT_UNASSIGN' THEN 1 END) / COUNT(DISTINCT shift_date),
COUNT(CASE WHEN ACTION = 'SHIFT_CLAIM' THEN 1 END) / COUNT(DISTINCT shift_date),
COUNT(CASE WHEN ACTION = 'SHIFT_DELETE' THEN 1 END) / COUNT(DISTINCT shift_date),
COUNT(CASE WHEN ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) / COUNT(DISTINCT shift_date),
COUNT(CASE WHEN ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) / COUNT(DISTINCT shift_date),
COUNT(CASE WHEN ACTION = 'SHIFT_ASSIGN' THEN 1 END) / COUNT(DISTINCT shift_date),
FROM ranked_shifts
JOIN shift_dates
ON DATE(SHIFT_START_AT) = shift_dates.shift_date
WHERE rn = 1;
"""
def build_location_workers_query(filters: dict) -> str:
worker_qualification = filters['qualification']
key_field = "CITY" if filters.get('field1') == LocationType.city.value else "STATE"
length = filters['length'] + 1 if filters.get('field1') == LocationType.city.value else filters['length']
qualification_cond = ""
if worker_qualification:
qualification_cond = f"AND QUALIFICATION ILIKE '%{worker_qualification}%'"
return f"""WITH QualifiedCities AS (
SELECT {key_field}, COUNT(*) AS worker_count
FROM DIM_WORKERS
WHERE 1=1
{qualification_cond}
GROUP BY {key_field}
ORDER BY worker_count DESC
LIMIT {length}
)
SELECT
t.{key_field} AS name,
OBJECT_CONSTRUCT(
'totalWorkersCount', COUNT(*),
'CNA', SUM(CASE WHEN d.QUALIFICATION = 'CNA' THEN 1 ELSE 0 END),
'RN', SUM(CASE WHEN d.QUALIFICATION = 'RN' THEN 1 ELSE 0 END),
'LVN', SUM(CASE WHEN d.QUALIFICATION = 'LVN' THEN 1 ELSE 0 END),
'CAREGIVER', SUM(CASE WHEN d.QUALIFICATION = 'CAREGIVER' THEN 1 ELSE 0 END),
'Non Clinical', SUM(CASE WHEN d.QUALIFICATION = 'Non Clinical' THEN 1 ELSE 0 END),
'Medical Assistant', SUM(CASE WHEN d.QUALIFICATION = 'Medical Assistant' THEN 1 ELSE 0 END),
'HOUSEKEEPER', SUM(CASE WHEN d.QUALIFICATION = 'HOUSEKEEPER' THEN 1 ELSE 0 END),
'Dietary Aide', SUM(CASE WHEN d.QUALIFICATION = 'Dietary Aide' THEN 1 ELSE 0 END),
'HHA', SUM(CASE WHEN d.QUALIFICATION = 'HHA' THEN 1 ELSE 0 END)
) AS workerStatistics
FROM DIM_WORKERS d
JOIN QualifiedCities t ON d.{key_field} = t.{key_field}
GROUP BY t.{key_field};"""
def build_location_facilities_query(filters: dict) -> str:
key_field = "CITY" if filters.get('field1') == LocationType.city.value else "STATE"
type_cond = ""
if filters.get('facilityType'):
type_cond = f"AND TYPE ILIKE '%{filters['facilityType']}%'"
return f"""WITH TopCities AS (
SELECT {key_field}, COUNT(*) AS facility_count
FROM DIM_WORKPLACES
WHERE 1=1
{type_cond}
GROUP BY {key_field}
ORDER BY facility_count DESC
LIMIT {filters['length']}
)
SELECT
t.{key_field} AS name,
OBJECT_CONSTRUCT(
'totalFacilitiesCount', COUNT(*),
'Long Term Care', SUM(CASE WHEN d.TYPE = 'Long Term Care' THEN 1 ELSE 0 END),
'Dental Clinic', SUM(CASE WHEN d.TYPE = 'Dental Clinic' THEN 1 ELSE 0 END),
'Home Healthcare', SUM(CASE WHEN d.TYPE = 'Home Healthcare' THEN 1 ELSE 0 END),
'Medical Lab', SUM(CASE WHEN d.TYPE = 'Medical Lab' THEN 1 ELSE 0 END),
'Hospice', SUM(CASE WHEN d.TYPE = 'Hospice' THEN 1 ELSE 0 END),
'Pharmacy', SUM(CASE WHEN d.TYPE = 'Pharmacy' THEN 1 ELSE 0 END),
'Hospital', SUM(CASE WHEN d.TYPE = 'Hospital' THEN 1 ELSE 0 END)
) AS facilityStatistics
FROM DIM_WORKPLACES d
JOIN TopCities t ON d.{key_field} = t.{key_field}
GROUP BY t.{key_field}"""
def build_location_shift_query(filters: dict) -> str:
key_field = "CITY" if filters.get('field1') == LocationType.city.value else "STATE"
return f"""WITH filtered_shifts AS (
SELECT WORKER_ID, WORKPLACE_ID, ACTION, SHIFT_START_AT
FROM fct_shift_logs
WHERE SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}')
AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}')
),
ranked_shifts AS (
SELECT WORKER_ID, WORKPLACE_ID, ACTION, SHIFT_START_AT,
ROW_NUMBER() OVER (PARTITION BY WORKER_ID ORDER BY SHIFT_START_AT DESC) AS rn
FROM filtered_shifts
),
city_shifts AS (
SELECT fsl.WORKPLACE_ID, fw.{key_field},
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_VERIFY' THEN 1 END) AS SHIFT_VERIFY,
COUNT(CASE WHEN fsl.ACTION = 'FACILITY_CANCEL' THEN 1 END) AS FACILITY_CANCEL,
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_OPEN' THEN 1 END) AS SHIFT_OPEN,
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_REASSIGN' THEN 1 END) AS SHIFT_REASSIGN,
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) AS SHIFT_TIME_CHANGE,
COUNT(CASE WHEN fsl.ACTION = 'WORKER_CANCEL' THEN 1 END) AS WORKER_CANCEL,
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_UNASSIGN' THEN 1 END) AS SHIFT_UNASSIGN,
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_CLAIM' THEN 1 END) AS SHIFT_CLAIM,
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_DELETE' THEN 1 END) AS SHIFT_DELETE,
COUNT(CASE WHEN fsl.ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) AS NO_CALL_NO_SHOW,
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) AS SHIFT_PAY_RATE_CHANGE,
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_ASSIGN' THEN 1 END) AS SHIFT_ASSIGN,
FROM ranked_shifts fsl
JOIN dim_workplaces fw ON fsl.WORKPLACE_ID = fw.WORKPLACE_ID
GROUP BY fsl.WORKPLACE_ID, fw.{key_field}
)
SELECT {key_field},
SHIFT_VERIFY,
FACILITY_CANCEL,
SHIFT_OPEN,
SHIFT_REASSIGN,
SHIFT_TIME_CHANGE,
WORKER_CANCEL,
SHIFT_UNASSIGN,
SHIFT_CLAIM,
SHIFT_DELETE,
NO_CALL_NO_SHOW,
SHIFT_PAY_RATE_CHANGE,
SHIFT_ASSIGN
FROM city_shifts
ORDER BY {filters.get('shiftType') or "SHIFT_VERIFY"} DESC
FETCH FIRST {filters['length']} ROWS ONLY;"""
def build_specific_location_worker_query(filters: dict) -> str:
location = filters.get('name')
key_field = "CITY" if filters.get('locationType') == LocationType.city.value else "STATE"
return f"""SELECT
{key_field} AS name,
OBJECT_CONSTRUCT(
'totalWorkersCount', COUNT(*),
'CNA', SUM(CASE WHEN QUALIFICATION = 'CNA' THEN 1 ELSE 0 END),
'RN', SUM(CASE WHEN QUALIFICATION = 'RN' THEN 1 ELSE 0 END),
'LVN', SUM(CASE WHEN QUALIFICATION = 'LVN' THEN 1 ELSE 0 END),
'CAREGIVER', SUM(CASE WHEN QUALIFICATION = 'CAREGIVER' THEN 1 ELSE 0 END),
'Non Clinical', SUM(CASE WHEN QUALIFICATION = 'Non Clinical' THEN 1 ELSE 0 END),
'Medical Assistant', SUM(CASE WHEN QUALIFICATION = 'Medical Assistant' THEN 1 ELSE 0 END),
'HOUSEKEEPER', SUM(CASE WHEN QUALIFICATION = 'HOUSEKEEPER' THEN 1 ELSE 0 END),
'Dietary Aide', SUM(CASE WHEN QUALIFICATION = 'Dietary Aide' THEN 1 ELSE 0 END),
'HHA', SUM(CASE WHEN QUALIFICATION = 'HHA' THEN 1 ELSE 0 END)
) as workerStatistics
FROM DIM_WORKERS
WHERE {key_field} ILIKE '{location}'
GROUP BY {key_field};"""
def build_specific_location_facility_query(filters: dict) -> str:
location = filters.get('name')
key_field = "CITY" if filters.get('locationType') == LocationType.city.value else "STATE"
return f"""SELECT
{key_field} AS name,
OBJECT_CONSTRUCT(
'totalFacilitiesCount', COUNT(*),
'Long Term Care', SUM(CASE WHEN d.TYPE = 'Long Term Care' THEN 1 ELSE 0 END),
'Dental Clinic', SUM(CASE WHEN d.TYPE = 'Dental Clinic' THEN 1 ELSE 0 END),
'Home Healthcare', SUM(CASE WHEN d.TYPE = 'Home Healthcare' THEN 1 ELSE 0 END),
'Medical Lab', SUM(CASE WHEN d.TYPE = 'Medical Lab' THEN 1 ELSE 0 END),
'Hospice', SUM(CASE WHEN d.TYPE = 'Hospice' THEN 1 ELSE 0 END),
'Pharmacy', SUM(CASE WHEN d.TYPE = 'Pharmacy' THEN 1 ELSE 0 END),
'Hospital', SUM(CASE WHEN d.TYPE = 'Hospital' THEN 1 ELSE 0 END)
) AS facilityStatistics
FROM DIM_WORKPLACES d
WHERE {key_field} ILIKE '{location}'
GROUP BY {key_field}"""
def build_specific_location_shift_query(filters: dict) -> str:
key_field = "CITY" if filters.get('locationType') == LocationType.city.value else "STATE"
location_value = filters['name']
return f"""WITH filtered_shifts AS (
SELECT WORKER_ID, WORKPLACE_ID, ACTION, SHIFT_START_AT
FROM fct_shift_logs
WHERE SHIFT_START_AT >= TO_TIMESTAMP('{datetime.strptime(filters["startDate"], "%Y-%m-%d")}')
AND SHIFT_START_AT <= TO_TIMESTAMP('{datetime.strptime(filters["endDate"], "%Y-%m-%d")}')
),
city_shifts AS (
SELECT fsl.WORKPLACE_ID, fw.{key_field},
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_VERIFY' THEN 1 END) AS SHIFT_VERIFY,
COUNT(CASE WHEN fsl.ACTION = 'FACILITY_CANCEL' THEN 1 END) AS FACILITY_CANCEL,
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_OPEN' THEN 1 END) AS SHIFT_OPEN,
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_REASSIGN' THEN 1 END) AS SHIFT_REASSIGN,
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_TIME_CHANGE' THEN 1 END) AS SHIFT_TIME_CHANGE,
COUNT(CASE WHEN fsl.ACTION = 'WORKER_CANCEL' THEN 1 END) AS WORKER_CANCEL,
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_UNASSIGN' THEN 1 END) AS SHIFT_UNASSIGN,
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_CLAIM' THEN 1 END) AS SHIFT_CLAIM,
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_DELETE' THEN 1 END) AS SHIFT_DELETE,
COUNT(CASE WHEN fsl.ACTION = 'NO_CALL_NO_SHOW' THEN 1 END) AS NO_CALL_NO_SHOW,
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_PAY_RATE_CHANGE' THEN 1 END) AS SHIFT_PAY_RATE_CHANGE,
COUNT(CASE WHEN fsl.ACTION = 'SHIFT_ASSIGN' THEN 1 END) AS SHIFT_ASSIGN
FROM filtered_shifts fsl
JOIN dim_workplaces fw ON fsl.WORKPLACE_ID = fw.WORKPLACE_ID
WHERE fw.{key_field} ILIKE '{location_value}'
GROUP BY fsl.WORKPLACE_ID, fw.{key_field}
)
SELECT {key_field},
SHIFT_VERIFY,
FACILITY_CANCEL,
SHIFT_OPEN,
SHIFT_REASSIGN,
SHIFT_TIME_CHANGE,
WORKER_CANCEL,
SHIFT_UNASSIGN,
SHIFT_CLAIM,
SHIFT_DELETE,
NO_CALL_NO_SHOW,
SHIFT_PAY_RATE_CHANGE,
SHIFT_ASSIGN
FROM city_shifts
FETCH FIRST 1 ROWS ONLY;"""
def form_shift_count_response(row: list[int]) -> dict[str, int]:
return {
"shiftVerified": row[0],
"facilityCancellations": row[1],
"shiftOpened": row[2],
"shiftReassigned": row[3],
"shiftTimeChanged": row[4],
"workerCancellations": row[5],
"shiftUnassigned": row[6],
"shiftClaimed": row[7],
"shiftDeleted": row[8],
"noCallNoShow": row[9],
"shiftPayRateChanged": row[10],
"shiftAssigned": row[11],
}
def form_worker_count_response(rows, filters: dict):
qualifications, results = dict(), dict()
total_count = 0
for qualification, count in rows:
if qualification:
qualifications[qualification] = int(count)
if qualification is None and count > total_count:
total_count = int(count)
results['totalWorkersCount'] = total_count
results['qualifications'] = qualifications
results['selectedDate'] = f"{filters['endDate']}"
return results
def form_facility_count_response(rows, filters: dict):
results = {"types": {}}
total_count = 0
for type_, count in rows:
if type_ and type_ != '-':
results["types"][type_] = int(count)
total_count += int(count)
results['totalFacilitiesCount'] = total_count
results['selectedDate'] = f"{filters['endDate']}"
return results
def form_shift_statistics_response(rows, request_body: dict):
entity = request_body.get('entity')
results = []
for row in rows:
key = "workerFullName" if entity == EntityType.Worker.value else "facilityName"
worker_data = {
key: row[0],
"totalShiftCount": row[1],
"avgShiftCountPerDay": round(float(row[2]), 2),
}
results.append(worker_data)
return {"results": results, "startDate": request_body['startDate'], "endDate": request_body['endDate']}
def check_missed_fields(request: dict, request_type: AgentRequestType):
required_fields_map = {
AgentRequestType.WorkerSearch: ['name', 'city', 'state', 'country', 'address', 'qualification', 'postalCode'],
AgentRequestType.FacilitySearch: ['name', 'email', 'country', 'state', 'city', 'address', 'postalCode', 'type'],
AgentRequestType.ShiftStatistics: ["entity", "type", 'length', 'shiftType'],
AgentRequestType.AverageStatistics: ["qualification"],
AgentRequestType.LocationStatistics: ["field1", "field2"],
AgentRequestType.CountStatistics: ["field"],
AgentRequestType.LocationSearch: ["name", "locationType"]
}
fields = required_fields_map[request_type]
if request_type in (AgentRequestType.WorkerSearch, AgentRequestType.FacilitySearch):
if not any(
[request.get('name'), request.get('email'), request.get('address')]
) and len(list(filter(lambda field: request.get(field), fields))) < 2:
message = form_search_missed_message(request, fields)
raise ValueError(message)
elif request_type == AgentRequestType.ShiftStatistics:
general_conditions = not all([request.get('shiftType'), request.get('entity'), request.get('type')])
if general_conditions:
message = form_shift_statistics_missed_message(request)
raise ValueError(message)
elif request_type == AgentRequestType.AverageStatistics:
if not request.get('qualification'):
raise ValueError("qualification")
elif request_type == AgentRequestType.CountStatistics:
if not request.get('field'):
raise ValueError("field (worker, facility or shift type)")
elif request_type == AgentRequestType.LocationStatistics:
if not all([request.get('field1'), request.get('field2')]):
raise ValueError(
"Filter field (city or state) and result field (workers or facilities) needs to be specified")
elif request_type == AgentRequestType.LocationSearch:
if not all([request.get('name'), request.get('locationType')]):
raise ValueError(
"Location name and location type (city or state) needs to be specified"
)
def form_search_missed_message(request: dict, fields: list) -> str:
return ', '.join([field for field in fields if not request.get(field)])
def form_shift_statistics_missed_message(request: dict) -> str:
result = ''
if not request.get('shiftType'):
result += "shiftType (Verified shifts, cancelled shifts, etc.); "
if not request.get('entity'):
result += "entity (worker or facility); "
if not request.get('type'):
result += "type (most or least); "
if not request.get('qualification') and request.get('entity') == EntityType.Worker.value:
result += "worker qualification; "
if not request.get('length'):
result += "length of list;"
return result
def prepare_message_history_str(message_history: list[MessageModel], search_request: str) -> str:
results = ''
shorted_message_history = message_history[-8:]
for message in shorted_message_history:
results += f'[{message.author.name}] {message.text}\n'
results += f"[User] {search_request}"
return results
def form_location_statistics_response(rows, filters: dict):
result = []
entity = filters['field2']
for row in rows:
city_name = row[0]
if entity != EntityType.Shift.value:
entity_statistics = json.loads(row[1])
total_field = "totalFacilitiesCount" if entity == EntityType.Facility.value else "totalWorkersCount"
total_value = entity_statistics.pop(total_field)
statistics_field = "workersByQualificationCount" if entity == EntityType.Worker.value else "facilitiesByTypeCount"
result.append({
"name": city_name,
total_field: total_value,
statistics_field: entity_statistics
})
else:
result.append({
"name": city_name,
"shiftsByTypeCount": {
"shiftVerified": row[1],
"facilityCancellations": row[2],
"shiftOpened": row[3],
"shiftReassigned": row[4],
"shiftTimeChanged": row[5],
"workerCancellations": row[6],
"shiftUnassigned": row[7],
"shiftClaimed": row[8],
"shiftDeleted": row[9],
"noCallNoShow": row[10],
"shiftPayRateChanged": row[11],
"shiftAssigned": row[12],
},
"selectedDate": f"from {filters['startDate']} to {filters['endDate']}"
})
return {"results": result[:filters['length']]}
def form_prev_request(messages: list[MessageModel]) -> tuple:
prev_query = ''
prev_module_response = {}
if len(messages) >= 2:
prev_query = messages[-2].text
prev_module_response = messages[-1].moduleResponse
return prev_query, prev_module_response
def form_avg_statistics_response(rows: list) -> dict:
return {
"shiftVerified": round(float(rows[0]), 2),
"facilityCancellations": round(float(rows[1]), 2),
"shiftOpened": round(float(rows[2]), 2),
"shiftReassigned": round(float(rows[3]), 2),
"shiftTimeChanged": round(float(rows[4]), 2),
"workerCancellations": round(float(rows[5]), 2),
"shiftUnassigned": round(float(rows[6]), 2),
"shiftClaimed": round(float(rows[7]), 2),
"shiftDeleted": round(float(rows[8]), 2),
"noCallNoShow": round(float(rows[9]), 2),
"shiftPayRateChanged": round(float(rows[10]), 2),
"shiftAssigned": round(float(rows[11]), 2),
}