import asyncio import logging import os import json from difflib import SequenceMatcher from api.core.firebase_utils import get_firestore_db logger = logging.getLogger(__name__) # --- Global Cache for Schemes --- cached_all_schemes = {} is_cache_loading = False # File path for Kannada JSON (project_root/data/translated_schemes_kn.json) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) KAN_JSON_FILE = os.path.join(BASE_DIR, "data", "translated_schemes_kn.json") async def load_all_schemes_into_cache(): """ Fetches all schemes from Firestore and populates the in-memory cache. This function should be called at application startup and/or periodically. """ global cached_all_schemes, is_cache_loading if is_cache_loading: logger.info("Cache is already loading, skipping concurrent load request.") return is_cache_loading = True logger.info("Starting to load all schemes into cache from Firestore...") temp_schemes_cache = {} db = get_firestore_db() if not db: logger.error("Firestore DB client is not available. Cannot load schemes into cache.") is_cache_loading = False return try: # Fetch all state docs state_docs = db.collection("schemes").stream() for state_doc in state_docs: state_name = state_doc.id.strip().lower() # store lowercase for consistency scheme_ref = db.collection("schemes").document(state_doc.id).collection("schemes") scheme_docs = scheme_ref.stream() schemes_in_state = [] for scheme_doc in scheme_docs: data = scheme_doc.to_dict() data["id"] = scheme_doc.id schemes_in_state.append(data) temp_schemes_cache[state_name] = schemes_in_state # cached_all_schemes = temp_schemes_cache # logger.info(f"Cache loaded successfully. Total states: {len(cached_all_schemes)}") except Exception as e: logger.error(f"Error loading schemes into cache: {e}") finally: is_cache_loading = False # --- Load Kannada Schemes from JSON --- try: if os.path.exists(KAN_JSON_FILE): with open(KAN_JSON_FILE, "r", encoding="utf-8") as f: kn_data = json.load(f) for state, schemes in kn_data.items(): state_key = state.strip().lower() for s in schemes: s["language"] = "kn" # ensure Kannada tag if state_key in temp_schemes_cache: temp_schemes_cache[state_key].extend(schemes) else: temp_schemes_cache[state_key] = schemes logger.info("Kannada schemes loaded successfully from JSON.") else: logger.warning(f"Kannada JSON file not found at {KAN_JSON_FILE}") except Exception as e: logger.error(f"Error loading Kannada JSON schemes: {e}") # --- Finalize cache --- cached_all_schemes = temp_schemes_cache is_cache_loading = False logger.info(f"Cache ready. Total states: {len(cached_all_schemes)}") # In scheme_service.py def get_all_schemes(lang=None): """ Returns all schemes from the in-memory cache. If lang is provided, return all schemes that match the specified language. Schemes without a language tag are considered 'en' by default. """ if not lang: # No change here, returns everything if no language is specified return cached_all_schemes filtered_cache = {} for state, schemes in cached_all_schemes.items(): # Corrected Logic: Default the language to 'en' if the key is missing. filtered = [ s for s in schemes if s.get("language", "en").lower() == lang.lower() ] if filtered: filtered_cache[state] = filtered logger.info(f"Filtering schemes for lang={lang}") return filtered_cache def search_schemes_in_cache(query: str, lang: str = None): """ Searches schemes across all states within the in-memory cache with basic stemming. Automatically includes schemes that don't have a language field if lang is provided. """ from difflib import SequenceMatcher search_query = query.strip().lower() matched = [] # Create variations of the query for simple stemming search_terms = [search_query] if search_query.endswith('ies'): search_terms.append(search_query[:-3] + 'y') elif search_query.endswith('s'): search_terms.append(search_query[:-1]) logger.info(f"Starting smart search for terms: {search_terms}...") for state_name, schemes in cached_all_schemes.items(): for scheme in schemes: # Language filter: include scheme if language matches OR no language specified language = scheme.get("language", "") if lang and language and language.lower() != lang.lower(): continue # Combine all searchable fields searchable_parts = [ scheme.get("Title", ""), scheme.get("Description", ""), scheme.get("Tags", ""), ] list_fields_to_search = ["Eligibility", "Benefits", "Details", "Documents Required"] for field in list_fields_to_search: items = scheme.get(field, []) if isinstance(items, list): searchable_parts.extend(items) elif isinstance(items, str): searchable_parts.append(items) searchable_text = " ".join(searchable_parts).lower() # Check if any search term is contained or fuzzy match (for typos) if any(term in searchable_text for term in search_terms) or \ any(SequenceMatcher(None, term, searchable_text).ratio() > 0.7 for term in search_terms): result = scheme.copy() result["state"] = state_name matched.append(result) # Don't break; allow multiple schemes per state if needed logger.info(f"Search for '{query}' completed. Found {len(matched)} matches.") return matched # In scheme_service.py def get_schemes_by_state(state: str, lang: str = None): """ Returns schemes for a specific state from the in-memory cache. """ state_key = state.strip().lower() schemes = cached_all_schemes.get(state_key) if not schemes: return None if lang: # Corrected Logic: Default to an empty string to prevent false matches. return [s for s in schemes if s.get("language", "en").lower() == lang.lower()] return schemes def get_scheme_details_by_title(state: str, title: str, lang: str = None): """ Returns details for a single scheme by title or id within a specific state. """ state_key = state.strip().lower() schemes_for_state = cached_all_schemes.get(state_key) if not schemes_for_state: return None url_title_clean = title.strip().lower() for scheme in schemes_for_state: db_id_clean = scheme.get("id", "").strip().lower() db_title_clean = scheme.get("Title", "").strip().lower() if db_id_clean == url_title_clean or db_title_clean == url_title_clean: # Corrected logic: Default the scheme's language to "en" if not present scheme_lang = scheme.get("language", "en").lower() if not lang or scheme_lang == lang.lower(): return scheme return None def get_cache_loading_status(): """Returns the current loading status of the cache.""" return is_cache_loading