import pandas as pd import joblib import os import time from sqlalchemy import create_engine from sklearn.metrics.pairwise import cosine_similarity from urllib.parse import quote_plus from text_utils import TextProcessor from functools import lru_cache # --- CONFIGURATION --- # For cloud deployment (HF/Production), use DATABASE_URL. # Fallback to local construction if not present. DATABASE_URL = os.getenv("DATABASE_URL") if not DATABASE_URL: DB_USER = os.getenv("DB_USER", "postgres") DB_PASSWORD = quote_plus(os.getenv("DB_PASSWORD", "subisu")) DB_HOST = os.getenv("DB_HOST", "localhost") DB_PORT = os.getenv("DB_PORT", "5432") DB_NAME = os.getenv("DB_NAME", "ppd_project_db") DB_URI = f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}' else: # Ensure URL is compatible with SQLAlchemy if it starts with postgres:// if DATABASE_URL.startswith("postgres://"): DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql+psycopg2://", 1) elif "postgresql://" in DATABASE_URL and "+psycopg2" not in DATABASE_URL: DATABASE_URL = DATABASE_URL.replace("postgresql://", "postgresql+psycopg2://", 1) DB_URI = DATABASE_URL class RecommenderCore: def __init__(self): self.engine = create_engine(DB_URI) self.vectorizer = None self.tfidf_matrix = None self.df = None self.load_model() def load_model(self): try: if os.path.exists('vectorizer.pkl') and os.path.exists('tfidf_matrix.pkl'): self.vectorizer = joblib.load('vectorizer.pkl') self.tfidf_matrix = joblib.load('tfidf_matrix.pkl') print("💾 Model Loaded into Memory.") self.df = pd.read_sql("SELECT * FROM articles WHERE status = 'Approved' ORDER BY article_id", self.engine) self.df = self.df.reset_index(drop=True) print(f"📚 Indexed {len(self.df)} articles.") except Exception as e: print(f"Load Error: {e}") @lru_cache(maxsize=128) def recommend_articles(self, symptoms_text, crisis_level, top_n=5): """Modular requirement: Main entry point with caching.""" if self.df is None or self.vectorizer is None: return [] # 1. Preprocess user query query_raw = symptoms_text query_norm = TextProcessor.normalize(symptoms_text) query_phased = TextProcessor.detect_phrases(query_norm) # 2. Filter by Crisis Level (Safety First) risk_map = { "High": ["High", "Critical", "Moderate", "All"], "Moderate": ["Moderate", "Low", "All"], "Low": ["Low", "All"] } allowed = risk_map.get(crisis_level, ["All"]) # Determine the filtered subset mask = self.df['risk_level'].apply( lambda x: any(level.strip() in allowed for level in str(x).split(',')) ) filtered_df = self.df[mask].copy() if filtered_df.empty: return [] # 3. Primary ML Scoring (Cosine Similarity) user_vec = self.vectorizer.transform([query_phased]) all_cos_scores = cosine_similarity(user_vec, self.tfidf_matrix).flatten() # 4. Final Ranking # Correctly align scores using the original dataframe's index # SAFETY: Ensure we don't exceed the bounds of the scores array (mismatch protection) max_idx = len(all_cos_scores) cos_scores_for_filtered = [] for i in filtered_df.index: if i < max_idx: cos_scores_for_filtered.append(all_cos_scores[i]) else: cos_scores_for_filtered.append(0.0) filtered_df['cosine_score'] = cos_scores_for_filtered # Apply the hybrid ranking engine ranked_results = self.apply_ranking(filtered_df, query_raw) # Format for output final_list = ranked_results.head(top_n).to_dict('records') # 5. Live Fallback if needed # Requirement: If results are too few, fetch fresh content K = 3 if len(final_list) < K: try: from ingestion_service import IngestionService service = IngestionService() live_arts = service.fetch_from_pubmed(query_raw, limit=K) for art in live_arts: if len(final_list) >= top_n: break final_list.append({ "article_id": -1, "title": art['title'], "category": "Live Fallback", "format_type": "pubmed", "external_url": art['url'], "content": art['content'], "risk_level": "All" }) # Background ingestion (optional here, but requested in strategy) if live_arts: service.store_articles(live_arts) except Exception as e: print(f"Fallback error: {e}") for item in final_list: item['access_type'] = 'External Link' if item.get('format_type') == 'pubmed' else 'Direct Text' if 'created_at' in item and item['created_at']: item['created_at'] = str(item['created_at']) return final_list def apply_ranking(self, df, raw_query): """Modular requirement: Hybrid ranking engine.""" # Constants for weighting SOURCE_WEIGHT = 1.15 # 15% boost for contributor articles EXACT_MATCH_BOOST = 0.2 tokens = TextProcessor.normalize(raw_query).split() now = pd.Timestamp.now() def calculate_hybrid_score(row): score = row['cosine_score'] # A. Source Weighting (Trusted Contributors) if row['format_type'] == 'text': score *= SOURCE_WEIGHT # B. Exact Symptom Overlap Boost # Check how many user tokens appear exactly in the normalized title norm_title = TextProcessor.normalize(row['title']) matches = sum(1 for t in tokens if t in norm_title) score += (matches * EXACT_MATCH_BOOST) # C. Recency Boost (PubMed only, newer is better) if row['format_type'] == 'pubmed' and row['created_at']: age_days = (now - pd.to_datetime(row['created_at'])).days # Decaying boost: max 0.1 for brand new, goes to 0 over 365 days recency_boost = max(0, 0.1 * (1 - (min(age_days, 365) / 365))) score += recency_boost return score df['final_score'] = df.apply(calculate_hybrid_score, axis=1) return df.sort_values(by='final_score', ascending=False) def get_article_by_id(self, article_id): """Fetches a single article by its ID.""" if self.df is None: return None article = self.df[self.df['article_id'] == article_id] return article.iloc[0].to_dict() if not article.empty else None # Singleton instance to be used by main.py recommender = RecommenderCore()