Luigi's picture
Keep only Granite-107M embedding model
de2e4cb
"""
Advanced Extraction Pipeline
Provides:
0. preprocess_transcript - Clean noisy CSV transcripts before extraction
1. EMBEDDING_MODELS registry (4 models for deduplication)
2. NativeTokenizer - Count tokens without llama.cpp
3. EmbeddingModel - Load/compute embeddings
4. format_progress_ticker - Live UI updates
5. stream_extract_from_window - Stage 1: Extraction
6. deduplicate_items - Stage 2: Deduplication
7. stream_synthesize_executive_summary - Stage 3: Synthesis
"""
import csv
import io
import re
import json
import time
import logging
from typing import Dict, List, Any, Tuple, Generator, Optional, Set
from dataclasses import dataclass
import numpy as np
from llama_cpp import Llama
logger = logging.getLogger(__name__)
# ===== TRANSCRIPT PREPROCESSING =====
def preprocess_transcript(transcript_text: str) -> Tuple[str, List[str]]:
"""
Clean noisy transcript text before extraction.
Handles:
1. CSV format detection and text column extraction
2. Speaker label prefixing (for context)
3. Collapsing consecutive duplicate lines
4. Collapsing repeated phrases within lines
5. Filtering lines that are pure noise (no meaningful content)
Args:
transcript_text: Raw transcript (CSV or plain text)
Returns:
Tuple of (cleaned_dialogue_text, noise_phrases_list)
- cleaned_dialogue_text: Cleaned dialogue text with speaker labels
- noise_phrases_list: List of noise phrases detected and removed
"""
raw_lines = transcript_text.strip().split('\n')
if not raw_lines:
return "", []
# Step 1: Detect CSV format and extract dialogue
dialogue_lines = _extract_dialogue_from_csv(raw_lines)
# Step 2: Collapse consecutive duplicate lines
deduped_lines = _collapse_consecutive_duplicates(dialogue_lines)
# Step 3: Clean repeated phrases within each line
cleaned_lines = []
for line in deduped_lines:
cleaned = _collapse_repeated_phrases(line)
if cleaned:
cleaned_lines.append(cleaned)
# Step 4: Filter lines that are pure noise
meaningful_lines, noise_phrases = _filter_noise_lines(cleaned_lines)
result = '\n'.join(meaningful_lines)
if result != transcript_text.strip():
original_len = len(transcript_text.strip())
cleaned_len = len(result)
reduction = ((original_len - cleaned_len) / original_len * 100) if original_len > 0 else 0
logger.info(
f"Transcript preprocessed: {original_len}{cleaned_len} chars "
f"({reduction:.0f}% reduction, {len(meaningful_lines)} lines)"
)
return result, list(noise_phrases)
def _extract_dialogue_from_csv(lines: List[str]) -> List[str]:
"""
Detect CSV format and extract speaker-prefixed dialogue lines.
If the first line looks like a CSV header (start,end,speaker,text),
parse as CSV and return 'SPEAKER_XX: text' lines.
Otherwise return lines as-is.
"""
# Check for CSV header
first_line = lines[0].strip().lower()
is_csv = first_line.startswith('start,end,speaker,text') or (
',' in first_line and any(
kw in first_line for kw in ['speaker', 'start', 'text']
)
)
if not is_csv:
return [l.strip() for l in lines if l.strip()]
# Parse CSV, skipping header
dialogue = []
csv_text = '\n'.join(lines)
reader = csv.reader(io.StringIO(csv_text))
for i, row in enumerate(reader):
if i == 0:
# Skip header row
continue
if len(row) >= 4:
speaker = row[2].strip()
text = row[3].strip().strip('"')
if text:
dialogue.append(f"{speaker}: {text}")
elif len(row) >= 1:
# Fallback: take whatever text is there
text = ','.join(row).strip()
if text:
dialogue.append(text)
return dialogue
def _collapse_consecutive_duplicates(lines: List[str]) -> List[str]:
"""Remove consecutive duplicate lines (exact match)."""
if not lines:
return []
result = [lines[0]]
for line in lines[1:]:
if line != result[-1]:
result.append(line)
return result
def _collapse_repeated_phrases(line: str, max_repeats: int = 2) -> str:
"""
Collapse repeated phrases within a single line.
Detects patterns like 'ABC。ABC。ABC。' and reduces to 'ABC。'
Works with Chinese punctuation boundaries.
"""
if not line:
return line
# Split by Chinese/standard sentence boundaries
# Keep the delimiter attached to the preceding segment
segments = re.split(r'(?<=[。!?;\.\!\?\;])', line)
segments = [s.strip() for s in segments if s.strip()]
if len(segments) <= 1:
return line
# Collapse consecutive identical segments
deduped = [segments[0]]
repeat_count = 1
for seg in segments[1:]:
if seg == deduped[-1]:
repeat_count += 1
if repeat_count <= max_repeats:
deduped.append(seg)
else:
deduped.append(seg)
repeat_count = 1
return ''.join(deduped)
def _filter_noise_lines(
lines: List[str],
min_unique_chars: int = 5,
noise_phrase_threshold: int = 5
) -> Tuple[List[str], Set[str]]:
"""
Filter out lines that are pure noise (ASR hallucination loops).
A line is noise if:
- It has fewer than min_unique_chars unique non-punctuation characters
- Its content is entirely composed of a single phrase that repeats
across the transcript more than noise_phrase_threshold times
Args:
lines: Preprocessed dialogue lines
min_unique_chars: Minimum unique chars to keep a line
noise_phrase_threshold: A phrase appearing more than this many times
across the transcript is considered noise
Returns:
Tuple of (filtered_lines, noise_phrases)
- filtered_lines: Lines that are not pure noise
- noise_phrases: Set of noise phrases detected
"""
if not lines:
return [], set()
_punct_re = re.compile(
r'[\s\u3000\uff0c\u3002\uff01\uff1f\u3001\uff1b\uff1a'
r'\u201c\u201d\u2018\u2019'
r'\uff08\uff09()\.,!?;:"\'\s]'
)
def strip_speaker(line: str) -> str:
return re.sub(r'^SPEAKER_\d+:\s*', '', line)
def get_content(text: str) -> str:
return _punct_re.sub('', text)
# Step 1: Split each line into sentence-level segments and count
# how many times each segment appears across the entire transcript.
# This catches ASR hallucination like "並且請留意下方的資訊欄" which
# may repeat within a line and across many lines.
segment_counts: Dict[str, int] = {}
for line in lines:
text = strip_speaker(line)
# Split on Chinese sentence boundaries
segments = re.split(r'[。!?;\.\!\?\;]', text)
seen_in_line: set = set()
for seg in segments:
seg_content = get_content(seg)
if len(seg_content) >= 3 and seg_content not in seen_in_line:
seen_in_line.add(seg_content)
segment_counts[seg_content] = segment_counts.get(seg_content, 0) + 1
# Step 2: Find noise phrases (segments appearing in too many lines)
noise_phrases = {
phrase for phrase, count in segment_counts.items()
if count >= noise_phrase_threshold
}
# Step 3: For each line, check if it's purely noise
meaningful = []
for line in lines:
text = strip_speaker(line)
content = get_content(text)
# Skip if too few unique characters
if len(set(content)) < min_unique_chars:
continue
# Check if the line is entirely composed of noise phrases.
# Remove all noise phrase occurrences and see if anything meaningful remains.
remaining = content
for noise in noise_phrases:
remaining = remaining.replace(noise, '')
# If nothing meaningful remains after removing noise, skip this line
if len(remaining.strip()) < min_unique_chars:
continue
meaningful.append(line)
return meaningful, noise_phrases
# ===== EMBEDDING MODELS REGISTRY =====
EMBEDDING_MODELS = {
"granite-107m": {
"name": "Granite 107M Multilingual (384-dim)",
"repo_id": "bartowski/granite-embedding-107m-multilingual-GGUF",
"filename": "*Q8_0.gguf",
"embedding_dim": 384,
"max_context": 2048,
"description": "Fastest, multilingual, good for quick deduplication",
},
}
# ===== NATIVE TOKENIZER =====
class NativeTokenizer:
"""
Simple tokenizer for counting tokens without llama.cpp.
Uses GPT-2 style approximation: ~1 token per 4 characters.
"""
def __init__(self):
"""Initialize tokenizer."""
self.chars_per_token = 4 # Conservative estimate
def count(self, text: str) -> int:
"""
Count tokens in text.
Args:
text: Input text
Returns:
Approximate token count
"""
if not text:
return 0
# Simple heuristic: 1 token ≈ 4 characters for English
# Adjust for CJK characters (Chinese/Japanese/Korean)
cjk_chars = len(re.findall(r'[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff]', text))
non_cjk_chars = len(text) - cjk_chars
# CJK: 1 char ≈ 1 token, Non-CJK: 4 chars ≈ 1 token
tokens = cjk_chars + (non_cjk_chars // self.chars_per_token)
return max(1, tokens) # Minimum 1 token
# ===== EMBEDDING MODEL =====
class EmbeddingModel:
"""Wrapper for embedding models used in deduplication."""
def __init__(self, model_key: str, n_threads: int = 2):
"""
Initialize embedding model.
Args:
model_key: Key from EMBEDDING_MODELS registry
n_threads: CPU threads for inference
"""
if model_key not in EMBEDDING_MODELS:
raise ValueError(f"Unknown embedding model: {model_key}")
self.model_key = model_key
self.config = EMBEDDING_MODELS[model_key]
self.n_threads = n_threads
self.llm: Optional[Llama] = None
def load(self) -> str:
"""
Load embedding model.
Returns:
Info message
"""
logger.info(f"Loading embedding model: {self.config['name']}")
try:
self.llm = Llama.from_pretrained(
repo_id=self.config["repo_id"],
filename=self.config["filename"],
n_ctx=self.config["max_context"],
n_batch=512,
n_threads=self.n_threads,
n_threads_batch=self.n_threads,
n_gpu_layers=0, # CPU only for embeddings
verbose=False,
embeddings=True, # Enable embedding mode
)
msg = f"✅ Loaded: {self.config['name']} ({self.config['embedding_dim']}-dim)"
logger.info(msg)
return msg
except Exception as e:
error_msg = f"❌ Failed to load {self.model_key}: {str(e)}"
logger.error(error_msg, exc_info=True)
raise Exception(error_msg)
def embed(self, text: str) -> np.ndarray:
"""
Compute embedding for text.
Args:
text: Input text
Returns:
Embedding vector (numpy array)
"""
if self.llm is None:
raise RuntimeError("Model not loaded. Call load() first.")
# Truncate text to max context
# Rough approximation: 1 token ≈ 4 chars
max_chars = self.config["max_context"] * 4
if len(text) > max_chars:
text = text[:max_chars]
# Get embedding
embedding = self.llm.embed(text)
# Normalize vector
norm = np.linalg.norm(embedding)
if norm > 0:
embedding = embedding / norm
return embedding
def unload(self) -> None:
"""Unload model and free memory."""
if self.llm:
logger.info(f"Unloading embedding model: {self.config['name']}")
del self.llm
self.llm = None
import gc
gc.collect()
time.sleep(0.5)
# ===== HELPER FUNCTIONS =====
@dataclass
class Window:
"""Represents a transcript window for extraction."""
id: int
content: str
start_turn: int
end_turn: int
token_count: int
def format_progress_ticker(
current_window: int,
total_windows: int,
window_tokens: int,
max_tokens: int,
items_found: Dict[str, int],
tokens_per_sec: float,
eta_seconds: int,
current_snippet: str
) -> str:
"""
Format progress ticker for extraction UI.
Args:
current_window: Current window number (1-indexed)
total_windows: Total number of windows
window_tokens: Tokens in current window
max_tokens: Maximum tokens (for percentage)
items_found: Dict of {category: count}
tokens_per_sec: Generation speed
eta_seconds: Estimated time to completion
current_snippet: Last extracted item (truncated)
Returns:
Formatted ticker string
"""
# Progress bar
progress_pct = (current_window / total_windows) * 100
bar_width = 20
filled = int(bar_width * progress_pct / 100)
bar = "█" * filled + "░" * (bar_width - filled)
# Item counts
action_items = items_found.get("action_items", 0)
decisions = items_found.get("decisions", 0)
key_points = items_found.get("key_points", 0)
questions = items_found.get("open_questions", 0)
total_items = action_items + decisions + key_points + questions
# ETA formatting
if eta_seconds > 60:
eta_str = f"{eta_seconds // 60}m {eta_seconds % 60}s"
else:
eta_str = f"{eta_seconds}s"
# Truncate snippet
snippet = current_snippet[:60] + "..." if len(current_snippet) > 60 else current_snippet
ticker = f"""
🪟 Window {current_window}/{total_windows} | {bar} {progress_pct:.0f}%
📊 Extracted: {total_items} items
✓ Actions: {action_items} | Decisions: {decisions} | Points: {key_points} | Questions: {questions}
⚡ Speed: {tokens_per_sec:.1f} tok/s | ETA: {eta_str}
📝 Latest: {snippet}
"""
return ticker.strip()
def cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float:
"""
Compute cosine similarity between two vectors.
Args:
vec1: First vector (normalized)
vec2: Second vector (normalized)
Returns:
Cosine similarity (0.0 to 1.0)
"""
# Vectors should already be normalized, but ensure it
dot_product = np.dot(vec1, vec2)
return float(dot_product)
# ===== HELPER FUNCTIONS =====
def _repair_truncated_json(text: str) -> str:
"""
Attempt to repair truncated JSON by closing open brackets/strings.
Handles cases where max_tokens cuts off the response mid-JSON,
e.g. a string never closed, an array never closed, etc.
Args:
text: Truncated JSON string
Returns:
Repaired JSON string (best effort)
"""
in_string = False
escape_next = False
stack = [] # tracks open { and [
for char in text:
if escape_next:
escape_next = False
continue
if char == '\\' and in_string:
escape_next = True
continue
if char == '"' and not escape_next:
in_string = not in_string
continue
if in_string:
continue
if char in ('{', '['):
stack.append(char)
elif char == '}' and stack and stack[-1] == '{':
stack.pop()
elif char == ']' and stack and stack[-1] == '[':
stack.pop()
repair = ""
if in_string:
repair += '"'
for opener in reversed(stack):
if opener == '[':
repair += ']'
elif opener == '{':
repair += '}'
return text + repair
def _normalize_item_to_string(item: Any) -> str:
"""
Normalize an extracted item to a plain string.
Models may output items as strings or as dicts with various fields
(e.g. {"assigned_to": "X", "due_date": "Y"}). This flattens them
to a single descriptive string.
Args:
item: A string or dict from the extraction JSON
Returns:
A plain string representation
"""
if isinstance(item, str):
return item.strip()
if isinstance(item, dict):
parts = []
for key, value in item.items():
if value and isinstance(value, str) and value.strip():
parts.append(f"{key}: {value.strip()}")
return '; '.join(parts) if parts else str(item)
return str(item)
def _normalize_extraction_items(data: Dict[str, list]) -> Dict[str, List[str]]:
"""
Normalize all extracted items to plain strings.
Args:
data: Parsed extraction dict (values may contain dicts or strings)
Returns:
Dict with all values as lists of strings
"""
required_keys = {"action_items", "decisions", "key_points", "open_questions"}
normalized: Dict[str, List[str]] = {}
for key in required_keys:
items = data.get(key, [])
if not isinstance(items, list):
normalized[key] = []
continue
normalized[key] = [
s for s in (_normalize_item_to_string(item) for item in items) if s
]
return normalized
def _try_parse_extraction_json(
text: str, log_repair: bool = False
) -> Optional[Dict[str, List[str]]]:
"""
Attempt to parse extraction JSON from LLM output.
Handles truncated JSON (from max_tokens cutoff) by repairing
unclosed brackets/strings. Normalizes item formats (dicts -> strings).
Args:
text: Raw LLM output
log_repair: If True, log when repair was needed (use only for
final parse, not streaming chunks)
Returns:
Parsed and normalized dict, or None if unrecoverable
"""
# Remove markdown code blocks
text = re.sub(r'```json\s*', '', text)
text = re.sub(r'```\s*$', '', text)
text = text.strip()
# Attempt 1: parse as-is
data = None
try:
data = json.loads(text)
except json.JSONDecodeError:
pass
# Attempt 2: repair truncated JSON
if data is None:
repaired = _repair_truncated_json(text)
try:
data = json.loads(repaired)
if log_repair:
logger.info("Successfully parsed JSON after repair (output was truncated)")
except json.JSONDecodeError:
pass
# Attempt 3: find outermost { and repair from there
if data is None:
match = re.search(r'\{', text)
if match:
repaired = _repair_truncated_json(text[match.start():])
try:
data = json.loads(repaired)
if log_repair:
logger.info("Successfully parsed JSON from substring after repair")
except json.JSONDecodeError:
return None
else:
return None
# Validate schema - be lenient and fill missing keys
required_keys = {"action_items", "decisions", "key_points", "open_questions"}
if not isinstance(data, dict):
return None
# Fill missing keys with empty arrays (allow partial extraction)
for key in required_keys:
if key not in data:
data[key] = []
elif not isinstance(data[key], list):
# If value exists but isn't a list, try to convert or use empty
if isinstance(data[key], str):
data[key] = [data[key]]
else:
data[key] = []
# Normalize items (flatten dicts to strings)
return _normalize_extraction_items(data)
def _sample_llm_response(text: str, max_chars: int = 400) -> str:
"""Sample LLM response for trace logging."""
if not text:
return ""
return text[:max_chars] if len(text) > max_chars else text
# ===== EXTRACTION PROMPT BUILDERS =====
def _build_schema_extraction_prompt(output_language: str) -> str:
"""Build concise schema-based extraction prompt (optimized for non-reasoning models)."""
if output_language == "zh-TW":
return """從會議逐字稿中提取關鍵資訊,以 JSON 格式返回。
範例輸出 (Example):
{
"action_items": ["與三星討論Q3產能分配", "確認LPDDR4供應數量"],
"decisions": ["優先供應大客戶浪潮", "暫停接受新訂單"],
"key_points": ["DDR4缺貨持續到2028年", "AI需求占全球產能45%", "美光可能跟進SanDisk付款條件"],
"open_questions": ["Q2價格漲幅預估", "深圳測試場良率確認"]
}
使用以下架構,必須返回有效的 JSON:
{
"action_items": ["具體行動項目1", "具體行動項目2"],
"decisions": ["決策1", "決策2"],
"key_points": ["要點1", "要點2", "要點3"],
"open_questions": ["問題1", "問題2"]
}
說明:
- action_items: 具體行動項目(包含負責人、時間、內容)
- decisions: 已做出的決策(包合理由)
- key_points: 重要討論要點(市場趨勢、供應狀況、策略調整)
- open_questions: 未解決的問題或疑慮
規則:
1. 必須返回有效的 JSON 格式
2. 每個類別至少提取1-3個項目,如果沒有則返回空陣列 []
3. 項目必須是具體的句子,不是關鍵詞
4. 專注於商業決策和行動,忽略重複詞句"""
else:
return """Return data as a JSON object with the following schema:
{
"action_items": ["action item 1"],
"decisions": ["decision 1"],
"key_points": ["point 1"],
"open_questions": ["question 1"]
}
action_items: Specific action items with owner and deadline
decisions: Decisions made with rationale
key_points: Important discussion points
open_questions: Unresolved questions or concerns
Extract from the transcript provided by the user. The transcript may contain repetitions, noise, or incomplete sentences - focus on meaningful dialogue content and ignore repetitive phrases."""
def _build_reasoning_extraction_prompt(output_language: str) -> str:
"""Build verbose extraction prompt with reasoning instructions (for hybrid models like Qwen3)."""
if output_language == "zh-TW":
return """你是會議分析助手。分析會議逐字稿並提取關鍵資訊。
分析步驟:
1. 先理解對話內容:討論了什麼主題?涉及哪些公司/產品?
2. 識別決策點:有哪些明確的決定或策略調整?
3. 找出行動項目:需要執行的具體任務
4. 記錄重要資訊:市場趨勢、供應狀況、價格變化
5. 標記未解決問題:還不清楚或需要後續確認的事項
範例輸出:
{
"action_items": ["與三星討論Q3產能分配", "確認LPDDR4供應數量"],
"decisions": ["優先供應大客戶浪潮", "暫停接受新訂單"],
"key_points": ["DDR4缺貨持續到2028年", "AI需求占全球產能45%", "美光可能跟進SanDisk付款條件"],
"open_questions": ["Q2價格漲幅預估", "深圳測試場良率確認"]
}
返回格式(必須是有效的 JSON):
{
"action_items": ["行動項目1", "行動項目2"],
"decisions": ["決策1", "決策2"],
"key_points": ["要點1", "要點2", "要點3"],
"open_questions": ["問題1", "問題2"]
}
說明:
- action_items: 具體行動項目(誰要做什麼、何時完成)
- decisions: 已做出的決策(包合理由)
- key_points: 重要討論要點(市場趨勢、供應狀況、策略)
- open_questions: 未解決的問題或疑慮
規則:
1. 必須返回有效的 JSON 格式
2. 每個類別提取1-5個項目,如果沒有則返回空陣列 []
3. 項目必須是具體的完整句子
4. 專注於商業決策和行動,忽略重複詞句和閒聊
5. 僅輸出 JSON,不要推理過程"""
else:
return """You are a meeting analysis assistant.
Use your reasoning capabilities to analyze the content before extracting.
Your reasoning should:
1. Identify key decision points and action items
2. Distinguish explicit decisions from general discussion
3. Categorize information appropriately (action vs point vs question)
The transcript may contain repetitions, noise, or incomplete sentences - focus on meaningful dialogue content and ignore repetitive phrases.
After reasoning, return data as a JSON object with the following schema:
{
"action_items": ["action item 1", "action item 2"],
"decisions": ["decision 1", "decision 2"],
"key_points": ["point 1", "point 2"],
"open_questions": ["question 1", "question 2"]
}
action_items: Specific action items with owner and deadline
decisions: Decisions made with rationale
key_points: Important discussion points
open_questions: Unresolved questions or concerns
Rules:
- Each item must be a complete, standalone sentence
- Include context (who, what, when) in each item
- If a category has no items, use empty array []
- Output ONLY JSON, no markdown, no explanations"""
# ===== CORE PIPELINE FUNCTIONS =====
def stream_extract_from_window(
extraction_llm: Llama,
window: Window,
window_id: int,
total_windows: int,
tracer: Any,
model_config: Dict[str, Any],
enable_reasoning: bool = False
) -> Generator[Tuple[str, str, Dict[str, List[str]], bool], None, None]:
"""
Stream extraction from single window with live progress + optional reasoning.
Yields:
(ticker_text, thinking_text, partial_items, is_complete)
- ticker_text: Progress ticker for UI
- thinking_text: Reasoning/thinking blocks (if model supports)
- partial_items: Current extracted items
- is_complete: True on final yield
"""
# Auto-detect language from window content
has_cjk = bool(re.search(r'[\u4e00-\u9fff]', window.content))
output_language = "zh-TW" if has_cjk else "en"
supports_reasoning = model_config.get("supports_reasoning", False)
supports_toggle = model_config.get("supports_toggle", False)
# Build system prompt based on model type and reasoning mode
# For reasoning-enabled hybrid models, use the verbose prompt with reasoning
# instructions. For non-reasoning models, use the concise schema-based prompt
# optimized for structured extraction.
if enable_reasoning and supports_reasoning:
# Verbose prompt with reasoning instructions (hybrid models like Qwen3)
system_prompt = _build_reasoning_extraction_prompt(output_language)
else:
# Concise schema-based prompt for non-reasoning models
system_prompt = _build_schema_extraction_prompt(output_language)
user_prompt = f"Transcript:\n\n{window.content}"
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
# Stream extraction
full_response = ""
thinking_content = ""
start_time = time.time()
first_token_time = None
token_count = 0
try:
max_gen_tokens = max(2048, window.token_count // 2)
settings = model_config["inference_settings"].copy()
# Qwen3 models need lower temperature for extraction (not synthesis)
# to avoid empty JSON output on larger windows
if "qwen3" in model_config["repo_id"].lower():
settings["temperature"] = 0.0 # Use greedy decoding for extraction
stream = extraction_llm.create_chat_completion(
messages=messages,
max_tokens=max_gen_tokens,
temperature=settings["temperature"],
top_p=settings["top_p"],
top_k=settings["top_k"],
repeat_penalty=max(1.2, settings["repeat_penalty"]),
response_format={"type": "json_object"},
stream=True,
)
for chunk in stream:
if 'choices' in chunk and len(chunk['choices']) > 0:
delta = chunk['choices'][0].get('delta', {})
content = delta.get('content', '')
if content:
if first_token_time is None:
first_token_time = time.time()
token_count += 1
full_response += content
# Parse thinking blocks if reasoning enabled
if enable_reasoning and supports_reasoning:
# Simple regex extraction
thinking_match = re.search(r'<think(?:ing)?>(.*?)</think(?:ing)?>', full_response, re.DOTALL)
if thinking_match:
thinking_content = thinking_match.group(1).strip()
json_text = full_response[:thinking_match.start()] + full_response[thinking_match.end():]
else:
json_text = full_response
else:
json_text = full_response
# Try parse JSON
partial_items = _try_parse_extraction_json(json_text)
if not partial_items:
partial_items = {"action_items": [], "decisions": [], "key_points": [], "open_questions": []}
# Calculate metrics
elapsed = time.time() - start_time
tps = token_count / elapsed if elapsed > 0 else 0
eta = int((max_gen_tokens - token_count) / tps) if tps > 0 else 0
# Get item counts
items_found = {k: len(v) for k, v in partial_items.items()}
# Get last item as snippet
last_item = ""
for cat in ["action_items", "decisions", "key_points", "open_questions"]:
if partial_items.get(cat):
last_item = partial_items[cat][-1]
break
# Format ticker
ticker = format_progress_ticker(
current_window=window_id,
total_windows=total_windows,
window_tokens=window.token_count,
max_tokens=4096,
items_found=items_found,
tokens_per_sec=tps,
eta_seconds=eta,
current_snippet=last_item
)
yield (ticker, thinking_content, partial_items, False)
# Final parse
if enable_reasoning and supports_reasoning:
thinking_match = re.search(r'<think(?:ing)?>(.*?)</think(?:ing)?>', full_response, re.DOTALL)
if thinking_match:
thinking_content = thinking_match.group(1).strip()
json_text = full_response[:thinking_match.start()] + full_response[thinking_match.end():]
else:
json_text = full_response
else:
json_text = full_response
final_items = _try_parse_extraction_json(json_text, log_repair=True)
if not final_items:
# Graceful degradation: log warning but don't crash pipeline.
# Other windows may still succeed and produce useful data.
error_msg = f"Failed to parse JSON from window {window_id}"
debug_output = f"{error_msg}\n\nRaw LLM output:\n{full_response[:1000]}\n"
logger.warning(debug_output)
print(f"\n{'='*80}\n{debug_output}{'='*80}\n", flush=True)
tracer.log_extraction(
window_id=window_id,
extraction=None,
llm_response=_sample_llm_response(full_response),
error=error_msg
)
# Graceful degradation: log warning but don't crash the pipeline.
# Other windows may still succeed and produce useful data.
error_msg = f"Failed to parse JSON from window {window_id}"
debug_output = f"{error_msg}\n\nRaw LLM output:\n{full_response[:1000]}\n"
logger.warning(debug_output)
print(f"\n{'='*80}\n{debug_output}{'='*80}\n", flush=True)
tracer.log_extraction(
window_id=window_id,
extraction=None,
llm_response=_sample_llm_response(full_response),
error=error_msg
)
# Yield empty result instead of crashing
empty_items = {
"action_items": [], "decisions": [],
"key_points": [], "open_questions": []
}
ticker = format_progress_ticker(
current_window=window_id,
total_windows=total_windows,
window_tokens=window.token_count,
max_tokens=4096,
items_found={k: 0 for k in empty_items},
tokens_per_sec=0,
eta_seconds=0,
current_snippet=f"⚠️ Window {window_id} parse failed, continuing..."
)
yield (ticker, thinking_content, empty_items, True)
# Log failed extraction to tracer for debugging
tracer.log_extraction_detail(
window_id=window_id,
extracted_items=empty_items,
full_llm_response=full_response,
full_thinking=thinking_content,
json_repaired=False,
parse_attempts=1
)
return
# Log success
tracer.log_extraction(
window_id=window_id,
extraction=final_items,
llm_response=_sample_llm_response(full_response),
thinking=_sample_llm_response(thinking_content) if thinking_content else None,
error=None
)
# Log detailed extraction info for debugging
json_repaired = False
parse_attempts = 1
# Check if the JSON was repaired by examining the parse function
# This is a heuristic - the actual parse_attempts would be tracked inside _try_parse_extraction_json
try:
json.loads(full_response)
except json.JSONDecodeError:
json_repaired = True
parse_attempts = 2
tracer.log_extraction_detail(
window_id=window_id,
extracted_items=final_items,
full_llm_response=full_response,
full_thinking=thinking_content,
json_repaired=json_repaired,
parse_attempts=parse_attempts
)
# Final ticker
elapsed = time.time() - start_time
tps = token_count / elapsed if elapsed > 0 else 0
items_found = {k: len(v) for k, v in final_items.items()}
ticker = format_progress_ticker(
current_window=window_id,
total_windows=total_windows,
window_tokens=window.token_count,
max_tokens=4096,
items_found=items_found,
tokens_per_sec=tps,
eta_seconds=0,
current_snippet="✅ Extraction complete"
)
yield (ticker, thinking_content, final_items, True)
except Exception as e:
tracer.log_extraction(
window_id=window_id,
extraction=None,
llm_response=_sample_llm_response(full_response) if full_response else "",
error=str(e)
)
raise
def deduplicate_items(
all_items: Dict[str, List[str]],
embedding_model: EmbeddingModel,
similarity_threshold: float,
tracer: Any
) -> Generator[Dict[str, List[str]], None, None]:
"""
Deduplicate items across all categories using embeddings.
Args:
all_items: Dict of {category: [items]}
embedding_model: Loaded embedding model
similarity_threshold: Cosine similarity threshold (0.0-1.0)
tracer: Tracer instance
Yields:
Intermediate deduplication results for progress tracking
"""
deduplicated = {}
for category, items in all_items.items():
if not items:
deduplicated[category] = []
continue
original_count = len(items)
# Compute embeddings for all items
embeddings = []
for item in items:
emb = embedding_model.embed(item)
embeddings.append(emb)
# Mark duplicates and track duplicate groups
keep_indices = []
duplicate_groups = []
for i in range(len(items)):
is_duplicate = False
duplicate_of_idx = -1
similarity_score = 0.0
# Compare with all previously kept items
for j in keep_indices:
similarity = cosine_similarity(embeddings[i], embeddings[j])
if similarity >= similarity_threshold:
is_duplicate = True
duplicate_of_idx = j
similarity_score = similarity
break
if not is_duplicate:
keep_indices.append(i)
else:
# Record duplicate group for debugging
duplicate_groups.append({
"duplicate_item": items[i],
"duplicate_index": i,
"kept_item": items[duplicate_of_idx],
"kept_index": duplicate_of_idx,
"similarity": round(similarity_score, 3),
})
# Keep only unique items
unique_items = [items[i] for i in keep_indices]
deduplicated[category] = unique_items
# Log deduplication with full details
duplicates_removed = original_count - len(unique_items)
tracer.log_deduplication(
category=category,
original_count=original_count,
deduplicated_count=len(unique_items),
duplicates_removed=duplicates_removed,
similarity_threshold=similarity_threshold,
embedding_model=embedding_model.model_key,
original_items=items,
deduplicated_items=unique_items,
duplicate_groups=duplicate_groups
)
logger.info(f"Dedup {category}: {original_count}{len(unique_items)} ({duplicates_removed} removed)")
# Yield intermediate results for progress tracking
yield deduplicated
return
def stream_synthesize_executive_summary(
synthesis_llm: Llama,
deduplicated_items: Dict[str, List[str]],
model_config: Dict[str, Any],
output_language: str,
enable_reasoning: bool,
max_tokens: int,
tracer: Any
) -> Generator[Tuple[str, str, bool], None, None]:
"""
Stream synthesis of executive summary from deduplicated items.
Yields:
(summary_text, thinking_text, is_complete)
"""
# Build synthesis prompt
item_counts = {k: len(v) for k, v in deduplicated_items.items()}
# Format items for prompt
items_text = ""
for category, items in deduplicated_items.items():
if items:
category_label = {
"action_items": "Action Items" if output_language == "en" else "行動項目",
"decisions": "Decisions" if output_language == "en" else "決策",
"key_points": "Key Points" if output_language == "en" else "關鍵要點",
"open_questions": "Open Questions" if output_language == "en" else "未解決問題"
}.get(category, category)
items_text += f"\n{category_label}:\n"
for i, item in enumerate(items, 1):
items_text += f"{i}. {item}\n"
if output_language == "zh-TW":
system_prompt = """你是執行摘要專家。請根據提供的結構化資訊,生成簡潔、專業的執行摘要。
輸出格式要求:
- 使用 **粗體標題** 標記各部分
- 使用項目符號(bullet points)列出重點
- 每個重點簡短有力,不超過一句話
- 不要說明、不要解釋、不要自我分析
- 直接輸出執行摘要,不要說"首先..."、"我注意到..."等開頭語
示例格式:
**核心重點**
- DDR4 今年是供應鏈優先項目
- 預計 2027 年 Q1 開始產出
**主要行動**
- 協調三星、海力士等供應商
- 優先保障嵌入式市場供給
**未解決問題**
- DDR4 產能不足仍待解決"""
user_prompt = f"基於以下結構化資訊生成執行摘要:\n{items_text}\n\n請按照上述格式要求,直接輸出執行摘要。"
else:
system_prompt = """You are an executive summary expert. Generate concise, professional executive summaries based on structured information.
Output format requirements:
- Use **BOLD HEADERS** for each section
- Use bullet points for key points
- Keep each point brief and powerful (one sentence max)
- NO explanations, NO analysis, NO self-reflection
- Start directly with the summary, do NOT use phrases like "First I need to...", "Let me analyze...", "I noticed..."
Example format:
**Key Priorities**
- DDR4 is supply chain priority this year
- Production expected to start in 2027 Q1
**Main Actions**
- Coordinate with Samsung, Hynix, and other suppliers
- Prioritize embedded market supply
**Open Issues**
- DDR4 capacity shortage still needs resolution"""
user_prompt = f"Generate an executive summary based on these structured items:\n{items_text}\n\nPlease follow the format requirements above and output the executive summary directly."
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
# Stream synthesis
full_summary = ""
thinking_content = ""
try:
settings = model_config["inference_settings"]
stream = synthesis_llm.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=settings["temperature"],
top_p=settings["top_p"],
top_k=settings["top_k"],
repeat_penalty=settings["repeat_penalty"],
stream=True,
)
for chunk in stream:
if 'choices' in chunk and len(chunk['choices']) > 0:
delta = chunk['choices'][0].get('delta', {})
content = delta.get('content', '')
if content:
full_summary += content
# Parse thinking if reasoning enabled
if enable_reasoning and model_config.get("supports_reasoning"):
thinking_match = re.search(r'<think(?:ing)?>(.*?)</think(?:ing)?>', full_summary, re.DOTALL)
if thinking_match:
thinking_content = thinking_match.group(1).strip()
summary_text = full_summary[:thinking_match.start()] + full_summary[thinking_match.end():]
else:
summary_text = full_summary
else:
summary_text = full_summary
yield (summary_text, thinking_content, False)
# Final parse
if enable_reasoning and model_config.get("supports_reasoning"):
thinking_match = re.search(r'<think(?:ing)?>(.*?)</think(?:ing)?>', full_summary, re.DOTALL)
if thinking_match:
thinking_content = thinking_match.group(1).strip()
summary_text = full_summary[:thinking_match.start()] + full_summary[thinking_match.end():]
else:
summary_text = full_summary
else:
summary_text = full_summary
# Log synthesis with full details
tracer.log_synthesis(
synthesis_model=model_config["name"],
input_item_counts=item_counts,
output_summary=_sample_llm_response(summary_text),
thinking=_sample_llm_response(thinking_content) if thinking_content else None,
error=None,
input_items=deduplicated_items,
system_prompt=system_prompt,
user_prompt=user_prompt
)
# Also store full outputs in synthesis_details directly
tracer.synthesis_details["full_output_summary"] = summary_text
tracer.synthesis_details["full_thinking"] = thinking_content
yield (summary_text, thinking_content, True)
except Exception as e:
tracer.log_synthesis(
synthesis_model=model_config["name"],
input_item_counts=item_counts,
output_summary="",
thinking=None,
error=str(e),
input_items=deduplicated_items,
system_prompt=system_prompt,
user_prompt=user_prompt
)
raise