Spaces:
Sleeping
Sleeping
| import logging | |
| from api.services import scheme_service | |
| from api.services import central_services | |
| logger = logging.getLogger(__name__) | |
| def _generate_tags_from_scheme(scheme: dict, user_tags_set: set) -> list[str]: | |
| search_text = ( | |
| scheme.get("Title", "") + " " + | |
| scheme.get("Description", "") | |
| ).lower() | |
| if not search_text: | |
| return [] | |
| found_tags = [] | |
| for tag in user_tags_set: | |
| if tag in search_text: | |
| found_tags.append(tag) | |
| return found_tags | |
| # --- END NEW HELPER --- | |
| # --- Hybrid Recommendation Logic --- | |
| def _calculate_hybrid_score(scheme: dict, user_tags_set: set) -> float: | |
| WEIGHT_TAG_MATCH = 0.7 # 70% importance | |
| WEIGHT_POPULARITY = 0.3 # 30% importance | |
| # 1. Content-Based Score (Jaccard Similarity) | |
| # Jaccard Similarity = (Intersection of tags) / (Union of tags) | |
| # --- Assumption Handling --- | |
| # Safely get tags, default to empty list if not present or wrong type | |
| scheme_tags = scheme.get("tags", []) | |
| if not isinstance(scheme_tags, list): | |
| # FIX: Use 'Title' for logging, as 'id' may not exist | |
| logger.warning(f"Scheme {scheme.get('Title', 'Unknown')} has invalid 'tags' format. Skipping.") | |
| scheme_tags = [] | |
| scheme_tags_set = set(tag.lower() for tag in scheme_tags) | |
| # --- End Assumption Handling --- | |
| intersection = user_tags_set.intersection(scheme_tags_set) | |
| union = user_tags_set.union(scheme_tags_set) | |
| if not union: | |
| tag_score = 0.0 | |
| else: | |
| tag_score = len(intersection) / len(union) | |
| # 2. Popularity-Based Score | |
| # --- Assumption Handling --- | |
| # Safely get popularity, default to 0.5 if not present or wrong type | |
| popularity_score = scheme.get("popularity", 0.5) | |
| if not isinstance(popularity_score, (int, float)): | |
| # FIX: Use 'Title' for logging | |
| logger.warning(f"Scheme {scheme.get('Title', 'Unknown')} has invalid 'popularity' format. Defaulting to 0.5.") | |
| popularity_score = 0.5 | |
| # --- End Assumption Handling --- | |
| # 3. Final Hybrid Score | |
| final_score = (WEIGHT_TAG_MATCH * tag_score) + (WEIGHT_POPULARITY * popularity_score) | |
| return final_score | |
| def get_recommendations(user_tags: list[str], lang: str) -> list[dict]: | |
| """ | |
| Generates a ranked list of scheme recommendations from both state and | |
| central caches based on user tags. | |
| NOTE: This function currently ignores the 'lang' parameter and searches | |
| across ALL languages in the cache. | |
| """ | |
| logger.info(f"Generating recommendations with tags={user_tags}. (NOTE: Ignoring lang='{lang}' and searching all languages)") | |
| # --- FIX: Get cache variables at RUN-TIME --- | |
| # Access the variables *through* their modules to get the current, populated data | |
| cached_all_schemes = scheme_service.cached_all_schemes | |
| _central_schemes_cache = central_services._central_schemes_cache | |
| # --- END FIX --- | |
| all_schemes = [] | |
| user_tags_set = set(tag.lower() for tag in user_tags) | |
| # --- NEW: Diagnostic Logging --- | |
| # Log what this function *sees* in the imported caches. | |
| logger.info(f"DIAGNOSTIC: State cache size: {len(cached_all_schemes)}") | |
| logger.info(f"DIAGNOSTIC: State cache keys: {list(cached_all_schemes.keys())}") | |
| logger.info(f"DIAGNOSTIC: Central cache size: {len(_central_schemes_cache)}") | |
| logger.info(f"DIAGNOSTIC: Central cache keys: {list(_central_schemes_cache.keys())}") | |
| # --- End Diagnostic Logging --- | |
| # 1. Aggregate State Schemes (Ignoring 'lang' parameter) | |
| try: | |
| # --- FIX: Changed loop to handle Dict[StateName, List[Schemes]] --- | |
| # Iterate over all states in the cache | |
| for state_name, state_schemes in cached_all_schemes.items(): | |
| # Log the number of schemes found for this state | |
| logger.info(f"DIAGNOSTIC: Processing state: {state_name}, found {len(state_schemes)} schemes.") | |
| # We don't have a definitive lang_key here. | |
| # Based on logs ('Kannada schemes loaded'), we make an assumption. | |
| lang_key = "unknown" | |
| if state_name.lower() == "karnataka": | |
| lang_key = "ka" # HACK: based on user log | |
| if not isinstance(state_schemes, list): | |
| logger.warning(f"DIAGNOSTIC: Expected list of schemes for state '{state_name}', but got {type(state_schemes)}. Skipping.") | |
| continue | |
| for scheme in state_schemes: | |
| # Add source to identify origin | |
| scheme_copy = scheme.copy() | |
| # --- FIX: DYNAMICALLY GENERATE TAGS --- | |
| # If 'tags' field is missing or empty, create them from Title/Description | |
| if not scheme_copy.get("tags"): | |
| generated_tags = _generate_tags_from_scheme(scheme_copy, user_tags_set) | |
| scheme_copy["tags"] = generated_tags # Add the new tags | |
| # --- END FIX --- | |
| scheme_copy["source"] = "state" | |
| scheme_copy["source_name"] = state_name | |
| scheme_copy["lang_found"] = lang_key # Set to unknown or assumed lang | |
| all_schemes.append(scheme_copy) | |
| # --- END FIX --- | |
| except Exception as e: | |
| logger.error(f"Error processing state schemes cache: {e}") | |
| # 2. Aggregate Central Schemes (Ignoring 'lang' parameter) | |
| try: | |
| # Iterate over all languages in the central cache, not just the specified one | |
| for lang_key, central_lang_cache in _central_schemes_cache.items(): | |
| # --- USER REQUEST: Skip 'hi' language --- | |
| if lang_key == "hi": | |
| continue | |
| # --- END USER REQUEST --- | |
| logger.info(f"DIAGNOSTIC: Processing central lang: {lang_key}, found ministries: {len(central_lang_cache)}") # NEW LOG | |
| if not isinstance(central_lang_cache, dict): | |
| logger.warning(f"DIAGNOSTIC: Expected dict of ministries for lang '{lang_key}', but got {type(central_lang_cache)}. Skipping.") | |
| continue | |
| # Iterate over all ministries in that language cache | |
| for ministry_name, ministry_schemes in central_lang_cache.items(): | |
| for scheme in ministry_schemes: | |
| # Add source to identify origin | |
| scheme_copy = scheme.copy() | |
| # --- FIX: DYNAMICALLY GENERATE TAGS --- | |
| # If 'tags' field is missing or empty, create them from Title/Description | |
| if not scheme_copy.get("tags"): | |
| generated_tags = _generate_tags_from_scheme(scheme_copy, user_tags_set) | |
| scheme_copy["tags"] = generated_tags # Add the new tags | |
| # --- END FIX --- | |
| scheme_copy["source"] = "central" | |
| scheme_copy["source_name"] = ministry_name | |
| scheme_copy["lang_found"] = lang_key # Add which lang it came from | |
| all_schemes.append(scheme_copy) | |
| except Exception as e: | |
| logger.error(f"Error processing central schemes cache: {e}") | |
| if not all_schemes: | |
| # Updated warning message | |
| logger.warning(f"No schemes found in cache across ANY language. Caches might be empty.") | |
| return [] | |
| # 3. Calculate scores for all aggregated schemes | |
| recommendations = [] | |
| for scheme in all_schemes: | |
| score = _calculate_hybrid_score(scheme, user_tags_set) | |
| # Only include schemes that had at least one tag match | |
| # This check will now work because we dynamically added tags | |
| scheme_tags_set = set(tag.lower() for tag in scheme.get("tags", [])) | |
| if user_tags_set.intersection(scheme_tags_set): | |
| recommendations.append({ | |
| # --- Assumed Fields --- | |
| # FIX: Use 'Title' and 'Description' to match your scheme data | |
| "name": scheme.get("Title", "Unnamed Scheme"), | |
| "description": scheme.get("Description", ""), | |
| "tags": scheme.get("tags", []), # Will now show generated tags | |
| # --- End Assumed Fields --- | |
| "source": scheme["source"], # 'state' or 'central' | |
| "source_name": scheme["source_name"], # State or Ministry name | |
| "lang_found": scheme.get("lang_found", "unknown"), # Show which lang it came from | |
| "matched_tags": list(user_tags_set.intersection(scheme_tags_set)), | |
| "final_score": round(score, 4) | |
| }) | |
| # 4. Sort by the final score in descending order | |
| sorted_recommendations = sorted(recommendations, key=lambda x: x["final_score"], reverse=True) | |
| logger.info(f"Found {len(sorted_recommendations)} matching recommendations.") | |
| return sorted_recommendations | |