Spaces:
Sleeping
Sleeping
File size: 8,863 Bytes
2e108ec | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 | import logging
from api.services import scheme_service
from api.services import central_services
logger = logging.getLogger(__name__)
def _generate_tags_from_scheme(scheme: dict, user_tags_set: set) -> list[str]:
search_text = (
scheme.get("Title", "") + " " +
scheme.get("Description", "")
).lower()
if not search_text:
return []
found_tags = []
for tag in user_tags_set:
if tag in search_text:
found_tags.append(tag)
return found_tags
# --- END NEW HELPER ---
# --- Hybrid Recommendation Logic ---
def _calculate_hybrid_score(scheme: dict, user_tags_set: set) -> float:
WEIGHT_TAG_MATCH = 0.7 # 70% importance
WEIGHT_POPULARITY = 0.3 # 30% importance
# 1. Content-Based Score (Jaccard Similarity)
# Jaccard Similarity = (Intersection of tags) / (Union of tags)
# --- Assumption Handling ---
# Safely get tags, default to empty list if not present or wrong type
scheme_tags = scheme.get("tags", [])
if not isinstance(scheme_tags, list):
# FIX: Use 'Title' for logging, as 'id' may not exist
logger.warning(f"Scheme {scheme.get('Title', 'Unknown')} has invalid 'tags' format. Skipping.")
scheme_tags = []
scheme_tags_set = set(tag.lower() for tag in scheme_tags)
# --- End Assumption Handling ---
intersection = user_tags_set.intersection(scheme_tags_set)
union = user_tags_set.union(scheme_tags_set)
if not union:
tag_score = 0.0
else:
tag_score = len(intersection) / len(union)
# 2. Popularity-Based Score
# --- Assumption Handling ---
# Safely get popularity, default to 0.5 if not present or wrong type
popularity_score = scheme.get("popularity", 0.5)
if not isinstance(popularity_score, (int, float)):
# FIX: Use 'Title' for logging
logger.warning(f"Scheme {scheme.get('Title', 'Unknown')} has invalid 'popularity' format. Defaulting to 0.5.")
popularity_score = 0.5
# --- End Assumption Handling ---
# 3. Final Hybrid Score
final_score = (WEIGHT_TAG_MATCH * tag_score) + (WEIGHT_POPULARITY * popularity_score)
return final_score
def get_recommendations(user_tags: list[str], lang: str) -> list[dict]:
"""
Generates a ranked list of scheme recommendations from both state and
central caches based on user tags.
NOTE: This function currently ignores the 'lang' parameter and searches
across ALL languages in the cache.
"""
logger.info(f"Generating recommendations with tags={user_tags}. (NOTE: Ignoring lang='{lang}' and searching all languages)")
# --- FIX: Get cache variables at RUN-TIME ---
# Access the variables *through* their modules to get the current, populated data
cached_all_schemes = scheme_service.cached_all_schemes
_central_schemes_cache = central_services._central_schemes_cache
# --- END FIX ---
all_schemes = []
user_tags_set = set(tag.lower() for tag in user_tags)
# --- NEW: Diagnostic Logging ---
# Log what this function *sees* in the imported caches.
logger.info(f"DIAGNOSTIC: State cache size: {len(cached_all_schemes)}")
logger.info(f"DIAGNOSTIC: State cache keys: {list(cached_all_schemes.keys())}")
logger.info(f"DIAGNOSTIC: Central cache size: {len(_central_schemes_cache)}")
logger.info(f"DIAGNOSTIC: Central cache keys: {list(_central_schemes_cache.keys())}")
# --- End Diagnostic Logging ---
# 1. Aggregate State Schemes (Ignoring 'lang' parameter)
try:
# --- FIX: Changed loop to handle Dict[StateName, List[Schemes]] ---
# Iterate over all states in the cache
for state_name, state_schemes in cached_all_schemes.items():
# Log the number of schemes found for this state
logger.info(f"DIAGNOSTIC: Processing state: {state_name}, found {len(state_schemes)} schemes.")
# We don't have a definitive lang_key here.
# Based on logs ('Kannada schemes loaded'), we make an assumption.
lang_key = "unknown"
if state_name.lower() == "karnataka":
lang_key = "ka" # HACK: based on user log
if not isinstance(state_schemes, list):
logger.warning(f"DIAGNOSTIC: Expected list of schemes for state '{state_name}', but got {type(state_schemes)}. Skipping.")
continue
for scheme in state_schemes:
# Add source to identify origin
scheme_copy = scheme.copy()
# --- FIX: DYNAMICALLY GENERATE TAGS ---
# If 'tags' field is missing or empty, create them from Title/Description
if not scheme_copy.get("tags"):
generated_tags = _generate_tags_from_scheme(scheme_copy, user_tags_set)
scheme_copy["tags"] = generated_tags # Add the new tags
# --- END FIX ---
scheme_copy["source"] = "state"
scheme_copy["source_name"] = state_name
scheme_copy["lang_found"] = lang_key # Set to unknown or assumed lang
all_schemes.append(scheme_copy)
# --- END FIX ---
except Exception as e:
logger.error(f"Error processing state schemes cache: {e}")
# 2. Aggregate Central Schemes (Ignoring 'lang' parameter)
try:
# Iterate over all languages in the central cache, not just the specified one
for lang_key, central_lang_cache in _central_schemes_cache.items():
# --- USER REQUEST: Skip 'hi' language ---
if lang_key == "hi":
continue
# --- END USER REQUEST ---
logger.info(f"DIAGNOSTIC: Processing central lang: {lang_key}, found ministries: {len(central_lang_cache)}") # NEW LOG
if not isinstance(central_lang_cache, dict):
logger.warning(f"DIAGNOSTIC: Expected dict of ministries for lang '{lang_key}', but got {type(central_lang_cache)}. Skipping.")
continue
# Iterate over all ministries in that language cache
for ministry_name, ministry_schemes in central_lang_cache.items():
for scheme in ministry_schemes:
# Add source to identify origin
scheme_copy = scheme.copy()
# --- FIX: DYNAMICALLY GENERATE TAGS ---
# If 'tags' field is missing or empty, create them from Title/Description
if not scheme_copy.get("tags"):
generated_tags = _generate_tags_from_scheme(scheme_copy, user_tags_set)
scheme_copy["tags"] = generated_tags # Add the new tags
# --- END FIX ---
scheme_copy["source"] = "central"
scheme_copy["source_name"] = ministry_name
scheme_copy["lang_found"] = lang_key # Add which lang it came from
all_schemes.append(scheme_copy)
except Exception as e:
logger.error(f"Error processing central schemes cache: {e}")
if not all_schemes:
# Updated warning message
logger.warning(f"No schemes found in cache across ANY language. Caches might be empty.")
return []
# 3. Calculate scores for all aggregated schemes
recommendations = []
for scheme in all_schemes:
score = _calculate_hybrid_score(scheme, user_tags_set)
# Only include schemes that had at least one tag match
# This check will now work because we dynamically added tags
scheme_tags_set = set(tag.lower() for tag in scheme.get("tags", []))
if user_tags_set.intersection(scheme_tags_set):
recommendations.append({
# --- Assumed Fields ---
# FIX: Use 'Title' and 'Description' to match your scheme data
"name": scheme.get("Title", "Unnamed Scheme"),
"description": scheme.get("Description", ""),
"tags": scheme.get("tags", []), # Will now show generated tags
# --- End Assumed Fields ---
"source": scheme["source"], # 'state' or 'central'
"source_name": scheme["source_name"], # State or Ministry name
"lang_found": scheme.get("lang_found", "unknown"), # Show which lang it came from
"matched_tags": list(user_tags_set.intersection(scheme_tags_set)),
"final_score": round(score, 4)
})
# 4. Sort by the final score in descending order
sorted_recommendations = sorted(recommendations, key=lambda x: x["final_score"], reverse=True)
logger.info(f"Found {len(sorted_recommendations)} matching recommendations.")
return sorted_recommendations
|