api-web-crawler / app /util /db_utils.py
unknown
feat: add endpoint get all visa photo metadata
fa66fbc
import os
from sqlalchemy import create_engine, text
import logging
from dotenv import load_dotenv
from sqlalchemy.engine import URL
from typing import List, Dict, Any
import pandas as pd
import json
import logging
class DBManager:
def __init__(self):
load_dotenv()
USERNAME = os.getenv("DB_USERNAME")
PASSWORD = os.getenv("DB_PASSWORD")
HOST = os.getenv("DB_HOST")
NAME = os.getenv("DB_NAME")
PORT = os.getenv("DB_PORT")
if not all([USERNAME, PASSWORD, HOST, NAME]):
raise ValueError("One or more database environment variables are not set.")
connection_url = URL.create(
drivername="postgresql+psycopg2",
username=USERNAME,
password=PASSWORD,
host=HOST,
port=PORT,
database=NAME,
)
self.engine = create_engine(connection_url, pool_pre_ping=True)
try:
safe_url = connection_url.render_as_string(hide_password=True)
except Exception:
safe_url = str(connection_url).replace(PASSWORD, "***")
logging.info(f"Database engine created with URL: {safe_url}")
def _execute_query(self, query: str, params: dict = None) -> pd.DataFrame:
"""Executes a SQL query and returns the result as a pandas DataFrame."""
try:
with self.engine.connect() as connection:
df = pd.read_sql_query(sql=text(query), con=connection, params=params)
return df
except Exception as e:
logging.error(f"Failed to execute query: {e}", exc_info=True)
return pd.DataFrame()
def get_profile_name_by_app_id(self, application_id: int) -> str | None:
"""
Gets the applicant's full name from their profile.
This is the most reliable source for the name.
"""
query = """
SELECT
p.first_name || ' ' || p.last_name as name
FROM
application a
JOIN profile p ON p.id = a.profile_id
WHERE a.id = :application_id
LIMIT 1
"""
params = {"application_id": application_id}
df = self._execute_query(query, params)
if not df.empty:
return df.iloc[0]['name']
return None
# --- NEW METHOD ADDED ---
def get_visa_photo_metadata_per_service(self) -> Dict[int, Dict[str, Any]]:
"""
Retrieves visa photo specifications for each service.
Returns a dictionary mapping service_id to its photo specifications.
"""
query = """
select s."name", metadata from service_document sd
join service s on s.id = sd.service_id
join document d on d.id = sd.document_id
where d.metadata is not null and s.is_active = true and s.has_limited_discoverability = False;
"""
df = self._execute_query(query)
result = {}
for _, row in df.iterrows():
service_name = row['name']
metadata = row['metadata']
try:
metadata_dict = json.loads(metadata)
result[service_name] = metadata_dict
except json.JSONDecodeError:
logging.warning(f"Invalid JSON metadata for service {service_name}")
return result
def get_destination_country(self, application_id: int) -> str | None:
"""
Gets the official destination country name for the application.
"""
query = """
SELECT DISTINCT
c."name"
FROM country c
JOIN visa_service_availability vsa ON vsa.destination_country_id = c.id
JOIN service s ON vsa.service_id = s.id
JOIN service_unit su ON s.id = su.service_id
JOIN application a ON a.service_unit_id = su.id
WHERE a.id = :application_id
LIMIT 1;
"""
params = {"application_id": application_id}
df = self._execute_query(query, params)
if not df.empty:
return df.iloc[0]['name']
return None
# --- END OF NEW METHOD ---
def get_form_data(self, application_id: int) -> pd.DataFrame:
"""
HANYA mengambil data dari form standar (non-SMART_UPLOAD).
"""
sql_query = """
SELECT
d.name AS name,
ad.value AS value
FROM
application_document AS ad
JOIN
document AS d ON ad.document_id = d.id
LEFT JOIN
document_tag AS dt ON ad.document_id = dt.document_id
WHERE
ad.application_id = :application_id
AND (dt.use_case IS NULL OR dt.use_case != 'SMART_UPLOAD')
AND ad.value IS NOT NULL
AND ad.value <> ''
"""
params = {"application_id": application_id}
df = self._execute_query(sql_query, params)
if not df.empty:
# Use document name as the key
df = df.drop_duplicates(subset=['name'], keep='first')
return df
def get_smart_upload_results(self, application_id: int) -> pd.DataFrame:
"""
Mengambil SEMUA hasil ekstraksi JSON yang unik dari smart upload
sebagai JSON utuh, BUKAN key-value yang dibongkar.
Menambahkan 'created_at' untuk sorting data terbaru.
"""
sql_query = """
SELECT DISTINCT
suj.extraction_result,
COALESCE(suj.extraction_result ->> 'doc_type', 'unknown') AS doc_type,
suj.created_at
FROM
application_document AS ad
JOIN
smart_upload_job AS suj
ON ad.document_id = suj.document_id
AND ad.application_id = ANY(suj.application_ids)
WHERE
ad.application_id = :application_id
AND suj.status IN ('COMPLETED', 'PROCESSED')
AND suj.extraction_result IS NOT NULL
AND suj.extraction_result::text <> '{}'
ORDER BY suj.created_at DESC;
"""
params = {"application_id": application_id}
df = self._execute_query(sql_query, params)
return df