Spaces:
Sleeping
Sleeping
| import os | |
| from sqlalchemy import create_engine, text | |
| import logging | |
| from dotenv import load_dotenv | |
| from sqlalchemy.engine import URL | |
| from typing import List, Dict, Any | |
| import pandas as pd | |
| import json | |
| import logging | |
| class DBManager: | |
| def __init__(self): | |
| load_dotenv() | |
| USERNAME = os.getenv("DB_USERNAME") | |
| PASSWORD = os.getenv("DB_PASSWORD") | |
| HOST = os.getenv("DB_HOST") | |
| NAME = os.getenv("DB_NAME") | |
| PORT = os.getenv("DB_PORT") | |
| if not all([USERNAME, PASSWORD, HOST, NAME]): | |
| raise ValueError("One or more database environment variables are not set.") | |
| connection_url = URL.create( | |
| drivername="postgresql+psycopg2", | |
| username=USERNAME, | |
| password=PASSWORD, | |
| host=HOST, | |
| port=PORT, | |
| database=NAME, | |
| ) | |
| self.engine = create_engine(connection_url, pool_pre_ping=True) | |
| try: | |
| safe_url = connection_url.render_as_string(hide_password=True) | |
| except Exception: | |
| safe_url = str(connection_url).replace(PASSWORD, "***") | |
| logging.info(f"Database engine created with URL: {safe_url}") | |
| def _execute_query(self, query: str, params: dict = None) -> pd.DataFrame: | |
| """Executes a SQL query and returns the result as a pandas DataFrame.""" | |
| try: | |
| with self.engine.connect() as connection: | |
| df = pd.read_sql_query(sql=text(query), con=connection, params=params) | |
| return df | |
| except Exception as e: | |
| logging.error(f"Failed to execute query: {e}", exc_info=True) | |
| return pd.DataFrame() | |
| def get_profile_name_by_app_id(self, application_id: int) -> str | None: | |
| """ | |
| Gets the applicant's full name from their profile. | |
| This is the most reliable source for the name. | |
| """ | |
| query = """ | |
| SELECT | |
| p.first_name || ' ' || p.last_name as name | |
| FROM | |
| application a | |
| JOIN profile p ON p.id = a.profile_id | |
| WHERE a.id = :application_id | |
| LIMIT 1 | |
| """ | |
| params = {"application_id": application_id} | |
| df = self._execute_query(query, params) | |
| if not df.empty: | |
| return df.iloc[0]['name'] | |
| return None | |
| # --- NEW METHOD ADDED --- | |
| def get_visa_photo_metadata_per_service(self) -> Dict[int, Dict[str, Any]]: | |
| """ | |
| Retrieves visa photo specifications for each service. | |
| Returns a dictionary mapping service_id to its photo specifications. | |
| """ | |
| query = """ | |
| select s."name", metadata from service_document sd | |
| join service s on s.id = sd.service_id | |
| join document d on d.id = sd.document_id | |
| where d.metadata is not null and s.is_active = true and s.has_limited_discoverability = False; | |
| """ | |
| df = self._execute_query(query) | |
| result = {} | |
| for _, row in df.iterrows(): | |
| service_name = row['name'] | |
| metadata = row['metadata'] | |
| try: | |
| metadata_dict = json.loads(metadata) | |
| result[service_name] = metadata_dict | |
| except json.JSONDecodeError: | |
| logging.warning(f"Invalid JSON metadata for service {service_name}") | |
| return result | |
| def get_destination_country(self, application_id: int) -> str | None: | |
| """ | |
| Gets the official destination country name for the application. | |
| """ | |
| query = """ | |
| SELECT DISTINCT | |
| c."name" | |
| FROM country c | |
| JOIN visa_service_availability vsa ON vsa.destination_country_id = c.id | |
| JOIN service s ON vsa.service_id = s.id | |
| JOIN service_unit su ON s.id = su.service_id | |
| JOIN application a ON a.service_unit_id = su.id | |
| WHERE a.id = :application_id | |
| LIMIT 1; | |
| """ | |
| params = {"application_id": application_id} | |
| df = self._execute_query(query, params) | |
| if not df.empty: | |
| return df.iloc[0]['name'] | |
| return None | |
| # --- END OF NEW METHOD --- | |
| def get_form_data(self, application_id: int) -> pd.DataFrame: | |
| """ | |
| HANYA mengambil data dari form standar (non-SMART_UPLOAD). | |
| """ | |
| sql_query = """ | |
| SELECT | |
| d.name AS name, | |
| ad.value AS value | |
| FROM | |
| application_document AS ad | |
| JOIN | |
| document AS d ON ad.document_id = d.id | |
| LEFT JOIN | |
| document_tag AS dt ON ad.document_id = dt.document_id | |
| WHERE | |
| ad.application_id = :application_id | |
| AND (dt.use_case IS NULL OR dt.use_case != 'SMART_UPLOAD') | |
| AND ad.value IS NOT NULL | |
| AND ad.value <> '' | |
| """ | |
| params = {"application_id": application_id} | |
| df = self._execute_query(sql_query, params) | |
| if not df.empty: | |
| # Use document name as the key | |
| df = df.drop_duplicates(subset=['name'], keep='first') | |
| return df | |
| def get_smart_upload_results(self, application_id: int) -> pd.DataFrame: | |
| """ | |
| Mengambil SEMUA hasil ekstraksi JSON yang unik dari smart upload | |
| sebagai JSON utuh, BUKAN key-value yang dibongkar. | |
| Menambahkan 'created_at' untuk sorting data terbaru. | |
| """ | |
| sql_query = """ | |
| SELECT DISTINCT | |
| suj.extraction_result, | |
| COALESCE(suj.extraction_result ->> 'doc_type', 'unknown') AS doc_type, | |
| suj.created_at | |
| FROM | |
| application_document AS ad | |
| JOIN | |
| smart_upload_job AS suj | |
| ON ad.document_id = suj.document_id | |
| AND ad.application_id = ANY(suj.application_ids) | |
| WHERE | |
| ad.application_id = :application_id | |
| AND suj.status IN ('COMPLETED', 'PROCESSED') | |
| AND suj.extraction_result IS NOT NULL | |
| AND suj.extraction_result::text <> '{}' | |
| ORDER BY suj.created_at DESC; | |
| """ | |
| params = {"application_id": application_id} | |
| df = self._execute_query(sql_query, params) | |
| return df |