Spaces:
Sleeping
Sleeping
File size: 7,222 Bytes
d37e0aa | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | import pandas as pd
import joblib
import os
import time
from sqlalchemy import create_engine
from sklearn.metrics.pairwise import cosine_similarity
from urllib.parse import quote_plus
from text_utils import TextProcessor
from functools import lru_cache
# --- CONFIGURATION ---
# For cloud deployment (HF/Production), use DATABASE_URL.
# Fallback to local construction if not present.
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
DB_USER = os.getenv("DB_USER", "postgres")
DB_PASSWORD = quote_plus(os.getenv("DB_PASSWORD", "subisu"))
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "5432")
DB_NAME = os.getenv("DB_NAME", "ppd_project_db")
DB_URI = f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
else:
# Ensure URL is compatible with SQLAlchemy if it starts with postgres://
if DATABASE_URL.startswith("postgres://"):
DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql+psycopg2://", 1)
elif "postgresql://" in DATABASE_URL and "+psycopg2" not in DATABASE_URL:
DATABASE_URL = DATABASE_URL.replace("postgresql://", "postgresql+psycopg2://", 1)
DB_URI = DATABASE_URL
class RecommenderCore:
def __init__(self):
self.engine = create_engine(DB_URI)
self.vectorizer = None
self.tfidf_matrix = None
self.df = None
self.load_model()
def load_model(self):
try:
if os.path.exists('vectorizer.pkl') and os.path.exists('tfidf_matrix.pkl'):
self.vectorizer = joblib.load('vectorizer.pkl')
self.tfidf_matrix = joblib.load('tfidf_matrix.pkl')
print("💾 Model Loaded into Memory.")
self.df = pd.read_sql("SELECT * FROM articles WHERE status = 'Approved' ORDER BY article_id", self.engine)
self.df = self.df.reset_index(drop=True)
print(f"📚 Indexed {len(self.df)} articles.")
except Exception as e:
print(f"Load Error: {e}")
@lru_cache(maxsize=128)
def recommend_articles(self, symptoms_text, crisis_level, top_n=5):
"""Modular requirement: Main entry point with caching."""
if self.df is None or self.vectorizer is None:
return []
# 1. Preprocess user query
query_raw = symptoms_text
query_norm = TextProcessor.normalize(symptoms_text)
query_phased = TextProcessor.detect_phrases(query_norm)
# 2. Filter by Crisis Level (Safety First)
risk_map = {
"High": ["High", "Critical", "Moderate", "All"],
"Moderate": ["Moderate", "Low", "All"],
"Low": ["Low", "All"]
}
allowed = risk_map.get(crisis_level, ["All"])
# Determine the filtered subset
mask = self.df['risk_level'].apply(
lambda x: any(level.strip() in allowed for level in str(x).split(','))
)
filtered_df = self.df[mask].copy()
if filtered_df.empty: return []
# 3. Primary ML Scoring (Cosine Similarity)
user_vec = self.vectorizer.transform([query_phased])
all_cos_scores = cosine_similarity(user_vec, self.tfidf_matrix).flatten()
# 4. Final Ranking
# Correctly align scores using the original dataframe's index
# SAFETY: Ensure we don't exceed the bounds of the scores array (mismatch protection)
max_idx = len(all_cos_scores)
cos_scores_for_filtered = []
for i in filtered_df.index:
if i < max_idx:
cos_scores_for_filtered.append(all_cos_scores[i])
else:
cos_scores_for_filtered.append(0.0)
filtered_df['cosine_score'] = cos_scores_for_filtered
# Apply the hybrid ranking engine
ranked_results = self.apply_ranking(filtered_df, query_raw)
# Format for output
final_list = ranked_results.head(top_n).to_dict('records')
# 5. Live Fallback if needed
# Requirement: If results are too few, fetch fresh content
K = 3
if len(final_list) < K:
try:
from ingestion_service import IngestionService
service = IngestionService()
live_arts = service.fetch_from_pubmed(query_raw, limit=K)
for art in live_arts:
if len(final_list) >= top_n: break
final_list.append({
"article_id": -1,
"title": art['title'],
"category": "Live Fallback",
"format_type": "pubmed",
"external_url": art['url'],
"content": art['content'],
"risk_level": "All"
})
# Background ingestion (optional here, but requested in strategy)
if live_arts: service.store_articles(live_arts)
except Exception as e:
print(f"Fallback error: {e}")
for item in final_list:
item['access_type'] = 'External Link' if item.get('format_type') == 'pubmed' else 'Direct Text'
if 'created_at' in item and item['created_at']:
item['created_at'] = str(item['created_at'])
return final_list
def apply_ranking(self, df, raw_query):
"""Modular requirement: Hybrid ranking engine."""
# Constants for weighting
SOURCE_WEIGHT = 1.15 # 15% boost for contributor articles
EXACT_MATCH_BOOST = 0.2
tokens = TextProcessor.normalize(raw_query).split()
now = pd.Timestamp.now()
def calculate_hybrid_score(row):
score = row['cosine_score']
# A. Source Weighting (Trusted Contributors)
if row['format_type'] == 'text':
score *= SOURCE_WEIGHT
# B. Exact Symptom Overlap Boost
# Check how many user tokens appear exactly in the normalized title
norm_title = TextProcessor.normalize(row['title'])
matches = sum(1 for t in tokens if t in norm_title)
score += (matches * EXACT_MATCH_BOOST)
# C. Recency Boost (PubMed only, newer is better)
if row['format_type'] == 'pubmed' and row['created_at']:
age_days = (now - pd.to_datetime(row['created_at'])).days
# Decaying boost: max 0.1 for brand new, goes to 0 over 365 days
recency_boost = max(0, 0.1 * (1 - (min(age_days, 365) / 365)))
score += recency_boost
return score
df['final_score'] = df.apply(calculate_hybrid_score, axis=1)
return df.sort_values(by='final_score', ascending=False)
def get_article_by_id(self, article_id):
"""Fetches a single article by its ID."""
if self.df is None: return None
article = self.df[self.df['article_id'] == article_id]
return article.iloc[0].to_dict() if not article.empty else None
# Singleton instance to be used by main.py
recommender = RecommenderCore()
|