Spaces:
Running
Running
| """ | |
| V4 Structured Summarization Service using Qwen-1.5B. | |
| """ | |
| import asyncio | |
| import json | |
| import threading | |
| import time | |
| from collections.abc import AsyncGenerator | |
| from typing import Any | |
| from app.core.config import settings | |
| from app.core.logging import get_logger | |
| logger = get_logger(__name__) | |
| # CRITICAL: Patch getpass.getuser() before importing bitsandbytes or transformers | |
| # HF Spaces containers don't have UID 1000 in /etc/passwd, causing KeyError | |
| import getpass | |
| import os | |
| _original_getuser = getpass.getuser | |
| def _mock_getuser(): | |
| """Mock getuser for HF Spaces compatibility.""" | |
| try: | |
| return _original_getuser() | |
| except KeyError: | |
| # Fallback for containerized environments without proper user database | |
| return os.environ.get("USER", os.environ.get("USERNAME", "user")) | |
| getpass.getuser = _mock_getuser | |
| # Try to import transformers | |
| try: | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer | |
| TRANSFORMERS_AVAILABLE = True | |
| except ImportError: | |
| TRANSFORMERS_AVAILABLE = False | |
| logger.warning("Transformers library not available. V4 endpoints will be disabled.") | |
| # Try bitsandbytes 4-bit config | |
| try: | |
| from transformers import BitsAndBytesConfig | |
| HAS_BITSANDBYTES = True | |
| except ImportError: | |
| HAS_BITSANDBYTES = False | |
| # Import Pydantic for schema definition | |
| from pydantic import BaseModel | |
| class StructuredSummary(BaseModel): | |
| """Pydantic schema for structured summary output.""" | |
| title: str | |
| main_summary: str | |
| key_points: list[str] | |
| category: str | |
| sentiment: str | |
| read_time_min: int | |
| class StructuredSummarizer: | |
| """Service for streaming structured summarization using Qwen-1.5B.""" | |
| def __init__(self): | |
| """Initialize the Qwen model and tokenizer with GPU/INT4 when possible.""" | |
| self.tokenizer: AutoTokenizer | None = None | |
| self.model: AutoModelForCausalLM | None = None | |
| if not TRANSFORMERS_AVAILABLE: | |
| logger.warning("β οΈ Transformers not available - V4 endpoints will not work") | |
| return | |
| logger.info(f"Initializing V4 model: {settings.v4_model_id}") | |
| try: | |
| # Load tokenizer | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| settings.v4_model_id, | |
| cache_dir=settings.hf_cache_dir, | |
| trust_remote_code=True, | |
| ) | |
| # Decide device / quantization strategy | |
| use_cuda = torch.cuda.is_available() | |
| use_mps = ( | |
| torch.backends.mps.is_available() | |
| if hasattr(torch.backends, "mps") | |
| else False | |
| ) | |
| use_gpu = use_cuda or use_mps | |
| quantization_desc = "None" | |
| if use_cuda: | |
| logger.info("CUDA is available. Using NVIDIA GPU for V4 model.") | |
| elif use_mps: | |
| logger.info( | |
| "MPS (Metal Performance Shaders) is available. Using Apple Silicon GPU for V4 model." | |
| ) | |
| else: | |
| logger.info("No GPU available. V4 model will run on CPU.") | |
| # ------------------------------------------------------------------ | |
| # Preferred path: 4-bit NF4 on CUDA GPU via bitsandbytes (memory efficient) | |
| # OR FP16 for speed (2-3x faster, uses more memory) | |
| # Note: bitsandbytes only works on CUDA, not MPS | |
| # ------------------------------------------------------------------ | |
| use_fp16_for_speed = getattr(settings, "v4_use_fp16_for_speed", False) | |
| if ( | |
| use_cuda | |
| and not use_fp16_for_speed | |
| and getattr(settings, "v4_enable_quantization", True) | |
| and HAS_BITSANDBYTES | |
| ): | |
| logger.info( | |
| "Applying 4-bit NF4 quantization (bitsandbytes) to V4 model..." | |
| ) | |
| quant_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype=torch.bfloat16, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_use_double_quant=True, | |
| ) | |
| try: | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| settings.v4_model_id, | |
| device_map="auto", | |
| quantization_config=quant_config, | |
| attn_implementation="sdpa", | |
| cache_dir=settings.hf_cache_dir, | |
| trust_remote_code=True, | |
| ) | |
| logger.info("β Using SDPA attention (optimized)") | |
| except Exception: | |
| logger.warning( | |
| "β οΈ SDPA not supported, falling back to default attention" | |
| ) | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| settings.v4_model_id, | |
| device_map="auto", | |
| quantization_config=quant_config, | |
| cache_dir=settings.hf_cache_dir, | |
| trust_remote_code=True, | |
| ) | |
| quantization_desc = "4-bit NF4 (bitsandbytes, GPU)" | |
| elif use_gpu and use_fp16_for_speed: | |
| # Use FP16 for 2-3x faster inference | |
| # Note: MPS doesn't support BFloat16, so we avoid device_map="auto" for MPS | |
| logger.info( | |
| "Loading V4 model in FP16 for maximum speed (2-3x faster than FP32)..." | |
| ) | |
| if use_mps: | |
| # MPS: Load without device_map, then manually move to MPS | |
| try: | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| settings.v4_model_id, | |
| torch_dtype=torch.float16, | |
| attn_implementation="sdpa", | |
| cache_dir=settings.hf_cache_dir, | |
| trust_remote_code=True, | |
| ) | |
| logger.info("β Using SDPA attention (optimized)") | |
| except Exception: | |
| logger.warning( | |
| "β οΈ SDPA not supported, falling back to default attention" | |
| ) | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| settings.v4_model_id, | |
| torch_dtype=torch.float16, | |
| cache_dir=settings.hf_cache_dir, | |
| trust_remote_code=True, | |
| ) | |
| self.model = self.model.to("mps") | |
| else: | |
| # CUDA: Use device_map="auto" for multi-GPU support | |
| try: | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| settings.v4_model_id, | |
| torch_dtype=torch.float16, | |
| device_map="auto", | |
| attn_implementation="sdpa", | |
| cache_dir=settings.hf_cache_dir, | |
| trust_remote_code=True, | |
| ) | |
| logger.info("β Using SDPA attention (optimized)") | |
| except Exception: | |
| logger.warning( | |
| "β οΈ SDPA not supported, falling back to default attention" | |
| ) | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| settings.v4_model_id, | |
| torch_dtype=torch.float16, | |
| device_map="auto", | |
| cache_dir=settings.hf_cache_dir, | |
| trust_remote_code=True, | |
| ) | |
| quantization_desc = "FP16 (GPU, fast)" | |
| else: | |
| # ------------------------------------------------------------------ | |
| # Fallback path: | |
| # - GPU (CUDA/MPS) without quantization/FP16 -> FP16 | |
| # - CPU -> FP32 + optional dynamic INT8 | |
| # ------------------------------------------------------------------ | |
| base_dtype = torch.float16 if use_gpu else torch.float32 | |
| if use_mps: | |
| # MPS fallback: Load without device_map, manually move to MPS | |
| logger.info(f"Loading V4 model for MPS with dtype={base_dtype}") | |
| try: | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| settings.v4_model_id, | |
| torch_dtype=base_dtype, | |
| attn_implementation="sdpa", | |
| cache_dir=settings.hf_cache_dir, | |
| trust_remote_code=True, | |
| ) | |
| logger.info("β Using SDPA attention (optimized)") | |
| except Exception: | |
| logger.warning( | |
| "β οΈ SDPA not supported, falling back to default attention" | |
| ) | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| settings.v4_model_id, | |
| torch_dtype=base_dtype, | |
| cache_dir=settings.hf_cache_dir, | |
| trust_remote_code=True, | |
| ) | |
| self.model = self.model.to("mps") | |
| else: | |
| # CUDA or CPU | |
| device_strategy = "auto" if use_cuda else None | |
| logger.info( | |
| f"Loading V4 model with device_map='{device_strategy}', dtype={base_dtype}" | |
| ) | |
| try: | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| settings.v4_model_id, | |
| torch_dtype=base_dtype, | |
| device_map=device_strategy, | |
| attn_implementation="sdpa", | |
| cache_dir=settings.hf_cache_dir, | |
| trust_remote_code=True, | |
| ) | |
| logger.info("β Using SDPA attention (optimized)") | |
| except Exception: | |
| logger.warning( | |
| "β οΈ SDPA not supported, falling back to default attention" | |
| ) | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| settings.v4_model_id, | |
| torch_dtype=base_dtype, | |
| device_map=device_strategy, | |
| cache_dir=settings.hf_cache_dir, | |
| trust_remote_code=True, | |
| ) | |
| # Optional dynamic INT8 quantization on CPU only (not supported on GPU) | |
| if getattr(settings, "v4_enable_quantization", True) and not use_gpu: | |
| try: | |
| logger.info( | |
| "Applying dynamic INT8 quantization to V4 model on CPU..." | |
| ) | |
| self.model = torch.quantization.quantize_dynamic( | |
| self.model, {torch.nn.Linear}, dtype=torch.qint8 | |
| ) | |
| quantization_desc = "INT8 dynamic (CPU)" | |
| except Exception as quant_error: | |
| logger.warning( | |
| f"β οΈ CPU INT8 quantization failed: {quant_error}. Using base dtype instead." | |
| ) | |
| quantization_desc = f"None ({base_dtype})" | |
| else: | |
| quantization_desc = f"None ({base_dtype})" | |
| # Set model to eval mode | |
| self.model.eval() | |
| logger.info("β V4 model initialized successfully") | |
| logger.info(f" Model ID: {settings.v4_model_id}") | |
| logger.info(f" Quantization: {quantization_desc}") | |
| logger.info(f" Model device: {next(self.model.parameters()).device}") | |
| logger.info(f" Torch dtype: {next(self.model.parameters()).dtype}") | |
| except Exception as e: | |
| logger.error(f"β Failed to initialize V4 model: {e}") | |
| logger.error(f"Model ID: {settings.v4_model_id}") | |
| logger.error(f"Cache dir: {settings.hf_cache_dir}") | |
| self.tokenizer = None | |
| self.model = None | |
| async def warm_up_model(self) -> None: | |
| """Warm up the model with a test input.""" | |
| if not self.model or not self.tokenizer: | |
| logger.warning("β οΈ V4 model not initialized, skipping warmup") | |
| return | |
| test_prompt = "<|system|>\nYou are a helpful assistant.\n<|end|>\n<|user|>\nHello\n<|end|>\n<|assistant|>" | |
| try: | |
| loop = asyncio.get_event_loop() | |
| await loop.run_in_executor(None, self._generate_test, test_prompt) | |
| logger.info("β V4 model warmup successful") | |
| except Exception as e: | |
| logger.error(f"β V4 model warmup failed: {e}") | |
| def _generate_test(self, prompt: str): | |
| """Test generation for warmup.""" | |
| inputs = self.tokenizer(prompt, return_tensors="pt") | |
| inputs = {k: v.to(self.model.device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| _ = self.model.generate( | |
| **inputs, | |
| max_new_tokens=5, | |
| do_sample=False, | |
| pad_token_id=self.tokenizer.pad_token_id or self.tokenizer.eos_token_id, | |
| ) | |
| def _build_system_prompt(self) -> str: | |
| """ | |
| System prompt for NDJSON patch-style structured generation. | |
| The model must output ONLY newline-delimited JSON patch objects, no prose. | |
| """ | |
| return """You are a summarization engine that outputs ONLY newline-delimited JSON objects (NDJSON). | |
| Each line MUST be a single JSON object. Do NOT output any text that is not valid JSON. | |
| Do NOT add markdown code fences, comments, or explanations. | |
| Your goal is to produce a BRIEF, CONCISE structured summary of an article in the following logical shape: | |
| { | |
| "title": string, // 6-10 words MAX (e.g. "Couple Found Not Guilty in Homicide Case") | |
| "main_summary": string, // 2 sentences MAX (be extremely brief) | |
| "key_points": string[], // 3-5 items, each 8-12 words MAX | |
| "category": string, // 1-2 words ONLY (e.g. "Crime", "Tech", "Politics") | |
| "sentiment": string, // one of ["positive", "negative", "neutral"] | |
| "read_time_min": number | |
| } | |
| Instead of outputting this object directly, you MUST emit a SEQUENCE of JSON "patch" objects, one per line. | |
| Patch formats: | |
| 1) Set or overwrite a scalar field (title, main_summary, category, sentiment, read_time_min): | |
| {"op": "set", "field": "<field_name>", "value": <value>} | |
| Examples (NOTE: Keep titles SHORT): | |
| {"op": "set", "field": "title", "value": "Couple Acquitted in Homicide Case"} | |
| {"op": "set", "field": "title", "value": "AI Model Breakthrough"} | |
| {"op": "set", "field": "category", "value": "Crime"} | |
| {"op": "set", "field": "sentiment", "value": "neutral"} | |
| {"op": "set", "field": "read_time_min", "value": 3} | |
| 2) Append a key point to the key_points array: | |
| {"op": "append", "field": "key_points", "value": "<one concise key fact>"} | |
| Examples (NOTE: Keep each point SHORT): | |
| {"op": "append", "field": "key_points", "value": "Couple found not guilty of murder charges."} | |
| {"op": "append", "field": "key_points", "value": "New model optimized for efficiency."} | |
| 3) At the very end, output exactly one final line to signal completion: | |
| {"op": "done"} | |
| Rules: | |
| - You MUST always set all scalar fields before finishing: | |
| 1) First patch: {"op": "set", "field": "title", ...} [6-10 words MAX - be SHORT!] | |
| 2) Second patch: {"op": "set", "field": "main_summary", ...} [2 sentences MAX] | |
| 3) Third patch: {"op": "set", "field": "category", ...} [1-2 words ONLY] | |
| 4) Fourth patch: {"op": "set", "field": "sentiment", ...} | |
| 5) Fifth patch: {"op": "set", "field": "read_time_min", ...} | |
| 6) Then emit {"op": "append", "field": "key_points", ...} patches (3-5 items, each 8-12 words MAX). | |
| 7) Only AFTER all fields are set and 3-5 key_points have been appended, | |
| output exactly one final line: {"op": "done"}. | |
| - NEVER output {"op": "done"} if any of title, main_summary, category, | |
| sentiment or read_time_min is missing or null. | |
| - Output ONLY these JSON patch objects, one per line (NDJSON). | |
| - Never wrap them in an outer array. | |
| - Do NOT output the final combined object; only the patches. | |
| - CRITICAL BREVITY RULES: | |
| * Title MUST be 6-10 words. If longer, shorten it! | |
| * Main summary MUST be 2 sentences maximum. | |
| * Each key point MUST be 8-12 words maximum. | |
| * Category MUST be 1-2 words only. | |
| * NO verbose explanations. NO long descriptions. BE BRIEF! | |
| - CRITICAL JSON FORMATTING RULES: | |
| * ALL string values MUST have quotes properly escaped. | |
| * If a value contains a quote character, escape it as \\" | |
| * Example: "value": "TVNZ\\'s legacy" (escape the apostrophe/quote) | |
| * NEVER output unescaped quotes inside JSON string values. | |
| * Each JSON object MUST be on a single line and be valid JSON. | |
| * Test your JSON - it must parse correctly!""" | |
| def _build_style_instruction(self, style: str) -> str: | |
| """Build the style-specific instruction.""" | |
| style_prompts = { | |
| "skimmer": "Summarize concisely using only hard facts and data. Keep it extremely brief and to the point.", | |
| "executive": "Summarize for a CEO or executive. Focus on business impact, key takeaways, and strategic importance.", | |
| "eli5": "Explain like I'm 5 years old. Use simple words and analogies. Avoid jargon and technical terms.", | |
| } | |
| return style_prompts.get(style, style_prompts["executive"]) | |
| def _empty_state(self) -> dict[str, Any]: | |
| """Initial empty structured state that patches will build up.""" | |
| return { | |
| "title": None, | |
| "main_summary": None, | |
| "key_points": [], | |
| "category": None, | |
| "sentiment": None, | |
| "read_time_min": None, | |
| } | |
| def _apply_patch(self, state: dict[str, Any], patch: dict[str, Any]) -> bool: | |
| """ | |
| Apply a single patch to the state. | |
| Returns True if this is a 'done' patch (signals logical completion). | |
| """ | |
| op = patch.get("op") | |
| if op == "done": | |
| return True | |
| field = patch.get("field") | |
| if not field: | |
| return False | |
| if op == "set": | |
| state[field] = patch.get("value") | |
| elif op == "append": | |
| # Ensure list exists for list-like fields (e.g. key_points) | |
| if not isinstance(state.get(field), list): | |
| state[field] = [] | |
| state[field].append(patch.get("value")) | |
| return False | |
| def _fallback_fill_missing_fields( | |
| self, | |
| text: str, | |
| state: dict[str, Any], | |
| ) -> dict[str, Any]: | |
| """ | |
| Fallback to fill missing fields when the model stopped early | |
| and did not provide title, main_summary, or read_time_min. | |
| Strategy: | |
| - If title is missing, derive it from the main_summary or first key point. | |
| - If main_summary is missing, derive it from the first 2-3 key points. | |
| - If read_time_min is missing, estimate from text length. | |
| """ | |
| # Estimate reading time if missing | |
| if state.get("read_time_min") is None: | |
| # Simple heuristic: 200 words per minute | |
| words = text.split() | |
| minutes = max(1, round(len(words) / 200)) | |
| state["read_time_min"] = minutes | |
| # Build a lightweight summary from key_points if main_summary is missing | |
| if state.get("main_summary") is None: | |
| key_points = state.get("key_points") or [] | |
| if key_points: | |
| # Use up to first 3 key points to form a paragraph | |
| summary_parts = key_points[:3] | |
| state["main_summary"] = " ".join(summary_parts) | |
| else: | |
| # As a last resort, use the first 2-3 sentences from the article itself | |
| sentences = text.split(". ") | |
| state["main_summary"] = ". ".join(sentences[:3]).strip() | |
| # Derive title if missing | |
| if state.get("title") is None: | |
| # If we now have a main_summary, use its beginning as a title | |
| if state.get("main_summary"): | |
| summary_words = state["main_summary"].split() | |
| # Keep it short-ish; 10-14 words | |
| title_words = summary_words[:14] | |
| title = " ".join(title_words).strip() | |
| # Add ellipsis if we truncated | |
| if len(summary_words) > len(title_words): | |
| title += "..." | |
| state["title"] = title | |
| else: | |
| # Fallback: very short generic title | |
| state["title"] = "Article Summary" | |
| return state | |
| def _build_prompt(self, text: str, style: str) -> str: | |
| """Build the complete prompt for Qwen2.5 using its chat template.""" | |
| system_prompt = self._build_system_prompt() | |
| style_instruction = self._build_style_instruction(style) | |
| # Truncate text to prevent token overflow | |
| max_chars = 10000 | |
| if len(text) > max_chars: | |
| text = text[:max_chars] | |
| logger.warning(f"Truncated text from {len(text)} to {max_chars} chars") | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": system_prompt, | |
| }, | |
| { | |
| "role": "user", | |
| "content": ( | |
| f"{style_instruction}\n\n" | |
| f"Article:\n{text}\n\n" | |
| "Remember: respond ONLY with newline-delimited JSON patch objects " | |
| "as described in the system message. " | |
| "No explanations, no comments, no markdown, no code, no prose." | |
| ), | |
| }, | |
| ] | |
| # Let Qwen's tokenizer construct the correct special tokens and format | |
| return self.tokenizer.apply_chat_template( | |
| messages, | |
| tokenize=False, | |
| add_generation_prompt=True, | |
| ) | |
| async def summarize_structured_stream( | |
| self, | |
| text: str, | |
| style: str = "executive", | |
| max_tokens: int | None = None, | |
| ) -> AsyncGenerator[dict[str, Any], None]: | |
| """ | |
| Stream structured summarization using Phi-3. | |
| Args: | |
| text: Input text to summarize | |
| style: Summarization style (skimmer, executive, eli5) | |
| max_tokens: Maximum tokens to generate | |
| Yields: | |
| Dict containing streaming data in SSE format | |
| """ | |
| if not self.model or not self.tokenizer: | |
| error_msg = "V4 model not available. Please check model initialization." | |
| logger.error(f"β {error_msg}") | |
| yield { | |
| "content": "", | |
| "done": True, | |
| "error": error_msg, | |
| } | |
| return | |
| start_time = time.time() | |
| logger.info(f"V4 structured summarization: {len(text)} chars, style={style}") | |
| try: | |
| # Build prompt | |
| full_prompt = self._build_prompt(text, style) | |
| # Tokenize | |
| inputs = self.tokenizer(full_prompt, return_tensors="pt") | |
| inputs = {k: v.to(self.model.device) for k, v in inputs.items()} | |
| # Use config value or override | |
| max_new_tokens = max_tokens or settings.v4_max_tokens | |
| # Create streamer | |
| streamer = TextIteratorStreamer( | |
| self.tokenizer, skip_prompt=True, skip_special_tokens=True | |
| ) | |
| # Generation kwargs | |
| gen_kwargs = { | |
| **inputs, | |
| "streamer": streamer, | |
| "max_new_tokens": max_new_tokens, | |
| "do_sample": True, | |
| "temperature": settings.v4_temperature, | |
| "top_p": 0.9, | |
| "pad_token_id": self.tokenizer.pad_token_id | |
| or self.tokenizer.eos_token_id, | |
| "eos_token_id": self.tokenizer.eos_token_id, | |
| } | |
| # Start generation in background thread | |
| generation_thread = threading.Thread( | |
| target=self.model.generate, kwargs=gen_kwargs, daemon=True | |
| ) | |
| generation_thread.start() | |
| # Stream tokens as they arrive | |
| token_count = 0 | |
| for text_chunk in streamer: | |
| if text_chunk: | |
| token_count += 1 | |
| yield { | |
| "content": text_chunk, | |
| "done": False, | |
| "tokens_used": token_count, | |
| } | |
| # Yield control to event loop | |
| await asyncio.sleep(0) | |
| # Wait for generation to complete | |
| generation_thread.join() | |
| # Send final "done" chunk | |
| latency_ms = (time.time() - start_time) * 1000.0 | |
| yield { | |
| "content": "", | |
| "done": True, | |
| "tokens_used": token_count, | |
| "latency_ms": round(latency_ms, 2), | |
| } | |
| logger.info(f"β V4 summarization completed in {latency_ms:.2f}ms") | |
| except Exception: | |
| logger.exception("β V4 summarization failed") | |
| yield { | |
| "content": "", | |
| "done": True, | |
| "error": "V4 summarization failed. See server logs.", | |
| } | |
| async def summarize_structured_stream_ndjson( | |
| self, | |
| text: str, | |
| style: str = "executive", | |
| max_tokens: int | None = None, | |
| ) -> AsyncGenerator[dict[str, Any], None]: | |
| """ | |
| Stream structured summarization using NDJSON patch-based protocol. | |
| Args: | |
| text: Input text to summarize | |
| style: Summarization style (skimmer, executive, eli5) | |
| max_tokens: Maximum tokens to generate | |
| Yields: | |
| Dict containing: | |
| - delta: The patch object or None | |
| - state: Current combined state or None | |
| - done: Boolean indicating completion | |
| - tokens_used: Number of tokens generated | |
| - latency_ms: Latency in milliseconds (final event only) | |
| - error: Error message (only on error) | |
| """ | |
| if not self.model or not self.tokenizer: | |
| error_msg = "V4 model not available. Please check model initialization." | |
| logger.error(f"β {error_msg}") | |
| yield { | |
| "delta": None, | |
| "state": None, | |
| "done": True, | |
| "tokens_used": 0, | |
| "error": error_msg, | |
| } | |
| return | |
| start_time = time.time() | |
| logger.info(f"V4 NDJSON summarization: {len(text)} chars, style={style}") | |
| try: | |
| # Build prompt | |
| full_prompt = self._build_prompt(text, style) | |
| # DEBUG: Log the actual prompt being sent to model | |
| logger.info("=" * 80) | |
| logger.info("π DEBUG: Full prompt being sent to model:") | |
| logger.info(f"Prompt length: {len(full_prompt)} chars") | |
| logger.info(f"First 500 chars:\n{full_prompt[:500]}") | |
| logger.info(f"Last 200 chars:\n{full_prompt[-200:]}") | |
| logger.info("=" * 80) | |
| # Tokenize | |
| inputs = self.tokenizer(full_prompt, return_tensors="pt") | |
| inputs = {k: v.to(self.model.device) for k, v in inputs.items()} | |
| # Use config value or override | |
| max_new_tokens = max_tokens or settings.v4_max_tokens | |
| # Create streamer | |
| streamer = TextIteratorStreamer( | |
| self.tokenizer, skip_prompt=True, skip_special_tokens=True | |
| ) | |
| # Generation kwargs with greedy decoding for maximum speed | |
| gen_kwargs = { | |
| **inputs, | |
| "streamer": streamer, | |
| "max_new_tokens": max_new_tokens, | |
| "do_sample": False, | |
| "pad_token_id": self.tokenizer.pad_token_id | |
| or self.tokenizer.eos_token_id, | |
| "eos_token_id": self.tokenizer.eos_token_id, | |
| } | |
| # DEBUG: Log generation config | |
| logger.info("ποΈ Generation config:") | |
| logger.info(f" max_new_tokens: {max_new_tokens}") | |
| logger.info(" do_sample: False (greedy decoding for speed)") | |
| logger.info(f" eos_token_id: {self.tokenizer.eos_token_id}") | |
| logger.info(f" pad_token_id: {gen_kwargs['pad_token_id']}") | |
| # Start generation in background thread | |
| generation_thread = threading.Thread( | |
| target=self.model.generate, kwargs=gen_kwargs, daemon=True | |
| ) | |
| generation_thread.start() | |
| # Initialize streaming state | |
| buffer = "" | |
| token_count = 0 | |
| state = self._empty_state() | |
| done_received = False | |
| # Stream tokens and parse NDJSON patches | |
| for text_chunk in streamer: | |
| if text_chunk: | |
| token_count += 1 | |
| buffer += text_chunk | |
| # DEBUG: Log every raw token chunk | |
| logger.debug(f"π€ Token #{token_count}: {repr(text_chunk)}") | |
| # Process complete lines | |
| while "\n" in buffer: | |
| line, buffer = buffer.split("\n", 1) | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # DEBUG: Log every line BEFORE filtering | |
| logger.info( | |
| f"π Raw line (at token #{token_count}): {line[:100]}..." | |
| ) | |
| # Heuristic: skip anything that clearly isn't a JSON patch object | |
| # This filters out lines like "#include <bits/stdc++.h>" or random prose. | |
| if not line.startswith("{") or "op" not in line: | |
| logger.warning( | |
| f"Skipping non-JSON-looking line: {line[:80]}..." | |
| ) | |
| continue | |
| # Try to parse JSON patch | |
| patch = None | |
| try: | |
| patch = json.loads(line) | |
| # Log each valid patch received from model | |
| op = patch.get("op") | |
| if op == "done": | |
| logger.info("β Model emitted done patch") | |
| elif op == "set": | |
| logger.info( | |
| f"π Model set: {patch.get('field')} = {str(patch.get('value'))[:50]}..." | |
| ) | |
| elif op == "append": | |
| logger.info( | |
| f"β Model append: {patch.get('field')} += {str(patch.get('value'))[:50]}..." | |
| ) | |
| except json.JSONDecodeError as e: | |
| logger.warning( | |
| f"Failed to parse NDJSON line: {line[:150]}... Error: {e}" | |
| ) | |
| # Try to extract valid JSON from the line | |
| # Common issues: incomplete lines, unescaped quotes, extra text | |
| try: | |
| # Strategy 1: Try to find the first complete JSON object | |
| brace_count = 0 | |
| end_pos = -1 | |
| for i, char in enumerate(line): | |
| if char == "{": | |
| brace_count += 1 | |
| elif char == "}": | |
| brace_count -= 1 | |
| if brace_count == 0: | |
| end_pos = i + 1 | |
| break | |
| if end_pos > 0: | |
| # Found a complete JSON object, try parsing just that part | |
| try: | |
| patch = json.loads(line[:end_pos]) | |
| logger.info( | |
| "β Extracted valid JSON from incomplete line" | |
| ) | |
| except: | |
| pass | |
| # Strategy 2: If still failed, try to fix common quote issues | |
| if patch is None and '"value":"' in line: | |
| # Try to escape unescaped quotes in the value field | |
| import re | |
| # Simple heuristic: if we see a pattern like "value":"...text with 'quote'..." | |
| # try to escape the inner quotes | |
| def try_fix_quotes(text): | |
| # Try to find and close the value string properly | |
| match = re.match( | |
| r'(\{"op":"[^"]+","field":"[^"]+","value":")(.*?)(.*)$', | |
| text, | |
| ) | |
| if match: | |
| prefix = match.group(1) | |
| value_content = match.group(2) | |
| rest = match.group(3) | |
| # Escape any unescaped quotes in the value | |
| value_content = value_content.replace( | |
| '\\"', "__TEMP__" | |
| ) | |
| value_content = value_content.replace( | |
| '"', '\\"' | |
| ) | |
| value_content = value_content.replace( | |
| "__TEMP__", '\\"' | |
| ) | |
| # Try to reconstruct: prefix + escaped_value + "}" | |
| if rest.startswith('"}'): | |
| try: | |
| return json.loads( | |
| prefix + value_content + rest | |
| ) | |
| except: | |
| pass | |
| return None | |
| repaired = try_fix_quotes(line) | |
| if repaired: | |
| patch = repaired | |
| logger.info( | |
| "β Repaired JSON by escaping quotes" | |
| ) | |
| except Exception as repair_error: | |
| logger.debug( | |
| f"JSON repair attempt failed: {repair_error}" | |
| ) | |
| if patch is None: | |
| continue | |
| # Apply patch to state | |
| is_done = self._apply_patch(state, patch) | |
| # Yield structured event | |
| yield { | |
| "delta": patch, | |
| "state": dict(state), # Copy state to avoid mutations | |
| "done": is_done, | |
| "tokens_used": token_count, | |
| } | |
| # If done, break out of loops | |
| if is_done: | |
| done_received = True | |
| break | |
| # Break outer loop if done | |
| if done_received: | |
| break | |
| # Yield control to event loop | |
| await asyncio.sleep(0) | |
| # Wait for generation to complete | |
| generation_thread.join() | |
| # Process any remaining buffer content (might contain {"op": "done"}) | |
| if buffer.strip(): | |
| logger.info(f"π¦ Processing remaining buffer: {repr(buffer[:200])}") | |
| # Try to parse the remaining buffer as a complete JSON object | |
| buffer_cleaned = buffer.strip() | |
| if buffer_cleaned.startswith("{") and "op" in buffer_cleaned: | |
| try: | |
| patch = json.loads(buffer_cleaned) | |
| is_done = self._apply_patch(state, patch) | |
| if is_done: | |
| done_received = True | |
| yield { | |
| "delta": patch, | |
| "state": dict(state), | |
| "done": True, | |
| "tokens_used": token_count, | |
| } | |
| else: | |
| yield { | |
| "delta": patch, | |
| "state": dict(state), | |
| "done": False, | |
| "tokens_used": token_count, | |
| } | |
| except json.JSONDecodeError: | |
| logger.warning( | |
| f"β οΈ Could not parse remaining buffer as JSON: {buffer_cleaned[:100]}" | |
| ) | |
| else: | |
| logger.warning( | |
| f"ποΈ Unparsed buffer remaining (not JSON): {repr(buffer[:200])}" | |
| ) | |
| else: | |
| logger.info("β Buffer was fully consumed (no partial lines)") | |
| logger.info( | |
| f"π Model generation completed: {token_count} tokens, " | |
| f"done_received={done_received}" | |
| ) | |
| # If the model never emitted {"op":"done"} OR left required fields missing, | |
| # run a fallback to fill the gaps and emit synthetic patch events. | |
| required_fields = [ | |
| "title", | |
| "main_summary", | |
| "category", | |
| "sentiment", | |
| "read_time_min", | |
| ] | |
| missing_required = [f for f in required_fields if state.get(f) is None] | |
| if missing_required: | |
| logger.warning( | |
| f"V4 NDJSON: Missing required fields from model: {missing_required}. " | |
| "Applying fallback to fill missing values." | |
| ) | |
| # Use fallback to fill in missing fields in-place | |
| state = self._fallback_fill_missing_fields(text, state) | |
| # For each field that was missing, emit a synthetic 'set' patch | |
| for field in missing_required: | |
| patch = { | |
| "op": "set", | |
| "field": field, | |
| "value": state.get(field), | |
| } | |
| # Apply patch (for consistency) and yield it as an event | |
| _ = self._apply_patch(state, patch) | |
| logger.info( | |
| f"π§ Fallback generated: {field} = {str(state.get(field))[:80]}..." | |
| ) | |
| yield { | |
| "delta": patch, | |
| "state": dict(state), | |
| "done": False, | |
| "tokens_used": token_count, | |
| } | |
| # Compute latency | |
| latency_ms = (time.time() - start_time) * 1000.0 | |
| # Emit final event (always mark done=True here) | |
| yield { | |
| "delta": None, | |
| "state": dict(state), | |
| "done": True, | |
| "tokens_used": token_count, | |
| "latency_ms": round(latency_ms, 2), | |
| } | |
| logger.info( | |
| f"β V4 NDJSON summarization completed in {latency_ms:.2f}ms. " | |
| f"Fields: title={'β ' if state.get('title') else 'β'}, " | |
| f"summary={'β ' if state.get('main_summary') else 'β'}, " | |
| f"category={'β ' if state.get('category') else 'β'}, " | |
| f"sentiment={'β ' if state.get('sentiment') else 'β'}, " | |
| f"read_time={'β ' if state.get('read_time_min') else 'β'}, " | |
| f"key_points={len(state.get('key_points', []))} items" | |
| ) | |
| logger.info(f"β V4 NDJSON summarization completed in {latency_ms:.2f}ms") | |
| except Exception: | |
| logger.exception("β V4 NDJSON summarization failed") | |
| yield { | |
| "delta": None, | |
| "state": None, | |
| "done": True, | |
| "tokens_used": 0, | |
| "error": "V4 NDJSON summarization failed. See server logs.", | |
| } | |
| # Global service instance | |
| structured_summarizer_service = StructuredSummarizer() | |