# ============================================================================= # 📰 Newspaper Summarizer # Scans all pages for headlines using font size, then ranks via LLM. # No YOLO, no vision model, no OCR. Fast and standalone. # # Usage: # from summarizer import NewspaperSummarizer # summarizer = NewspaperSummarizer(api_key="...") # result = summarizer.summarize("newspaper.pdf") # ============================================================================= import json import time import re import fitz from openai import OpenAI import logging from config import ( LLM_BASE_URL, TEXT_MODEL, HEADLINE_MIN_FONT_SIZE, TOP_ARTICLES_COUNT, SUMMARY_PROMPT, SECTION_NAMES, SKIP_SECTIONS, SECTION_TIERS, ) logger = logging.getLogger("newspaper_summarizer") class NewspaperSummarizer: """Scans a newspaper PDF for headlines and generates importance-ranked summaries.""" def __init__(self, api_key): self.llm_client = OpenAI(base_url=LLM_BASE_URL, api_key=api_key) logger.info("✅ Summarizer initialized") def summarize(self, pdf_path): """ Full pipeline: detect sections → scan headlines → rank via LLM. Returns: {"important_articles": [...], "total_headlines_found": int, "sections": {...}} """ sections = self._detect_page_sections(pdf_path) logger.info(f"Detected sections: {sections}") headlines = self._scan_headlines(pdf_path, sections) logger.info(f"Scanned {len(headlines)} headlines from PDF") if not headlines: return {"important_articles": [], "total_headlines_found": 0, "sections": sections} ranked = self._rank_headlines(headlines) ranked["total_headlines_found"] = len(headlines) ranked["sections"] = sections return ranked def _detect_page_sections(self, pdf_path): """ Detect section names from page headers. Returns dict: {page_num (1-indexed): section_name} """ doc = fitz.open(pdf_path) sections = {} section_names_lower = {s.lower(): s for s in SECTION_NAMES} for page_num in range(doc.page_count): page = doc[page_num] page_height = page.rect.height # Scan top 12% of the page for section headers header_rect = fitz.Rect(0, 0, page.rect.width, page_height * 0.12) blocks = page.get_text("dict", clip=header_rect)["blocks"] best_match = None best_size = 0 for block in blocks: if block.get("type") != 0: continue for line in block.get("lines", []): for span in line.get("spans", []): text = span.get("text", "").strip().upper() size = span.get("size", 0) # Check against known section names for known_lower, known_original in section_names_lower.items(): if known_lower in text.lower() and size > best_size: best_match = known_original best_size = size if best_match: sections[page_num + 1] = best_match elif page_num == 0: sections[1] = "FRONT PAGE" doc.close() return sections def _get_tier(self, section_name): """Get tier (1-3) for a section. Returns 3 as default.""" if not section_name: return 3 upper = section_name.upper() for tier, names in SECTION_TIERS.items(): if upper in [n.upper() for n in names]: return tier return 3 def _scan_headlines(self, pdf_path, sections=None): """ Extract headlines from all pages using font size detection. Includes section context from detected sections. Skips pages with low-value sections. """ if sections is None: sections = {} doc = fitz.open(pdf_path) headlines = [] for page_num in range(doc.page_count): section = sections.get(page_num + 1, "UNKNOWN") # Skip low-value sections entirely if section.lower() in SKIP_SECTIONS: continue page = doc[page_num] blocks = page.get_text("dict")["blocks"] page_texts = [] # Collect all text spans with their font size and position for block in blocks: if block.get("type") != 0: # type 0 = text block continue for line in block.get("lines", []): for span in line.get("spans", []): text = span.get("text", "").strip() if not text: continue page_texts.append({ "text": text, "size": round(span.get("size", 0), 1), "y": round(span.get("origin", [0, 0])[1], 1), "flags": span.get("flags", 0), # bold, italic etc. }) if not page_texts: continue # Find headline spans (font size above threshold) # Group consecutive spans at the same font size into one headline i = 0 while i < len(page_texts): span = page_texts[i] if span["size"] >= HEADLINE_MIN_FONT_SIZE: # Collect consecutive headline-sized spans into one headline headline_parts = [span["text"]] headline_y = span["y"] headline_size = span["size"] j = i + 1 while j < len(page_texts): next_span = page_texts[j] # Same headline if similar font size and close vertical position if (abs(next_span["size"] - headline_size) < 2 and abs(next_span["y"] - page_texts[j-1]["y"]) < 20): headline_parts.append(next_span["text"]) j += 1 else: break headline_text = " ".join(headline_parts).strip() # Skip very short headlines (likely page numbers, labels) if len(headline_text) < 10: i = j continue # Skip common non-article text skip_patterns = [ r"^the\s+hindu$", r"^page\s+\d+", r"^\d+$", r"^www\.", r"^https?://", r"continued\s+on", r"continued\s+from", ] if any(re.search(p, headline_text, re.IGNORECASE) for p in skip_patterns): i = j continue # Grab snippet: first ~200 chars of body text below this headline snippet = "" for k in range(j, min(j + 15, len(page_texts))): body_span = page_texts[k] if body_span["size"] < HEADLINE_MIN_FONT_SIZE: snippet += body_span["text"] + " " if len(snippet) >= 200: break snippet = snippet[:200].strip() headlines.append({ "page": page_num + 1, "headline": headline_text, "snippet": snippet, "font_size": headline_size, "section": section, "tier": self._get_tier(section), }) i = j else: i += 1 doc.close() # Deduplicate headlines (same text on same page) seen = set() unique = [] for h in headlines: key = (h["page"], h["headline"].lower()) if key not in seen: seen.add(key) unique.append(h) # Sort by font size descending (bigger headline = more important) unique.sort(key=lambda h: h["font_size"], reverse=True) return unique def _rank_headlines(self, headlines, max_retries=3): """Send headlines to text LLM for importance ranking and summary.""" # Build headline list with section and tier context lines = [] for h in headlines: tier = h.get("tier", 3) section = h.get("section", "UNKNOWN") line = f"[Tier {tier}] Page {h['page']} — {section}: \"{h['headline']}\"" if h["snippet"]: line += f" — {h['snippet']}" lines.append(line) headlines_text = "\n".join(lines) prompt = SUMMARY_PROMPT.format( headlines_list=headlines_text, count=TOP_ARTICLES_COUNT, ) # Call LLM with retry for attempt in range(max_retries): try: response = self.llm_client.chat.completions.create( model=TEXT_MODEL, messages=[ { "role": "system", "content": "You are a newspaper editor. Respond with valid JSON only, no markdown fences.", }, {"role": "user", "content": prompt}, ], temperature=0.1, max_tokens=4096, ) raw = response.choices[0].message.content.strip() if raw.startswith("```"): raw = raw.split("\n", 1)[1].rsplit("```", 1)[0] result = json.loads(raw) logger.info( f"LLM ranked {len(result.get('important_articles', []))} important articles" ) return result except Exception as e: if "429" in str(e) or "rate" in str(e).lower(): wait = 60 match = re.search(r"(\d+\.?\d*)\s*s", str(e)) if match: wait = float(match.group(1)) + 2 logger.warning(f"Rate limited, waiting {wait:.0f}s (attempt {attempt + 1})") time.sleep(wait) continue logger.error(f"LLM ranking failed: {e}") raise raise RuntimeError("Summary LLM failed after retries")