ppd-recommendation-api / ingestion_service.py
appledog00's picture
Update ingestion_service.py
16f0d20 verified
import time
import pandas as pd
import joblib
from Bio import Entrez
from sqlalchemy import create_engine, text
from urllib.parse import quote_plus
from text_utils import TextProcessor
from sklearn.feature_extraction.text import TfidfVectorizer
# --- CONFIGURATION ---
import os
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
DB_USER = os.getenv("DB_USER", "postgres")
DB_PASSWORD = quote_plus(os.getenv("DB_PASSWORD", "subisu"))
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "5432")
DB_NAME = os.getenv("DB_NAME", "ppd_project_db")
DB_URI = f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
else:
# Ensure URL is compatible with SQLAlchemy if it starts with postgres://
if DATABASE_URL.startswith("postgres://"):
DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql+psycopg2://", 1)
elif "postgresql://" in DATABASE_URL and "+psycopg2" not in DATABASE_URL:
DATABASE_URL = DATABASE_URL.replace("postgresql://", "postgresql+psycopg2://", 1)
DB_URI = DATABASE_URL
Entrez.email = "surbi.211740@ncit.edu.np"
Entrez.tool = "PPD_Recommender_App"
class IngestionService:
def __init__(self):
self.engine = create_engine(DB_URI)
def fetch_from_pubmed(self, query, limit=100):
print(f"🔍 Searching PubMed: '{query}'...")
try:
h1 = Entrez.esearch(db="pubmed", term=query, retmax=limit, sort="relevance")
ids = Entrez.read(h1)["IdList"]
if not ids: return []
h2 = Entrez.efetch(db="pubmed", id=ids, retmode="xml")
papers = Entrez.read(h2)
results = []
for paper in papers['PubmedArticle']:
try:
article = paper['MedlineCitation']['Article']
title = article.get('ArticleTitle', '')
abstract_data = article.get('Abstract', {}).get('AbstractText', [])
abstract = " ".join([str(x) for x in abstract_data]) if isinstance(abstract_data, list) else str(abstract_data)
if not abstract: continue
results.append({
"title": title,
"content": abstract,
"url": f"https://pubmed.ncbi.nlm.nih.gov/{paper['MedlineCitation']['PMID']}/"
})
except: continue
return results
except Exception as e:
print(f"Pubmed Error: {e}")
return []
def store_articles(self, articles, category="General", risk="All"):
"""Modular requirement: Stores articles with deduplication."""
added = 0
with self.engine.connect() as conn:
for art in articles:
# Preprocessing
clean_title = TextProcessor.clean_html(art['title'])
clean_content = TextProcessor.clean_html(art['content'])
query = text("""
INSERT INTO articles
(title, content_clean, content_raw, category, risk_level, status, format_type, external_url)
VALUES (:t, :cc, :cr, :cat, :risk, 'Approved', 'pubmed', :url)
ON CONFLICT (external_url) DO NOTHING
""")
try:
res = conn.execute(query, {
"t": clean_title,
"cc": clean_content,
"cr": f"<h3>Source: PubMed</h3><p>{art['content']}</p>",
"cat": category,
"risk": risk,
"url": art['url']
})
conn.commit()
if res.rowcount > 0: added += 1
except Exception as e:
print(f"DB Error: {e}")
print(f"✅ Stored {added} new articles.")
return added
def build_tfidf_model(self, force=False):
"""Modular requirement: Builds the TF-IDF model with weighted fields."""
print("🧠 Building Weighted TF-IDF Model...")
# Use ORDER BY for deterministic indexing
df = pd.read_sql("SELECT * FROM articles WHERE status = 'Approved' ORDER BY article_id", self.engine)
df = df.reset_index(drop=True)
if df.empty:
print("⚠️ No articles to build model.")
return
# Multi-Field Weighting
# Title (3x) + Content (1x) + Tags/Categories (1x)
# We also apply normalization and phrase detection
def prepare_features(row):
title = TextProcessor.normalize(row['title'])
content = TextProcessor.normalize(row['content_clean'])
tags = TextProcessor.normalize(str(row['tags']) + " " + str(row['category']))
# Phrase detection on title and content
title = TextProcessor.detect_phrases(title)
content = TextProcessor.detect_phrases(content)
# Weighted concatenation
return (title + " ") * 3 + content + " " + tags
features = df.apply(prepare_features, axis=1)
vectorizer = TfidfVectorizer(ngram_range=(1, 2)) # Support bigrams natively
tfidf_matrix = vectorizer.fit_transform(features)
joblib.dump(vectorizer, 'vectorizer.pkl')
joblib.dump(tfidf_matrix, 'tfidf_matrix.pkl')
print(f"💾 Model optimized and saved. Vocabulary size: {len(vectorizer.vocabulary_)}")
if __name__ == "__main__":
service = IngestionService()
# 24-hour broad update
arts = service.fetch_from_pubmed("postpartum depression OR maternal mental health", 100)
if arts:
service.store_articles(arts)
service.build_tfidf_model()