Spaces:
Sleeping
Sleeping
| import asyncio | |
| import logging | |
| import os # <-- ADDED | |
| import json # <-- ADDED | |
| from typing import Union, List, Dict, Any | |
| import api.core.firebase_utils as firebase_utils | |
| from google.cloud.firestore_v1.base_query import FieldFilter | |
| logger = logging.getLogger(__name__) | |
| # --- Path for Kannada JSON File --- | |
| # This assumes your 'data' folder is structured like: .../project_root/api/data/ | |
| # Based on your path: BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| # If your 'data' folder is at the project root (.../project_root/data/), | |
| # you might need one more os.path.dirname(): | |
| # BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| KAN_JSON_FILE = os.path.join(BASE_DIR, "data", "translated_central_schemes_kn.json") | |
| # --- End Path --- | |
| # In-memory cache, structured by language | |
| _central_schemes_cache: dict = {"en": {}, "hi": {}, "kn": {}} # <-- MODIFIED | |
| _is_loading: bool = False | |
| # --- Cache Loading Function --- | |
| async def load_central_schemes_into_cache(): | |
| """ | |
| Asynchronously loads all central government schemes into | |
| an in-memory cache. | |
| - 'en', 'hi' are loaded from Firestore. | |
| - 'kn' is loaded from a local JSON file. | |
| """ | |
| global _central_schemes_cache, _is_loading | |
| if _is_loading: | |
| logger.warning("Central cache load already in progress.") | |
| return | |
| _is_loading = True | |
| logger.info("===== STARTING CENTRAL SCHEMES CACHE LOAD =====") | |
| # This will hold the new cache data | |
| temp_cache = {"en": {}, "hi": {}, "kn": {}} # <-- MODIFIED | |
| try: | |
| # --- 1. Load from Firestore (for 'en', 'hi', etc.) --- | |
| db = firebase_utils.db | |
| if not db: | |
| logger.error("CRITICAL: Firebase DB not initialized. Aborting Firestore cache load.") | |
| else: | |
| logger.info("Attempting to load 'en'/'hi' data from Firestore...") | |
| ministries_ref = db.collection('schemes').document('Central').collection('ministries') | |
| ministries_docs = ministries_ref.get() | |
| if not ministries_docs: | |
| logger.warning("No ministry documents found at 'schemes/Central/ministries'. 'en' cache will be empty.") | |
| else: | |
| logger.info(f"Found {len(ministries_docs)} ministry documents. Processing for 'en'...") | |
| for ministry_doc in ministries_docs: | |
| ministry_name = ministry_doc.id | |
| temp_cache["en"][ministry_name] = [] | |
| # TODO: Add logic for 'hi' if it's in Firestore | |
| schemes_ref = ministry_doc.reference.collection('schemes') | |
| schemes_docs = schemes_ref.get() | |
| for scheme_doc in schemes_docs: | |
| scheme_data = scheme_doc.to_dict() | |
| if 'Title' in scheme_data and 'Description' in scheme_data: | |
| # Storing the full data | |
| temp_cache["en"][ministry_name].append(scheme_data) | |
| # TODO: Add logic to populate 'hi' cache if data exists | |
| else: | |
| logger.warning(f"Scheme {scheme_doc.id} in {ministry_name} is missing Title or Description.") | |
| # --- 2. Load Kannada ('kn') from JSON file --- | |
| logger.info(f"Attempting to load 'kn' data from JSON: {KAN_JSON_FILE}") | |
| try: | |
| if not os.path.exists(KAN_JSON_FILE): | |
| logger.warning(f"Kannada JSON file not found at: {KAN_JSON_FILE}. 'kn' cache will be empty.") | |
| else: | |
| with open(KAN_JSON_FILE, 'r', encoding='utf-8') as f: | |
| kannada_data = json.load(f) | |
| # Assuming JSON structure is: {"Ministry Name": [schemes...]} | |
| if isinstance(kannada_data, dict): | |
| temp_cache["kn"] = kannada_data | |
| logger.info(f"Successfully loaded 'kn' data from JSON. Found {len(kannada_data)} ministries.") | |
| else: | |
| logger.error(f"Kannada JSON file format is incorrect. Expected a dictionary, got {type(kannada_data)}. 'kn' cache will be empty.") | |
| except json.JSONDecodeError as json_err: | |
| logger.error(f"Failed to decode Kannada JSON file: {json_err}. 'kn' cache will be empty.") | |
| except Exception as file_err: | |
| logger.error(f"Failed to read Kannada JSON file: {file_err}", exc_info=True) | |
| # --- 3. Finalize Cache --- | |
| _central_schemes_cache = temp_cache | |
| logger.info(f"===== CENTRAL SCHEMES CACHE LOADED. =====") | |
| logger.info(f"Total Ministries (en): {len(_central_schemes_cache.get('en', {}))}") | |
| logger.info(f"Total Ministries (kn): {len(_central_schemes_cache.get('kn', {}))}") | |
| except Exception as e: | |
| logger.error(f"CRITICAL: Unhandled exception during cache load: {e}", exc_info=True) | |
| # Keep the old cache if a critical loading error fails | |
| finally: | |
| _is_loading = False | |
| # --- Public Getter Functions --- | |
| def get_central_schemes_cache() -> dict: | |
| """Returns the entire central schemes cache.""" | |
| return _central_schemes_cache | |
| def is_central_cache_loading() -> bool: | |
| """Returns True if the cache is currently being loaded.""" | |
| return _is_loading | |
| def get_central_cache_loading_status() -> bool: | |
| """Returns the loading status. (Alias for is_central_cache_loading)""" | |
| return _is_loading | |
| def get_all_ministries_and_schemes(lang: str = 'en') -> dict: | |
| """ | |
| Returns the entire dictionary of ministries and their schemes for a given lang. | |
| e.g., {"Ministry of Finance": [...], "Ministry of Health": [...]} | |
| """ | |
| return _central_schemes_cache.get(lang, {}) | |
| def get_all_central_ministries(lang: str = 'en') -> list: | |
| """ | |
| Returns a list of all ministry names for a given language. | |
| """ | |
| return list(_central_schemes_cache.get(lang, {}).keys()) | |
| def get_schemes_for_ministry(ministry_name: str, lang: str = 'en') -> list: | |
| """ | |
| Retrieves all schemes for a specific ministry, case-insensitively. | |
| """ | |
| if not _central_schemes_cache.get(lang): | |
| logger.warning(f"get_schemes_for_ministry called for lang='{lang}', but cache is empty or lang not loaded.") | |
| return [] | |
| ministry_key = _find_case_insensitive_key(_central_schemes_cache.get(lang, {}), ministry_name) | |
| if ministry_key: | |
| return _central_schemes_cache[lang][ministry_key] | |
| return [] | |
| def get_scheme_by_ministry_and_title(ministry_name: str, title: str, lang: str = 'en') -> Union[dict, None]: | |
| """ | |
| Finds a single scheme by ministry and title, case-insensitively. | |
| """ | |
| if not _central_schemes_cache.get(lang): | |
| logger.warning(f"get_scheme_by_ministry_and_title called for lang='{lang}', but cache is empty.") | |
| return None | |
| schemes_list = get_schemes_for_ministry(ministry_name, lang) | |
| for scheme in schemes_list: | |
| if scheme.get('Title', '').lower() == title.lower(): | |
| return scheme | |
| return None | |
| # --- NEW SEARCH FUNCTION --- | |
| def search_central_schemes(query: str, lang: str = 'en') -> List[Dict[str, Any]]: | |
| """ | |
| Searches all central schemes for a keyword query. | |
| Returns a list of matched schemes, with their ministry added. | |
| """ | |
| lang_cache = _central_schemes_cache.get(lang, {}) | |
| if not lang_cache: | |
| logger.warning(f"Search called for lang='{lang}', but no data is loaded for this language.") | |
| return [] | |
| search_query = query.lower() | |
| matches = [] | |
| for ministry_name, schemes in lang_cache.items(): | |
| for scheme in schemes: | |
| # Create a searchable string from relevant fields | |
| haystack = [] | |
| # Define fields to search in | |
| search_fields = ['Title', 'Description', 'Eligibility', 'Benefits', 'Application Process'] | |
| for field in search_fields: | |
| content = scheme.get(field) | |
| if isinstance(content, str): | |
| haystack.append(content) | |
| elif isinstance(content, list): | |
| # If field is a list (e.g., benefits), join its items | |
| haystack.extend(str(item) for item in content) | |
| haystack_str = " ".join(haystack).lower() | |
| # Check for match | |
| if search_query in haystack_str: | |
| # Add the ministry name to the result | |
| result_item = scheme.copy() | |
| result_item['ministry'] = ministry_name | |
| matches.append(result_item) | |
| return matches | |
| # --- END NEW FUNCTION --- | |
| # --- Private Helper Utility --- | |
| def _find_case_insensitive_key(data: dict, key: str) -> Union[str, None]: | |
| """ | |
| Utility function to find a key in a dictionary case-insensitively. | |
| Returns the *actual* key (with original casing) if found. | |
| """ | |
| if not data or not key: | |
| return None | |
| for k in data.keys(): | |
| if k.lower() == key.lower(): | |
| return k | |
| return None |