import logging from api.services import scheme_service from api.services import central_services logger = logging.getLogger(__name__) def _generate_tags_from_scheme(scheme: dict, user_tags_set: set) -> list[str]: search_text = ( scheme.get("Title", "") + " " + scheme.get("Description", "") ).lower() if not search_text: return [] found_tags = [] for tag in user_tags_set: if tag in search_text: found_tags.append(tag) return found_tags # --- END NEW HELPER --- # --- Hybrid Recommendation Logic --- def _calculate_hybrid_score(scheme: dict, user_tags_set: set) -> float: WEIGHT_TAG_MATCH = 0.7 # 70% importance WEIGHT_POPULARITY = 0.3 # 30% importance # 1. Content-Based Score (Jaccard Similarity) # Jaccard Similarity = (Intersection of tags) / (Union of tags) # --- Assumption Handling --- # Safely get tags, default to empty list if not present or wrong type scheme_tags = scheme.get("tags", []) if not isinstance(scheme_tags, list): # FIX: Use 'Title' for logging, as 'id' may not exist logger.warning(f"Scheme {scheme.get('Title', 'Unknown')} has invalid 'tags' format. Skipping.") scheme_tags = [] scheme_tags_set = set(tag.lower() for tag in scheme_tags) # --- End Assumption Handling --- intersection = user_tags_set.intersection(scheme_tags_set) union = user_tags_set.union(scheme_tags_set) if not union: tag_score = 0.0 else: tag_score = len(intersection) / len(union) # 2. Popularity-Based Score # --- Assumption Handling --- # Safely get popularity, default to 0.5 if not present or wrong type popularity_score = scheme.get("popularity", 0.5) if not isinstance(popularity_score, (int, float)): # FIX: Use 'Title' for logging logger.warning(f"Scheme {scheme.get('Title', 'Unknown')} has invalid 'popularity' format. Defaulting to 0.5.") popularity_score = 0.5 # --- End Assumption Handling --- # 3. Final Hybrid Score final_score = (WEIGHT_TAG_MATCH * tag_score) + (WEIGHT_POPULARITY * popularity_score) return final_score def get_recommendations(user_tags: list[str], lang: str) -> list[dict]: """ Generates a ranked list of scheme recommendations from both state and central caches based on user tags. NOTE: This function currently ignores the 'lang' parameter and searches across ALL languages in the cache. """ logger.info(f"Generating recommendations with tags={user_tags}. (NOTE: Ignoring lang='{lang}' and searching all languages)") # --- FIX: Get cache variables at RUN-TIME --- # Access the variables *through* their modules to get the current, populated data cached_all_schemes = scheme_service.cached_all_schemes _central_schemes_cache = central_services._central_schemes_cache # --- END FIX --- all_schemes = [] user_tags_set = set(tag.lower() for tag in user_tags) # --- NEW: Diagnostic Logging --- # Log what this function *sees* in the imported caches. logger.info(f"DIAGNOSTIC: State cache size: {len(cached_all_schemes)}") logger.info(f"DIAGNOSTIC: State cache keys: {list(cached_all_schemes.keys())}") logger.info(f"DIAGNOSTIC: Central cache size: {len(_central_schemes_cache)}") logger.info(f"DIAGNOSTIC: Central cache keys: {list(_central_schemes_cache.keys())}") # --- End Diagnostic Logging --- # 1. Aggregate State Schemes (Ignoring 'lang' parameter) try: # --- FIX: Changed loop to handle Dict[StateName, List[Schemes]] --- # Iterate over all states in the cache for state_name, state_schemes in cached_all_schemes.items(): # Log the number of schemes found for this state logger.info(f"DIAGNOSTIC: Processing state: {state_name}, found {len(state_schemes)} schemes.") # We don't have a definitive lang_key here. # Based on logs ('Kannada schemes loaded'), we make an assumption. lang_key = "unknown" if state_name.lower() == "karnataka": lang_key = "ka" # HACK: based on user log if not isinstance(state_schemes, list): logger.warning(f"DIAGNOSTIC: Expected list of schemes for state '{state_name}', but got {type(state_schemes)}. Skipping.") continue for scheme in state_schemes: # Add source to identify origin scheme_copy = scheme.copy() # --- FIX: DYNAMICALLY GENERATE TAGS --- # If 'tags' field is missing or empty, create them from Title/Description if not scheme_copy.get("tags"): generated_tags = _generate_tags_from_scheme(scheme_copy, user_tags_set) scheme_copy["tags"] = generated_tags # Add the new tags # --- END FIX --- scheme_copy["source"] = "state" scheme_copy["source_name"] = state_name scheme_copy["lang_found"] = lang_key # Set to unknown or assumed lang all_schemes.append(scheme_copy) # --- END FIX --- except Exception as e: logger.error(f"Error processing state schemes cache: {e}") # 2. Aggregate Central Schemes (Ignoring 'lang' parameter) try: # Iterate over all languages in the central cache, not just the specified one for lang_key, central_lang_cache in _central_schemes_cache.items(): # --- USER REQUEST: Skip 'hi' language --- if lang_key == "hi": continue # --- END USER REQUEST --- logger.info(f"DIAGNOSTIC: Processing central lang: {lang_key}, found ministries: {len(central_lang_cache)}") # NEW LOG if not isinstance(central_lang_cache, dict): logger.warning(f"DIAGNOSTIC: Expected dict of ministries for lang '{lang_key}', but got {type(central_lang_cache)}. Skipping.") continue # Iterate over all ministries in that language cache for ministry_name, ministry_schemes in central_lang_cache.items(): for scheme in ministry_schemes: # Add source to identify origin scheme_copy = scheme.copy() # --- FIX: DYNAMICALLY GENERATE TAGS --- # If 'tags' field is missing or empty, create them from Title/Description if not scheme_copy.get("tags"): generated_tags = _generate_tags_from_scheme(scheme_copy, user_tags_set) scheme_copy["tags"] = generated_tags # Add the new tags # --- END FIX --- scheme_copy["source"] = "central" scheme_copy["source_name"] = ministry_name scheme_copy["lang_found"] = lang_key # Add which lang it came from all_schemes.append(scheme_copy) except Exception as e: logger.error(f"Error processing central schemes cache: {e}") if not all_schemes: # Updated warning message logger.warning(f"No schemes found in cache across ANY language. Caches might be empty.") return [] # 3. Calculate scores for all aggregated schemes recommendations = [] for scheme in all_schemes: score = _calculate_hybrid_score(scheme, user_tags_set) # Only include schemes that had at least one tag match # This check will now work because we dynamically added tags scheme_tags_set = set(tag.lower() for tag in scheme.get("tags", [])) if user_tags_set.intersection(scheme_tags_set): recommendations.append({ # --- Assumed Fields --- # FIX: Use 'Title' and 'Description' to match your scheme data "name": scheme.get("Title", "Unnamed Scheme"), "description": scheme.get("Description", ""), "tags": scheme.get("tags", []), # Will now show generated tags # --- End Assumed Fields --- "source": scheme["source"], # 'state' or 'central' "source_name": scheme["source_name"], # State or Ministry name "lang_found": scheme.get("lang_found", "unknown"), # Show which lang it came from "matched_tags": list(user_tags_set.intersection(scheme_tags_set)), "final_score": round(score, 4) }) # 4. Sort by the final score in descending order sorted_recommendations = sorted(recommendations, key=lambda x: x["final_score"], reverse=True) logger.info(f"Found {len(sorted_recommendations)} matching recommendations.") return sorted_recommendations