import os from sqlalchemy import create_engine, text import logging from dotenv import load_dotenv from sqlalchemy.engine import URL from typing import List, Dict, Any import pandas as pd import json import logging class DBManager: def __init__(self): load_dotenv() USERNAME = os.getenv("DB_USERNAME") PASSWORD = os.getenv("DB_PASSWORD") HOST = os.getenv("DB_HOST") NAME = os.getenv("DB_NAME") PORT = os.getenv("DB_PORT") if not all([USERNAME, PASSWORD, HOST, NAME]): raise ValueError("One or more database environment variables are not set.") connection_url = URL.create( drivername="postgresql+psycopg2", username=USERNAME, password=PASSWORD, host=HOST, port=PORT, database=NAME, ) self.engine = create_engine(connection_url, pool_pre_ping=True) try: safe_url = connection_url.render_as_string(hide_password=True) except Exception: safe_url = str(connection_url).replace(PASSWORD, "***") logging.info(f"Database engine created with URL: {safe_url}") def _execute_query(self, query: str, params: dict = None) -> pd.DataFrame: """Executes a SQL query and returns the result as a pandas DataFrame.""" try: with self.engine.connect() as connection: df = pd.read_sql_query(sql=text(query), con=connection, params=params) return df except Exception as e: logging.error(f"Failed to execute query: {e}", exc_info=True) return pd.DataFrame() def get_profile_name_by_app_id(self, application_id: int) -> str | None: """ Gets the applicant's full name from their profile. This is the most reliable source for the name. """ query = """ SELECT p.first_name || ' ' || p.last_name as name FROM application a JOIN profile p ON p.id = a.profile_id WHERE a.id = :application_id LIMIT 1 """ params = {"application_id": application_id} df = self._execute_query(query, params) if not df.empty: return df.iloc[0]['name'] return None # --- NEW METHOD ADDED --- def get_visa_photo_metadata_per_service(self) -> Dict[int, Dict[str, Any]]: """ Retrieves visa photo specifications for each service. Returns a dictionary mapping service_id to its photo specifications. """ query = """ select s."name", metadata from service_document sd join service s on s.id = sd.service_id join document d on d.id = sd.document_id where d.metadata is not null and s.is_active = true and s.has_limited_discoverability = False; """ df = self._execute_query(query) result = {} for _, row in df.iterrows(): service_name = row['name'] metadata = row['metadata'] try: metadata_dict = json.loads(metadata) result[service_name] = metadata_dict except json.JSONDecodeError: logging.warning(f"Invalid JSON metadata for service {service_name}") return result def get_destination_country(self, application_id: int) -> str | None: """ Gets the official destination country name for the application. """ query = """ SELECT DISTINCT c."name" FROM country c JOIN visa_service_availability vsa ON vsa.destination_country_id = c.id JOIN service s ON vsa.service_id = s.id JOIN service_unit su ON s.id = su.service_id JOIN application a ON a.service_unit_id = su.id WHERE a.id = :application_id LIMIT 1; """ params = {"application_id": application_id} df = self._execute_query(query, params) if not df.empty: return df.iloc[0]['name'] return None # --- END OF NEW METHOD --- def get_form_data(self, application_id: int) -> pd.DataFrame: """ HANYA mengambil data dari form standar (non-SMART_UPLOAD). """ sql_query = """ SELECT d.name AS name, ad.value AS value FROM application_document AS ad JOIN document AS d ON ad.document_id = d.id LEFT JOIN document_tag AS dt ON ad.document_id = dt.document_id WHERE ad.application_id = :application_id AND (dt.use_case IS NULL OR dt.use_case != 'SMART_UPLOAD') AND ad.value IS NOT NULL AND ad.value <> '' """ params = {"application_id": application_id} df = self._execute_query(sql_query, params) if not df.empty: # Use document name as the key df = df.drop_duplicates(subset=['name'], keep='first') return df def get_smart_upload_results(self, application_id: int) -> pd.DataFrame: """ Mengambil SEMUA hasil ekstraksi JSON yang unik dari smart upload sebagai JSON utuh, BUKAN key-value yang dibongkar. Menambahkan 'created_at' untuk sorting data terbaru. """ sql_query = """ SELECT DISTINCT suj.extraction_result, COALESCE(suj.extraction_result ->> 'doc_type', 'unknown') AS doc_type, suj.created_at FROM application_document AS ad JOIN smart_upload_job AS suj ON ad.document_id = suj.document_id AND ad.application_id = ANY(suj.application_ids) WHERE ad.application_id = :application_id AND suj.status IN ('COMPLETED', 'PROCESSED') AND suj.extraction_result IS NOT NULL AND suj.extraction_result::text <> '{}' ORDER BY suj.created_at DESC; """ params = {"application_id": application_id} df = self._execute_query(sql_query, params) return df