Spaces:
Sleeping
Sleeping
File size: 9,167 Bytes
aaa9a08 bbd3a01 aaa9a08 bbd3a01 aaa9a08 bbd3a01 aaa9a08 bbd3a01 aaa9a08 bbd3a01 aaa9a08 bbd3a01 aaa9a08 bbd3a01 aaa9a08 bbd3a01 aaa9a08 bbd3a01 aaa9a08 bbd3a01 aaa9a08 bbd3a01 aaa9a08 bbd3a01 aaa9a08 bbd3a01 aaa9a08 bbd3a01 aaa9a08 bbd3a01 aaa9a08 bbd3a01 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 | import asyncio
import logging
import os # <-- ADDED
import json # <-- ADDED
from typing import Union, List, Dict, Any
import api.core.firebase_utils as firebase_utils
from google.cloud.firestore_v1.base_query import FieldFilter
logger = logging.getLogger(__name__)
# --- Path for Kannada JSON File ---
# This assumes your 'data' folder is structured like: .../project_root/api/data/
# Based on your path: BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# If your 'data' folder is at the project root (.../project_root/data/),
# you might need one more os.path.dirname():
# BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
KAN_JSON_FILE = os.path.join(BASE_DIR, "data", "translated_central_schemes_kn.json")
# --- End Path ---
# In-memory cache, structured by language
_central_schemes_cache: dict = {"en": {}, "hi": {}, "kn": {}} # <-- MODIFIED
_is_loading: bool = False
# --- Cache Loading Function ---
async def load_central_schemes_into_cache():
"""
Asynchronously loads all central government schemes into
an in-memory cache.
- 'en', 'hi' are loaded from Firestore.
- 'kn' is loaded from a local JSON file.
"""
global _central_schemes_cache, _is_loading
if _is_loading:
logger.warning("Central cache load already in progress.")
return
_is_loading = True
logger.info("===== STARTING CENTRAL SCHEMES CACHE LOAD =====")
# This will hold the new cache data
temp_cache = {"en": {}, "hi": {}, "kn": {}} # <-- MODIFIED
try:
# --- 1. Load from Firestore (for 'en', 'hi', etc.) ---
db = firebase_utils.db
if not db:
logger.error("CRITICAL: Firebase DB not initialized. Aborting Firestore cache load.")
else:
logger.info("Attempting to load 'en'/'hi' data from Firestore...")
ministries_ref = db.collection('schemes').document('Central').collection('ministries')
ministries_docs = ministries_ref.get()
if not ministries_docs:
logger.warning("No ministry documents found at 'schemes/Central/ministries'. 'en' cache will be empty.")
else:
logger.info(f"Found {len(ministries_docs)} ministry documents. Processing for 'en'...")
for ministry_doc in ministries_docs:
ministry_name = ministry_doc.id
temp_cache["en"][ministry_name] = []
# TODO: Add logic for 'hi' if it's in Firestore
schemes_ref = ministry_doc.reference.collection('schemes')
schemes_docs = schemes_ref.get()
for scheme_doc in schemes_docs:
scheme_data = scheme_doc.to_dict()
if 'Title' in scheme_data and 'Description' in scheme_data:
# Storing the full data
temp_cache["en"][ministry_name].append(scheme_data)
# TODO: Add logic to populate 'hi' cache if data exists
else:
logger.warning(f"Scheme {scheme_doc.id} in {ministry_name} is missing Title or Description.")
# --- 2. Load Kannada ('kn') from JSON file ---
logger.info(f"Attempting to load 'kn' data from JSON: {KAN_JSON_FILE}")
try:
if not os.path.exists(KAN_JSON_FILE):
logger.warning(f"Kannada JSON file not found at: {KAN_JSON_FILE}. 'kn' cache will be empty.")
else:
with open(KAN_JSON_FILE, 'r', encoding='utf-8') as f:
kannada_data = json.load(f)
# Assuming JSON structure is: {"Ministry Name": [schemes...]}
if isinstance(kannada_data, dict):
temp_cache["kn"] = kannada_data
logger.info(f"Successfully loaded 'kn' data from JSON. Found {len(kannada_data)} ministries.")
else:
logger.error(f"Kannada JSON file format is incorrect. Expected a dictionary, got {type(kannada_data)}. 'kn' cache will be empty.")
except json.JSONDecodeError as json_err:
logger.error(f"Failed to decode Kannada JSON file: {json_err}. 'kn' cache will be empty.")
except Exception as file_err:
logger.error(f"Failed to read Kannada JSON file: {file_err}", exc_info=True)
# --- 3. Finalize Cache ---
_central_schemes_cache = temp_cache
logger.info(f"===== CENTRAL SCHEMES CACHE LOADED. =====")
logger.info(f"Total Ministries (en): {len(_central_schemes_cache.get('en', {}))}")
logger.info(f"Total Ministries (kn): {len(_central_schemes_cache.get('kn', {}))}")
except Exception as e:
logger.error(f"CRITICAL: Unhandled exception during cache load: {e}", exc_info=True)
# Keep the old cache if a critical loading error fails
finally:
_is_loading = False
# --- Public Getter Functions ---
def get_central_schemes_cache() -> dict:
"""Returns the entire central schemes cache."""
return _central_schemes_cache
def is_central_cache_loading() -> bool:
"""Returns True if the cache is currently being loaded."""
return _is_loading
def get_central_cache_loading_status() -> bool:
"""Returns the loading status. (Alias for is_central_cache_loading)"""
return _is_loading
def get_all_ministries_and_schemes(lang: str = 'en') -> dict:
"""
Returns the entire dictionary of ministries and their schemes for a given lang.
e.g., {"Ministry of Finance": [...], "Ministry of Health": [...]}
"""
return _central_schemes_cache.get(lang, {})
def get_all_central_ministries(lang: str = 'en') -> list:
"""
Returns a list of all ministry names for a given language.
"""
return list(_central_schemes_cache.get(lang, {}).keys())
def get_schemes_for_ministry(ministry_name: str, lang: str = 'en') -> list:
"""
Retrieves all schemes for a specific ministry, case-insensitively.
"""
if not _central_schemes_cache.get(lang):
logger.warning(f"get_schemes_for_ministry called for lang='{lang}', but cache is empty or lang not loaded.")
return []
ministry_key = _find_case_insensitive_key(_central_schemes_cache.get(lang, {}), ministry_name)
if ministry_key:
return _central_schemes_cache[lang][ministry_key]
return []
def get_scheme_by_ministry_and_title(ministry_name: str, title: str, lang: str = 'en') -> Union[dict, None]:
"""
Finds a single scheme by ministry and title, case-insensitively.
"""
if not _central_schemes_cache.get(lang):
logger.warning(f"get_scheme_by_ministry_and_title called for lang='{lang}', but cache is empty.")
return None
schemes_list = get_schemes_for_ministry(ministry_name, lang)
for scheme in schemes_list:
if scheme.get('Title', '').lower() == title.lower():
return scheme
return None
# --- NEW SEARCH FUNCTION ---
def search_central_schemes(query: str, lang: str = 'en') -> List[Dict[str, Any]]:
"""
Searches all central schemes for a keyword query.
Returns a list of matched schemes, with their ministry added.
"""
lang_cache = _central_schemes_cache.get(lang, {})
if not lang_cache:
logger.warning(f"Search called for lang='{lang}', but no data is loaded for this language.")
return []
search_query = query.lower()
matches = []
for ministry_name, schemes in lang_cache.items():
for scheme in schemes:
# Create a searchable string from relevant fields
haystack = []
# Define fields to search in
search_fields = ['Title', 'Description', 'Eligibility', 'Benefits', 'Application Process']
for field in search_fields:
content = scheme.get(field)
if isinstance(content, str):
haystack.append(content)
elif isinstance(content, list):
# If field is a list (e.g., benefits), join its items
haystack.extend(str(item) for item in content)
haystack_str = " ".join(haystack).lower()
# Check for match
if search_query in haystack_str:
# Add the ministry name to the result
result_item = scheme.copy()
result_item['ministry'] = ministry_name
matches.append(result_item)
return matches
# --- END NEW FUNCTION ---
# --- Private Helper Utility ---
def _find_case_insensitive_key(data: dict, key: str) -> Union[str, None]:
"""
Utility function to find a key in a dictionary case-insensitively.
Returns the *actual* key (with original casing) if found.
"""
if not data or not key:
return None
for k in data.keys():
if k.lower() == key.lower():
return k
return None |