chathur_api / api /services /central_services.py
VJnCode's picture
Feat : added central lang trans
bbd3a01
import asyncio
import logging
import os # <-- ADDED
import json # <-- ADDED
from typing import Union, List, Dict, Any
import api.core.firebase_utils as firebase_utils
from google.cloud.firestore_v1.base_query import FieldFilter
logger = logging.getLogger(__name__)
# --- Path for Kannada JSON File ---
# This assumes your 'data' folder is structured like: .../project_root/api/data/
# Based on your path: BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# If your 'data' folder is at the project root (.../project_root/data/),
# you might need one more os.path.dirname():
# BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
KAN_JSON_FILE = os.path.join(BASE_DIR, "data", "translated_central_schemes_kn.json")
# --- End Path ---
# In-memory cache, structured by language
_central_schemes_cache: dict = {"en": {}, "hi": {}, "kn": {}} # <-- MODIFIED
_is_loading: bool = False
# --- Cache Loading Function ---
async def load_central_schemes_into_cache():
"""
Asynchronously loads all central government schemes into
an in-memory cache.
- 'en', 'hi' are loaded from Firestore.
- 'kn' is loaded from a local JSON file.
"""
global _central_schemes_cache, _is_loading
if _is_loading:
logger.warning("Central cache load already in progress.")
return
_is_loading = True
logger.info("===== STARTING CENTRAL SCHEMES CACHE LOAD =====")
# This will hold the new cache data
temp_cache = {"en": {}, "hi": {}, "kn": {}} # <-- MODIFIED
try:
# --- 1. Load from Firestore (for 'en', 'hi', etc.) ---
db = firebase_utils.db
if not db:
logger.error("CRITICAL: Firebase DB not initialized. Aborting Firestore cache load.")
else:
logger.info("Attempting to load 'en'/'hi' data from Firestore...")
ministries_ref = db.collection('schemes').document('Central').collection('ministries')
ministries_docs = ministries_ref.get()
if not ministries_docs:
logger.warning("No ministry documents found at 'schemes/Central/ministries'. 'en' cache will be empty.")
else:
logger.info(f"Found {len(ministries_docs)} ministry documents. Processing for 'en'...")
for ministry_doc in ministries_docs:
ministry_name = ministry_doc.id
temp_cache["en"][ministry_name] = []
# TODO: Add logic for 'hi' if it's in Firestore
schemes_ref = ministry_doc.reference.collection('schemes')
schemes_docs = schemes_ref.get()
for scheme_doc in schemes_docs:
scheme_data = scheme_doc.to_dict()
if 'Title' in scheme_data and 'Description' in scheme_data:
# Storing the full data
temp_cache["en"][ministry_name].append(scheme_data)
# TODO: Add logic to populate 'hi' cache if data exists
else:
logger.warning(f"Scheme {scheme_doc.id} in {ministry_name} is missing Title or Description.")
# --- 2. Load Kannada ('kn') from JSON file ---
logger.info(f"Attempting to load 'kn' data from JSON: {KAN_JSON_FILE}")
try:
if not os.path.exists(KAN_JSON_FILE):
logger.warning(f"Kannada JSON file not found at: {KAN_JSON_FILE}. 'kn' cache will be empty.")
else:
with open(KAN_JSON_FILE, 'r', encoding='utf-8') as f:
kannada_data = json.load(f)
# Assuming JSON structure is: {"Ministry Name": [schemes...]}
if isinstance(kannada_data, dict):
temp_cache["kn"] = kannada_data
logger.info(f"Successfully loaded 'kn' data from JSON. Found {len(kannada_data)} ministries.")
else:
logger.error(f"Kannada JSON file format is incorrect. Expected a dictionary, got {type(kannada_data)}. 'kn' cache will be empty.")
except json.JSONDecodeError as json_err:
logger.error(f"Failed to decode Kannada JSON file: {json_err}. 'kn' cache will be empty.")
except Exception as file_err:
logger.error(f"Failed to read Kannada JSON file: {file_err}", exc_info=True)
# --- 3. Finalize Cache ---
_central_schemes_cache = temp_cache
logger.info(f"===== CENTRAL SCHEMES CACHE LOADED. =====")
logger.info(f"Total Ministries (en): {len(_central_schemes_cache.get('en', {}))}")
logger.info(f"Total Ministries (kn): {len(_central_schemes_cache.get('kn', {}))}")
except Exception as e:
logger.error(f"CRITICAL: Unhandled exception during cache load: {e}", exc_info=True)
# Keep the old cache if a critical loading error fails
finally:
_is_loading = False
# --- Public Getter Functions ---
def get_central_schemes_cache() -> dict:
"""Returns the entire central schemes cache."""
return _central_schemes_cache
def is_central_cache_loading() -> bool:
"""Returns True if the cache is currently being loaded."""
return _is_loading
def get_central_cache_loading_status() -> bool:
"""Returns the loading status. (Alias for is_central_cache_loading)"""
return _is_loading
def get_all_ministries_and_schemes(lang: str = 'en') -> dict:
"""
Returns the entire dictionary of ministries and their schemes for a given lang.
e.g., {"Ministry of Finance": [...], "Ministry of Health": [...]}
"""
return _central_schemes_cache.get(lang, {})
def get_all_central_ministries(lang: str = 'en') -> list:
"""
Returns a list of all ministry names for a given language.
"""
return list(_central_schemes_cache.get(lang, {}).keys())
def get_schemes_for_ministry(ministry_name: str, lang: str = 'en') -> list:
"""
Retrieves all schemes for a specific ministry, case-insensitively.
"""
if not _central_schemes_cache.get(lang):
logger.warning(f"get_schemes_for_ministry called for lang='{lang}', but cache is empty or lang not loaded.")
return []
ministry_key = _find_case_insensitive_key(_central_schemes_cache.get(lang, {}), ministry_name)
if ministry_key:
return _central_schemes_cache[lang][ministry_key]
return []
def get_scheme_by_ministry_and_title(ministry_name: str, title: str, lang: str = 'en') -> Union[dict, None]:
"""
Finds a single scheme by ministry and title, case-insensitively.
"""
if not _central_schemes_cache.get(lang):
logger.warning(f"get_scheme_by_ministry_and_title called for lang='{lang}', but cache is empty.")
return None
schemes_list = get_schemes_for_ministry(ministry_name, lang)
for scheme in schemes_list:
if scheme.get('Title', '').lower() == title.lower():
return scheme
return None
# --- NEW SEARCH FUNCTION ---
def search_central_schemes(query: str, lang: str = 'en') -> List[Dict[str, Any]]:
"""
Searches all central schemes for a keyword query.
Returns a list of matched schemes, with their ministry added.
"""
lang_cache = _central_schemes_cache.get(lang, {})
if not lang_cache:
logger.warning(f"Search called for lang='{lang}', but no data is loaded for this language.")
return []
search_query = query.lower()
matches = []
for ministry_name, schemes in lang_cache.items():
for scheme in schemes:
# Create a searchable string from relevant fields
haystack = []
# Define fields to search in
search_fields = ['Title', 'Description', 'Eligibility', 'Benefits', 'Application Process']
for field in search_fields:
content = scheme.get(field)
if isinstance(content, str):
haystack.append(content)
elif isinstance(content, list):
# If field is a list (e.g., benefits), join its items
haystack.extend(str(item) for item in content)
haystack_str = " ".join(haystack).lower()
# Check for match
if search_query in haystack_str:
# Add the ministry name to the result
result_item = scheme.copy()
result_item['ministry'] = ministry_name
matches.append(result_item)
return matches
# --- END NEW FUNCTION ---
# --- Private Helper Utility ---
def _find_case_insensitive_key(data: dict, key: str) -> Union[str, None]:
"""
Utility function to find a key in a dictionary case-insensitively.
Returns the *actual* key (with original casing) if found.
"""
if not data or not key:
return None
for k in data.keys():
if k.lower() == key.lower():
return k
return None