chathur_api / api /services /scheme_service.py
VJnCode's picture
HOT FIX : correct model
0737512
import asyncio
import logging
import os
import json
from difflib import SequenceMatcher
from api.core.firebase_utils import get_firestore_db
logger = logging.getLogger(__name__)
# --- Global Cache for Schemes ---
cached_all_schemes = {}
is_cache_loading = False
# File path for Kannada JSON (project_root/data/translated_schemes_kn.json)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
KAN_JSON_FILE = os.path.join(BASE_DIR, "data", "translated_schemes_kn.json")
async def load_all_schemes_into_cache():
"""
Fetches all schemes from Firestore and populates the in-memory cache.
This function should be called at application startup and/or periodically.
"""
global cached_all_schemes, is_cache_loading
if is_cache_loading:
logger.info("Cache is already loading, skipping concurrent load request.")
return
is_cache_loading = True
logger.info("Starting to load all schemes into cache from Firestore...")
temp_schemes_cache = {}
db = get_firestore_db()
if not db:
logger.error("Firestore DB client is not available. Cannot load schemes into cache.")
is_cache_loading = False
return
try:
# Fetch all state docs
state_docs = db.collection("schemes").stream()
for state_doc in state_docs:
state_name = state_doc.id.strip().lower() # store lowercase for consistency
scheme_ref = db.collection("schemes").document(state_doc.id).collection("schemes")
scheme_docs = scheme_ref.stream()
schemes_in_state = []
for scheme_doc in scheme_docs:
data = scheme_doc.to_dict()
data["id"] = scheme_doc.id
schemes_in_state.append(data)
temp_schemes_cache[state_name] = schemes_in_state
# cached_all_schemes = temp_schemes_cache
# logger.info(f"Cache loaded successfully. Total states: {len(cached_all_schemes)}")
except Exception as e:
logger.error(f"Error loading schemes into cache: {e}")
finally:
is_cache_loading = False
# --- Load Kannada Schemes from JSON ---
try:
if os.path.exists(KAN_JSON_FILE):
with open(KAN_JSON_FILE, "r", encoding="utf-8") as f:
kn_data = json.load(f)
for state, schemes in kn_data.items():
state_key = state.strip().lower()
for s in schemes:
s["language"] = "kn" # ensure Kannada tag
if state_key in temp_schemes_cache:
temp_schemes_cache[state_key].extend(schemes)
else:
temp_schemes_cache[state_key] = schemes
logger.info("Kannada schemes loaded successfully from JSON.")
else:
logger.warning(f"Kannada JSON file not found at {KAN_JSON_FILE}")
except Exception as e:
logger.error(f"Error loading Kannada JSON schemes: {e}")
# --- Finalize cache ---
cached_all_schemes = temp_schemes_cache
is_cache_loading = False
logger.info(f"Cache ready. Total states: {len(cached_all_schemes)}")
# In scheme_service.py
def get_all_schemes(lang=None):
"""
Returns all schemes from the in-memory cache.
If lang is provided, return all schemes that match the specified language.
Schemes without a language tag are considered 'en' by default.
"""
if not lang:
# No change here, returns everything if no language is specified
return cached_all_schemes
filtered_cache = {}
for state, schemes in cached_all_schemes.items():
# Corrected Logic: Default the language to 'en' if the key is missing.
filtered = [
s for s in schemes
if s.get("language", "en").lower() == lang.lower()
]
if filtered:
filtered_cache[state] = filtered
logger.info(f"Filtering schemes for lang={lang}")
return filtered_cache
def search_schemes_in_cache(query: str, lang: str = None):
"""
Searches schemes across all states within the in-memory cache with basic stemming.
Automatically includes schemes that don't have a language field if lang is provided.
"""
from difflib import SequenceMatcher
search_query = query.strip().lower()
matched = []
# Create variations of the query for simple stemming
search_terms = [search_query]
if search_query.endswith('ies'):
search_terms.append(search_query[:-3] + 'y')
elif search_query.endswith('s'):
search_terms.append(search_query[:-1])
logger.info(f"Starting smart search for terms: {search_terms}...")
for state_name, schemes in cached_all_schemes.items():
for scheme in schemes:
# Language filter: include scheme if language matches OR no language specified
language = scheme.get("language", "")
if lang and language and language.lower() != lang.lower():
continue
# Combine all searchable fields
searchable_parts = [
scheme.get("Title", ""),
scheme.get("Description", ""),
scheme.get("Tags", ""),
]
list_fields_to_search = ["Eligibility", "Benefits", "Details", "Documents Required"]
for field in list_fields_to_search:
items = scheme.get(field, [])
if isinstance(items, list):
searchable_parts.extend(items)
elif isinstance(items, str):
searchable_parts.append(items)
searchable_text = " ".join(searchable_parts).lower()
# Check if any search term is contained or fuzzy match (for typos)
if any(term in searchable_text for term in search_terms) or \
any(SequenceMatcher(None, term, searchable_text).ratio() > 0.7 for term in search_terms):
result = scheme.copy()
result["state"] = state_name
matched.append(result)
# Don't break; allow multiple schemes per state if needed
logger.info(f"Search for '{query}' completed. Found {len(matched)} matches.")
return matched
# In scheme_service.py
def get_schemes_by_state(state: str, lang: str = None):
"""
Returns schemes for a specific state from the in-memory cache.
"""
state_key = state.strip().lower()
schemes = cached_all_schemes.get(state_key)
if not schemes:
return None
if lang:
# Corrected Logic: Default to an empty string to prevent false matches.
return [s for s in schemes if s.get("language", "en").lower() == lang.lower()]
return schemes
def get_scheme_details_by_title(state: str, title: str, lang: str = None):
"""
Returns details for a single scheme by title or id within a specific state.
"""
state_key = state.strip().lower()
schemes_for_state = cached_all_schemes.get(state_key)
if not schemes_for_state:
return None
url_title_clean = title.strip().lower()
for scheme in schemes_for_state:
db_id_clean = scheme.get("id", "").strip().lower()
db_title_clean = scheme.get("Title", "").strip().lower()
if db_id_clean == url_title_clean or db_title_clean == url_title_clean:
# Corrected logic: Default the scheme's language to "en" if not present
scheme_lang = scheme.get("language", "en").lower()
if not lang or scheme_lang == lang.lower():
return scheme
return None
def get_cache_loading_status():
"""Returns the current loading status of the cache."""
return is_cache_loading