Spaces:
Running
Running
| """ | |
| V4 API endpoint for structured summarization with streaming. | |
| """ | |
| import json | |
| import time | |
| from fastapi import APIRouter, HTTPException, Request | |
| from fastapi.responses import StreamingResponse | |
| from app.api.v4.schemas import StructuredSummaryRequest | |
| from app.core.logging import get_logger | |
| from app.services.article_scraper import article_scraper_service | |
| from app.services.structured_summarizer import structured_summarizer_service | |
| router = APIRouter() | |
| logger = get_logger(__name__) | |
| async def scrape_and_summarize_stream( | |
| request: Request, payload: StructuredSummaryRequest | |
| ): | |
| """ | |
| V4: Structured summarization with streaming support. | |
| Supports two modes: | |
| 1. URL mode: Scrape article from URL then generate structured summary | |
| 2. Text mode: Generate structured summary from provided text | |
| Returns structured JSON summary with: | |
| - title: Click-worthy title | |
| - main_summary: 2-4 sentence summary | |
| - key_points: 3-5 bullet points | |
| - category: Topic category | |
| - sentiment: positive/negative/neutral | |
| - read_time_min: Estimated reading time | |
| Response format: | |
| Server-Sent Events stream with: | |
| - Metadata event (if include_metadata=true) | |
| - Content chunks (streaming JSON tokens) | |
| - Done event (final latency) | |
| """ | |
| request_id = getattr(request.state, "request_id", "unknown") | |
| # Determine input mode and prepare data | |
| if payload.url: | |
| # URL Mode: Scrape + Summarize | |
| logger.info(f"[{request_id}] V4 URL mode: {payload.url[:80]}...") | |
| scrape_start = time.time() | |
| try: | |
| article_data = await article_scraper_service.scrape_article( | |
| url=payload.url, use_cache=payload.use_cache | |
| ) | |
| except Exception as e: | |
| logger.error(f"[{request_id}] Scraping failed: {e}") | |
| raise HTTPException( | |
| status_code=502, detail=f"Failed to scrape article: {str(e)}" | |
| ) | |
| scrape_latency_ms = (time.time() - scrape_start) * 1000 | |
| logger.info( | |
| f"[{request_id}] Scraped in {scrape_latency_ms:.2f}ms, " | |
| f"extracted {len(article_data['text'])} chars" | |
| ) | |
| # Validate scraped content | |
| if len(article_data["text"]) < 100: | |
| raise HTTPException( | |
| status_code=422, | |
| detail="Insufficient content extracted from URL. " | |
| "Article may be behind paywall or site may block scrapers.", | |
| ) | |
| text_to_summarize = article_data["text"] | |
| metadata = { | |
| "input_type": "url", | |
| "url": payload.url, | |
| "title": article_data.get("title"), | |
| "author": article_data.get("author"), | |
| "date": article_data.get("date"), | |
| "site_name": article_data.get("site_name"), | |
| "scrape_method": article_data.get("method", "static"), | |
| "scrape_latency_ms": scrape_latency_ms, | |
| "extracted_text_length": len(article_data["text"]), | |
| "style": payload.style.value, | |
| } | |
| else: | |
| # Text Mode: Direct Summarization | |
| logger.info(f"[{request_id}] V4 text mode: {len(payload.text)} chars") | |
| text_to_summarize = payload.text | |
| metadata = { | |
| "input_type": "text", | |
| "text_length": len(payload.text), | |
| "style": payload.style.value, | |
| } | |
| # Stream structured summarization | |
| return StreamingResponse( | |
| _stream_generator(text_to_summarize, payload, metadata, request_id), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| "X-Request-ID": request_id, | |
| }, | |
| ) | |
| async def _stream_generator(text: str, payload, metadata: dict, request_id: str): | |
| """Generate SSE stream for structured summarization.""" | |
| # Send metadata event first | |
| if payload.include_metadata: | |
| metadata_event = {"type": "metadata", "data": metadata} | |
| yield f"data: {json.dumps(metadata_event)}\n\n" | |
| # Stream structured summarization chunks | |
| summarization_start = time.time() | |
| tokens_used = 0 | |
| try: | |
| async for chunk in structured_summarizer_service.summarize_structured_stream( | |
| text=text, | |
| style=payload.style.value, | |
| max_tokens=payload.max_tokens, | |
| ): | |
| # Track tokens | |
| if not chunk.get("done", False): | |
| tokens_used = chunk.get("tokens_used", tokens_used) | |
| # Forward chunks in SSE format | |
| yield f"data: {json.dumps(chunk)}\n\n" | |
| except Exception as e: | |
| logger.error(f"[{request_id}] V4 summarization failed: {e}") | |
| error_event = {"type": "error", "error": str(e), "done": True} | |
| yield f"data: {json.dumps(error_event)}\n\n" | |
| return | |
| summarization_latency_ms = (time.time() - summarization_start) * 1000 | |
| # Calculate total latency (include scrape time for URL mode) | |
| total_latency_ms = summarization_latency_ms | |
| if metadata.get("input_type") == "url": | |
| total_latency_ms += metadata.get("scrape_latency_ms", 0) | |
| logger.info( | |
| f"[{request_id}] V4 request completed in {total_latency_ms:.2f}ms " | |
| f"(scrape: {metadata.get('scrape_latency_ms', 0):.2f}ms, " | |
| f"summary: {summarization_latency_ms:.2f}ms)" | |
| ) | |
| else: | |
| logger.info( | |
| f"[{request_id}] V4 text mode completed in {total_latency_ms:.2f}ms" | |
| ) | |
| async def scrape_and_summarize_stream_ndjson( | |
| request: Request, payload: StructuredSummaryRequest | |
| ): | |
| """ | |
| V4: NDJSON patch-based structured summarization with streaming. | |
| This is the NEW streaming protocol that outputs NDJSON patches. | |
| Each event contains: | |
| - delta: The patch object (e.g., {"op": "set", "field": "title", "value": "..."}) | |
| - state: The current accumulated state | |
| - done: Boolean indicating completion | |
| - tokens_used: Number of tokens generated | |
| - latency_ms: Total latency (final event only) | |
| Supports two modes: | |
| 1. URL mode: Scrape article from URL then generate structured summary | |
| 2. Text mode: Generate structured summary from provided text | |
| Response format: | |
| Server-Sent Events stream with: | |
| - Metadata event (if include_metadata=true) | |
| - NDJSON patch events (streaming state updates) | |
| - Final event (with latency) | |
| """ | |
| request_id = getattr(request.state, "request_id", "unknown") | |
| # Determine input mode and prepare data | |
| if payload.url: | |
| # URL Mode: Scrape + Summarize | |
| logger.info(f"[{request_id}] V4 NDJSON URL mode: {payload.url[:80]}...") | |
| scrape_start = time.time() | |
| try: | |
| article_data = await article_scraper_service.scrape_article( | |
| url=payload.url, use_cache=payload.use_cache | |
| ) | |
| except Exception as e: | |
| logger.error(f"[{request_id}] Scraping failed: {e}") | |
| raise HTTPException( | |
| status_code=502, detail=f"Failed to scrape article: {str(e)}" | |
| ) | |
| scrape_latency_ms = (time.time() - scrape_start) * 1000 | |
| logger.info( | |
| f"[{request_id}] Scraped in {scrape_latency_ms:.2f}ms, " | |
| f"extracted {len(article_data['text'])} chars" | |
| ) | |
| # Validate scraped content | |
| if len(article_data["text"]) < 100: | |
| raise HTTPException( | |
| status_code=422, | |
| detail="Insufficient content extracted from URL. " | |
| "Article may be behind paywall or site may block scrapers.", | |
| ) | |
| text_to_summarize = article_data["text"] | |
| metadata = { | |
| "input_type": "url", | |
| "url": payload.url, | |
| "title": article_data.get("title"), | |
| "author": article_data.get("author"), | |
| "date": article_data.get("date"), | |
| "site_name": article_data.get("site_name"), | |
| "scrape_method": article_data.get("method", "static"), | |
| "scrape_latency_ms": scrape_latency_ms, | |
| "extracted_text_length": len(article_data["text"]), | |
| "style": payload.style.value, | |
| } | |
| else: | |
| # Text Mode: Direct Summarization | |
| logger.info(f"[{request_id}] V4 NDJSON text mode: {len(payload.text)} chars") | |
| text_to_summarize = payload.text | |
| metadata = { | |
| "input_type": "text", | |
| "text_length": len(payload.text), | |
| "style": payload.style.value, | |
| } | |
| # Stream NDJSON structured summarization | |
| return StreamingResponse( | |
| _stream_generator_ndjson(text_to_summarize, payload, metadata, request_id), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| "X-Request-ID": request_id, | |
| }, | |
| ) | |
| async def _stream_generator_ndjson(text: str, payload, metadata: dict, request_id: str): | |
| """Generate SSE stream for NDJSON patch-based structured summarization.""" | |
| # Send metadata event first | |
| if payload.include_metadata: | |
| metadata_event = {"type": "metadata", "data": metadata} | |
| yield f"data: {json.dumps(metadata_event)}\n\n" | |
| # Stream NDJSON structured summarization | |
| summarization_start = time.time() | |
| try: | |
| async for ( | |
| event | |
| ) in structured_summarizer_service.summarize_structured_stream_ndjson( | |
| text=text, | |
| style=payload.style.value, | |
| max_tokens=payload.max_tokens, | |
| ): | |
| # Forward events in SSE format | |
| yield f"data: {json.dumps(event)}\n\n" | |
| except Exception as e: | |
| logger.error(f"[{request_id}] V4 NDJSON summarization failed: {e}") | |
| error_event = { | |
| "delta": None, | |
| "state": None, | |
| "done": True, | |
| "error": str(e), | |
| } | |
| yield f"data: {json.dumps(error_event)}\n\n" | |
| return | |
| summarization_latency_ms = (time.time() - summarization_start) * 1000 | |
| # Calculate total latency (include scrape time for URL mode) | |
| total_latency_ms = summarization_latency_ms | |
| if metadata.get("input_type") == "url": | |
| total_latency_ms += metadata.get("scrape_latency_ms", 0) | |
| logger.info( | |
| f"[{request_id}] V4 NDJSON request completed in {total_latency_ms:.2f}ms " | |
| f"(scrape: {metadata.get('scrape_latency_ms', 0):.2f}ms, " | |
| f"summary: {summarization_latency_ms:.2f}ms)" | |
| ) | |
| else: | |
| logger.info( | |
| f"[{request_id}] V4 NDJSON text mode completed in {total_latency_ms:.2f}ms" | |
| ) | |