Spaces:
Sleeping
Sleeping
File size: 6,159 Bytes
f8bb6ab 12a4071 f8bb6ab 12a4071 fa66fbc 12a4071 f8bb6ab 12a4071 f8bb6ab 12a4071 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | import os
from sqlalchemy import create_engine, text
import logging
from dotenv import load_dotenv
from sqlalchemy.engine import URL
from typing import List, Dict, Any
import pandas as pd
import json
import logging
class DBManager:
def __init__(self):
load_dotenv()
USERNAME = os.getenv("DB_USERNAME")
PASSWORD = os.getenv("DB_PASSWORD")
HOST = os.getenv("DB_HOST")
NAME = os.getenv("DB_NAME")
PORT = os.getenv("DB_PORT")
if not all([USERNAME, PASSWORD, HOST, NAME]):
raise ValueError("One or more database environment variables are not set.")
connection_url = URL.create(
drivername="postgresql+psycopg2",
username=USERNAME,
password=PASSWORD,
host=HOST,
port=PORT,
database=NAME,
)
self.engine = create_engine(connection_url, pool_pre_ping=True)
try:
safe_url = connection_url.render_as_string(hide_password=True)
except Exception:
safe_url = str(connection_url).replace(PASSWORD, "***")
logging.info(f"Database engine created with URL: {safe_url}")
def _execute_query(self, query: str, params: dict = None) -> pd.DataFrame:
"""Executes a SQL query and returns the result as a pandas DataFrame."""
try:
with self.engine.connect() as connection:
df = pd.read_sql_query(sql=text(query), con=connection, params=params)
return df
except Exception as e:
logging.error(f"Failed to execute query: {e}", exc_info=True)
return pd.DataFrame()
def get_profile_name_by_app_id(self, application_id: int) -> str | None:
"""
Gets the applicant's full name from their profile.
This is the most reliable source for the name.
"""
query = """
SELECT
p.first_name || ' ' || p.last_name as name
FROM
application a
JOIN profile p ON p.id = a.profile_id
WHERE a.id = :application_id
LIMIT 1
"""
params = {"application_id": application_id}
df = self._execute_query(query, params)
if not df.empty:
return df.iloc[0]['name']
return None
# --- NEW METHOD ADDED ---
def get_visa_photo_metadata_per_service(self) -> Dict[int, Dict[str, Any]]:
"""
Retrieves visa photo specifications for each service.
Returns a dictionary mapping service_id to its photo specifications.
"""
query = """
select s."name", metadata from service_document sd
join service s on s.id = sd.service_id
join document d on d.id = sd.document_id
where d.metadata is not null and s.is_active = true and s.has_limited_discoverability = False;
"""
df = self._execute_query(query)
result = {}
for _, row in df.iterrows():
service_name = row['name']
metadata = row['metadata']
try:
metadata_dict = json.loads(metadata)
result[service_name] = metadata_dict
except json.JSONDecodeError:
logging.warning(f"Invalid JSON metadata for service {service_name}")
return result
def get_destination_country(self, application_id: int) -> str | None:
"""
Gets the official destination country name for the application.
"""
query = """
SELECT DISTINCT
c."name"
FROM country c
JOIN visa_service_availability vsa ON vsa.destination_country_id = c.id
JOIN service s ON vsa.service_id = s.id
JOIN service_unit su ON s.id = su.service_id
JOIN application a ON a.service_unit_id = su.id
WHERE a.id = :application_id
LIMIT 1;
"""
params = {"application_id": application_id}
df = self._execute_query(query, params)
if not df.empty:
return df.iloc[0]['name']
return None
# --- END OF NEW METHOD ---
def get_form_data(self, application_id: int) -> pd.DataFrame:
"""
HANYA mengambil data dari form standar (non-SMART_UPLOAD).
"""
sql_query = """
SELECT
d.name AS name,
ad.value AS value
FROM
application_document AS ad
JOIN
document AS d ON ad.document_id = d.id
LEFT JOIN
document_tag AS dt ON ad.document_id = dt.document_id
WHERE
ad.application_id = :application_id
AND (dt.use_case IS NULL OR dt.use_case != 'SMART_UPLOAD')
AND ad.value IS NOT NULL
AND ad.value <> ''
"""
params = {"application_id": application_id}
df = self._execute_query(sql_query, params)
if not df.empty:
# Use document name as the key
df = df.drop_duplicates(subset=['name'], keep='first')
return df
def get_smart_upload_results(self, application_id: int) -> pd.DataFrame:
"""
Mengambil SEMUA hasil ekstraksi JSON yang unik dari smart upload
sebagai JSON utuh, BUKAN key-value yang dibongkar.
Menambahkan 'created_at' untuk sorting data terbaru.
"""
sql_query = """
SELECT DISTINCT
suj.extraction_result,
COALESCE(suj.extraction_result ->> 'doc_type', 'unknown') AS doc_type,
suj.created_at
FROM
application_document AS ad
JOIN
smart_upload_job AS suj
ON ad.document_id = suj.document_id
AND ad.application_id = ANY(suj.application_ids)
WHERE
ad.application_id = :application_id
AND suj.status IN ('COMPLETED', 'PROCESSED')
AND suj.extraction_result IS NOT NULL
AND suj.extraction_result::text <> '{}'
ORDER BY suj.created_at DESC;
"""
params = {"application_id": application_id}
df = self._execute_query(sql_query, params)
return df |