import asyncio import logging import os # <-- ADDED import json # <-- ADDED from typing import Union, List, Dict, Any import api.core.firebase_utils as firebase_utils from google.cloud.firestore_v1.base_query import FieldFilter logger = logging.getLogger(__name__) # --- Path for Kannada JSON File --- # This assumes your 'data' folder is structured like: .../project_root/api/data/ # Based on your path: BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # If your 'data' folder is at the project root (.../project_root/data/), # you might need one more os.path.dirname(): # BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) KAN_JSON_FILE = os.path.join(BASE_DIR, "data", "translated_central_schemes_kn.json") # --- End Path --- # In-memory cache, structured by language _central_schemes_cache: dict = {"en": {}, "hi": {}, "kn": {}} # <-- MODIFIED _is_loading: bool = False # --- Cache Loading Function --- async def load_central_schemes_into_cache(): """ Asynchronously loads all central government schemes into an in-memory cache. - 'en', 'hi' are loaded from Firestore. - 'kn' is loaded from a local JSON file. """ global _central_schemes_cache, _is_loading if _is_loading: logger.warning("Central cache load already in progress.") return _is_loading = True logger.info("===== STARTING CENTRAL SCHEMES CACHE LOAD =====") # This will hold the new cache data temp_cache = {"en": {}, "hi": {}, "kn": {}} # <-- MODIFIED try: # --- 1. Load from Firestore (for 'en', 'hi', etc.) --- db = firebase_utils.db if not db: logger.error("CRITICAL: Firebase DB not initialized. Aborting Firestore cache load.") else: logger.info("Attempting to load 'en'/'hi' data from Firestore...") ministries_ref = db.collection('schemes').document('Central').collection('ministries') ministries_docs = ministries_ref.get() if not ministries_docs: logger.warning("No ministry documents found at 'schemes/Central/ministries'. 'en' cache will be empty.") else: logger.info(f"Found {len(ministries_docs)} ministry documents. Processing for 'en'...") for ministry_doc in ministries_docs: ministry_name = ministry_doc.id temp_cache["en"][ministry_name] = [] # TODO: Add logic for 'hi' if it's in Firestore schemes_ref = ministry_doc.reference.collection('schemes') schemes_docs = schemes_ref.get() for scheme_doc in schemes_docs: scheme_data = scheme_doc.to_dict() if 'Title' in scheme_data and 'Description' in scheme_data: # Storing the full data temp_cache["en"][ministry_name].append(scheme_data) # TODO: Add logic to populate 'hi' cache if data exists else: logger.warning(f"Scheme {scheme_doc.id} in {ministry_name} is missing Title or Description.") # --- 2. Load Kannada ('kn') from JSON file --- logger.info(f"Attempting to load 'kn' data from JSON: {KAN_JSON_FILE}") try: if not os.path.exists(KAN_JSON_FILE): logger.warning(f"Kannada JSON file not found at: {KAN_JSON_FILE}. 'kn' cache will be empty.") else: with open(KAN_JSON_FILE, 'r', encoding='utf-8') as f: kannada_data = json.load(f) # Assuming JSON structure is: {"Ministry Name": [schemes...]} if isinstance(kannada_data, dict): temp_cache["kn"] = kannada_data logger.info(f"Successfully loaded 'kn' data from JSON. Found {len(kannada_data)} ministries.") else: logger.error(f"Kannada JSON file format is incorrect. Expected a dictionary, got {type(kannada_data)}. 'kn' cache will be empty.") except json.JSONDecodeError as json_err: logger.error(f"Failed to decode Kannada JSON file: {json_err}. 'kn' cache will be empty.") except Exception as file_err: logger.error(f"Failed to read Kannada JSON file: {file_err}", exc_info=True) # --- 3. Finalize Cache --- _central_schemes_cache = temp_cache logger.info(f"===== CENTRAL SCHEMES CACHE LOADED. =====") logger.info(f"Total Ministries (en): {len(_central_schemes_cache.get('en', {}))}") logger.info(f"Total Ministries (kn): {len(_central_schemes_cache.get('kn', {}))}") except Exception as e: logger.error(f"CRITICAL: Unhandled exception during cache load: {e}", exc_info=True) # Keep the old cache if a critical loading error fails finally: _is_loading = False # --- Public Getter Functions --- def get_central_schemes_cache() -> dict: """Returns the entire central schemes cache.""" return _central_schemes_cache def is_central_cache_loading() -> bool: """Returns True if the cache is currently being loaded.""" return _is_loading def get_central_cache_loading_status() -> bool: """Returns the loading status. (Alias for is_central_cache_loading)""" return _is_loading def get_all_ministries_and_schemes(lang: str = 'en') -> dict: """ Returns the entire dictionary of ministries and their schemes for a given lang. e.g., {"Ministry of Finance": [...], "Ministry of Health": [...]} """ return _central_schemes_cache.get(lang, {}) def get_all_central_ministries(lang: str = 'en') -> list: """ Returns a list of all ministry names for a given language. """ return list(_central_schemes_cache.get(lang, {}).keys()) def get_schemes_for_ministry(ministry_name: str, lang: str = 'en') -> list: """ Retrieves all schemes for a specific ministry, case-insensitively. """ if not _central_schemes_cache.get(lang): logger.warning(f"get_schemes_for_ministry called for lang='{lang}', but cache is empty or lang not loaded.") return [] ministry_key = _find_case_insensitive_key(_central_schemes_cache.get(lang, {}), ministry_name) if ministry_key: return _central_schemes_cache[lang][ministry_key] return [] def get_scheme_by_ministry_and_title(ministry_name: str, title: str, lang: str = 'en') -> Union[dict, None]: """ Finds a single scheme by ministry and title, case-insensitively. """ if not _central_schemes_cache.get(lang): logger.warning(f"get_scheme_by_ministry_and_title called for lang='{lang}', but cache is empty.") return None schemes_list = get_schemes_for_ministry(ministry_name, lang) for scheme in schemes_list: if scheme.get('Title', '').lower() == title.lower(): return scheme return None # --- NEW SEARCH FUNCTION --- def search_central_schemes(query: str, lang: str = 'en') -> List[Dict[str, Any]]: """ Searches all central schemes for a keyword query. Returns a list of matched schemes, with their ministry added. """ lang_cache = _central_schemes_cache.get(lang, {}) if not lang_cache: logger.warning(f"Search called for lang='{lang}', but no data is loaded for this language.") return [] search_query = query.lower() matches = [] for ministry_name, schemes in lang_cache.items(): for scheme in schemes: # Create a searchable string from relevant fields haystack = [] # Define fields to search in search_fields = ['Title', 'Description', 'Eligibility', 'Benefits', 'Application Process'] for field in search_fields: content = scheme.get(field) if isinstance(content, str): haystack.append(content) elif isinstance(content, list): # If field is a list (e.g., benefits), join its items haystack.extend(str(item) for item in content) haystack_str = " ".join(haystack).lower() # Check for match if search_query in haystack_str: # Add the ministry name to the result result_item = scheme.copy() result_item['ministry'] = ministry_name matches.append(result_item) return matches # --- END NEW FUNCTION --- # --- Private Helper Utility --- def _find_case_insensitive_key(data: dict, key: str) -> Union[str, None]: """ Utility function to find a key in a dictionary case-insensitively. Returns the *actual* key (with original casing) if found. """ if not data or not key: return None for k in data.keys(): if k.lower() == key.lower(): return k return None