Spaces:
Running
Running
| """ | |
| Advanced Extraction Pipeline | |
| Provides: | |
| 0. preprocess_transcript - Clean noisy CSV transcripts before extraction | |
| 1. EMBEDDING_MODELS registry (4 models for deduplication) | |
| 2. NativeTokenizer - Count tokens without llama.cpp | |
| 3. EmbeddingModel - Load/compute embeddings | |
| 4. format_progress_ticker - Live UI updates | |
| 5. stream_extract_from_window - Stage 1: Extraction | |
| 6. deduplicate_items - Stage 2: Deduplication | |
| 7. stream_synthesize_executive_summary - Stage 3: Synthesis | |
| """ | |
| import csv | |
| import io | |
| import re | |
| import json | |
| import time | |
| import logging | |
| from typing import Dict, List, Any, Tuple, Generator, Optional, Set | |
| from dataclasses import dataclass | |
| import numpy as np | |
| from llama_cpp import Llama | |
| logger = logging.getLogger(__name__) | |
| # ===== TRANSCRIPT PREPROCESSING ===== | |
| def preprocess_transcript(transcript_text: str) -> Tuple[str, List[str]]: | |
| """ | |
| Clean noisy transcript text before extraction. | |
| Handles: | |
| 1. CSV format detection and text column extraction | |
| 2. Speaker label prefixing (for context) | |
| 3. Collapsing consecutive duplicate lines | |
| 4. Collapsing repeated phrases within lines | |
| 5. Filtering lines that are pure noise (no meaningful content) | |
| Args: | |
| transcript_text: Raw transcript (CSV or plain text) | |
| Returns: | |
| Tuple of (cleaned_dialogue_text, noise_phrases_list) | |
| - cleaned_dialogue_text: Cleaned dialogue text with speaker labels | |
| - noise_phrases_list: List of noise phrases detected and removed | |
| """ | |
| raw_lines = transcript_text.strip().split('\n') | |
| if not raw_lines: | |
| return "", [] | |
| # Step 1: Detect CSV format and extract dialogue | |
| dialogue_lines = _extract_dialogue_from_csv(raw_lines) | |
| # Step 2: Collapse consecutive duplicate lines | |
| deduped_lines = _collapse_consecutive_duplicates(dialogue_lines) | |
| # Step 3: Clean repeated phrases within each line | |
| cleaned_lines = [] | |
| for line in deduped_lines: | |
| cleaned = _collapse_repeated_phrases(line) | |
| if cleaned: | |
| cleaned_lines.append(cleaned) | |
| # Step 4: Filter lines that are pure noise | |
| meaningful_lines, noise_phrases = _filter_noise_lines(cleaned_lines) | |
| result = '\n'.join(meaningful_lines) | |
| if result != transcript_text.strip(): | |
| original_len = len(transcript_text.strip()) | |
| cleaned_len = len(result) | |
| reduction = ((original_len - cleaned_len) / original_len * 100) if original_len > 0 else 0 | |
| logger.info( | |
| f"Transcript preprocessed: {original_len} → {cleaned_len} chars " | |
| f"({reduction:.0f}% reduction, {len(meaningful_lines)} lines)" | |
| ) | |
| return result, list(noise_phrases) | |
| def _extract_dialogue_from_csv(lines: List[str]) -> List[str]: | |
| """ | |
| Detect CSV format and extract speaker-prefixed dialogue lines. | |
| If the first line looks like a CSV header (start,end,speaker,text), | |
| parse as CSV and return 'SPEAKER_XX: text' lines. | |
| Otherwise return lines as-is. | |
| """ | |
| # Check for CSV header | |
| first_line = lines[0].strip().lower() | |
| is_csv = first_line.startswith('start,end,speaker,text') or ( | |
| ',' in first_line and any( | |
| kw in first_line for kw in ['speaker', 'start', 'text'] | |
| ) | |
| ) | |
| if not is_csv: | |
| return [l.strip() for l in lines if l.strip()] | |
| # Parse CSV, skipping header | |
| dialogue = [] | |
| csv_text = '\n'.join(lines) | |
| reader = csv.reader(io.StringIO(csv_text)) | |
| for i, row in enumerate(reader): | |
| if i == 0: | |
| # Skip header row | |
| continue | |
| if len(row) >= 4: | |
| speaker = row[2].strip() | |
| text = row[3].strip().strip('"') | |
| if text: | |
| dialogue.append(f"{speaker}: {text}") | |
| elif len(row) >= 1: | |
| # Fallback: take whatever text is there | |
| text = ','.join(row).strip() | |
| if text: | |
| dialogue.append(text) | |
| return dialogue | |
| def _collapse_consecutive_duplicates(lines: List[str]) -> List[str]: | |
| """Remove consecutive duplicate lines (exact match).""" | |
| if not lines: | |
| return [] | |
| result = [lines[0]] | |
| for line in lines[1:]: | |
| if line != result[-1]: | |
| result.append(line) | |
| return result | |
| def _collapse_repeated_phrases(line: str, max_repeats: int = 2) -> str: | |
| """ | |
| Collapse repeated phrases within a single line. | |
| Detects patterns like 'ABC。ABC。ABC。' and reduces to 'ABC。' | |
| Works with Chinese punctuation boundaries. | |
| """ | |
| if not line: | |
| return line | |
| # Split by Chinese/standard sentence boundaries | |
| # Keep the delimiter attached to the preceding segment | |
| segments = re.split(r'(?<=[。!?;\.\!\?\;])', line) | |
| segments = [s.strip() for s in segments if s.strip()] | |
| if len(segments) <= 1: | |
| return line | |
| # Collapse consecutive identical segments | |
| deduped = [segments[0]] | |
| repeat_count = 1 | |
| for seg in segments[1:]: | |
| if seg == deduped[-1]: | |
| repeat_count += 1 | |
| if repeat_count <= max_repeats: | |
| deduped.append(seg) | |
| else: | |
| deduped.append(seg) | |
| repeat_count = 1 | |
| return ''.join(deduped) | |
| def _filter_noise_lines( | |
| lines: List[str], | |
| min_unique_chars: int = 5, | |
| noise_phrase_threshold: int = 5 | |
| ) -> Tuple[List[str], Set[str]]: | |
| """ | |
| Filter out lines that are pure noise (ASR hallucination loops). | |
| A line is noise if: | |
| - It has fewer than min_unique_chars unique non-punctuation characters | |
| - Its content is entirely composed of a single phrase that repeats | |
| across the transcript more than noise_phrase_threshold times | |
| Args: | |
| lines: Preprocessed dialogue lines | |
| min_unique_chars: Minimum unique chars to keep a line | |
| noise_phrase_threshold: A phrase appearing more than this many times | |
| across the transcript is considered noise | |
| Returns: | |
| Tuple of (filtered_lines, noise_phrases) | |
| - filtered_lines: Lines that are not pure noise | |
| - noise_phrases: Set of noise phrases detected | |
| """ | |
| if not lines: | |
| return [], set() | |
| _punct_re = re.compile( | |
| r'[\s\u3000\uff0c\u3002\uff01\uff1f\u3001\uff1b\uff1a' | |
| r'\u201c\u201d\u2018\u2019' | |
| r'\uff08\uff09()\.,!?;:"\'\s]' | |
| ) | |
| def strip_speaker(line: str) -> str: | |
| return re.sub(r'^SPEAKER_\d+:\s*', '', line) | |
| def get_content(text: str) -> str: | |
| return _punct_re.sub('', text) | |
| # Step 1: Split each line into sentence-level segments and count | |
| # how many times each segment appears across the entire transcript. | |
| # This catches ASR hallucination like "並且請留意下方的資訊欄" which | |
| # may repeat within a line and across many lines. | |
| segment_counts: Dict[str, int] = {} | |
| for line in lines: | |
| text = strip_speaker(line) | |
| # Split on Chinese sentence boundaries | |
| segments = re.split(r'[。!?;\.\!\?\;]', text) | |
| seen_in_line: set = set() | |
| for seg in segments: | |
| seg_content = get_content(seg) | |
| if len(seg_content) >= 3 and seg_content not in seen_in_line: | |
| seen_in_line.add(seg_content) | |
| segment_counts[seg_content] = segment_counts.get(seg_content, 0) + 1 | |
| # Step 2: Find noise phrases (segments appearing in too many lines) | |
| noise_phrases = { | |
| phrase for phrase, count in segment_counts.items() | |
| if count >= noise_phrase_threshold | |
| } | |
| # Step 3: For each line, check if it's purely noise | |
| meaningful = [] | |
| for line in lines: | |
| text = strip_speaker(line) | |
| content = get_content(text) | |
| # Skip if too few unique characters | |
| if len(set(content)) < min_unique_chars: | |
| continue | |
| # Check if the line is entirely composed of noise phrases. | |
| # Remove all noise phrase occurrences and see if anything meaningful remains. | |
| remaining = content | |
| for noise in noise_phrases: | |
| remaining = remaining.replace(noise, '') | |
| # If nothing meaningful remains after removing noise, skip this line | |
| if len(remaining.strip()) < min_unique_chars: | |
| continue | |
| meaningful.append(line) | |
| return meaningful, noise_phrases | |
| # ===== EMBEDDING MODELS REGISTRY ===== | |
| EMBEDDING_MODELS = { | |
| "granite-107m": { | |
| "name": "Granite 107M Multilingual (384-dim)", | |
| "repo_id": "bartowski/granite-embedding-107m-multilingual-GGUF", | |
| "filename": "*Q8_0.gguf", | |
| "embedding_dim": 384, | |
| "max_context": 2048, | |
| "description": "Fastest, multilingual, good for quick deduplication", | |
| }, | |
| } | |
| # ===== NATIVE TOKENIZER ===== | |
| class NativeTokenizer: | |
| """ | |
| Simple tokenizer for counting tokens without llama.cpp. | |
| Uses GPT-2 style approximation: ~1 token per 4 characters. | |
| """ | |
| def __init__(self): | |
| """Initialize tokenizer.""" | |
| self.chars_per_token = 4 # Conservative estimate | |
| def count(self, text: str) -> int: | |
| """ | |
| Count tokens in text. | |
| Args: | |
| text: Input text | |
| Returns: | |
| Approximate token count | |
| """ | |
| if not text: | |
| return 0 | |
| # Simple heuristic: 1 token ≈ 4 characters for English | |
| # Adjust for CJK characters (Chinese/Japanese/Korean) | |
| cjk_chars = len(re.findall(r'[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff]', text)) | |
| non_cjk_chars = len(text) - cjk_chars | |
| # CJK: 1 char ≈ 1 token, Non-CJK: 4 chars ≈ 1 token | |
| tokens = cjk_chars + (non_cjk_chars // self.chars_per_token) | |
| return max(1, tokens) # Minimum 1 token | |
| # ===== EMBEDDING MODEL ===== | |
| class EmbeddingModel: | |
| """Wrapper for embedding models used in deduplication.""" | |
| def __init__(self, model_key: str, n_threads: int = 2): | |
| """ | |
| Initialize embedding model. | |
| Args: | |
| model_key: Key from EMBEDDING_MODELS registry | |
| n_threads: CPU threads for inference | |
| """ | |
| if model_key not in EMBEDDING_MODELS: | |
| raise ValueError(f"Unknown embedding model: {model_key}") | |
| self.model_key = model_key | |
| self.config = EMBEDDING_MODELS[model_key] | |
| self.n_threads = n_threads | |
| self.llm: Optional[Llama] = None | |
| def load(self) -> str: | |
| """ | |
| Load embedding model. | |
| Returns: | |
| Info message | |
| """ | |
| logger.info(f"Loading embedding model: {self.config['name']}") | |
| try: | |
| self.llm = Llama.from_pretrained( | |
| repo_id=self.config["repo_id"], | |
| filename=self.config["filename"], | |
| n_ctx=self.config["max_context"], | |
| n_batch=512, | |
| n_threads=self.n_threads, | |
| n_threads_batch=self.n_threads, | |
| n_gpu_layers=0, # CPU only for embeddings | |
| verbose=False, | |
| embeddings=True, # Enable embedding mode | |
| ) | |
| msg = f"✅ Loaded: {self.config['name']} ({self.config['embedding_dim']}-dim)" | |
| logger.info(msg) | |
| return msg | |
| except Exception as e: | |
| error_msg = f"❌ Failed to load {self.model_key}: {str(e)}" | |
| logger.error(error_msg, exc_info=True) | |
| raise Exception(error_msg) | |
| def embed(self, text: str) -> np.ndarray: | |
| """ | |
| Compute embedding for text. | |
| Args: | |
| text: Input text | |
| Returns: | |
| Embedding vector (numpy array) | |
| """ | |
| if self.llm is None: | |
| raise RuntimeError("Model not loaded. Call load() first.") | |
| # Truncate text to max context | |
| # Rough approximation: 1 token ≈ 4 chars | |
| max_chars = self.config["max_context"] * 4 | |
| if len(text) > max_chars: | |
| text = text[:max_chars] | |
| # Get embedding | |
| embedding = self.llm.embed(text) | |
| # Normalize vector | |
| norm = np.linalg.norm(embedding) | |
| if norm > 0: | |
| embedding = embedding / norm | |
| return embedding | |
| def unload(self) -> None: | |
| """Unload model and free memory.""" | |
| if self.llm: | |
| logger.info(f"Unloading embedding model: {self.config['name']}") | |
| del self.llm | |
| self.llm = None | |
| import gc | |
| gc.collect() | |
| time.sleep(0.5) | |
| # ===== HELPER FUNCTIONS ===== | |
| class Window: | |
| """Represents a transcript window for extraction.""" | |
| id: int | |
| content: str | |
| start_turn: int | |
| end_turn: int | |
| token_count: int | |
| def format_progress_ticker( | |
| current_window: int, | |
| total_windows: int, | |
| window_tokens: int, | |
| max_tokens: int, | |
| items_found: Dict[str, int], | |
| tokens_per_sec: float, | |
| eta_seconds: int, | |
| current_snippet: str | |
| ) -> str: | |
| """ | |
| Format progress ticker for extraction UI. | |
| Args: | |
| current_window: Current window number (1-indexed) | |
| total_windows: Total number of windows | |
| window_tokens: Tokens in current window | |
| max_tokens: Maximum tokens (for percentage) | |
| items_found: Dict of {category: count} | |
| tokens_per_sec: Generation speed | |
| eta_seconds: Estimated time to completion | |
| current_snippet: Last extracted item (truncated) | |
| Returns: | |
| Formatted ticker string | |
| """ | |
| # Progress bar | |
| progress_pct = (current_window / total_windows) * 100 | |
| bar_width = 20 | |
| filled = int(bar_width * progress_pct / 100) | |
| bar = "█" * filled + "░" * (bar_width - filled) | |
| # Item counts | |
| action_items = items_found.get("action_items", 0) | |
| decisions = items_found.get("decisions", 0) | |
| key_points = items_found.get("key_points", 0) | |
| questions = items_found.get("open_questions", 0) | |
| total_items = action_items + decisions + key_points + questions | |
| # ETA formatting | |
| if eta_seconds > 60: | |
| eta_str = f"{eta_seconds // 60}m {eta_seconds % 60}s" | |
| else: | |
| eta_str = f"{eta_seconds}s" | |
| # Truncate snippet | |
| snippet = current_snippet[:60] + "..." if len(current_snippet) > 60 else current_snippet | |
| ticker = f""" | |
| 🪟 Window {current_window}/{total_windows} | {bar} {progress_pct:.0f}% | |
| 📊 Extracted: {total_items} items | |
| ✓ Actions: {action_items} | Decisions: {decisions} | Points: {key_points} | Questions: {questions} | |
| ⚡ Speed: {tokens_per_sec:.1f} tok/s | ETA: {eta_str} | |
| 📝 Latest: {snippet} | |
| """ | |
| return ticker.strip() | |
| def cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float: | |
| """ | |
| Compute cosine similarity between two vectors. | |
| Args: | |
| vec1: First vector (normalized) | |
| vec2: Second vector (normalized) | |
| Returns: | |
| Cosine similarity (0.0 to 1.0) | |
| """ | |
| # Vectors should already be normalized, but ensure it | |
| dot_product = np.dot(vec1, vec2) | |
| return float(dot_product) | |
| # ===== HELPER FUNCTIONS ===== | |
| def _repair_truncated_json(text: str) -> str: | |
| """ | |
| Attempt to repair truncated JSON by closing open brackets/strings. | |
| Handles cases where max_tokens cuts off the response mid-JSON, | |
| e.g. a string never closed, an array never closed, etc. | |
| Args: | |
| text: Truncated JSON string | |
| Returns: | |
| Repaired JSON string (best effort) | |
| """ | |
| in_string = False | |
| escape_next = False | |
| stack = [] # tracks open { and [ | |
| for char in text: | |
| if escape_next: | |
| escape_next = False | |
| continue | |
| if char == '\\' and in_string: | |
| escape_next = True | |
| continue | |
| if char == '"' and not escape_next: | |
| in_string = not in_string | |
| continue | |
| if in_string: | |
| continue | |
| if char in ('{', '['): | |
| stack.append(char) | |
| elif char == '}' and stack and stack[-1] == '{': | |
| stack.pop() | |
| elif char == ']' and stack and stack[-1] == '[': | |
| stack.pop() | |
| repair = "" | |
| if in_string: | |
| repair += '"' | |
| for opener in reversed(stack): | |
| if opener == '[': | |
| repair += ']' | |
| elif opener == '{': | |
| repair += '}' | |
| return text + repair | |
| def _normalize_item_to_string(item: Any) -> str: | |
| """ | |
| Normalize an extracted item to a plain string. | |
| Models may output items as strings or as dicts with various fields | |
| (e.g. {"assigned_to": "X", "due_date": "Y"}). This flattens them | |
| to a single descriptive string. | |
| Args: | |
| item: A string or dict from the extraction JSON | |
| Returns: | |
| A plain string representation | |
| """ | |
| if isinstance(item, str): | |
| return item.strip() | |
| if isinstance(item, dict): | |
| parts = [] | |
| for key, value in item.items(): | |
| if value and isinstance(value, str) and value.strip(): | |
| parts.append(f"{key}: {value.strip()}") | |
| return '; '.join(parts) if parts else str(item) | |
| return str(item) | |
| def _normalize_extraction_items(data: Dict[str, list]) -> Dict[str, List[str]]: | |
| """ | |
| Normalize all extracted items to plain strings. | |
| Args: | |
| data: Parsed extraction dict (values may contain dicts or strings) | |
| Returns: | |
| Dict with all values as lists of strings | |
| """ | |
| required_keys = {"action_items", "decisions", "key_points", "open_questions"} | |
| normalized: Dict[str, List[str]] = {} | |
| for key in required_keys: | |
| items = data.get(key, []) | |
| if not isinstance(items, list): | |
| normalized[key] = [] | |
| continue | |
| normalized[key] = [ | |
| s for s in (_normalize_item_to_string(item) for item in items) if s | |
| ] | |
| return normalized | |
| def _try_parse_extraction_json( | |
| text: str, log_repair: bool = False | |
| ) -> Optional[Dict[str, List[str]]]: | |
| """ | |
| Attempt to parse extraction JSON from LLM output. | |
| Handles truncated JSON (from max_tokens cutoff) by repairing | |
| unclosed brackets/strings. Normalizes item formats (dicts -> strings). | |
| Args: | |
| text: Raw LLM output | |
| log_repair: If True, log when repair was needed (use only for | |
| final parse, not streaming chunks) | |
| Returns: | |
| Parsed and normalized dict, or None if unrecoverable | |
| """ | |
| # Remove markdown code blocks | |
| text = re.sub(r'```json\s*', '', text) | |
| text = re.sub(r'```\s*$', '', text) | |
| text = text.strip() | |
| # Attempt 1: parse as-is | |
| data = None | |
| try: | |
| data = json.loads(text) | |
| except json.JSONDecodeError: | |
| pass | |
| # Attempt 2: repair truncated JSON | |
| if data is None: | |
| repaired = _repair_truncated_json(text) | |
| try: | |
| data = json.loads(repaired) | |
| if log_repair: | |
| logger.info("Successfully parsed JSON after repair (output was truncated)") | |
| except json.JSONDecodeError: | |
| pass | |
| # Attempt 3: find outermost { and repair from there | |
| if data is None: | |
| match = re.search(r'\{', text) | |
| if match: | |
| repaired = _repair_truncated_json(text[match.start():]) | |
| try: | |
| data = json.loads(repaired) | |
| if log_repair: | |
| logger.info("Successfully parsed JSON from substring after repair") | |
| except json.JSONDecodeError: | |
| return None | |
| else: | |
| return None | |
| # Validate schema - be lenient and fill missing keys | |
| required_keys = {"action_items", "decisions", "key_points", "open_questions"} | |
| if not isinstance(data, dict): | |
| return None | |
| # Fill missing keys with empty arrays (allow partial extraction) | |
| for key in required_keys: | |
| if key not in data: | |
| data[key] = [] | |
| elif not isinstance(data[key], list): | |
| # If value exists but isn't a list, try to convert or use empty | |
| if isinstance(data[key], str): | |
| data[key] = [data[key]] | |
| else: | |
| data[key] = [] | |
| # Normalize items (flatten dicts to strings) | |
| return _normalize_extraction_items(data) | |
| def _sample_llm_response(text: str, max_chars: int = 400) -> str: | |
| """Sample LLM response for trace logging.""" | |
| if not text: | |
| return "" | |
| return text[:max_chars] if len(text) > max_chars else text | |
| # ===== EXTRACTION PROMPT BUILDERS ===== | |
| def _build_schema_extraction_prompt(output_language: str) -> str: | |
| """Build concise schema-based extraction prompt (optimized for non-reasoning models).""" | |
| if output_language == "zh-TW": | |
| return """從會議逐字稿中提取關鍵資訊,以 JSON 格式返回。 | |
| 範例輸出 (Example): | |
| { | |
| "action_items": ["與三星討論Q3產能分配", "確認LPDDR4供應數量"], | |
| "decisions": ["優先供應大客戶浪潮", "暫停接受新訂單"], | |
| "key_points": ["DDR4缺貨持續到2028年", "AI需求占全球產能45%", "美光可能跟進SanDisk付款條件"], | |
| "open_questions": ["Q2價格漲幅預估", "深圳測試場良率確認"] | |
| } | |
| 使用以下架構,必須返回有效的 JSON: | |
| { | |
| "action_items": ["具體行動項目1", "具體行動項目2"], | |
| "decisions": ["決策1", "決策2"], | |
| "key_points": ["要點1", "要點2", "要點3"], | |
| "open_questions": ["問題1", "問題2"] | |
| } | |
| 說明: | |
| - action_items: 具體行動項目(包含負責人、時間、內容) | |
| - decisions: 已做出的決策(包合理由) | |
| - key_points: 重要討論要點(市場趨勢、供應狀況、策略調整) | |
| - open_questions: 未解決的問題或疑慮 | |
| 規則: | |
| 1. 必須返回有效的 JSON 格式 | |
| 2. 每個類別至少提取1-3個項目,如果沒有則返回空陣列 [] | |
| 3. 項目必須是具體的句子,不是關鍵詞 | |
| 4. 專注於商業決策和行動,忽略重複詞句""" | |
| else: | |
| return """Return data as a JSON object with the following schema: | |
| { | |
| "action_items": ["action item 1"], | |
| "decisions": ["decision 1"], | |
| "key_points": ["point 1"], | |
| "open_questions": ["question 1"] | |
| } | |
| action_items: Specific action items with owner and deadline | |
| decisions: Decisions made with rationale | |
| key_points: Important discussion points | |
| open_questions: Unresolved questions or concerns | |
| Extract from the transcript provided by the user. The transcript may contain repetitions, noise, or incomplete sentences - focus on meaningful dialogue content and ignore repetitive phrases.""" | |
| def _build_reasoning_extraction_prompt(output_language: str) -> str: | |
| """Build verbose extraction prompt with reasoning instructions (for hybrid models like Qwen3).""" | |
| if output_language == "zh-TW": | |
| return """你是會議分析助手。分析會議逐字稿並提取關鍵資訊。 | |
| 分析步驟: | |
| 1. 先理解對話內容:討論了什麼主題?涉及哪些公司/產品? | |
| 2. 識別決策點:有哪些明確的決定或策略調整? | |
| 3. 找出行動項目:需要執行的具體任務 | |
| 4. 記錄重要資訊:市場趨勢、供應狀況、價格變化 | |
| 5. 標記未解決問題:還不清楚或需要後續確認的事項 | |
| 範例輸出: | |
| { | |
| "action_items": ["與三星討論Q3產能分配", "確認LPDDR4供應數量"], | |
| "decisions": ["優先供應大客戶浪潮", "暫停接受新訂單"], | |
| "key_points": ["DDR4缺貨持續到2028年", "AI需求占全球產能45%", "美光可能跟進SanDisk付款條件"], | |
| "open_questions": ["Q2價格漲幅預估", "深圳測試場良率確認"] | |
| } | |
| 返回格式(必須是有效的 JSON): | |
| { | |
| "action_items": ["行動項目1", "行動項目2"], | |
| "decisions": ["決策1", "決策2"], | |
| "key_points": ["要點1", "要點2", "要點3"], | |
| "open_questions": ["問題1", "問題2"] | |
| } | |
| 說明: | |
| - action_items: 具體行動項目(誰要做什麼、何時完成) | |
| - decisions: 已做出的決策(包合理由) | |
| - key_points: 重要討論要點(市場趨勢、供應狀況、策略) | |
| - open_questions: 未解決的問題或疑慮 | |
| 規則: | |
| 1. 必須返回有效的 JSON 格式 | |
| 2. 每個類別提取1-5個項目,如果沒有則返回空陣列 [] | |
| 3. 項目必須是具體的完整句子 | |
| 4. 專注於商業決策和行動,忽略重複詞句和閒聊 | |
| 5. 僅輸出 JSON,不要推理過程""" | |
| else: | |
| return """You are a meeting analysis assistant. | |
| Use your reasoning capabilities to analyze the content before extracting. | |
| Your reasoning should: | |
| 1. Identify key decision points and action items | |
| 2. Distinguish explicit decisions from general discussion | |
| 3. Categorize information appropriately (action vs point vs question) | |
| The transcript may contain repetitions, noise, or incomplete sentences - focus on meaningful dialogue content and ignore repetitive phrases. | |
| After reasoning, return data as a JSON object with the following schema: | |
| { | |
| "action_items": ["action item 1", "action item 2"], | |
| "decisions": ["decision 1", "decision 2"], | |
| "key_points": ["point 1", "point 2"], | |
| "open_questions": ["question 1", "question 2"] | |
| } | |
| action_items: Specific action items with owner and deadline | |
| decisions: Decisions made with rationale | |
| key_points: Important discussion points | |
| open_questions: Unresolved questions or concerns | |
| Rules: | |
| - Each item must be a complete, standalone sentence | |
| - Include context (who, what, when) in each item | |
| - If a category has no items, use empty array [] | |
| - Output ONLY JSON, no markdown, no explanations""" | |
| # ===== CORE PIPELINE FUNCTIONS ===== | |
| def stream_extract_from_window( | |
| extraction_llm: Llama, | |
| window: Window, | |
| window_id: int, | |
| total_windows: int, | |
| tracer: Any, | |
| model_config: Dict[str, Any], | |
| enable_reasoning: bool = False | |
| ) -> Generator[Tuple[str, str, Dict[str, List[str]], bool], None, None]: | |
| """ | |
| Stream extraction from single window with live progress + optional reasoning. | |
| Yields: | |
| (ticker_text, thinking_text, partial_items, is_complete) | |
| - ticker_text: Progress ticker for UI | |
| - thinking_text: Reasoning/thinking blocks (if model supports) | |
| - partial_items: Current extracted items | |
| - is_complete: True on final yield | |
| """ | |
| # Auto-detect language from window content | |
| has_cjk = bool(re.search(r'[\u4e00-\u9fff]', window.content)) | |
| output_language = "zh-TW" if has_cjk else "en" | |
| supports_reasoning = model_config.get("supports_reasoning", False) | |
| supports_toggle = model_config.get("supports_toggle", False) | |
| # Build system prompt based on model type and reasoning mode | |
| # For reasoning-enabled hybrid models, use the verbose prompt with reasoning | |
| # instructions. For non-reasoning models, use the concise schema-based prompt | |
| # optimized for structured extraction. | |
| if enable_reasoning and supports_reasoning: | |
| # Verbose prompt with reasoning instructions (hybrid models like Qwen3) | |
| system_prompt = _build_reasoning_extraction_prompt(output_language) | |
| else: | |
| # Concise schema-based prompt for non-reasoning models | |
| system_prompt = _build_schema_extraction_prompt(output_language) | |
| user_prompt = f"Transcript:\n\n{window.content}" | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| # Stream extraction | |
| full_response = "" | |
| thinking_content = "" | |
| start_time = time.time() | |
| first_token_time = None | |
| token_count = 0 | |
| try: | |
| max_gen_tokens = max(2048, window.token_count // 2) | |
| settings = model_config["inference_settings"].copy() | |
| # Qwen3 models need lower temperature for extraction (not synthesis) | |
| # to avoid empty JSON output on larger windows | |
| if "qwen3" in model_config["repo_id"].lower(): | |
| settings["temperature"] = 0.0 # Use greedy decoding for extraction | |
| stream = extraction_llm.create_chat_completion( | |
| messages=messages, | |
| max_tokens=max_gen_tokens, | |
| temperature=settings["temperature"], | |
| top_p=settings["top_p"], | |
| top_k=settings["top_k"], | |
| repeat_penalty=max(1.2, settings["repeat_penalty"]), | |
| response_format={"type": "json_object"}, | |
| stream=True, | |
| ) | |
| for chunk in stream: | |
| if 'choices' in chunk and len(chunk['choices']) > 0: | |
| delta = chunk['choices'][0].get('delta', {}) | |
| content = delta.get('content', '') | |
| if content: | |
| if first_token_time is None: | |
| first_token_time = time.time() | |
| token_count += 1 | |
| full_response += content | |
| # Parse thinking blocks if reasoning enabled | |
| if enable_reasoning and supports_reasoning: | |
| # Simple regex extraction | |
| thinking_match = re.search(r'<think(?:ing)?>(.*?)</think(?:ing)?>', full_response, re.DOTALL) | |
| if thinking_match: | |
| thinking_content = thinking_match.group(1).strip() | |
| json_text = full_response[:thinking_match.start()] + full_response[thinking_match.end():] | |
| else: | |
| json_text = full_response | |
| else: | |
| json_text = full_response | |
| # Try parse JSON | |
| partial_items = _try_parse_extraction_json(json_text) | |
| if not partial_items: | |
| partial_items = {"action_items": [], "decisions": [], "key_points": [], "open_questions": []} | |
| # Calculate metrics | |
| elapsed = time.time() - start_time | |
| tps = token_count / elapsed if elapsed > 0 else 0 | |
| eta = int((max_gen_tokens - token_count) / tps) if tps > 0 else 0 | |
| # Get item counts | |
| items_found = {k: len(v) for k, v in partial_items.items()} | |
| # Get last item as snippet | |
| last_item = "" | |
| for cat in ["action_items", "decisions", "key_points", "open_questions"]: | |
| if partial_items.get(cat): | |
| last_item = partial_items[cat][-1] | |
| break | |
| # Format ticker | |
| ticker = format_progress_ticker( | |
| current_window=window_id, | |
| total_windows=total_windows, | |
| window_tokens=window.token_count, | |
| max_tokens=4096, | |
| items_found=items_found, | |
| tokens_per_sec=tps, | |
| eta_seconds=eta, | |
| current_snippet=last_item | |
| ) | |
| yield (ticker, thinking_content, partial_items, False) | |
| # Final parse | |
| if enable_reasoning and supports_reasoning: | |
| thinking_match = re.search(r'<think(?:ing)?>(.*?)</think(?:ing)?>', full_response, re.DOTALL) | |
| if thinking_match: | |
| thinking_content = thinking_match.group(1).strip() | |
| json_text = full_response[:thinking_match.start()] + full_response[thinking_match.end():] | |
| else: | |
| json_text = full_response | |
| else: | |
| json_text = full_response | |
| final_items = _try_parse_extraction_json(json_text, log_repair=True) | |
| if not final_items: | |
| # Graceful degradation: log warning but don't crash pipeline. | |
| # Other windows may still succeed and produce useful data. | |
| error_msg = f"Failed to parse JSON from window {window_id}" | |
| debug_output = f"{error_msg}\n\nRaw LLM output:\n{full_response[:1000]}\n" | |
| logger.warning(debug_output) | |
| print(f"\n{'='*80}\n{debug_output}{'='*80}\n", flush=True) | |
| tracer.log_extraction( | |
| window_id=window_id, | |
| extraction=None, | |
| llm_response=_sample_llm_response(full_response), | |
| error=error_msg | |
| ) | |
| # Graceful degradation: log warning but don't crash the pipeline. | |
| # Other windows may still succeed and produce useful data. | |
| error_msg = f"Failed to parse JSON from window {window_id}" | |
| debug_output = f"{error_msg}\n\nRaw LLM output:\n{full_response[:1000]}\n" | |
| logger.warning(debug_output) | |
| print(f"\n{'='*80}\n{debug_output}{'='*80}\n", flush=True) | |
| tracer.log_extraction( | |
| window_id=window_id, | |
| extraction=None, | |
| llm_response=_sample_llm_response(full_response), | |
| error=error_msg | |
| ) | |
| # Yield empty result instead of crashing | |
| empty_items = { | |
| "action_items": [], "decisions": [], | |
| "key_points": [], "open_questions": [] | |
| } | |
| ticker = format_progress_ticker( | |
| current_window=window_id, | |
| total_windows=total_windows, | |
| window_tokens=window.token_count, | |
| max_tokens=4096, | |
| items_found={k: 0 for k in empty_items}, | |
| tokens_per_sec=0, | |
| eta_seconds=0, | |
| current_snippet=f"⚠️ Window {window_id} parse failed, continuing..." | |
| ) | |
| yield (ticker, thinking_content, empty_items, True) | |
| # Log failed extraction to tracer for debugging | |
| tracer.log_extraction_detail( | |
| window_id=window_id, | |
| extracted_items=empty_items, | |
| full_llm_response=full_response, | |
| full_thinking=thinking_content, | |
| json_repaired=False, | |
| parse_attempts=1 | |
| ) | |
| return | |
| # Log success | |
| tracer.log_extraction( | |
| window_id=window_id, | |
| extraction=final_items, | |
| llm_response=_sample_llm_response(full_response), | |
| thinking=_sample_llm_response(thinking_content) if thinking_content else None, | |
| error=None | |
| ) | |
| # Log detailed extraction info for debugging | |
| json_repaired = False | |
| parse_attempts = 1 | |
| # Check if the JSON was repaired by examining the parse function | |
| # This is a heuristic - the actual parse_attempts would be tracked inside _try_parse_extraction_json | |
| try: | |
| json.loads(full_response) | |
| except json.JSONDecodeError: | |
| json_repaired = True | |
| parse_attempts = 2 | |
| tracer.log_extraction_detail( | |
| window_id=window_id, | |
| extracted_items=final_items, | |
| full_llm_response=full_response, | |
| full_thinking=thinking_content, | |
| json_repaired=json_repaired, | |
| parse_attempts=parse_attempts | |
| ) | |
| # Final ticker | |
| elapsed = time.time() - start_time | |
| tps = token_count / elapsed if elapsed > 0 else 0 | |
| items_found = {k: len(v) for k, v in final_items.items()} | |
| ticker = format_progress_ticker( | |
| current_window=window_id, | |
| total_windows=total_windows, | |
| window_tokens=window.token_count, | |
| max_tokens=4096, | |
| items_found=items_found, | |
| tokens_per_sec=tps, | |
| eta_seconds=0, | |
| current_snippet="✅ Extraction complete" | |
| ) | |
| yield (ticker, thinking_content, final_items, True) | |
| except Exception as e: | |
| tracer.log_extraction( | |
| window_id=window_id, | |
| extraction=None, | |
| llm_response=_sample_llm_response(full_response) if full_response else "", | |
| error=str(e) | |
| ) | |
| raise | |
| def deduplicate_items( | |
| all_items: Dict[str, List[str]], | |
| embedding_model: EmbeddingModel, | |
| similarity_threshold: float, | |
| tracer: Any | |
| ) -> Generator[Dict[str, List[str]], None, None]: | |
| """ | |
| Deduplicate items across all categories using embeddings. | |
| Args: | |
| all_items: Dict of {category: [items]} | |
| embedding_model: Loaded embedding model | |
| similarity_threshold: Cosine similarity threshold (0.0-1.0) | |
| tracer: Tracer instance | |
| Yields: | |
| Intermediate deduplication results for progress tracking | |
| """ | |
| deduplicated = {} | |
| for category, items in all_items.items(): | |
| if not items: | |
| deduplicated[category] = [] | |
| continue | |
| original_count = len(items) | |
| # Compute embeddings for all items | |
| embeddings = [] | |
| for item in items: | |
| emb = embedding_model.embed(item) | |
| embeddings.append(emb) | |
| # Mark duplicates and track duplicate groups | |
| keep_indices = [] | |
| duplicate_groups = [] | |
| for i in range(len(items)): | |
| is_duplicate = False | |
| duplicate_of_idx = -1 | |
| similarity_score = 0.0 | |
| # Compare with all previously kept items | |
| for j in keep_indices: | |
| similarity = cosine_similarity(embeddings[i], embeddings[j]) | |
| if similarity >= similarity_threshold: | |
| is_duplicate = True | |
| duplicate_of_idx = j | |
| similarity_score = similarity | |
| break | |
| if not is_duplicate: | |
| keep_indices.append(i) | |
| else: | |
| # Record duplicate group for debugging | |
| duplicate_groups.append({ | |
| "duplicate_item": items[i], | |
| "duplicate_index": i, | |
| "kept_item": items[duplicate_of_idx], | |
| "kept_index": duplicate_of_idx, | |
| "similarity": round(similarity_score, 3), | |
| }) | |
| # Keep only unique items | |
| unique_items = [items[i] for i in keep_indices] | |
| deduplicated[category] = unique_items | |
| # Log deduplication with full details | |
| duplicates_removed = original_count - len(unique_items) | |
| tracer.log_deduplication( | |
| category=category, | |
| original_count=original_count, | |
| deduplicated_count=len(unique_items), | |
| duplicates_removed=duplicates_removed, | |
| similarity_threshold=similarity_threshold, | |
| embedding_model=embedding_model.model_key, | |
| original_items=items, | |
| deduplicated_items=unique_items, | |
| duplicate_groups=duplicate_groups | |
| ) | |
| logger.info(f"Dedup {category}: {original_count} → {len(unique_items)} ({duplicates_removed} removed)") | |
| # Yield intermediate results for progress tracking | |
| yield deduplicated | |
| return | |
| def stream_synthesize_executive_summary( | |
| synthesis_llm: Llama, | |
| deduplicated_items: Dict[str, List[str]], | |
| model_config: Dict[str, Any], | |
| output_language: str, | |
| enable_reasoning: bool, | |
| max_tokens: int, | |
| tracer: Any | |
| ) -> Generator[Tuple[str, str, bool], None, None]: | |
| """ | |
| Stream synthesis of executive summary from deduplicated items. | |
| Yields: | |
| (summary_text, thinking_text, is_complete) | |
| """ | |
| # Build synthesis prompt | |
| item_counts = {k: len(v) for k, v in deduplicated_items.items()} | |
| # Format items for prompt | |
| items_text = "" | |
| for category, items in deduplicated_items.items(): | |
| if items: | |
| category_label = { | |
| "action_items": "Action Items" if output_language == "en" else "行動項目", | |
| "decisions": "Decisions" if output_language == "en" else "決策", | |
| "key_points": "Key Points" if output_language == "en" else "關鍵要點", | |
| "open_questions": "Open Questions" if output_language == "en" else "未解決問題" | |
| }.get(category, category) | |
| items_text += f"\n{category_label}:\n" | |
| for i, item in enumerate(items, 1): | |
| items_text += f"{i}. {item}\n" | |
| if output_language == "zh-TW": | |
| system_prompt = """你是執行摘要專家。請根據提供的結構化資訊,生成簡潔、專業的執行摘要。 | |
| 輸出格式要求: | |
| - 使用 **粗體標題** 標記各部分 | |
| - 使用項目符號(bullet points)列出重點 | |
| - 每個重點簡短有力,不超過一句話 | |
| - 不要說明、不要解釋、不要自我分析 | |
| - 直接輸出執行摘要,不要說"首先..."、"我注意到..."等開頭語 | |
| 示例格式: | |
| **核心重點** | |
| - DDR4 今年是供應鏈優先項目 | |
| - 預計 2027 年 Q1 開始產出 | |
| **主要行動** | |
| - 協調三星、海力士等供應商 | |
| - 優先保障嵌入式市場供給 | |
| **未解決問題** | |
| - DDR4 產能不足仍待解決""" | |
| user_prompt = f"基於以下結構化資訊生成執行摘要:\n{items_text}\n\n請按照上述格式要求,直接輸出執行摘要。" | |
| else: | |
| system_prompt = """You are an executive summary expert. Generate concise, professional executive summaries based on structured information. | |
| Output format requirements: | |
| - Use **BOLD HEADERS** for each section | |
| - Use bullet points for key points | |
| - Keep each point brief and powerful (one sentence max) | |
| - NO explanations, NO analysis, NO self-reflection | |
| - Start directly with the summary, do NOT use phrases like "First I need to...", "Let me analyze...", "I noticed..." | |
| Example format: | |
| **Key Priorities** | |
| - DDR4 is supply chain priority this year | |
| - Production expected to start in 2027 Q1 | |
| **Main Actions** | |
| - Coordinate with Samsung, Hynix, and other suppliers | |
| - Prioritize embedded market supply | |
| **Open Issues** | |
| - DDR4 capacity shortage still needs resolution""" | |
| user_prompt = f"Generate an executive summary based on these structured items:\n{items_text}\n\nPlease follow the format requirements above and output the executive summary directly." | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| # Stream synthesis | |
| full_summary = "" | |
| thinking_content = "" | |
| try: | |
| settings = model_config["inference_settings"] | |
| stream = synthesis_llm.create_chat_completion( | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| temperature=settings["temperature"], | |
| top_p=settings["top_p"], | |
| top_k=settings["top_k"], | |
| repeat_penalty=settings["repeat_penalty"], | |
| stream=True, | |
| ) | |
| for chunk in stream: | |
| if 'choices' in chunk and len(chunk['choices']) > 0: | |
| delta = chunk['choices'][0].get('delta', {}) | |
| content = delta.get('content', '') | |
| if content: | |
| full_summary += content | |
| # Parse thinking if reasoning enabled | |
| if enable_reasoning and model_config.get("supports_reasoning"): | |
| thinking_match = re.search(r'<think(?:ing)?>(.*?)</think(?:ing)?>', full_summary, re.DOTALL) | |
| if thinking_match: | |
| thinking_content = thinking_match.group(1).strip() | |
| summary_text = full_summary[:thinking_match.start()] + full_summary[thinking_match.end():] | |
| else: | |
| summary_text = full_summary | |
| else: | |
| summary_text = full_summary | |
| yield (summary_text, thinking_content, False) | |
| # Final parse | |
| if enable_reasoning and model_config.get("supports_reasoning"): | |
| thinking_match = re.search(r'<think(?:ing)?>(.*?)</think(?:ing)?>', full_summary, re.DOTALL) | |
| if thinking_match: | |
| thinking_content = thinking_match.group(1).strip() | |
| summary_text = full_summary[:thinking_match.start()] + full_summary[thinking_match.end():] | |
| else: | |
| summary_text = full_summary | |
| else: | |
| summary_text = full_summary | |
| # Log synthesis with full details | |
| tracer.log_synthesis( | |
| synthesis_model=model_config["name"], | |
| input_item_counts=item_counts, | |
| output_summary=_sample_llm_response(summary_text), | |
| thinking=_sample_llm_response(thinking_content) if thinking_content else None, | |
| error=None, | |
| input_items=deduplicated_items, | |
| system_prompt=system_prompt, | |
| user_prompt=user_prompt | |
| ) | |
| # Also store full outputs in synthesis_details directly | |
| tracer.synthesis_details["full_output_summary"] = summary_text | |
| tracer.synthesis_details["full_thinking"] = thinking_content | |
| yield (summary_text, thinking_content, True) | |
| except Exception as e: | |
| tracer.log_synthesis( | |
| synthesis_model=model_config["name"], | |
| input_item_counts=item_counts, | |
| output_summary="", | |
| thinking=None, | |
| error=str(e), | |
| input_items=deduplicated_items, | |
| system_prompt=system_prompt, | |
| user_prompt=user_prompt | |
| ) | |
| raise | |