Spaces:
Sleeping
Sleeping
| import asyncio | |
| import logging | |
| import os | |
| import json | |
| from difflib import SequenceMatcher | |
| from api.core.firebase_utils import get_firestore_db | |
| logger = logging.getLogger(__name__) | |
| # --- Global Cache for Schemes --- | |
| cached_all_schemes = {} | |
| is_cache_loading = False | |
| # File path for Kannada JSON (project_root/data/translated_schemes_kn.json) | |
| BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| KAN_JSON_FILE = os.path.join(BASE_DIR, "data", "translated_schemes_kn.json") | |
| async def load_all_schemes_into_cache(): | |
| """ | |
| Fetches all schemes from Firestore and populates the in-memory cache. | |
| This function should be called at application startup and/or periodically. | |
| """ | |
| global cached_all_schemes, is_cache_loading | |
| if is_cache_loading: | |
| logger.info("Cache is already loading, skipping concurrent load request.") | |
| return | |
| is_cache_loading = True | |
| logger.info("Starting to load all schemes into cache from Firestore...") | |
| temp_schemes_cache = {} | |
| db = get_firestore_db() | |
| if not db: | |
| logger.error("Firestore DB client is not available. Cannot load schemes into cache.") | |
| is_cache_loading = False | |
| return | |
| try: | |
| # Fetch all state docs | |
| state_docs = db.collection("schemes").stream() | |
| for state_doc in state_docs: | |
| state_name = state_doc.id.strip().lower() # store lowercase for consistency | |
| scheme_ref = db.collection("schemes").document(state_doc.id).collection("schemes") | |
| scheme_docs = scheme_ref.stream() | |
| schemes_in_state = [] | |
| for scheme_doc in scheme_docs: | |
| data = scheme_doc.to_dict() | |
| data["id"] = scheme_doc.id | |
| schemes_in_state.append(data) | |
| temp_schemes_cache[state_name] = schemes_in_state | |
| # cached_all_schemes = temp_schemes_cache | |
| # logger.info(f"Cache loaded successfully. Total states: {len(cached_all_schemes)}") | |
| except Exception as e: | |
| logger.error(f"Error loading schemes into cache: {e}") | |
| finally: | |
| is_cache_loading = False | |
| # --- Load Kannada Schemes from JSON --- | |
| try: | |
| if os.path.exists(KAN_JSON_FILE): | |
| with open(KAN_JSON_FILE, "r", encoding="utf-8") as f: | |
| kn_data = json.load(f) | |
| for state, schemes in kn_data.items(): | |
| state_key = state.strip().lower() | |
| for s in schemes: | |
| s["language"] = "kn" # ensure Kannada tag | |
| if state_key in temp_schemes_cache: | |
| temp_schemes_cache[state_key].extend(schemes) | |
| else: | |
| temp_schemes_cache[state_key] = schemes | |
| logger.info("Kannada schemes loaded successfully from JSON.") | |
| else: | |
| logger.warning(f"Kannada JSON file not found at {KAN_JSON_FILE}") | |
| except Exception as e: | |
| logger.error(f"Error loading Kannada JSON schemes: {e}") | |
| # --- Finalize cache --- | |
| cached_all_schemes = temp_schemes_cache | |
| is_cache_loading = False | |
| logger.info(f"Cache ready. Total states: {len(cached_all_schemes)}") | |
| # In scheme_service.py | |
| def get_all_schemes(lang=None): | |
| """ | |
| Returns all schemes from the in-memory cache. | |
| If lang is provided, return all schemes that match the specified language. | |
| Schemes without a language tag are considered 'en' by default. | |
| """ | |
| if not lang: | |
| # No change here, returns everything if no language is specified | |
| return cached_all_schemes | |
| filtered_cache = {} | |
| for state, schemes in cached_all_schemes.items(): | |
| # Corrected Logic: Default the language to 'en' if the key is missing. | |
| filtered = [ | |
| s for s in schemes | |
| if s.get("language", "en").lower() == lang.lower() | |
| ] | |
| if filtered: | |
| filtered_cache[state] = filtered | |
| logger.info(f"Filtering schemes for lang={lang}") | |
| return filtered_cache | |
| def search_schemes_in_cache(query: str, lang: str = None): | |
| """ | |
| Searches schemes across all states within the in-memory cache with basic stemming. | |
| Automatically includes schemes that don't have a language field if lang is provided. | |
| """ | |
| from difflib import SequenceMatcher | |
| search_query = query.strip().lower() | |
| matched = [] | |
| # Create variations of the query for simple stemming | |
| search_terms = [search_query] | |
| if search_query.endswith('ies'): | |
| search_terms.append(search_query[:-3] + 'y') | |
| elif search_query.endswith('s'): | |
| search_terms.append(search_query[:-1]) | |
| logger.info(f"Starting smart search for terms: {search_terms}...") | |
| for state_name, schemes in cached_all_schemes.items(): | |
| for scheme in schemes: | |
| # Language filter: include scheme if language matches OR no language specified | |
| language = scheme.get("language", "") | |
| if lang and language and language.lower() != lang.lower(): | |
| continue | |
| # Combine all searchable fields | |
| searchable_parts = [ | |
| scheme.get("Title", ""), | |
| scheme.get("Description", ""), | |
| scheme.get("Tags", ""), | |
| ] | |
| list_fields_to_search = ["Eligibility", "Benefits", "Details", "Documents Required"] | |
| for field in list_fields_to_search: | |
| items = scheme.get(field, []) | |
| if isinstance(items, list): | |
| searchable_parts.extend(items) | |
| elif isinstance(items, str): | |
| searchable_parts.append(items) | |
| searchable_text = " ".join(searchable_parts).lower() | |
| # Check if any search term is contained or fuzzy match (for typos) | |
| if any(term in searchable_text for term in search_terms) or \ | |
| any(SequenceMatcher(None, term, searchable_text).ratio() > 0.7 for term in search_terms): | |
| result = scheme.copy() | |
| result["state"] = state_name | |
| matched.append(result) | |
| # Don't break; allow multiple schemes per state if needed | |
| logger.info(f"Search for '{query}' completed. Found {len(matched)} matches.") | |
| return matched | |
| # In scheme_service.py | |
| def get_schemes_by_state(state: str, lang: str = None): | |
| """ | |
| Returns schemes for a specific state from the in-memory cache. | |
| """ | |
| state_key = state.strip().lower() | |
| schemes = cached_all_schemes.get(state_key) | |
| if not schemes: | |
| return None | |
| if lang: | |
| # Corrected Logic: Default to an empty string to prevent false matches. | |
| return [s for s in schemes if s.get("language", "en").lower() == lang.lower()] | |
| return schemes | |
| def get_scheme_details_by_title(state: str, title: str, lang: str = None): | |
| """ | |
| Returns details for a single scheme by title or id within a specific state. | |
| """ | |
| state_key = state.strip().lower() | |
| schemes_for_state = cached_all_schemes.get(state_key) | |
| if not schemes_for_state: | |
| return None | |
| url_title_clean = title.strip().lower() | |
| for scheme in schemes_for_state: | |
| db_id_clean = scheme.get("id", "").strip().lower() | |
| db_title_clean = scheme.get("Title", "").strip().lower() | |
| if db_id_clean == url_title_clean or db_title_clean == url_title_clean: | |
| # Corrected logic: Default the scheme's language to "en" if not present | |
| scheme_lang = scheme.get("language", "en").lower() | |
| if not lang or scheme_lang == lang.lower(): | |
| return scheme | |
| return None | |
| def get_cache_loading_status(): | |
| """Returns the current loading status of the cache.""" | |
| return is_cache_loading | |