chathur_api / api /services /recommend_service.py
VJnCode's picture
Feat : added central lang trans
bbd3a01
import logging
from api.services import scheme_service
from api.services import central_services
logger = logging.getLogger(__name__)
def _generate_tags_from_scheme(scheme: dict, user_tags_set: set) -> list[str]:
search_text = (
scheme.get("Title", "") + " " +
scheme.get("Description", "")
).lower()
if not search_text:
return []
found_tags = []
for tag in user_tags_set:
if tag in search_text:
found_tags.append(tag)
return found_tags
# --- END NEW HELPER ---
# --- Hybrid Recommendation Logic ---
def _calculate_hybrid_score(scheme: dict, user_tags_set: set) -> float:
WEIGHT_TAG_MATCH = 0.7 # 70% importance
WEIGHT_POPULARITY = 0.3 # 30% importance
# 1. Content-Based Score (Jaccard Similarity)
# Jaccard Similarity = (Intersection of tags) / (Union of tags)
# --- Assumption Handling ---
# Safely get tags, default to empty list if not present or wrong type
scheme_tags = scheme.get("tags", [])
if not isinstance(scheme_tags, list):
# FIX: Use 'Title' for logging, as 'id' may not exist
logger.warning(f"Scheme {scheme.get('Title', 'Unknown')} has invalid 'tags' format. Skipping.")
scheme_tags = []
scheme_tags_set = set(tag.lower() for tag in scheme_tags)
# --- End Assumption Handling ---
intersection = user_tags_set.intersection(scheme_tags_set)
union = user_tags_set.union(scheme_tags_set)
if not union:
tag_score = 0.0
else:
tag_score = len(intersection) / len(union)
# 2. Popularity-Based Score
# --- Assumption Handling ---
# Safely get popularity, default to 0.5 if not present or wrong type
popularity_score = scheme.get("popularity", 0.5)
if not isinstance(popularity_score, (int, float)):
# FIX: Use 'Title' for logging
logger.warning(f"Scheme {scheme.get('Title', 'Unknown')} has invalid 'popularity' format. Defaulting to 0.5.")
popularity_score = 0.5
# --- End Assumption Handling ---
# 3. Final Hybrid Score
final_score = (WEIGHT_TAG_MATCH * tag_score) + (WEIGHT_POPULARITY * popularity_score)
return final_score
def get_recommendations(user_tags: list[str], lang: str) -> list[dict]:
"""
Generates a ranked list of scheme recommendations from both state and
central caches based on user tags.
NOTE: This function currently ignores the 'lang' parameter and searches
across ALL languages in the cache.
"""
logger.info(f"Generating recommendations with tags={user_tags}. (NOTE: Ignoring lang='{lang}' and searching all languages)")
# --- FIX: Get cache variables at RUN-TIME ---
# Access the variables *through* their modules to get the current, populated data
cached_all_schemes = scheme_service.cached_all_schemes
_central_schemes_cache = central_services._central_schemes_cache
# --- END FIX ---
all_schemes = []
user_tags_set = set(tag.lower() for tag in user_tags)
# --- NEW: Diagnostic Logging ---
# Log what this function *sees* in the imported caches.
logger.info(f"DIAGNOSTIC: State cache size: {len(cached_all_schemes)}")
logger.info(f"DIAGNOSTIC: State cache keys: {list(cached_all_schemes.keys())}")
logger.info(f"DIAGNOSTIC: Central cache size: {len(_central_schemes_cache)}")
logger.info(f"DIAGNOSTIC: Central cache keys: {list(_central_schemes_cache.keys())}")
# --- End Diagnostic Logging ---
# 1. Aggregate State Schemes (Ignoring 'lang' parameter)
try:
# --- FIX: Changed loop to handle Dict[StateName, List[Schemes]] ---
# Iterate over all states in the cache
for state_name, state_schemes in cached_all_schemes.items():
# Log the number of schemes found for this state
logger.info(f"DIAGNOSTIC: Processing state: {state_name}, found {len(state_schemes)} schemes.")
# We don't have a definitive lang_key here.
# Based on logs ('Kannada schemes loaded'), we make an assumption.
lang_key = "unknown"
if state_name.lower() == "karnataka":
lang_key = "ka" # HACK: based on user log
if not isinstance(state_schemes, list):
logger.warning(f"DIAGNOSTIC: Expected list of schemes for state '{state_name}', but got {type(state_schemes)}. Skipping.")
continue
for scheme in state_schemes:
# Add source to identify origin
scheme_copy = scheme.copy()
# --- FIX: DYNAMICALLY GENERATE TAGS ---
# If 'tags' field is missing or empty, create them from Title/Description
if not scheme_copy.get("tags"):
generated_tags = _generate_tags_from_scheme(scheme_copy, user_tags_set)
scheme_copy["tags"] = generated_tags # Add the new tags
# --- END FIX ---
scheme_copy["source"] = "state"
scheme_copy["source_name"] = state_name
scheme_copy["lang_found"] = lang_key # Set to unknown or assumed lang
all_schemes.append(scheme_copy)
# --- END FIX ---
except Exception as e:
logger.error(f"Error processing state schemes cache: {e}")
# 2. Aggregate Central Schemes (Ignoring 'lang' parameter)
try:
# Iterate over all languages in the central cache, not just the specified one
for lang_key, central_lang_cache in _central_schemes_cache.items():
# --- USER REQUEST: Skip 'hi' language ---
if lang_key == "hi":
continue
# --- END USER REQUEST ---
logger.info(f"DIAGNOSTIC: Processing central lang: {lang_key}, found ministries: {len(central_lang_cache)}") # NEW LOG
if not isinstance(central_lang_cache, dict):
logger.warning(f"DIAGNOSTIC: Expected dict of ministries for lang '{lang_key}', but got {type(central_lang_cache)}. Skipping.")
continue
# Iterate over all ministries in that language cache
for ministry_name, ministry_schemes in central_lang_cache.items():
for scheme in ministry_schemes:
# Add source to identify origin
scheme_copy = scheme.copy()
# --- FIX: DYNAMICALLY GENERATE TAGS ---
# If 'tags' field is missing or empty, create them from Title/Description
if not scheme_copy.get("tags"):
generated_tags = _generate_tags_from_scheme(scheme_copy, user_tags_set)
scheme_copy["tags"] = generated_tags # Add the new tags
# --- END FIX ---
scheme_copy["source"] = "central"
scheme_copy["source_name"] = ministry_name
scheme_copy["lang_found"] = lang_key # Add which lang it came from
all_schemes.append(scheme_copy)
except Exception as e:
logger.error(f"Error processing central schemes cache: {e}")
if not all_schemes:
# Updated warning message
logger.warning(f"No schemes found in cache across ANY language. Caches might be empty.")
return []
# 3. Calculate scores for all aggregated schemes
recommendations = []
for scheme in all_schemes:
score = _calculate_hybrid_score(scheme, user_tags_set)
# Only include schemes that had at least one tag match
# This check will now work because we dynamically added tags
scheme_tags_set = set(tag.lower() for tag in scheme.get("tags", []))
if user_tags_set.intersection(scheme_tags_set):
recommendations.append({
# --- Assumed Fields ---
# FIX: Use 'Title' and 'Description' to match your scheme data
"name": scheme.get("Title", "Unnamed Scheme"),
"description": scheme.get("Description", ""),
"tags": scheme.get("tags", []), # Will now show generated tags
# --- End Assumed Fields ---
"source": scheme["source"], # 'state' or 'central'
"source_name": scheme["source_name"], # State or Ministry name
"lang_found": scheme.get("lang_found", "unknown"), # Show which lang it came from
"matched_tags": list(user_tags_set.intersection(scheme_tags_set)),
"final_score": round(score, 4)
})
# 4. Sort by the final score in descending order
sorted_recommendations = sorted(recommendations, key=lambda x: x["final_score"], reverse=True)
logger.info(f"Found {len(sorted_recommendations)} matching recommendations.")
return sorted_recommendations