ppd-recommendation-api / recommender_core.py
appledog00's picture
Update recommender_core.py
d37e0aa verified
import pandas as pd
import joblib
import os
import time
from sqlalchemy import create_engine
from sklearn.metrics.pairwise import cosine_similarity
from urllib.parse import quote_plus
from text_utils import TextProcessor
from functools import lru_cache
# --- CONFIGURATION ---
# For cloud deployment (HF/Production), use DATABASE_URL.
# Fallback to local construction if not present.
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
DB_USER = os.getenv("DB_USER", "postgres")
DB_PASSWORD = quote_plus(os.getenv("DB_PASSWORD", "subisu"))
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "5432")
DB_NAME = os.getenv("DB_NAME", "ppd_project_db")
DB_URI = f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
else:
# Ensure URL is compatible with SQLAlchemy if it starts with postgres://
if DATABASE_URL.startswith("postgres://"):
DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql+psycopg2://", 1)
elif "postgresql://" in DATABASE_URL and "+psycopg2" not in DATABASE_URL:
DATABASE_URL = DATABASE_URL.replace("postgresql://", "postgresql+psycopg2://", 1)
DB_URI = DATABASE_URL
class RecommenderCore:
def __init__(self):
self.engine = create_engine(DB_URI)
self.vectorizer = None
self.tfidf_matrix = None
self.df = None
self.load_model()
def load_model(self):
try:
if os.path.exists('vectorizer.pkl') and os.path.exists('tfidf_matrix.pkl'):
self.vectorizer = joblib.load('vectorizer.pkl')
self.tfidf_matrix = joblib.load('tfidf_matrix.pkl')
print("💾 Model Loaded into Memory.")
self.df = pd.read_sql("SELECT * FROM articles WHERE status = 'Approved' ORDER BY article_id", self.engine)
self.df = self.df.reset_index(drop=True)
print(f"📚 Indexed {len(self.df)} articles.")
except Exception as e:
print(f"Load Error: {e}")
@lru_cache(maxsize=128)
def recommend_articles(self, symptoms_text, crisis_level, top_n=5):
"""Modular requirement: Main entry point with caching."""
if self.df is None or self.vectorizer is None:
return []
# 1. Preprocess user query
query_raw = symptoms_text
query_norm = TextProcessor.normalize(symptoms_text)
query_phased = TextProcessor.detect_phrases(query_norm)
# 2. Filter by Crisis Level (Safety First)
risk_map = {
"High": ["High", "Critical", "Moderate", "All"],
"Moderate": ["Moderate", "Low", "All"],
"Low": ["Low", "All"]
}
allowed = risk_map.get(crisis_level, ["All"])
# Determine the filtered subset
mask = self.df['risk_level'].apply(
lambda x: any(level.strip() in allowed for level in str(x).split(','))
)
filtered_df = self.df[mask].copy()
if filtered_df.empty: return []
# 3. Primary ML Scoring (Cosine Similarity)
user_vec = self.vectorizer.transform([query_phased])
all_cos_scores = cosine_similarity(user_vec, self.tfidf_matrix).flatten()
# 4. Final Ranking
# Correctly align scores using the original dataframe's index
# SAFETY: Ensure we don't exceed the bounds of the scores array (mismatch protection)
max_idx = len(all_cos_scores)
cos_scores_for_filtered = []
for i in filtered_df.index:
if i < max_idx:
cos_scores_for_filtered.append(all_cos_scores[i])
else:
cos_scores_for_filtered.append(0.0)
filtered_df['cosine_score'] = cos_scores_for_filtered
# Apply the hybrid ranking engine
ranked_results = self.apply_ranking(filtered_df, query_raw)
# Format for output
final_list = ranked_results.head(top_n).to_dict('records')
# 5. Live Fallback if needed
# Requirement: If results are too few, fetch fresh content
K = 3
if len(final_list) < K:
try:
from ingestion_service import IngestionService
service = IngestionService()
live_arts = service.fetch_from_pubmed(query_raw, limit=K)
for art in live_arts:
if len(final_list) >= top_n: break
final_list.append({
"article_id": -1,
"title": art['title'],
"category": "Live Fallback",
"format_type": "pubmed",
"external_url": art['url'],
"content": art['content'],
"risk_level": "All"
})
# Background ingestion (optional here, but requested in strategy)
if live_arts: service.store_articles(live_arts)
except Exception as e:
print(f"Fallback error: {e}")
for item in final_list:
item['access_type'] = 'External Link' if item.get('format_type') == 'pubmed' else 'Direct Text'
if 'created_at' in item and item['created_at']:
item['created_at'] = str(item['created_at'])
return final_list
def apply_ranking(self, df, raw_query):
"""Modular requirement: Hybrid ranking engine."""
# Constants for weighting
SOURCE_WEIGHT = 1.15 # 15% boost for contributor articles
EXACT_MATCH_BOOST = 0.2
tokens = TextProcessor.normalize(raw_query).split()
now = pd.Timestamp.now()
def calculate_hybrid_score(row):
score = row['cosine_score']
# A. Source Weighting (Trusted Contributors)
if row['format_type'] == 'text':
score *= SOURCE_WEIGHT
# B. Exact Symptom Overlap Boost
# Check how many user tokens appear exactly in the normalized title
norm_title = TextProcessor.normalize(row['title'])
matches = sum(1 for t in tokens if t in norm_title)
score += (matches * EXACT_MATCH_BOOST)
# C. Recency Boost (PubMed only, newer is better)
if row['format_type'] == 'pubmed' and row['created_at']:
age_days = (now - pd.to_datetime(row['created_at'])).days
# Decaying boost: max 0.1 for brand new, goes to 0 over 365 days
recency_boost = max(0, 0.1 * (1 - (min(age_days, 365) / 365)))
score += recency_boost
return score
df['final_score'] = df.apply(calculate_hybrid_score, axis=1)
return df.sort_values(by='final_score', ascending=False)
def get_article_by_id(self, article_id):
"""Fetches a single article by its ID."""
if self.df is None: return None
article = self.df[self.df['article_id'] == article_id]
return article.iloc[0].to_dict() if not article.empty else None
# Singleton instance to be used by main.py
recommender = RecommenderCore()