Spaces:
Sleeping
Sleeping
| # ============================================================================= | |
| # 📰 Newspaper Article Extractor — FastAPI Backend | |
| # Priority queue: one page at a time, user clicks jump to front | |
| # ============================================================================= | |
| import os | |
| import hashlib | |
| import json | |
| import threading | |
| import time | |
| import logging | |
| from pathlib import Path | |
| from collections import deque | |
| from fastapi import FastAPI, UploadFile, File, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import uvicorn | |
| from config import MAX_PDF_PAGES, MAX_PDF_SIZE_MB | |
| from extractor import ExtractionPipeline | |
| from summarizer import NewspaperSummarizer | |
| from rater import ArticleRater | |
| # ---- Logging ---- | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("newspaper_api") | |
| # ---- Config ---- | |
| API_KEY = os.environ.get("SAMBANOVA_API_KEY", "") | |
| assert API_KEY, "Set SAMBANOVA_API_KEY environment variable" | |
| UPLOAD_DIR = Path("/tmp/newspaper_uploads") | |
| UPLOAD_DIR.mkdir(exist_ok=True) | |
| QUEUE_DELAY = 5 # seconds between queued jobs to avoid rate limits | |
| # ---- Initialize once ---- | |
| pipeline = ExtractionPipeline(api_key=API_KEY) | |
| summarizer = NewspaperSummarizer(api_key=API_KEY) | |
| rater = ArticleRater(api_key=API_KEY) | |
| # ---- Cache ---- | |
| extraction_cache = {} # "hash_pageN" → result | |
| extraction_status = {} # "hash_pageN" → "queued"/"running"/"done"/"error" | |
| summary_results = {} # hash → result | |
| summary_status = {} # hash → "running"/"done"/"error" | |
| file_sections = {} # hash → {page_num: section_name} | |
| # ---- Priority Queue ---- | |
| # The queue processes ONE page at a time. User-requested pages jump to front. | |
| page_queue = deque() # [(file_hash, page_num, priority), ...] | |
| queue_lock = threading.Lock() | |
| queue_event = threading.Event() # Signals the worker when new jobs are added | |
| worker_running = False | |
| def get_cache_key(file_hash, page_num): | |
| return f"{file_hash}_page{page_num}" | |
| def is_page_processed(file_hash, page_num): | |
| key = get_cache_key(file_hash, page_num) | |
| return extraction_status.get(key) in ("done", "running") | |
| def enqueue_page(file_hash, page_num, priority="low"): | |
| """Add a page to the queue. HIGH priority goes to front, LOW to back.""" | |
| key = get_cache_key(file_hash, page_num) | |
| with queue_lock: | |
| # Skip if already done or in progress | |
| if extraction_status.get(key) in ("done", "running"): | |
| return | |
| # Remove any existing entry for this page (to re-prioritize) | |
| items = [(h, p, pr) for h, p, pr in page_queue if not (h == file_hash and p == page_num)] | |
| page_queue.clear() | |
| page_queue.extend(items) | |
| if priority == "high": | |
| page_queue.appendleft((file_hash, page_num, priority)) | |
| else: | |
| page_queue.append((file_hash, page_num, priority)) | |
| extraction_status[key] = "queued" | |
| # Wake up the worker | |
| queue_event.set() | |
| ensure_worker_running() | |
| def process_page(file_path, file_hash, page_num): | |
| """Extract + rate a single page. Called by the queue worker.""" | |
| key = get_cache_key(file_hash, page_num) | |
| extraction_status[key] = "running" | |
| try: | |
| # Extract | |
| page_idx = page_num - 1 | |
| result, viz_image, regions, is_digital, total_pages = pipeline.extract( | |
| file_path, page_idx | |
| ) | |
| if result is None: | |
| extraction_status[key] = "error" | |
| extraction_cache[key] = {"error": f"Invalid page. PDF has {total_pages} pages."} | |
| return | |
| articles = result.get("articles", []) | |
| # Rate (text-only call, fast) | |
| ratings = [] | |
| if articles: | |
| section = file_sections.get(file_hash, {}).get(page_num, "UNKNOWN") | |
| logger.info(f"Rating {len(articles)} articles on page {page_num} ({section})") | |
| ratings = rater.rate(articles, section=section, page_num=page_num) | |
| response = { | |
| "page": page_num, | |
| "pdf_type": "digital" if is_digital else "scanned", | |
| "total_regions": len(regions) if regions else 0, | |
| "articles": articles, | |
| "ratings": ratings, | |
| } | |
| extraction_cache[key] = response | |
| extraction_status[key] = "done" | |
| logger.info(f"✅ Page {page_num} done ({len(articles)} articles)") | |
| except Exception as e: | |
| logger.error(f"Page {page_num} failed: {e}", exc_info=True) | |
| extraction_status[key] = "error" | |
| extraction_cache[key] = {"error": str(e)} | |
| def queue_worker(): | |
| """Background worker — processes one page at a time from the queue.""" | |
| global worker_running | |
| logger.info("Queue worker started") | |
| while True: | |
| # Wait for jobs | |
| queue_event.wait() | |
| queue_event.clear() | |
| while True: | |
| # Get next job | |
| job = None | |
| with queue_lock: | |
| if page_queue: | |
| job = page_queue.popleft() | |
| if job is None: | |
| break | |
| file_hash, page_num, priority = job | |
| file_path = str(UPLOAD_DIR / f"{file_hash}.pdf") | |
| if not os.path.exists(file_path): | |
| logger.warning(f"PDF not found for {file_hash}") | |
| continue | |
| # Skip if already done | |
| key = get_cache_key(file_hash, page_num) | |
| if extraction_status.get(key) == "done": | |
| continue | |
| logger.info(f"Processing page {page_num} (priority: {priority})") | |
| process_page(file_path, file_hash, page_num) | |
| # Delay between pages to avoid rate limits | |
| time.sleep(QUEUE_DELAY) | |
| worker_running = False | |
| def ensure_worker_running(): | |
| """Start the queue worker thread if not already running.""" | |
| global worker_running | |
| if not worker_running: | |
| worker_running = True | |
| thread = threading.Thread(target=queue_worker, daemon=True) | |
| thread.start() | |
| # ---- Summary (runs independently of queue) ---- | |
| def run_summary_background(file_path, file_hash): | |
| """Run summary in its own thread — doesn't use the page queue.""" | |
| try: | |
| summary_status[file_hash] = "running" | |
| # Detect sections first | |
| sections = summarizer._detect_page_sections(file_path) | |
| file_sections[file_hash] = sections | |
| logger.info(f"Sections detected: {sections}") | |
| # Run summary | |
| result = summarizer.summarize(file_path) | |
| summary_results[file_hash] = result | |
| summary_status[file_hash] = "done" | |
| logger.info(f"Summary complete: {len(result.get('important_articles', []))} articles") | |
| # Queue important pages from summary (low priority) | |
| important_pages = [] | |
| for article in result.get("important_articles", []): | |
| p = article.get("page") | |
| if p and p not in important_pages: | |
| important_pages.append(p) | |
| for page_num in important_pages[:8]: # Max 8 important pages | |
| enqueue_page(file_hash, page_num, priority="low") | |
| except Exception as e: | |
| logger.error(f"Summary failed: {e}", exc_info=True) | |
| summary_status[file_hash] = "error" | |
| summary_results[file_hash] = {"error": str(e)} | |
| # ---- Helpers ---- | |
| def save_upload(upload_file: UploadFile) -> tuple: | |
| content = upload_file.file.read() | |
| file_hash = hashlib.md5(content[:10 * 1024 * 1024]).hexdigest() | |
| file_path = str(UPLOAD_DIR / f"{file_hash}.pdf") | |
| if not os.path.exists(file_path): | |
| with open(file_path, "wb") as f: | |
| f.write(content) | |
| return file_path, file_hash | |
| def detect_front_and_editorial(file_path, file_hash): | |
| """Detect front page and editorial page numbers. Returns (front, editorial).""" | |
| sections = file_sections.get(file_hash, {}) | |
| front_page = 1 # Default to page 1 | |
| editorial_page = None | |
| for page_num, section in sections.items(): | |
| upper = section.upper() | |
| if upper in ("FRONT PAGE", "NATIONAL") and page_num <= 2: | |
| front_page = page_num | |
| if upper in ("EDITORIAL", "OP-ED", "OPINION") and editorial_page is None: | |
| editorial_page = page_num | |
| return front_page, editorial_page | |
| # ---- FastAPI ---- | |
| app = FastAPI(title="Newspaper Article Extractor API", version="6.0") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def health_check(): | |
| return {"status": "ok"} | |
| async def upload_pdf(file: UploadFile = File(...)): | |
| """Upload PDF → validate → start summary → queue front page + editorial.""" | |
| if not file.filename.lower().endswith(".pdf"): | |
| raise HTTPException(400, "Only PDF files are accepted") | |
| file_path, file_hash = save_upload(file) | |
| size_mb = os.path.getsize(file_path) / (1024 * 1024) | |
| if size_mb > MAX_PDF_SIZE_MB: | |
| os.remove(file_path) | |
| raise HTTPException(400, f"PDF is {size_mb:.1f} MB. Max {MAX_PDF_SIZE_MB} MB.") | |
| try: | |
| total_pages = pipeline.get_page_count(file_path) | |
| except Exception as e: | |
| os.remove(file_path) | |
| raise HTTPException(400, f"Could not read PDF: {e}") | |
| if total_pages > MAX_PDF_PAGES: | |
| os.remove(file_path) | |
| raise HTTPException(400, f"PDF has {total_pages} pages. Max {MAX_PDF_PAGES}.") | |
| # Detect sections (instant) | |
| sections = summarizer._detect_page_sections(file_path) | |
| file_sections[file_hash] = sections | |
| # Start summary immediately (separate thread, not in page queue) | |
| if file_hash not in summary_status: | |
| thread = threading.Thread( | |
| target=run_summary_background, | |
| args=(file_path, file_hash), | |
| daemon=True, | |
| ) | |
| thread.start() | |
| # Queue front page and editorial | |
| front_page, editorial_page = detect_front_and_editorial(file_path, file_hash) | |
| enqueue_page(file_hash, front_page, priority="low") | |
| if editorial_page and editorial_page != front_page: | |
| enqueue_page(file_hash, editorial_page, priority="low") | |
| return { | |
| "file_hash": file_hash, | |
| "filename": file.filename, | |
| "size_mb": round(size_mb, 1), | |
| "total_pages": total_pages, | |
| "sections": sections, | |
| "front_page": front_page, | |
| "editorial_page": editorial_page, | |
| } | |
| def start_extraction(file_hash: str, page_num: int): | |
| """Start extraction — user-requested page jumps to front of queue.""" | |
| file_path = str(UPLOAD_DIR / f"{file_hash}.pdf") | |
| if not os.path.exists(file_path): | |
| raise HTTPException(404, "PDF not found. Please upload again.") | |
| key = get_cache_key(file_hash, page_num) | |
| # Already done — return immediately | |
| if extraction_status.get(key) == "done": | |
| return {"status": "done", "result": extraction_cache[key]} | |
| # Already running | |
| if extraction_status.get(key) == "running": | |
| return {"status": "running", "message": "Extraction in progress..."} | |
| # Queue with HIGH priority (jumps to front) | |
| enqueue_page(file_hash, page_num, priority="high") | |
| # Also queue next page with low priority | |
| total_pages = pipeline.get_page_count(file_path) | |
| next_page = page_num + 1 | |
| if next_page <= total_pages: | |
| enqueue_page(file_hash, next_page, priority="low") | |
| return {"status": "started", "message": "Extraction queued with high priority."} | |
| def get_extraction(file_hash: str, page_num: int): | |
| """Poll for extraction results.""" | |
| key = get_cache_key(file_hash, page_num) | |
| status = extraction_status.get(key) | |
| if status is None: | |
| return {"status": "not_started"} | |
| if status == "queued": | |
| # Show position in queue | |
| position = 0 | |
| with queue_lock: | |
| for i, (h, p, pr) in enumerate(page_queue): | |
| if h == file_hash and p == page_num: | |
| position = i + 1 | |
| break | |
| msg = f"Queued (position {position})" if position > 0 else "Queued" | |
| return {"status": "queued", "message": msg} | |
| if status == "running": | |
| return {"status": "running", "message": "Extracting articles..."} | |
| if status == "error": | |
| error = extraction_cache.get(key, {}).get("error", "Unknown error") | |
| return {"status": "error", "message": error} | |
| # Done | |
| return {"status": "done", "result": extraction_cache[key]} | |
| def get_summary(file_hash: str): | |
| """Get newspaper summary.""" | |
| status = summary_status.get(file_hash) | |
| if status is None: | |
| return {"status": "not_started"} | |
| if status == "running": | |
| return {"status": "running", "message": "Scanning newspaper..."} | |
| if status == "error": | |
| error = summary_results.get(file_hash, {}).get("error", "Unknown") | |
| return {"status": "error", "message": error} | |
| result = summary_results.get(file_hash, {}) | |
| return { | |
| "status": "done", | |
| "total_headlines_found": result.get("total_headlines_found", 0), | |
| "important_articles": result.get("important_articles", []), | |
| } | |
| def get_queue_status(file_hash: str): | |
| """See what's in the queue for this PDF.""" | |
| with queue_lock: | |
| items = [ | |
| {"page": p, "priority": pr} | |
| for h, p, pr in page_queue | |
| if h == file_hash | |
| ] | |
| cached = [ | |
| int(key.split("_page")[1]) | |
| for key, status in extraction_status.items() | |
| if key.startswith(file_hash) and status == "done" | |
| ] | |
| return { | |
| "queued": items, | |
| "cached_pages": sorted(cached), | |
| } | |
| def get_layout_image(file_hash: str, page_num: int): | |
| """Get annotated layout image as base64 JPEG.""" | |
| import base64 | |
| import io | |
| file_path = str(UPLOAD_DIR / f"{file_hash}.pdf") | |
| if not os.path.exists(file_path): | |
| raise HTTPException(404, "PDF not found.") | |
| try: | |
| image, total = pipeline._pdf_page_to_image(file_path, page_num - 1) | |
| if image is None: | |
| raise HTTPException(400, f"Invalid page. PDF has {total} pages.") | |
| regions = pipeline._detect_layout(image) | |
| viz = pipeline._visualize_layout(image, regions) | |
| buf = io.BytesIO() | |
| viz.thumbnail((1400, 1400)) | |
| viz.save(buf, format="JPEG", quality=80) | |
| return {"image_base64": base64.b64encode(buf.getvalue()).decode()} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(500, str(e)) | |
| def get_page_image(file_hash: str, page_num: int): | |
| """Get clean page image as base64 JPEG.""" | |
| import base64 | |
| import io | |
| file_path = str(UPLOAD_DIR / f"{file_hash}.pdf") | |
| if not os.path.exists(file_path): | |
| raise HTTPException(404, "PDF not found.") | |
| try: | |
| image, total = pipeline._pdf_page_to_image(file_path, page_num - 1) | |
| if image is None: | |
| raise HTTPException(400, f"Invalid page. PDF has {total} pages.") | |
| buf = io.BytesIO() | |
| image.thumbnail((1400, 1400)) | |
| image.save(buf, format="JPEG", quality=80) | |
| return {"image_base64": base64.b64encode(buf.getvalue()).decode()} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(500, str(e)) | |
| # ---- Run ---- | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) |