Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import joblib | |
| import os | |
| import time | |
| from sqlalchemy import create_engine | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from urllib.parse import quote_plus | |
| from text_utils import TextProcessor | |
| from functools import lru_cache | |
| # --- CONFIGURATION --- | |
| # For cloud deployment (HF/Production), use DATABASE_URL. | |
| # Fallback to local construction if not present. | |
| DATABASE_URL = os.getenv("DATABASE_URL") | |
| if not DATABASE_URL: | |
| DB_USER = os.getenv("DB_USER", "postgres") | |
| DB_PASSWORD = quote_plus(os.getenv("DB_PASSWORD", "subisu")) | |
| DB_HOST = os.getenv("DB_HOST", "localhost") | |
| DB_PORT = os.getenv("DB_PORT", "5432") | |
| DB_NAME = os.getenv("DB_NAME", "ppd_project_db") | |
| DB_URI = f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}' | |
| else: | |
| # Ensure URL is compatible with SQLAlchemy if it starts with postgres:// | |
| if DATABASE_URL.startswith("postgres://"): | |
| DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql+psycopg2://", 1) | |
| elif "postgresql://" in DATABASE_URL and "+psycopg2" not in DATABASE_URL: | |
| DATABASE_URL = DATABASE_URL.replace("postgresql://", "postgresql+psycopg2://", 1) | |
| DB_URI = DATABASE_URL | |
| class RecommenderCore: | |
| def __init__(self): | |
| self.engine = create_engine(DB_URI) | |
| self.vectorizer = None | |
| self.tfidf_matrix = None | |
| self.df = None | |
| self.load_model() | |
| def load_model(self): | |
| try: | |
| if os.path.exists('vectorizer.pkl') and os.path.exists('tfidf_matrix.pkl'): | |
| self.vectorizer = joblib.load('vectorizer.pkl') | |
| self.tfidf_matrix = joblib.load('tfidf_matrix.pkl') | |
| print("💾 Model Loaded into Memory.") | |
| self.df = pd.read_sql("SELECT * FROM articles WHERE status = 'Approved' ORDER BY article_id", self.engine) | |
| self.df = self.df.reset_index(drop=True) | |
| print(f"📚 Indexed {len(self.df)} articles.") | |
| except Exception as e: | |
| print(f"Load Error: {e}") | |
| def recommend_articles(self, symptoms_text, crisis_level, top_n=5): | |
| """Modular requirement: Main entry point with caching.""" | |
| if self.df is None or self.vectorizer is None: | |
| return [] | |
| # 1. Preprocess user query | |
| query_raw = symptoms_text | |
| query_norm = TextProcessor.normalize(symptoms_text) | |
| query_phased = TextProcessor.detect_phrases(query_norm) | |
| # 2. Filter by Crisis Level (Safety First) | |
| risk_map = { | |
| "High": ["High", "Critical", "Moderate", "All"], | |
| "Moderate": ["Moderate", "Low", "All"], | |
| "Low": ["Low", "All"] | |
| } | |
| allowed = risk_map.get(crisis_level, ["All"]) | |
| # Determine the filtered subset | |
| mask = self.df['risk_level'].apply( | |
| lambda x: any(level.strip() in allowed for level in str(x).split(',')) | |
| ) | |
| filtered_df = self.df[mask].copy() | |
| if filtered_df.empty: return [] | |
| # 3. Primary ML Scoring (Cosine Similarity) | |
| user_vec = self.vectorizer.transform([query_phased]) | |
| all_cos_scores = cosine_similarity(user_vec, self.tfidf_matrix).flatten() | |
| # 4. Final Ranking | |
| # Correctly align scores using the original dataframe's index | |
| # SAFETY: Ensure we don't exceed the bounds of the scores array (mismatch protection) | |
| max_idx = len(all_cos_scores) | |
| cos_scores_for_filtered = [] | |
| for i in filtered_df.index: | |
| if i < max_idx: | |
| cos_scores_for_filtered.append(all_cos_scores[i]) | |
| else: | |
| cos_scores_for_filtered.append(0.0) | |
| filtered_df['cosine_score'] = cos_scores_for_filtered | |
| # Apply the hybrid ranking engine | |
| ranked_results = self.apply_ranking(filtered_df, query_raw) | |
| # Format for output | |
| final_list = ranked_results.head(top_n).to_dict('records') | |
| # 5. Live Fallback if needed | |
| # Requirement: If results are too few, fetch fresh content | |
| K = 3 | |
| if len(final_list) < K: | |
| try: | |
| from ingestion_service import IngestionService | |
| service = IngestionService() | |
| live_arts = service.fetch_from_pubmed(query_raw, limit=K) | |
| for art in live_arts: | |
| if len(final_list) >= top_n: break | |
| final_list.append({ | |
| "article_id": -1, | |
| "title": art['title'], | |
| "category": "Live Fallback", | |
| "format_type": "pubmed", | |
| "external_url": art['url'], | |
| "content": art['content'], | |
| "risk_level": "All" | |
| }) | |
| # Background ingestion (optional here, but requested in strategy) | |
| if live_arts: service.store_articles(live_arts) | |
| except Exception as e: | |
| print(f"Fallback error: {e}") | |
| for item in final_list: | |
| item['access_type'] = 'External Link' if item.get('format_type') == 'pubmed' else 'Direct Text' | |
| if 'created_at' in item and item['created_at']: | |
| item['created_at'] = str(item['created_at']) | |
| return final_list | |
| def apply_ranking(self, df, raw_query): | |
| """Modular requirement: Hybrid ranking engine.""" | |
| # Constants for weighting | |
| SOURCE_WEIGHT = 1.15 # 15% boost for contributor articles | |
| EXACT_MATCH_BOOST = 0.2 | |
| tokens = TextProcessor.normalize(raw_query).split() | |
| now = pd.Timestamp.now() | |
| def calculate_hybrid_score(row): | |
| score = row['cosine_score'] | |
| # A. Source Weighting (Trusted Contributors) | |
| if row['format_type'] == 'text': | |
| score *= SOURCE_WEIGHT | |
| # B. Exact Symptom Overlap Boost | |
| # Check how many user tokens appear exactly in the normalized title | |
| norm_title = TextProcessor.normalize(row['title']) | |
| matches = sum(1 for t in tokens if t in norm_title) | |
| score += (matches * EXACT_MATCH_BOOST) | |
| # C. Recency Boost (PubMed only, newer is better) | |
| if row['format_type'] == 'pubmed' and row['created_at']: | |
| age_days = (now - pd.to_datetime(row['created_at'])).days | |
| # Decaying boost: max 0.1 for brand new, goes to 0 over 365 days | |
| recency_boost = max(0, 0.1 * (1 - (min(age_days, 365) / 365))) | |
| score += recency_boost | |
| return score | |
| df['final_score'] = df.apply(calculate_hybrid_score, axis=1) | |
| return df.sort_values(by='final_score', ascending=False) | |
| def get_article_by_id(self, article_id): | |
| """Fetches a single article by its ID.""" | |
| if self.df is None: return None | |
| article = self.df[self.df['article_id'] == article_id] | |
| return article.iloc[0].to_dict() if not article.empty else None | |
| # Singleton instance to be used by main.py | |
| recommender = RecommenderCore() | |