tiny-scribe / app.py
Luigi's picture
fix: add missing meeting_summarizer module to Dockerfile for HF Spaces deployment
bc6516c
#!/usr/bin/env python3
"""
Tiny Scribe - HuggingFace Spaces Demo
A Gradio app for summarizing transcripts using GGUF models with live streaming output.
Optimized for HuggingFace Spaces Free CPU Tier (2 vCPUs).
UI Version: 2.0 - Enhanced with modern styling and UX improvements
"""
import os
import gc
import time
import logging
import re
import json
from typing import Dict, List, Any, Optional, Generator, Tuple
from datetime import datetime
from opencc import OpenCC
from llama_cpp import Llama
import gradio as gr
from huggingface_hub import list_repo_files, hf_hub_download
from gradio_huggingfacehub_search import HuggingfaceHubSearch
from meeting_summarizer.trace import Tracer
from meeting_summarizer.extraction import (
EmbeddingModel, Window, preprocess_transcript,
stream_extract_from_window, deduplicate_items, stream_synthesize_executive_summary
)
logger = logging.getLogger(__name__)
# Increase Hugging Face timeout to handle slow connections
os.environ['HF_HUB_DOWNLOAD_TIMEOUT'] = '300' # 5 minutes
# Global model instance
llm = None
converter = None
current_model_key = None
def parse_quantization(filename: str) -> Optional[str]:
"""Extract quantization level from GGUF filename.
Examples:
model-Q4_K_M.gguf -> Q4_K_M
model.Q5_K_S.gguf -> Q5_K_S
model-fp16.gguf -> fp16
Args:
filename: GGUF filename
Returns:
Quantization string or None if not found
"""
# Common quantization patterns
patterns = [
r'[.-](Q[0-9]_[A-Z]_[A-Z])\.gguf$', # Q4_K_M
r'[.-](Q[0-9]_[A-Z]+)\.gguf$', # Q4_K
r'[.-](fp16|fp32|q4_0|q4_1|q5_0|q5_1|q8_0)\.gguf$', # fp16, q4_0, etc.
]
for pattern in patterns:
match = re.search(pattern, filename, re.IGNORECASE)
if match:
return match.group(1).upper()
return None
def list_repo_gguf_files(repo_id: str) -> Tuple[List[Dict[str, Any]], str]:
"""List all GGUF files in a HuggingFace repository with metadata.
Args:
repo_id: HuggingFace repository ID (e.g., 'unsloth/DeepSeek-R1-Distill-Qwen-7B-GGUF')
Returns:
Tuple of (files_list, error_message)
- files_list: List of dicts with name, size_mb, quant, params, downloads
- error_message: Empty string on success, error description on failure
"""
if not repo_id or "/" not in repo_id:
return [], "Invalid repo ID format. Use 'username/repo-name'"
try:
# List all files in repo
files = list(list_repo_files(repo_id))
# Filter for GGUF files only
gguf_files = [f for f in files if f.endswith('.gguf')]
if not gguf_files:
return [], f"No GGUF files found in repository '{repo_id}'"
# Get repo info for downloads (optional, may fail for some repos)
try:
from huggingface_hub import model_info
info = model_info(repo_id)
repo_downloads = info.downloads
except:
repo_downloads = 0
# Build file metadata
result = []
for filename in sorted(gguf_files): # Alphabetical sorting (preference C)
quant = parse_quantization(filename) or "Unknown"
# Estimate size (we'd need to fetch file info for exact size)
# For now, use placeholder that will be updated when downloading
size_mb = 0
# Try to extract parameter count from filename
params = "Unknown"
param_patterns = [
r'(\d+\.?\d*)b', # 7b, 1.5b
r'(\d+\.?\d*)B', # 7B, 1.5B
]
for pattern in param_patterns:
match = re.search(pattern, filename, re.IGNORECASE)
if match:
params = f"{match.group(1)}B"
break
result.append({
"name": filename,
"size_mb": size_mb,
"quant": quant,
"params": params,
"downloads": repo_downloads,
})
return result, ""
except Exception as e:
error_msg = str(e).lower()
if "not found" in error_msg or "404" in error_msg:
return [], f"Repository '{repo_id}' not found"
elif "permission" in error_msg or "access" in error_msg:
return [], f"Cannot access '{repo_id}' - may be private or gated"
else:
return [], f"Error listing files: {str(e)}"
def format_file_choice(file_info: Dict[str, Any]) -> str:
"""Format a file info dict for display in dropdown.
Args:
file_info: Dict with name, size_mb, quant, params, downloads
Returns:
Formatted string for dropdown display
"""
name = file_info["name"]
size = file_info["size_mb"]
quant = file_info["quant"]
params = file_info["params"]
downloads = file_info.get("downloads", 0)
# Format downloads nicely
if downloads >= 1000000:
dl_str = f"{downloads/1000000:.1f}M"
elif downloads >= 1000:
dl_str = f"{downloads/1000:.1f}K"
else:
dl_str = str(downloads)
return f"📄 {name} | {size} | {quant} | {params} params | ⬇️ {dl_str}"
def build_system_prompt(output_language: str, supports_toggle: bool, enable_reasoning: bool) -> str:
"""Build the system prompt for the summarization task.
This function creates the system prompt that will be displayed in the debug field
and sent to the LLM. It handles language-specific prompts and reasoning toggles.
Args:
output_language: Target language ("en" or "zh-TW")
supports_toggle: Whether the model supports reasoning toggle (/think, /no_think)
enable_reasoning: Whether reasoning mode is enabled
Returns:
The complete system prompt string
"""
if output_language == "zh-TW":
if supports_toggle:
reasoning_mode = "/think" if enable_reasoning else "/no_think"
return f"你是一個有助的助手,負責總結轉錄內容。{reasoning_mode}"
else:
return "你是一個有助的助手,負責總結轉錄內容。"
else:
if supports_toggle:
reasoning_mode = "/think" if enable_reasoning else "/no_think"
return f"You are a helpful assistant that summarizes transcripts. {reasoning_mode}"
else:
return "You are a helpful assistant that summarizes transcripts."
def build_user_prompt(transcript: str, output_language: str) -> str:
"""Build the user prompt containing the transcript to summarize.
Args:
transcript: The transcript content to summarize
output_language: Target language ("en" or "zh-TW")
Returns:
The user prompt string with the transcript
"""
if output_language == "zh-TW":
return f"請總結以下內容:\n\n{transcript}"
else:
return f"Please summarize the following content:\n\n{transcript}"
def get_thread_count(thread_config: str, custom_threads: int) -> int:
"""Get the actual thread count based on configuration.
Args:
thread_config: Thread preset ("free", "upgrade", "custom")
custom_threads: Custom thread count when preset is "custom"
Returns:
Number of threads to use
"""
if thread_config == "free":
return 2
elif thread_config == "upgrade":
return 8
else: # custom
return max(1, min(32, custom_threads))
def load_custom_model_from_hf(repo_id: str, filename: str, n_threads: int) -> Tuple[Optional[Llama], str]:
"""Load a custom GGUF model from HuggingFace Hub.
Args:
repo_id: HuggingFace repository ID
filename: GGUF filename to load
n_threads: Number of CPU threads
Returns:
Tuple of (model_or_none, message)
"""
try:
logger.info(f"Loading custom model from {repo_id}/{filename}")
# Conservative defaults for custom models
n_ctx = 8192
n_batch = 512
n_gpu_layers = 0 # CPU only for safety
model = Llama.from_pretrained(
repo_id=repo_id,
filename=filename,
n_ctx=n_ctx,
n_batch=n_batch,
n_threads=n_threads,
n_gpu_layers=n_gpu_layers,
verbose=False,
)
return model, f"Successfully loaded {repo_id}/{filename}"
except Exception as e:
error_msg = str(e)
logger.error(f"Failed to load custom model: {error_msg}")
if "not found" in error_msg.lower():
return None, f"Model or file not found: {repo_id}/{filename}"
elif "permission" in error_msg.lower():
return None, f"Access denied (model may be private/gated): {repo_id}"
elif "memory" in error_msg.lower() or "oom" in error_msg.lower():
return None, f"Out of memory loading model. Try a smaller file or lower quantization."
else:
return None, f"Error loading model: {error_msg}"
# Thread configuration from environment variable
def _get_default_thread_config():
"""Get default thread configuration from environment variable."""
env_threads = os.environ.get("DEFAULT_N_THREADS", "").strip()
if env_threads:
try:
thread_count = int(env_threads)
if 1 <= thread_count <= 32:
logger.info(f"Using DEFAULT_N_THREADS={thread_count} from environment")
return "custom", thread_count
else:
logger.warning(f"DEFAULT_N_THREADS={thread_count} out of range (1-32), using HF Free Tier")
except ValueError:
logger.warning(f"Invalid DEFAULT_N_THREADS='{env_threads}', using HF Free Tier")
return "free", -1 # -1 = irrelevant when preset is not "custom"
DEFAULT_THREAD_PRESET, DEFAULT_CUSTOM_THREADS = _get_default_thread_config()
# Maximum context window to use (caps memory usage on 2 vCPUs)
MAX_USABLE_CTX = 32768
# Available models registry - ordered by parameter count (smallest to largest)
AVAILABLE_MODELS = {
"falcon_h1_100m": {
"name": "Falcon-H1 100M",
"repo_id": "mradermacher/Falcon-H1-Tiny-Multilingual-100M-Instruct-GGUF",
"filename": "*Q8_0.gguf",
"max_context": 32768,
"default_temperature": 0.6,
"supports_reasoning": False,
"inference_settings": {
"temperature": 0.1,
"top_p": 0.9,
"top_k": 40,
"repeat_penalty": 1.05,
},
},
"gemma3_270m": {
"name": "Gemma-3 270M",
"repo_id": "unsloth/gemma-3-270m-it-qat-GGUF",
"filename": "*Q8_0.gguf",
"max_context": 32768,
"default_temperature": 0.6,
"supports_reasoning": False,
"inference_settings": {
"temperature": 1.0,
"top_p": 0.95,
"top_k": 64,
"repeat_penalty": 1.0,
},
},
"ernie_300m": {
"name": "ERNIE-4.5 0.3B (131K Context)",
"repo_id": "unsloth/ERNIE-4.5-0.3B-PT-GGUF",
"filename": "*Q8_0.gguf",
"max_context": 131072,
"default_temperature": 0.6,
"supports_reasoning": False,
"inference_settings": {
"temperature": 0.3,
"top_p": 0.95,
"top_k": 30,
"repeat_penalty": 1.05,
},
},
"granite_350m": {
"name": "Granite-4.0 350M",
"repo_id": "unsloth/granite-4.0-h-350m-GGUF",
"filename": "*Q8_0.gguf",
"max_context": 32768,
"default_temperature": 0.6,
"supports_reasoning": False,
"inference_settings": {
"temperature": 0.0,
"top_p": 1.0,
"top_k": 0,
"repeat_penalty": 1.05,
},
},
"lfm2_350m": {
"name": "LFM2 350M",
"repo_id": "LiquidAI/LFM2-350M-GGUF",
"filename": "*Q8_0.gguf",
"max_context": 32768,
"default_temperature": 0.6,
"supports_reasoning": False,
"inference_settings": {
"temperature": 0.1,
"top_p": 0.1,
"top_k": 50,
"repeat_penalty": 1.05,
},
},
"bitcpm4_500m": {
"name": "BitCPM4 0.5B (128K Context)",
"repo_id": "openbmb/BitCPM4-0.5B-GGUF",
"filename": "*q4_0.gguf",
"max_context": 131072,
"default_temperature": 0.6,
"supports_reasoning": False,
"inference_settings": {
"temperature": 0.3,
"top_p": 0.95,
"top_k": 30,
"repeat_penalty": 1.05,
},
},
"hunyuan_500m": {
"name": "Hunyuan 0.5B (256K Context)",
"repo_id": "mradermacher/Hunyuan-0.5B-Instruct-GGUF",
"filename": "*Q8_0.gguf",
"max_context": 262144,
"default_temperature": 0.6,
"supports_reasoning": False,
"inference_settings": {
"temperature": 0.3,
"top_p": 0.95,
"top_k": 30,
"repeat_penalty": 1.05,
},
},
"qwen3_600m_q4": {
"name": "Qwen3 0.6B Q4 (32K Context)",
"repo_id": "unsloth/Qwen3-0.6B-GGUF",
"filename": "*Q4_0.gguf",
"max_context": 32768,
"default_temperature": 0.6,
"supports_reasoning": True,
"supports_toggle": True,
"inference_settings": {
"temperature": 0.6,
"top_p": 0.95,
"top_k": 20,
"repeat_penalty": 1.0,
},
},
"granite_3_1_1b_q8": {
"name": "Granite 3.1 1B-A400M Instruct (128K Context)",
"repo_id": "bartowski/granite-3.1-1b-a400m-instruct-GGUF",
"filename": "*Q8_0.gguf",
"max_context": 131072,
"default_temperature": 0.7,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.7,
"top_p": 0.9,
"top_k": 40,
"repeat_penalty": 1.1,
},
},
"falcon_h1_1.5b_q4": {
"name": "Falcon-H1 1.5B Q4",
"repo_id": "unsloth/Falcon-H1-1.5B-Deep-Instruct-GGUF",
"filename": "*Q4_K_M.gguf",
"max_context": 32768,
"default_temperature": 0.6,
"supports_reasoning": False,
"inference_settings": {
"temperature": 0.1,
"top_p": 0.9,
"top_k": 40,
"repeat_penalty": 1.05,
},
},
"qwen3_1.7b_q4": {
"name": "Qwen3 1.7B Q4 (32K Context)",
"repo_id": "unsloth/Qwen3-1.7B-GGUF",
"filename": "*Q4_0.gguf",
"max_context": 32768,
"default_temperature": 0.6,
"supports_reasoning": True,
"supports_toggle": True,
"inference_settings": {
"temperature": 0.6,
"top_p": 0.95,
"top_k": 20,
"repeat_penalty": 1.0,
},
},
"granite_3_3_2b_q4": {
"name": "Granite 3.3 2B Instruct (128K Context)",
"repo_id": "ibm-granite/granite-3.3-2b-instruct-GGUF",
"filename": "*Q4_K_M.gguf",
"max_context": 131072,
"default_temperature": 0.7,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.7,
"top_p": 0.9,
"top_k": 40,
"repeat_penalty": 1.1,
},
},
"youtu_llm_2b_q8": {
"name": "Youtu-LLM 2B (128K Context)",
"repo_id": "tencent/Youtu-LLM-2B-GGUF",
"filename": "*Q8_0.gguf",
"max_context": 131072,
"default_temperature": 0.7,
"supports_reasoning": True,
"supports_toggle": True,
"inference_settings": {
"temperature": 0.7,
"top_p": 0.8,
"top_k": 20,
"repeat_penalty": 1.05,
},
},
"lfm2_2_6b_transcript": {
"name": "LFM2 2.6B Transcript (32K Context)",
"repo_id": "LiquidAI/LFM-2.6B-Transcript-GGUF",
"filename": "*Q4_0.gguf",
"max_context": 32768,
"default_temperature": 0.6,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.6,
"top_p": 0.95,
"top_k": 20,
"repeat_penalty": 1.1,
},
},
"breeze_3b_q4": {
"name": "Breeze 3B Q4 (32K Context)",
"repo_id": "mradermacher/breeze-3b-GGUF",
"filename": "*Q4_K_M.gguf",
"max_context": 32768,
"default_temperature": 0.6,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.6,
"top_p": 0.95,
"top_k": 20,
"repeat_penalty": 1.0,
},
},
"granite_3_1_3b_q4": {
"name": "Granite 3.1 3B-A800M Instruct (128K Context)",
"repo_id": "bartowski/granite-3.1-3b-a800m-instruct-GGUF",
"filename": "*Q4_K_M.gguf",
"max_context": 131072,
"default_temperature": 0.7,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.7,
"top_p": 0.9,
"top_k": 40,
"repeat_penalty": 1.1,
},
},
"qwen3_4b_thinking_q3": {
"name": "Qwen3 4B Thinking (256K Context)",
"repo_id": "unsloth/Qwen3-4B-Thinking-2507-GGUF",
"filename": "*Q3_K_M.gguf",
"max_context": 262144,
"default_temperature": 0.6,
"supports_reasoning": True,
"supports_toggle": False, # Thinking-only mode
"inference_settings": {
"temperature": 0.6,
"top_p": 0.95,
"top_k": 20,
"repeat_penalty": 1.0,
},
},
"granite4_tiny_q3": {
"name": "Granite 4.0 Tiny 7B (128K Context)",
"repo_id": "ibm-research/granite-4.0-Tiny-7B-Instruct-GGUF",
"filename": "*Q3_K_M.gguf",
"max_context": 131072,
"default_temperature": 0.7,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.7,
"top_p": 0.9,
"top_k": 40,
"repeat_penalty": 1.1,
},
},
"ernie_21b_pt_q1": {
"name": "ERNIE-4.5 21B PT (128K Context)",
"repo_id": "unsloth/ERNIE-4.5-21B-A3B-PT-GGUF",
"filename": "*TQ1_0.gguf",
"max_context": 131072,
"default_temperature": 0.7,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.7,
"top_p": 0.9,
"top_k": 40,
"repeat_penalty": 1.1,
},
},
"ernie_21b_thinking_q1": {
"name": "ERNIE-4.5 21B Thinking (128K Context)",
"repo_id": "unsloth/ERNIE-4.5-21B-A3B-Thinking-GGUF",
"filename": "*TQ1_0.gguf",
"max_context": 131072,
"default_temperature": 0.8,
"supports_reasoning": True,
"supports_toggle": False, # Thinking-only mode
"inference_settings": {
"temperature": 0.8,
"top_p": 0.95,
"top_k": 40,
"repeat_penalty": 1.1,
},
},
"glm_4_7_flash_reap_30b": {
"name": "GLM-4.7-Flash-REAP-30B Thinking (128K Context)",
"repo_id": "unsloth/GLM-4.7-Flash-REAP-23B-A3B-GGUF",
"filename": "*TQ1_0.gguf",
"max_context": 131072,
"default_temperature": 0.6,
"supports_reasoning": True,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.6,
"top_p": 0.95,
"top_k": 20,
"repeat_penalty": 1.05,
},
},
"glm_4_7_flash_30b_iq2": {
"name": "GLM-4.7-Flash-30B (Original) IQ2_XXS (128K Context)",
"repo_id": "bartowski/zai-org_GLM-4.7-Flash-GGUF",
"filename": "*IQ2_XXS.gguf",
"max_context": 131072,
"default_temperature": 0.6,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.6,
"top_p": 0.95,
"top_k": 20,
"repeat_penalty": 1.05,
},
},
"qwen3_30b_thinking_q1": {
"name": "Qwen3 30B Thinking (256K Context)",
"repo_id": "unsloth/Qwen3-30B-A3B-Thinking-2507-GGUF",
"filename": "*TQ1_0.gguf",
"max_context": 262144,
"default_temperature": 0.6,
"supports_reasoning": True,
"supports_toggle": False, # Thinking-only mode
"inference_settings": {
"temperature": 0.6,
"top_p": 0.95,
"top_k": 20,
"repeat_penalty": 1.0,
},
},
"qwen3_30b_instruct_q1": {
"name": "Qwen3 30B Instruct (256K Context)",
"repo_id": "unsloth/Qwen3-30B-A3B-Instruct-2507-GGUF",
"filename": "*TQ1_0.gguf",
"max_context": 262144,
"default_temperature": 0.6,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.6,
"top_p": 0.95,
"top_k": 20,
"repeat_penalty": 1.0,
},
},
"custom_hf": {
"name": "🔧 Custom HF GGUF...",
"repo_id": None,
"filename": None,
"max_context": 8192,
"default_temperature": 0.6,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.6,
"top_p": 0.95,
"top_k": 40,
"repeat_penalty": 1.0,
},
},
}
DEFAULT_MODEL_KEY = "qwen3_600m_q4"
# ===== ADVANCED MODE: EXTRACTION MODELS REGISTRY (13 models, ≤1.7B) =====
# Used exclusively for Stage 1: Extraction (transcript windows → structured JSON)
# Extraction-optimized settings: Low temperature (0.1-0.3) for deterministic output
EXTRACTION_MODELS = {
"qwen2.5_1.5b": {
"name": "Qwen2.5 1.5B (128K Context)",
"repo_id": "Qwen/Qwen2.5-1.5B-Instruct-GGUF",
"filename": "qwen2.5-1.5b-instruct-q4_k_m.gguf",
"max_context": 131072,
"default_n_ctx": 4096,
"params_size": "1.5B",
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.2,
"top_p": 0.9,
"top_k": 30,
"repeat_penalty": 1.0,
},
},
}
DEFAULT_EXTRACTION_MODEL = "qwen2.5_1.5b"
# ===== ADVANCED MODE: SYNTHESIS MODELS REGISTRY (16 models, 1B-30B) =====
# Used exclusively for Stage 3: Synthesis (deduplicated items → executive summary)
# Synthesis-optimized settings: Higher temperature (0.7-0.9) for creative synthesis
# FULLY INDEPENDENT from AVAILABLE_MODELS (no shared references)
SYNTHESIS_MODELS = {
"granite_3_1_1b_q8": {
"name": "Granite 3.1 1B-A400M Instruct (128K Context)",
"repo_id": "bartowski/granite-3.1-1b-a400m-instruct-GGUF",
"filename": "*Q8_0.gguf",
"max_context": 131072,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.8,
"top_p": 0.95,
"top_k": 50,
"repeat_penalty": 1.05,
},
},
"falcon_h1_1.5b_q4": {
"name": "Falcon-H1 1.5B Q4",
"repo_id": "unsloth/Falcon-H1-1.5B-Deep-Instruct-GGUF",
"filename": "*Q4_K_M.gguf",
"max_context": 32768,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.7,
"top_p": 0.95,
"top_k": 40,
"repeat_penalty": 1.0,
},
},
"qwen3_1.7b_q4": {
"name": "Qwen3 1.7B Q4 (32K Context)",
"repo_id": "unsloth/Qwen3-1.7B-GGUF",
"filename": "*Q4_0.gguf",
"max_context": 32768,
"supports_reasoning": True,
"supports_toggle": True, # Hybrid model
"inference_settings": {
"temperature": 0.8,
"top_p": 0.95,
"top_k": 30,
"repeat_penalty": 1.0,
},
},
"granite_3_3_2b_q4": {
"name": "Granite 3.3 2B Instruct (128K Context)",
"repo_id": "ibm-granite/granite-3.3-2b-instruct-GGUF",
"filename": "*Q4_K_M.gguf",
"max_context": 131072,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.8,
"top_p": 0.95,
"top_k": 50,
"repeat_penalty": 1.05,
},
},
"youtu_llm_2b_q8": {
"name": "Youtu-LLM 2B (128K Context)",
"repo_id": "tencent/Youtu-LLM-2B-GGUF",
"filename": "*Q8_0.gguf",
"max_context": 131072,
"supports_reasoning": True,
"supports_toggle": True, # Hybrid model
"inference_settings": {
"temperature": 0.8,
"top_p": 0.95,
"top_k": 40,
"repeat_penalty": 1.0,
},
},
"lfm2_2_6b_transcript": {
"name": "LFM2 2.6B Transcript (32K Context)",
"repo_id": "LiquidAI/LFM-2.6B-Transcript-GGUF",
"filename": "*Q4_0.gguf",
"max_context": 32768,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.7,
"top_p": 0.95,
"top_k": 40,
"repeat_penalty": 1.05,
},
},
"breeze_3b_q4": {
"name": "Breeze 3B Q4 (32K Context)",
"repo_id": "mradermacher/breeze-3b-GGUF",
"filename": "*Q4_K_M.gguf",
"max_context": 32768,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.7,
"top_p": 0.95,
"top_k": 40,
"repeat_penalty": 1.0,
},
},
"granite_3_1_3b_q4": {
"name": "Granite 3.1 3B-A800M Instruct (128K Context)",
"repo_id": "bartowski/granite-3.1-3b-a800m-instruct-GGUF",
"filename": "*Q4_K_M.gguf",
"max_context": 131072,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.8,
"top_p": 0.95,
"top_k": 50,
"repeat_penalty": 1.05,
},
},
"qwen3_4b_thinking_q3": {
"name": "Qwen3 4B Thinking (256K Context)",
"repo_id": "unsloth/Qwen3-4B-Thinking-2507-GGUF",
"filename": "*Q3_K_M.gguf",
"max_context": 262144,
"supports_reasoning": True,
"supports_toggle": False, # Thinking-only
"inference_settings": {
"temperature": 0.8,
"top_p": 0.95,
"top_k": 30,
"repeat_penalty": 1.0,
},
},
"granite4_tiny_q3": {
"name": "Granite 4.0 Tiny 7B (128K Context)",
"repo_id": "ibm-research/granite-4.0-Tiny-7B-Instruct-GGUF",
"filename": "*Q3_K_M.gguf",
"max_context": 131072,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.8,
"top_p": 0.95,
"top_k": 50,
"repeat_penalty": 1.05,
},
},
"ernie_21b_pt_q1": {
"name": "ERNIE-4.5 21B PT (128K Context)",
"repo_id": "unsloth/ERNIE-4.5-21B-A3B-PT-GGUF",
"filename": "*TQ1_0.gguf",
"max_context": 131072,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.8,
"top_p": 0.95,
"top_k": 50,
"repeat_penalty": 1.05,
},
},
"ernie_21b_thinking_q1": {
"name": "ERNIE-4.5 21B Thinking (128K Context)",
"repo_id": "unsloth/ERNIE-4.5-21B-A3B-Thinking-GGUF",
"filename": "*TQ1_0.gguf",
"max_context": 131072,
"supports_reasoning": True,
"supports_toggle": False, # Thinking-only
"inference_settings": {
"temperature": 0.9,
"top_p": 0.95,
"top_k": 50,
"repeat_penalty": 1.05,
},
},
"glm_4_7_flash_reap_30b": {
"name": "GLM-4.7-Flash-REAP-30B Thinking (128K Context)",
"repo_id": "unsloth/GLM-4.7-Flash-REAP-23B-A3B-GGUF",
"filename": "*TQ1_0.gguf",
"max_context": 131072,
"supports_reasoning": True,
"supports_toggle": False, # Thinking-only
"inference_settings": {
"temperature": 0.8,
"top_p": 0.95,
"top_k": 40,
"repeat_penalty": 1.0,
},
},
"glm_4_7_flash_30b_iq2": {
"name": "GLM-4.7-Flash-30B (Original) IQ2_XXS (128K Context)",
"repo_id": "bartowski/zai-org_GLM-4.7-Flash-GGUF",
"filename": "*IQ2_XXS.gguf",
"max_context": 131072,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.7,
"top_p": 0.95,
"top_k": 40,
"repeat_penalty": 1.0,
},
},
"qwen3_30b_thinking_q1": {
"name": "Qwen3 30B Thinking (256K Context)",
"repo_id": "unsloth/Qwen3-30B-A3B-Thinking-2507-GGUF",
"filename": "*TQ1_0.gguf",
"max_context": 262144,
"supports_reasoning": True,
"supports_toggle": False, # Thinking-only
"inference_settings": {
"temperature": 0.8,
"top_p": 0.95,
"top_k": 30,
"repeat_penalty": 1.0,
},
},
"qwen3_30b_instruct_q1": {
"name": "Qwen3 30B Instruct (256K Context)",
"repo_id": "unsloth/Qwen3-30B-A3B-Instruct-2507-GGUF",
"filename": "*TQ1_0.gguf",
"max_context": 262144,
"supports_reasoning": False,
"supports_toggle": False,
"inference_settings": {
"temperature": 0.7,
"top_p": 0.95,
"top_k": 30,
"repeat_penalty": 1.0,
},
},
}
DEFAULT_SYNTHESIS_MODEL = "qwen3_1.7b_q4"
def load_model(model_key: str = None, n_threads: int = 2) -> Tuple[Llama, str]:
"""
Load model with CPU optimizations. Only reloads if model changes.
Args:
model_key: Model identifier from AVAILABLE_MODELS
n_threads: Number of CPU threads to use for inference
Returns:
Tuple of (loaded_model, info_message)
"""
global llm, converter, current_model_key
# Default to current or default model
if model_key is None:
model_key = current_model_key if current_model_key else DEFAULT_MODEL_KEY
model = AVAILABLE_MODELS[model_key]
# Already loaded?
if llm is not None and model_key == current_model_key:
return llm, f"Model ready: {model['name']}"
# Unload old model to free memory
if llm is not None:
logger.info(f"Unloading previous model: {AVAILABLE_MODELS[current_model_key]['name']}")
del llm
llm = None
gc.collect()
# Initialize OpenCC converter once
if converter is None:
converter = OpenCC('s2twp')
# Calculate n_ctx: model max capped at MAX_USABLE_CTX
n_ctx = min(model["max_context"], MAX_USABLE_CTX)
logger.info(f"Loading {model['name']} with n_ctx={n_ctx}")
# Detect GPU support and adjust n_gpu_layers
requested_ngl = int(os.environ.get("N_GPU_LAYERS", 0))
n_gpu_layers = requested_ngl
if requested_ngl != 0:
# Check if GPU offload is actually supported
try:
from llama_cpp import llama_supports_gpu_offload
gpu_available = llama_supports_gpu_offload()
if not gpu_available:
logger.warning(f"N_GPU_LAYERS={requested_ngl} requested but GPU offload not available. Falling back to CPU.")
n_gpu_layers = 0
except Exception as e:
logger.warning(f"Could not detect GPU support: {e}. Using CPU fallback.")
n_gpu_layers = 0
try:
llm = Llama.from_pretrained(
repo_id=model["repo_id"],
filename=model["filename"],
n_ctx=n_ctx,
n_batch=min(2048, n_ctx), # Batch size for throughput
n_threads=n_threads, # Configurable thread count
n_threads_batch=n_threads, # Parallel batch processing
n_gpu_layers=n_gpu_layers, # 0=CPU only, -1=all GPU layers (if available)
verbose=False,
seed=1337,
v_type=2,
k_type=2,
)
current_model_key = model_key
info_msg = f"Loaded: {model['name']} ({n_ctx:,} context)"
logger.info(info_msg)
return llm, info_msg
except Exception as e:
logger.error(f"Error loading model: {e}")
raise
def update_reasoning_visibility(model_key):
"""
Update reasoning checkbox visibility, value, and interactivity based on model type.
Three model types:
- Non-reasoning: checkbox hidden
- Thinking-only: checkbox visible, checked, locked (non-interactive), label "Reasoning Mode (Always On)"
- Hybrid: checkbox visible, toggleable, label "Enable Reasoning Mode"
Returns: Single gr.update() with all properties
"""
model = AVAILABLE_MODELS[model_key]
supports_reasoning = model.get("supports_reasoning", False)
supports_toggle = model.get("supports_toggle", False)
if not supports_reasoning:
# Non-reasoning model: hide checkbox
return gr.update(visible=False, value=False, interactive=False, label="Enable Reasoning Mode")
elif supports_reasoning and not supports_toggle:
# Thinking-only model: show, check, lock
return gr.update(visible=True, value=True, interactive=False, label="⚡ Reasoning Mode (Always On)")
else:
# Hybrid model: show, toggleable
return gr.update(visible=True, value=True, interactive=True, label="Enable Reasoning Mode")
# ===== ADVANCED MODE: HELPER FUNCTIONS =====
def get_model_config(model_key: str, model_role: str) -> Dict[str, Any]:
"""
Get model configuration based on role.
Ensures same model (e.g., qwen3_1.7b_q4) uses DIFFERENT settings
for extraction vs synthesis.
Args:
model_key: Model identifier (e.g., "qwen3_1.7b_q4")
model_role: "extraction" or "synthesis"
Returns:
Model configuration dict with role-specific settings
Raises:
ValueError: If model_key not available for specified role
"""
if model_role == "extraction":
if model_key not in EXTRACTION_MODELS:
available = ", ".join(list(EXTRACTION_MODELS.keys())[:3]) + "..."
raise ValueError(
f"Model '{model_key}' not available for extraction role. "
f"Available: {available}"
)
return EXTRACTION_MODELS[model_key]
elif model_role == "synthesis":
if model_key not in SYNTHESIS_MODELS:
available = ", ".join(list(SYNTHESIS_MODELS.keys())[:3]) + "..."
raise ValueError(
f"Model '{model_key}' not available for synthesis role. "
f"Available: {available}"
)
return SYNTHESIS_MODELS[model_key]
else:
raise ValueError(
f"Unknown model role: '{model_role}'. "
f"Must be 'extraction' or 'synthesis'"
)
def load_model_for_role(
model_key: str,
model_role: str,
n_threads: int = 2,
user_n_ctx: Optional[int] = None
) -> Tuple[Llama, str]:
"""
Load model with role-specific configuration.
Args:
model_key: Model identifier
model_role: "extraction" or "synthesis"
n_threads: CPU threads
user_n_ctx: User-specified n_ctx (extraction only, from slider)
Returns:
(loaded_model, info_message)
Raises:
Exception: If model loading fails (graceful failure)
"""
try:
config = get_model_config(model_key, model_role)
# Calculate n_ctx
if model_role == "extraction" and user_n_ctx is not None:
n_ctx = min(user_n_ctx, config["max_context"], MAX_USABLE_CTX)
else:
# Synthesis or default extraction
n_ctx = min(config.get("max_context", 8192), MAX_USABLE_CTX)
# Detect GPU support
requested_ngl = int(os.environ.get("N_GPU_LAYERS", 0))
n_gpu_layers = requested_ngl
if requested_ngl != 0:
try:
from llama_cpp import llama_supports_gpu_offload
gpu_available = llama_supports_gpu_offload()
if not gpu_available:
logger.warning("GPU requested but not available. Using CPU.")
n_gpu_layers = 0
except Exception as e:
logger.warning(f"Could not detect GPU: {e}. Using CPU.")
n_gpu_layers = 0
# Load model
logger.info(f"Loading {config['name']} for {model_role} role (n_ctx={n_ctx:,})")
llm = Llama.from_pretrained(
repo_id=config["repo_id"],
filename=config["filename"],
n_ctx=n_ctx,
n_batch=min(2048, n_ctx),
n_threads=n_threads,
n_threads_batch=n_threads,
n_gpu_layers=n_gpu_layers,
verbose=False,
seed=1337,
)
info_msg = (
f"✅ Loaded: {config['name']} for {model_role} "
f"(n_ctx={n_ctx:,}, threads={n_threads})"
)
logger.info(info_msg)
return llm, info_msg
except Exception as e:
# Graceful failure - let user select different model
error_msg = (
f"❌ Failed to load {model_key} for {model_role}: {str(e)}\n\n"
f"Please select a different model and try again."
)
logger.error(error_msg, exc_info=True)
raise Exception(error_msg)
def unload_model(llm: Optional[Llama], model_name: str = "model") -> None:
"""Explicitly unload model and trigger garbage collection."""
if llm:
logger.info(f"Unloading {model_name}")
del llm
gc.collect()
time.sleep(0.5) # Allow OS to reclaim memory
def get_extraction_model_info(model_key: str) -> str:
"""Generate markdown info for extraction model."""
config = EXTRACTION_MODELS.get(model_key, {})
if not config:
return "**Extraction Model**\n\nSelect a model to see details"
settings = config.get("inference_settings", {})
reasoning_support = ""
if config.get("supports_toggle"):
reasoning_support = "\n**Reasoning:** Hybrid (user-toggleable)"
elif config.get("supports_reasoning"):
reasoning_support = "\n**Reasoning:** Thinking-only (always on)"
return f"""**{config.get('name', 'Unknown')}**
**Size:** {config.get('params_size', 'N/A')}
**Max Context:** {config.get('max_context', 0):,} tokens
**Default n_ctx:** {config.get('default_n_ctx', 4096):,} tokens (user-adjustable via slider)
**Repository:** `{config.get('repo_id', 'N/A')}`{reasoning_support}
**Extraction-Optimized Settings:**
- Temperature: {settings.get('temperature', 'N/A')}
- Top P: {settings.get('top_p', 'N/A')}
- Top K: {settings.get('top_k', 'N/A')}
- Repeat Penalty: {settings.get('repeat_penalty', 'N/A')}
"""
def get_embedding_model_info(model_key: str) -> str:
"""Generate markdown info for embedding model."""
from meeting_summarizer.extraction import EMBEDDING_MODELS
config = EMBEDDING_MODELS.get(model_key, {})
if not config:
return "**Embedding Model**\n\nSelect a model to see details"
return f"""**{config.get('name', 'Unknown')}**
**Embedding Dimension:** {config.get('embedding_dim', 'N/A')}
**Context:** {config.get('max_context', 0):,} tokens
**Repository:** `{config.get('repo_id', 'N/A')}`
**Description:** {config.get('description', 'N/A')}
"""
def get_synthesis_model_info(model_key: str) -> str:
"""Generate markdown info for synthesis model."""
config = SYNTHESIS_MODELS.get(model_key, {})
if not config:
return "**Synthesis Model**\n\nSelect a model to see details"
settings = config.get("inference_settings", {})
reasoning_support = ""
if config.get("supports_toggle"):
reasoning_support = "\n**Reasoning:** Hybrid (user-toggleable)"
elif config.get("supports_reasoning"):
reasoning_support = "\n**Reasoning:** Thinking-only (always on)"
return f"""**{config.get('name', 'Unknown')}**
**Max Context:** {config.get('max_context', 0):,} tokens
**Repository:** `{config.get('repo_id', 'N/A')}`{reasoning_support}
**Synthesis-Optimized Settings:**
- Temperature: {settings.get('temperature', 'N/A')}
- Top P: {settings.get('top_p', 'N/A')}
- Top K: {settings.get('top_k', 'N/A')}
- Repeat Penalty: {settings.get('repeat_penalty', 'N/A')}
"""
def summarize_advanced(
transcript: str,
extraction_model_key: str,
embedding_model_key: str,
synthesis_model_key: str,
extraction_n_ctx: int,
overlap_turns: int,
similarity_threshold: float,
enable_extraction_reasoning: bool,
enable_synthesis_reasoning: bool,
output_language: str,
max_tokens: int,
enable_logging: bool,
n_threads: int = 2,
temperature: float = 0.6,
top_p: float = 0.95,
top_k: int = 20
) -> Generator[Dict[str, Any], None, None]:
"""
Advanced 3-stage pipeline: Extraction → Deduplication → Synthesis.
Yields progress updates as dicts with keys:
- stage: "extraction" | "deduplication" | "synthesis" | "complete" | "error"
- ticker: Progress ticker text (for extraction)
- thinking: Thinking/reasoning content
- summary: Final summary (for synthesis/complete)
- error: Error message (if any)
- trace_stats: Summary statistics (on complete)
"""
from meeting_summarizer.trace import Tracer
from meeting_summarizer.extraction import (
EmbeddingModel, Window, preprocess_transcript,
stream_extract_from_window, deduplicate_items, stream_synthesize_executive_summary
)
# Initialize tracer
tracer = Tracer(enabled=enable_logging)
extraction_llm = None
embedding_model = None
synthesis_llm = None
try:
# ===== STAGE 1: EXTRACTION =====
yield {"stage": "extraction", "ticker": "Loading extraction model...", "thinking": "", "summary": ""}
extraction_llm, load_msg = load_model_for_role(
model_key=extraction_model_key,
model_role="extraction",
n_threads=n_threads,
user_n_ctx=extraction_n_ctx
)
yield {"stage": "extraction", "ticker": load_msg, "thinking": "", "summary": ""}
# Use the model's actual tokenizer for accurate token counting
def count_tokens(text: str) -> int:
"""Count tokens using the extraction model's tokenizer."""
return len(extraction_llm.tokenize(text.encode('utf-8')))
# Preprocess transcript: strip CSV format, remove noise/repetition
raw_line_count = len(transcript.split('\n'))
raw_char_count = len(transcript)
transcript, noise_phrases = preprocess_transcript(transcript)
cleaned_line_count = len(transcript.split('\n'))
cleaned_char_count = len(transcript)
# Log preprocessing info to tracer
tracer.log_preprocessing(
original_line_count=raw_line_count,
cleaned_line_count=cleaned_line_count,
original_char_count=raw_char_count,
cleaned_char_count=cleaned_char_count,
noise_phrases_removed=noise_phrases
)
# Create windows from preprocessed transcript
lines = [l.strip() for l in transcript.split('\n') if l.strip()]
# Reserve tokens for system prompt (~200) and output (~2048)
max_window_tokens = extraction_n_ctx - 2300 # Target ~1800 tokens per window
# Simple windowing: split into chunks based on token count
windows = []
current_window = []
current_tokens = 0
window_id = 1
for line_num, line in enumerate(lines):
line_tokens = count_tokens(line)
if current_tokens + line_tokens > max_window_tokens and current_window:
# Create window
window_content = '\n'.join(current_window)
windows.append(Window(
id=window_id,
content=window_content,
start_turn=line_num - len(current_window),
end_turn=line_num - 1,
token_count=current_tokens
))
# Log window to tracer for debugging
tracer.log_window(
window_id=window_id,
content=window_content,
token_count=current_tokens,
start_turn=line_num - len(current_window),
end_turn=line_num - 1
)
window_id += 1
# Start new window with overlap
overlap_lines = current_window[-overlap_turns:] if len(current_window) >= overlap_turns else current_window
current_window = overlap_lines + [line]
current_tokens = sum(count_tokens(l) for l in current_window)
else:
current_window.append(line)
current_tokens += line_tokens
# Add final window
if current_window:
window_content = '\n'.join(current_window)
windows.append(Window(
id=window_id,
content=window_content,
start_turn=len(lines) - len(current_window),
end_turn=len(lines) - 1,
token_count=current_tokens
))
# Log window to tracer for debugging
tracer.log_window(
window_id=window_id,
content=window_content,
token_count=current_tokens,
start_turn=len(lines) - len(current_window),
end_turn=len(lines) - 1
)
total_windows = len(windows)
yield {"stage": "extraction", "ticker": f"Created {total_windows} windows", "thinking": "", "summary": ""}
# Extract from each window
all_items = {"action_items": [], "decisions": [], "key_points": [], "open_questions": []}
extraction_config = get_model_config(extraction_model_key, "extraction")
for window in windows:
for ticker, thinking, partial_items, is_complete in stream_extract_from_window(
extraction_llm=extraction_llm,
window=window,
window_id=window.id,
total_windows=total_windows,
tracer=tracer,
model_config=extraction_config,
enable_reasoning=enable_extraction_reasoning
):
yield {"stage": "extraction", "ticker": ticker, "thinking": thinking, "summary": ""}
if is_complete:
# Merge items
for category, items in partial_items.items():
all_items[category].extend(items)
# Unload extraction model
unload_model(extraction_llm, "extraction model")
extraction_llm = None
total_extracted = sum(len(v) for v in all_items.values())
yield {"stage": "extraction", "ticker": f"✅ Extracted {total_extracted} total items", "thinking": "", "summary": ""}
# ===== STAGE 2: DEDUPLICATION =====
yield {"stage": "deduplication", "ticker": "Loading embedding model...", "thinking": "", "summary": ""}
embedding_model = EmbeddingModel(embedding_model_key, n_threads=n_threads)
load_msg = embedding_model.load()
yield {"stage": "deduplication", "ticker": load_msg, "thinking": "", "summary": ""}
# Deduplicate - now a generator for progress updates
deduplicated_items = {"action_items": [], "decisions": [], "key_points": [], "open_questions": []}
categories_processed = 0
total_categories = len([k for k, v in all_items.items() if v])
for intermediate_dedup in deduplicate_items(
all_items=all_items,
embedding_model=embedding_model,
similarity_threshold=similarity_threshold,
tracer=tracer
):
deduplicated_items = intermediate_dedup
categories_processed += 1
current_total = sum(len(v) for v in deduplicated_items.values())
yield {
"stage": "deduplication",
"ticker": f"Deduplicating: {categories_processed}/{total_categories} categories processed ({current_total} items so far)...",
"thinking": "",
"summary": ""
}
# Unload embedding model
embedding_model.unload()
embedding_model = None
total_deduplicated = sum(len(v) for v in deduplicated_items.values())
duplicates_removed = total_extracted - total_deduplicated
yield {
"stage": "deduplication",
"ticker": f"✅ Deduplication complete: {total_extracted}{total_deduplicated} ({duplicates_removed} duplicates removed)",
"thinking": "",
"summary": ""
}
# ===== STAGE 3: SYNTHESIS =====
yield {"stage": "synthesis", "ticker": "", "thinking": "Loading synthesis model...", "summary": ""}
synthesis_llm, load_msg = load_model_for_role(
model_key=synthesis_model_key,
model_role="synthesis",
n_threads=n_threads
)
yield {"stage": "synthesis", "ticker": "", "thinking": f"✅ {load_msg}", "summary": ""}
# Synthesize
synthesis_config = get_model_config(synthesis_model_key, "synthesis")
# Override inference settings with custom parameters
synthesis_config["inference_settings"] = {
"temperature": temperature,
"top_p": top_p,
"top_k": top_k,
"repeat_penalty": 1.1
}
final_summary = ""
final_thinking = ""
for summary_chunk, thinking_chunk, is_complete in stream_synthesize_executive_summary(
synthesis_llm=synthesis_llm,
deduplicated_items=deduplicated_items,
model_config=synthesis_config,
output_language=output_language,
enable_reasoning=enable_synthesis_reasoning,
max_tokens=max_tokens,
tracer=tracer
):
final_summary = summary_chunk
final_thinking = thinking_chunk
yield {"stage": "synthesis", "ticker": "", "thinking": thinking_chunk, "summary": summary_chunk}
# Unload synthesis model
unload_model(synthesis_llm, "synthesis model")
synthesis_llm = None
# Apply Chinese conversion if needed
if output_language == "zh-TW":
converter = OpenCC('s2twp')
final_summary = converter.convert(final_summary)
if final_thinking:
final_thinking = converter.convert(final_thinking)
# Get trace stats and add model names for download JSON
trace_stats = tracer.get_summary_stats()
debug_json = tracer.get_debug_json()
ext_config = get_model_config(extraction_model_key, "extraction")
syn_config = get_model_config(synthesis_model_key, "synthesis")
trace_stats["extraction_model"] = ext_config.get("name", extraction_model_key)
trace_stats["embedding_model"] = embedding_model_key
trace_stats["synthesis_model"] = syn_config.get("name", synthesis_model_key)
yield {
"stage": "complete",
"ticker": "",
"thinking": final_thinking,
"summary": final_summary,
"trace_stats": trace_stats,
"trace_json": tracer.get_trace_json(),
"debug_json": debug_json
}
except Exception as e:
logger.error(f"Advanced pipeline error: {e}", exc_info=True)
# Cleanup
if extraction_llm:
unload_model(extraction_llm, "extraction model")
if embedding_model:
embedding_model.unload()
if synthesis_llm:
unload_model(synthesis_llm, "synthesis model")
yield {
"stage": "error",
"ticker": "",
"thinking": "",
"summary": "",
"error": str(e)
}
def download_summary_json(summary, thinking, model_key, language, metrics):
"""Generate JSON file with summary and metadata for both Standard and Advanced modes."""
import json
from datetime import datetime
is_advanced = isinstance(metrics, dict) and metrics.get("mode") == "advanced"
if is_advanced:
# Advanced Mode: embed trace data and use pipeline model names
trace_stats = metrics.get("trace_stats", {})
debug_info = metrics.get("debug_json", {})
data = {
"metadata": {
"generated_at": datetime.now().isoformat(),
"mode": "advanced",
"pipeline": "extraction → deduplication → synthesis",
"extraction_model": trace_stats.get("extraction_model", "unknown"),
"embedding_model": trace_stats.get("embedding_model", "unknown"),
"synthesis_model": trace_stats.get("synthesis_model", "unknown"),
"language": language
},
"thinking_process": thinking,
"summary": summary,
"pipeline_stats": {
"total_windows": trace_stats.get("total_windows", 0),
"successful_extractions": trace_stats.get("successful_extractions", 0),
"total_items_extracted": trace_stats.get("total_items_extracted", 0),
"total_items_after_dedup": trace_stats.get("total_items_after_dedup", 0),
"total_duplicates_removed": trace_stats.get("total_duplicates_removed", 0),
"duplicate_rate": trace_stats.get("duplicate_rate", 0),
"synthesis_success": trace_stats.get("synthesis_success", False),
"total_elapsed_seconds": trace_stats.get("total_elapsed_seconds", 0),
},
"debug_info": debug_info,
"trace": metrics.get("trace_json", [])
}
else:
# Standard Mode: original behavior
model_name = "unknown"
if model_key and model_key in AVAILABLE_MODELS:
model_name = AVAILABLE_MODELS[model_key]["name"]
data = {
"metadata": {
"generated_at": datetime.now().isoformat(),
"mode": "standard",
"model": model_name,
"model_id": model_key,
"language": language
},
"thinking_process": thinking,
"summary": summary
}
# Add generation metrics if available
if metrics and isinstance(metrics, dict):
data["generation_metrics"] = {
"settings_used": metrics.get("settings", {}),
"timing": {
"time_to_first_token_ms": round(metrics.get("time_to_first_token_ms", 0), 2) if metrics.get("time_to_first_token_ms") else None,
"total_processing_time_ms": round(metrics.get("total_processing_time_ms", 0), 2) if metrics.get("total_processing_time_ms") else None,
"model_load_time_ms": round(metrics.get("model_load_time_ms", 0), 2) if metrics.get("model_load_time_ms") else None,
},
"tokens": {
"n_ctx": metrics.get("n_ctx"),
"input_tokens": metrics.get("input_tokens"),
"output_tokens": metrics.get("output_tokens"),
"thinking_tokens": metrics.get("thinking_tokens"),
"total_tokens": metrics.get("total_tokens"),
"generation_tokens": metrics.get("generation_tokens"),
"prefill_tokens": metrics.get("prefill_tokens")
},
"performance": {
"generation_speed_tps": round(metrics.get("generation_speed_tps", 0), 2) if metrics.get("generation_speed_tps") else None,
"prefill_speed_tps": round(metrics.get("prefill_speed_tps", 0), 2) if metrics.get("prefill_speed_tps") else None
},
"file_info": metrics.get("file_info", {}),
"truncation_info": metrics.get("truncation_info", {})
}
filename = f"summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return gr.update(value=filename, visible=True)
def estimate_tokens(text: str) -> int:
"""
Estimate token count for mixed CJK/English text.
~3 UTF-8 bytes per token for Chinese-heavy content.
"""
return len(text.encode('utf-8')) // 3
def calculate_n_ctx(model_key: str, transcript: str, max_tokens: int, enable_reasoning: bool = False) -> Tuple[int, str]:
"""
Calculate optimal n_ctx based on model limits and input size.
Args:
model_key: Model identifier from AVAILABLE_MODELS
transcript: Input text content
max_tokens: Maximum tokens to generate for summary
enable_reasoning: If True, add extra buffer for thinking tokens
Returns:
Tuple of (n_ctx, warning_message) -- warning is "" if no issue
"""
model = AVAILABLE_MODELS[model_key]
model_max = model["max_context"]
usable_max = min(model_max, MAX_USABLE_CTX)
input_tokens = estimate_tokens(transcript)
# Calculate thinking buffer for reasoning models
thinking_buffer = 0
if enable_reasoning:
# Reserve 50% of max_tokens for thinking output
thinking_buffer = int(max_tokens * 0.5)
required = input_tokens + max_tokens + thinking_buffer + 512 # 512 for system prompt + buffer
# Round up to nearest 512 for efficiency
n_ctx = ((required // 512) + 1) * 512
n_ctx = max(2048, min(n_ctx, usable_max))
warning = ""
if required > usable_max:
available_input = usable_max - max_tokens - thinking_buffer - 512
warning = (
f"⚠️ Warning: File too large for {model['name']} "
f"(need ~{required:,} tokens, max {usable_max:,}). "
f"Input will be truncated to ~{available_input:,} tokens. "
f"Consider Hunyuan (256K) or ERNIE (131K) for large files."
)
return n_ctx, warning
def calculate_effective_max_tokens(model_key: str, max_tokens: int, enable_reasoning: bool) -> int:
"""
Calculate effective max_tokens with thinking headroom for reasoning models.
When reasoning is enabled for thinking-capable models, adds 50% headroom
to accommodate both thinking process and final output.
Args:
model_key: Model identifier from AVAILABLE_MODELS
max_tokens: User-specified maximum tokens
enable_reasoning: Whether reasoning mode is enabled
Returns:
Adjusted max_tokens value (1.5x for reasoning models, unchanged otherwise)
"""
if not enable_reasoning:
return max_tokens
model_config = AVAILABLE_MODELS.get(model_key)
if not model_config:
return max_tokens
# Check if model supports reasoning/thinking
supports_reasoning = model_config.get("supports_reasoning", False)
if supports_reasoning:
# Add 50% headroom for thinking process
thinking_headroom = int(max_tokens * 0.5)
effective_max = max_tokens + thinking_headroom
logger.info(f"Reasoning enabled for {model_key}: extending max_tokens from {max_tokens} to {effective_max}")
return effective_max
return max_tokens
def get_model_info(model_key: str, n_threads: int = 2, custom_metadata: Optional[dict] = None) -> Tuple[str, str, float, int]:
"""Get model information and inference settings for UI display.
Args:
model_key: Model identifier from AVAILABLE_MODELS
n_threads: Number of CPU threads currently configured
custom_metadata: Optional metadata for custom models (repo_id, filename, size_mb)
Returns:
Tuple of (info_text, temperature, top_p, top_k)
"""
# Handle custom model case
if model_key == "custom_hf" and custom_metadata:
repo_id = custom_metadata.get("repo_id", "Unknown")
filename = custom_metadata.get("filename", "Unknown")
size_mb = custom_metadata.get("size_mb", 0)
size_str = f"{size_mb:.1f} MB" if size_mb > 0 else "Unknown"
# Determine thread preset label
if n_threads == 2:
thread_label = "HF Free Tier"
elif n_threads == 8:
thread_label = "HF Upgrade Tier"
else:
thread_label = "Custom"
info_text = (
f"## 🤖 Custom GGUF Model\n\n"
f"### 📊 Model Specs\n"
f"| Property | Value |\n"
f"|----------|-------|\n"
f"| **Repository** | `{repo_id}` |\n"
f"| **Quantization** | `{filename}` |\n"
f"| **Size** | {size_str} |\n"
f"| **Context** | Dynamic (up to 32K) |\n\n"
f"### 🖥️ Hardware Configuration\n"
f"| Property | Value |\n"
f"|----------|-------|\n"
f"| **CPU Threads** | {n_threads} ({thread_label}) |\n\n"
f"### ⚙️ Inference Settings\n"
f"| Property | Value |\n"
f"|----------|-------|\n"
f"| **Temperature** | 0.6 |\n"
f"| **Top P** | 0.9 |\n"
f"| **Top K** | 40 |\n"
f"| **Repeat Penalty** | 1.0 |"
)
return info_text, "0.6", 0.9, 40
# Handle predefined models
m = AVAILABLE_MODELS[model_key]
usable_ctx = min(m["max_context"], MAX_USABLE_CTX)
settings = m["inference_settings"]
# Determine thread preset label
if n_threads == 2:
thread_label = "HF Free Tier"
elif n_threads == 8:
thread_label = "HF Upgrade Tier"
else:
thread_label = "Custom"
info_text = (
f"## 🤖 {m['name']}\n\n"
f"### 📊 Model Specs\n"
f"| Property | Value |\n"
f"|----------|-------|\n"
f"| **Context** | {m['max_context']:,} tokens (capped at {usable_ctx:,}) |\n"
f"| **Quantization** | `{m['filename']}` |\n"
f"| **Repository** | `{m['repo_id']}` |\n\n"
f"### 🖥️ Hardware Configuration\n"
f"| Property | Value |\n"
f"|----------|-------|\n"
f"| **CPU Threads** | {n_threads} ({thread_label}) |\n\n"
f"### ⚙️ Inference Settings\n"
f"| Property | Value |\n"
f"|----------|-------|\n"
f"| **Temperature** | {settings['temperature']} |\n"
f"| **Top P** | {settings['top_p']} |\n"
f"| **Top K** | {settings['top_k']} |\n"
f"| **Repeat Penalty** | {settings.get('repeat_penalty', 1.0)} |"
)
return info_text, str(settings["temperature"]), settings["top_p"], settings["top_k"]
def parse_thinking_blocks(content: str, streaming: bool = False) -> Tuple[str, str]:
"""
Parse thinking blocks from model output.
Supports both <think> and <thinking> tags.
Args:
content: Full model response
streaming: If True, handle unclosed <think> tags for live display
Returns:
Tuple of (thinking_content, summary_content)
"""
closed_pattern = r'<think(?:ing)?>(.*?)</think(?:ing)?>'
open_pattern = r'<think(?:ing)?>([^<]*)$'
# Extract completed thinking blocks
closed_matches = re.findall(closed_pattern, content, re.DOTALL)
# Remove completed blocks to get summary
remaining = re.sub(closed_pattern, '', content, flags=re.DOTALL).strip()
thinking_parts = [m.strip() for m in closed_matches if m.strip()]
if streaming:
# Check for unclosed <think> tag (model still generating thinking tokens)
open_match = re.search(open_pattern, content, re.DOTALL)
if open_match:
partial = open_match.group(1).strip()
if partial:
thinking_parts.append(partial)
# Nothing after the open tag counts as summary yet
remaining = re.sub(r'<think(?:ing)?>[^<]*$', '', remaining, flags=re.DOTALL).strip()
thinking = '\n\n'.join(thinking_parts)
if not thinking and not closed_matches:
# No thinking tags found at all
return ("", content if not content.startswith('<think') else "")
return (thinking, remaining)
def summarize_streaming(
file_obj,
text_input: str = "",
model_key: str = "qwen3_600m_q4",
enable_reasoning: bool = True,
max_tokens: int = 2048,
temperature: float = 0.6,
top_p: float = None,
top_k: int = None,
output_language: str = "en",
thread_config: str = "free",
custom_threads: int = 4,
custom_model_state: Any = None,
) -> Generator[Tuple[str, str, str, dict, str], None, None]:
"""
Stream summary generation from uploaded file or text input.
Args:
file_obj: Gradio file object
text_input: Direct text input from user
model_key: Model identifier from AVAILABLE_MODELS
enable_reasoning: Whether to use reasoning mode (/think) for Qwen3 models
max_tokens: Maximum tokens to generate
top_p: Nucleus sampling parameter (uses model default if None)
top_k: Top-k sampling parameter (uses model default if None)
output_language: Target language for summary ("en" or "zh-TW")
thread_config: Thread configuration preset ("free", "upgrade", "custom")
custom_threads: Custom thread count when preset is "custom"
custom_model_state: Pre-loaded custom model (if using custom_hf)
Yields:
Tuple of (thinking_text, summary_text, info_text, metrics_dict, system_prompt)
"""
import time
metrics = {
"start_time": None,
"time_to_first_token_ms": None,
"generation_start_time": None,
"generation_end_time": None,
"model_load_time_ms": None,
"total_tokens": 0,
"generation_tokens": 0,
"prefill_tokens": 0,
"input_tokens": 0,
"output_tokens": 0,
"thinking_tokens": 0,
"n_ctx": 0,
"settings": {},
"file_info": {},
"truncation_info": {},
}
global llm, converter
# Determine thread count based on configuration preset
thread_preset_map = {
"free": 2, # HF Spaces Free Tier: 2 vCPUs
"upgrade": 8, # HF Spaces CPU Upgrade: 8 vCPUs
"custom": custom_threads, # User-specified thread count
}
n_threads = thread_preset_map.get(thread_config, 2)
logger.info(f"Using {n_threads} threads (config: {thread_config})")
model = AVAILABLE_MODELS[model_key]
usable_max = min(model["max_context"], MAX_USABLE_CTX)
# Adjust max_tokens for thinking models when reasoning is enabled
original_max_tokens = max_tokens
max_tokens = calculate_effective_max_tokens(model_key, max_tokens, enable_reasoning)
if max_tokens != original_max_tokens:
logger.info(f"Adjusted max_tokens from {original_max_tokens} to {max_tokens} for reasoning mode")
# Validate max_tokens fits in context
if max_tokens > usable_max - 512:
max_tokens = usable_max - 512
# Read input source (prioritize text_input)
try:
transcript = ""
source_name = "Direct Input"
source_size = 0
if text_input and text_input.strip():
transcript = text_input
source_size = len(transcript.encode('utf-8'))
elif file_obj is not None:
path = file_obj.name if hasattr(file_obj, 'name') else file_obj
source_name = os.path.basename(path)
source_size = os.path.getsize(path)
with open(path, 'r', encoding='utf-8') as f:
transcript = f.read()
else:
system_prompt_preview = build_system_prompt(output_language, False, enable_reasoning)
yield ("", "Error: Please upload a file or paste text first", "", metrics, system_prompt_preview)
return
# Store input info
metrics["file_info"] = {
"source": source_name,
"size_bytes": source_size,
"original_char_count": len(transcript),
}
except Exception as e:
system_prompt_preview = build_system_prompt(output_language, False, enable_reasoning)
yield ("", f"Error reading input: {e}", "", metrics, system_prompt_preview)
return
if not transcript.strip():
system_prompt_preview = build_system_prompt(output_language, False, enable_reasoning)
yield ("", "Error: File is empty", "", metrics, system_prompt_preview)
return
# Calculate context and check truncation (with reasoning buffer if enabled)
n_ctx, warning = calculate_n_ctx(model_key, transcript, max_tokens, enable_reasoning)
metrics["n_ctx"] = n_ctx
# Truncate if needed (estimate max chars from available tokens)
available_tokens = usable_max - max_tokens - 512
max_bytes = available_tokens * 3 # Reverse estimate: tokens * 3 bytes
encoded = transcript.encode('utf-8')
was_truncated = len(encoded) > max_bytes
original_length = len(transcript)
if was_truncated:
transcript = encoded[:max_bytes].decode('utf-8', errors='ignore')
transcript += "\n\n[Content truncated to fit model context]"
# Store truncation info
metrics["truncation_info"] = {
"was_truncated": was_truncated,
"original_char_count": original_length,
"final_char_count": len(transcript),
"original_token_estimate": estimate_tokens(transcript) if not was_truncated else estimate_tokens(encoded[:max_bytes].decode('utf-8', errors='ignore')),
}
# Get base model info with current thread configuration
info_text, _, _, _ = get_model_info(model_key, n_threads=n_threads)
# Build generation stats section
input_tokens = estimate_tokens(transcript)
max_output_text = f"{max_tokens:,} tokens"
if max_tokens != original_max_tokens:
max_output_text += f" (adjusted from {original_max_tokens:,} for thinking mode)"
generation_stats = (
f"\n\n### 📈 Generation Stats\n"
f"| Property | Value |\n"
f"|----------|-------|\n"
f"| **Context Window** | {n_ctx:,} tokens |\n"
f"| **Input Tokens** | ~{input_tokens:,} tokens |\n"
f"| **Max Output** | {max_output_text} |"
)
# Combine model info with generation stats
info = info_text + generation_stats
if warning:
info += f"\n\n⚠️ {warning}"
# Load model (no-op if already loaded) with timing
model_load_start = time.time()
try:
if model_key == "custom_hf":
# Use pre-loaded custom model
if custom_model_state is None:
system_prompt_preview = build_system_prompt(output_language, False, enable_reasoning)
yield ("", "Error: No custom model loaded. Please load a custom model first.", "", metrics, system_prompt_preview)
return
llm = custom_model_state
load_msg = "Using pre-loaded custom model"
else:
llm, load_msg = load_model(model_key, n_threads=n_threads)
logger.info(load_msg)
metrics["model_load_time_ms"] = (time.time() - model_load_start) * 1000
except Exception as e:
system_prompt_preview = build_system_prompt(output_language, False, enable_reasoning)
yield ("", f"Error loading model: {e}", "", metrics, system_prompt_preview)
return
# Prepare system prompt with reasoning toggle for Qwen3 models
if model_key == "custom_hf":
# Use default settings for custom models
model = AVAILABLE_MODELS["custom_hf"]
else:
model = AVAILABLE_MODELS[model_key]
# Calculate dynamic temperature for Qwen3 models
if model.get("supports_toggle") and "temperature_thinking" in model.get("inference_settings", {}):
if enable_reasoning:
effective_temperature = model["inference_settings"]["temperature_thinking"]
else:
effective_temperature = model["inference_settings"]["temperature_no_thinking"]
else:
effective_temperature = temperature
# Build system and user prompts using the extracted function
system_content = build_system_prompt(output_language, model.get("supports_toggle", False), enable_reasoning)
user_content = build_user_prompt(transcript, output_language)
messages = [
{"role": "system", "content": system_content},
{"role": "user", "content": user_content},
]
# Get model-specific inference settings
inference_settings = model["inference_settings"]
temperature = inference_settings["temperature"]
final_top_p = top_p if top_p is not None else inference_settings["top_p"]
final_top_k = top_k if top_k is not None else inference_settings["top_k"]
repeat_penalty = inference_settings["repeat_penalty"]
# Stream - NO stop= parameter, let GGUF metadata handle it
full_response = ""
current_thinking = ""
current_summary = ""
try:
# Record generation settings
metrics["settings"] = {
"model": model_key,
"max_tokens": max_tokens,
"temperature": effective_temperature,
"top_p": final_top_p,
"top_k": final_top_k,
"repeat_penalty": repeat_penalty,
"enable_reasoning": enable_reasoning,
"output_language": output_language,
"n_ctx": metrics["n_ctx"],
}
# Calculate exact input tokens (system + user prompts)
system_tokens = estimate_tokens(system_content)
user_tokens = estimate_tokens(user_content)
metrics["input_tokens"] = system_tokens + user_tokens
# Start timing
metrics["start_time"] = time.time()
first_token_time = None
token_count = 0
# Apply model-specific inference settings
stream = llm.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=effective_temperature,
min_p=0.0,
top_p=final_top_p,
top_k=final_top_k,
repeat_penalty=repeat_penalty,
stream=True,
)
metrics["generation_start_time"] = time.time()
for chunk in stream:
if 'choices' in chunk and len(chunk['choices']) > 0:
delta = chunk['choices'][0].get('delta', {})
content = delta.get('content', '')
if content:
# Track time to first token
if first_token_time is None:
first_token_time = time.time()
metrics["time_to_first_token_ms"] = (first_token_time - metrics["start_time"]) * 1000
token_count += 1
if output_language == "zh-TW":
converted = converter.convert(content)
full_response += converted
else:
full_response += content
thinking, summary = parse_thinking_blocks(full_response, streaming=True)
current_thinking = thinking or ""
current_summary = summary or ""
yield (current_thinking, current_summary, info, metrics, system_content)
# Final timing calculations
metrics["generation_end_time"] = time.time()
metrics["generation_tokens"] = token_count
metrics["total_tokens"] = token_count
# Calculate speeds
generation_duration = metrics["generation_end_time"] - metrics["generation_start_time"]
if generation_duration > 0:
metrics["generation_speed_tps"] = token_count / generation_duration
else:
metrics["generation_speed_tps"] = 0.0
# Prefill = time from start to first token
if metrics["time_to_first_token_ms"]:
prefill_seconds = metrics["time_to_first_token_ms"] / 1000
# Estimate prefill tokens (input tokens processed before first output)
input_tokens = estimate_tokens(transcript)
metrics["prefill_tokens"] = input_tokens
if prefill_seconds > 0:
metrics["prefill_speed_tps"] = input_tokens / prefill_seconds
else:
metrics["prefill_speed_tps"] = 0.0
# Total processing time
metrics["total_processing_time_ms"] = (metrics["generation_end_time"] - metrics["start_time"]) * 1000
# Final parse and token counts
thinking, summary = parse_thinking_blocks(full_response)
# Calculate output tokens
metrics["output_tokens"] = estimate_tokens(summary) if summary else 0
metrics["thinking_tokens"] = estimate_tokens(thinking) if thinking else 0
# Update totals
metrics["total_tokens"] = metrics["input_tokens"] + metrics["output_tokens"] + metrics["thinking_tokens"]
yield (thinking or "", summary or "", info, metrics, system_content)
llm.reset()
except Exception as e:
logger.error(f"Generation error: {e}")
metrics["error"] = str(e)
yield (current_thinking, current_summary + f"\n\nError: {e}", info, metrics, system_content)
# Custom CSS for better UI
custom_css = """
:root {
--primary-color: #6366f1;
--primary-dark: #4f46e5;
--primary-light: #c7d2fe;
--accent-color: #8b5cf6;
--bg-color: #f8fafc;
--card-bg: rgba(255, 255, 255, 0.85);
--text-color: #1e293b;
--text-muted: #64748b;
--border-color: #e2e8f0;
--border-light: #f1f5f9;
/* Semantic Colors */
--thinking-bg: #f5f3ff;
--thinking-border: #ddd6fe;
--thinking-accent: #8b5cf6;
--summary-bg: #f0fdf4;
--summary-border: #dcfce7;
--summary-accent: #22c55e;
--shadow-sm: 0 1px 2px rgba(0, 0, 0, 0.05);
--shadow-md: 0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06);
--shadow-lg: 0 10px 15px -3px rgba(0, 0, 0, 0.1), 0 4px 6px -2px rgba(0, 0, 0, 0.05);
--radius-sm: 8px;
--radius-md: 12px;
--radius-lg: 20px;
}
/* ===== LAYOUT & BASE ===== */
.gradio-container {
max-width: 1400px !important;
background: radial-gradient(circle at top right, #eef2ff 0%, #f8fafc 40%) !important;
}
/* ===== HEADER ===== */
.app-header {
text-align: center;
padding: 2.5rem 1.5rem;
background: linear-gradient(135deg, var(--primary-color) 0%, var(--accent-color) 100%);
border-radius: var(--radius-lg);
margin-bottom: 2rem;
color: white;
box-shadow: var(--shadow-lg);
position: relative;
overflow: hidden;
}
.app-header::before {
content: "";
position: absolute;
top: -50%;
left: -50%;
width: 200%;
height: 200%;
background: radial-gradient(circle, rgba(255,255,255,0.1) 0%, transparent 60%);
animation: rotate 20s linear infinite;
}
@keyframes rotate {
from { transform: rotate(0deg); }
to { transform: rotate(360deg); }
}
.app-header h1 {
margin: 0 0 0.5rem 0;
font-size: 2.5rem;
font-weight: 800;
letter-spacing: -0.04em;
position: relative;
z-index: 1;
}
.app-header p {
margin: 0;
opacity: 0.9;
font-size: 1.15rem;
font-weight: 400;
position: relative;
z-index: 1;
}
.model-badge {
display: inline-flex;
align-items: center;
gap: 0.5rem;
background: rgba(255, 255, 255, 0.15);
padding: 0.6rem 1.25rem;
border-radius: 30px;
font-size: 0.9rem;
margin-top: 1.25rem;
backdrop-filter: blur(8px);
border: 1px solid rgba(255, 255, 255, 0.2);
position: relative;
z-index: 1;
font-weight: 500;
}
/* ===== INSTRUCTIONS ===== */
.instructions {
background: var(--card-bg);
border-left: 5px solid var(--primary-color);
padding: 1.25rem 1.5rem;
border-radius: var(--radius-sm) var(--radius-md) var(--radius-md) var(--radius-sm);
margin-bottom: 2rem;
box-shadow: var(--shadow-sm);
backdrop-filter: blur(10px);
border: 1px solid var(--border-color);
}
/* ===== SECTION HEADERS ===== */
.section-header {
font-size: 0.95rem;
font-weight: 700;
color: var(--text-color);
margin-bottom: 1rem;
display: flex;
align-items: center;
gap: 0.6rem;
padding-bottom: 0.6rem;
border-bottom: 2px solid var(--border-light);
text-transform: uppercase;
letter-spacing: 0.05em;
}
.section-icon {
font-size: 1.2rem;
}
/* ===== TABS STYLING ===== */
.gradio-tabs {
border: 1px solid var(--border-color) !important;
border-radius: var(--radius-md) !important;
overflow: hidden;
box-shadow: var(--shadow-sm);
background: var(--card-bg) !important;
backdrop-filter: blur(10px);
}
.tab-nav {
background: #f1f5f9 !important;
padding: 0.25rem 0.25rem 0 0.25rem !important;
gap: 4px !important;
}
.tab-nav button {
border-radius: 8px 8px 0 0 !important;
padding: 0.75rem 1rem !important;
}
/* ===== GROUPS & CARDS ===== */
.gradio-group {
border: 1px solid var(--border-color) !important;
border-radius: var(--radius-md) !important;
padding: 1.25rem !important;
background: var(--card-bg) !important;
box-shadow: var(--shadow-sm) !important;
margin-bottom: 1.5rem !important;
backdrop-filter: blur(10px);
transition: transform 0.2s ease, box-shadow 0.2s ease !important;
}
.gradio-group:hover {
box-shadow: var(--shadow-md) !important;
}
/* ===== ACCORDION STYLING ===== */
.gradio-accordion {
border: 1px solid var(--border-color) !important;
border-radius: var(--radius-md) !important;
background: var(--card-bg) !important;
}
/* ===== BUTTONS ===== */
.submit-btn {
background: linear-gradient(135deg, var(--primary-color) 0%, var(--accent-color) 100%) !important;
border: none !important;
color: white !important;
font-weight: 700 !important;
padding: 1rem 2rem !important;
border-radius: var(--radius-md) !important;
cursor: pointer;
transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1) !important;
box-shadow: 0 4px 15px rgba(99, 102, 241, 0.4) !important;
width: 100% !important;
font-size: 1.1rem !important;
letter-spacing: 0.02em;
}
.submit-btn:hover {
transform: translateY(-3px) scale(1.02);
box-shadow: 0 8px 25px rgba(99, 102, 241, 0.5) !important;
}
/* ===== OUTPUT BOXES ===== */
.thinking-box {
background: var(--thinking-bg) !important;
border: 1px solid var(--thinking-border) !important;
border-left: 4px solid var(--thinking-accent) !important;
border-radius: var(--radius-md) !important;
font-family: 'JetBrains Mono', 'Fira Code', monospace !important;
transition: all 0.3s ease !important;
}
.thinking-box:focus-within {
box-shadow: 0 0 0 3px rgba(139, 92, 246, 0.1) !important;
}
.summary-box {
background: var(--summary-bg) !important;
border: 1px solid var(--summary-border) !important;
border-radius: var(--radius-md) !important;
padding: 1.5rem !important;
font-size: 1.1rem !important;
line-height: 1.7 !important;
color: #0f172a !important;
box-shadow: var(--shadow-sm);
}
.completion-info {
background: linear-gradient(135deg, #f8fafc 0%, #f1f5f9 100%) !important;
border: 1px solid #cbd5e1 !important;
border-left: 4px solid #10b981 !important;
border-radius: var(--radius-md) !important;
padding: 1.2rem !important;
font-size: 0.95rem !important;
line-height: 1.6 !important;
color: #334155 !important;
box-shadow: 0 2px 8px rgba(0, 0, 0, 0.08);
}
.completion-info h3 {
color: #10b981 !important;
font-size: 1.1rem !important;
margin-bottom: 0.5rem !important;
}
.completion-info strong {
color: #0f172a !important;
}
/* ===== RESPONSIVE ADJUSTMENTS ===== */
@media (max-width: 1024px) {
.gradio-container {
padding: 1rem !important;
}
.submit-btn {
position: sticky;
bottom: 1rem;
z-index: 100;
}
}
@media (max-width: 768px) {
.app-header {
padding: 1.5rem 1rem;
}
.app-header h1 {
font-size: 1.8rem;
}
}
/* ===== MODE VISUAL INDICATORS ===== */
/* Style for visible mode groups to indicate they are active */
.gradio-group:not([style*="display: none"]) {
position: relative;
}
/* Add subtle highlight border to active mode group */
.gradio-group:not([style*="display: none"]) > .form {
border-left: 3px solid var(--primary-color);
padding-left: 12px;
background: linear-gradient(90deg, rgba(99, 102, 241, 0.03) 0%, transparent 100%);
}
"""
# Create Gradio interface
def create_interface():
"""Create and configure the Gradio interface."""
with gr.Blocks(
title="Tiny Scribe - AI Transcript Summarizer"
) as demo:
# Header section (simplified - no Row/Column wrapper needed for full-width)
gr.HTML("""
<div class="app-header">
<h1>📄 Tiny Scribe</h1>
<p>AI-Powered Transcript Summarization with Real-Time Streaming</p>
<div class="model-badge">
<span>Select a model below to get started</span>
</div>
</div>
""")
# Instructions (simplified)
gr.HTML("""
<div class="instructions">
<strong>📋 How to use:</strong>
<ul>
<li>Upload a .txt file containing your transcript, notes, or document</li>
<li>Click "Generate Summary" to start AI processing</li>
<li>Watch the <strong>Thinking Process</strong> (left) - see how the AI reasons</li>
<li>Read the <strong>Final Summary</strong> (right) - the polished result</li>
<li>Both outputs stream in real-time as the AI generates content</li>
</ul>
</div>
""")
# Main content area
with gr.Row():
# Left column - Configuration
with gr.Column(scale=1):
# ==========================================
# Section 1: Output Configuration
# ==========================================
with gr.Group():
gr.HTML('<div class="section-header"><span class="section-icon">🌐</span> Output Settings</div>')
language_selector = gr.Dropdown(
choices=[("English", "en"), ("Traditional Chinese (zh-TW)", "zh-TW")],
value="en",
label="Output Language",
info="Target language for the summary"
)
with gr.Group():
gr.HTML('<div class="section-header"><span class="section-icon">📥</span> Input Content</div>')
with gr.Tabs() as input_tabs:
with gr.TabItem("📄 Upload File", id=0):
file_input = gr.File(
label="Transcript (.txt)",
file_types=[".txt"],
type="filepath",
elem_classes=["file-upload-area"]
)
with gr.TabItem("✍️ Paste Text", id=1):
text_input = gr.Textbox(
label="Paste Transcript",
placeholder="Paste your transcript content here...",
lines=10,
max_lines=20
)
# ==========================================
# Section 2: Hardware Configuration (Global)
# ==========================================
with gr.Group():
gr.HTML('<div class="section-header"><span class="section-icon">🖥️</span> Hardware Configuration</div>')
thread_config_dropdown = gr.Dropdown(
choices=[
("HF Spaces Free Tier (2 vCPUs)", "free"),
("HF Spaces CPU Upgrade (8 vCPUs)", "upgrade"),
("Custom (manual)", "custom"),
],
value=DEFAULT_THREAD_PRESET,
label="CPU Thread Preset",
info="Select hardware tier or specify custom thread count"
)
custom_threads_slider = gr.Slider(
minimum=1,
maximum=32,
value=DEFAULT_CUSTOM_THREADS if DEFAULT_CUSTOM_THREADS > 0 else 4,
step=1,
label="Custom Thread Count",
info="Number of CPU threads for model inference (1-32)",
visible=DEFAULT_THREAD_PRESET == "custom"
)
# ==========================================
# Section 3: Mode Selection (Standard vs Advanced)
# ==========================================
mode_radio = gr.Radio(
choices=["Standard Mode", "Advanced Mode (3-Model Pipeline)"],
value="Standard Mode",
label="🎯 Summarization Mode",
info="Select between single-model Standard or multi-model Advanced mode"
)
# ===== STANDARD MODE =====
with gr.Group(visible=True) as standard_mode_group:
gr.HTML('<div style="font-size: 0.9em; color: #64748b; margin-bottom: 10px;">📊 <strong>Standard Mode</strong> - Single-model direct summarization</div>')
# Model source selector
model_source_radio = gr.Radio(
choices=["Preset Models", "Custom GGUF"],
value="Preset Models",
label="Model Source",
info="Choose between curated presets or custom HuggingFace models"
)
# Preset Models Group
with gr.Group(visible=True) as preset_models_group:
# Filter out custom_hf from preset choices
preset_choices = [
(info["name"] + (" ⚡" if info.get("supports_reasoning", False) and not info.get("supports_toggle", False) else ""), key)
for key, info in AVAILABLE_MODELS.items()
if key != "custom_hf"
]
model_dropdown = gr.Dropdown(
choices=preset_choices,
value=DEFAULT_MODEL_KEY,
label="Select Model",
info="Smaller = faster. ⚡ = Always-reasoning models."
)
enable_reasoning = gr.Checkbox(
value=True,
label="Enable Reasoning Mode",
info="Uses /think for deeper analysis (slower) or /no_think for direct output (faster).",
interactive=True,
visible=AVAILABLE_MODELS[DEFAULT_MODEL_KEY].get("supports_toggle", False)
)
# Custom GGUF Group
with gr.Group(visible=False) as custom_gguf_group:
gr.HTML('<div style="font-size: 0.85em; color: #64748b; margin-bottom: 10px;">Load any GGUF model from HuggingFace Hub</div>')
# HF Hub Search Component
model_search_input = HuggingfaceHubSearch(
label="🔍 Search HuggingFace Models",
placeholder="Type model name (e.g., 'qwen', 'phi', 'llama')",
search_type="model",
)
# File dropdown (populated after repo discovery)
custom_file_dropdown = gr.Dropdown(
label="📦 Select GGUF File",
choices=[],
value=None,
info="GGUF files appear after selecting a model above",
interactive=True,
)
# Load button
load_btn = gr.Button("⬇️ Load Selected Model", variant="primary", size="sm")
# Status message
custom_status = gr.Textbox(
label="Status",
interactive=False,
value="",
visible=False,
)
retry_btn = gr.Button("🔄 Retry", variant="secondary", visible=False)
# Inference Parameters (Standard Mode)
gr.HTML('<div class="section-header" style="margin-top: 16px;"><span class="section-icon">🎛️</span> Inference Parameters</div>')
temperature_slider = gr.Slider(
minimum=0.0,
maximum=2.0,
value=0.6,
step=0.1,
label="Temperature",
info="Lower = more focused, Higher = more creative"
)
max_tokens = gr.Slider(
minimum=256,
maximum=4096,
value=2048,
step=256,
label="Max Output Tokens",
info="Higher = more detailed summary"
)
top_p = gr.Slider(
minimum=0.0,
maximum=1.0,
value=0.95,
step=0.05,
label="Top P (Nucleus Sampling)",
info="Lower = more focused, Higher = more diverse"
)
top_k = gr.Slider(
minimum=0,
maximum=100,
value=20,
step=5,
label="Top K",
info="Limits token selection to top K tokens (0 = disabled)"
)
# ===== ADVANCED MODE =====
with gr.Group(visible=False) as advanced_mode_group:
gr.HTML('<div style="font-size: 0.9em; color: #64748b; margin-bottom: 16px;">🧠 <strong>Advanced Mode (3-Model Pipeline)</strong> - Extraction → Deduplication → Synthesis</div>')
# ========== STAGE 1: EXTRACTION ==========
gr.HTML('<div class="section-header"><span class="section-icon">🔍</span> Stage 1: Extraction</div>')
extraction_model = gr.Dropdown(
choices=[(EXTRACTION_MODELS[k]["name"], k) for k in EXTRACTION_MODELS.keys()],
value=DEFAULT_EXTRACTION_MODEL,
label="Extraction Model (≤1.7B)",
info="Extracts structured items from transcript windows"
)
with gr.Row():
extraction_n_ctx = gr.Slider(
minimum=2048,
maximum=8192,
step=1024,
value=4096,
label="Context Window (n_ctx)",
info="Smaller = more windows, Larger = fewer windows"
)
overlap_turns = gr.Slider(
minimum=1,
maximum=5,
step=1,
value=2,
label="Window Overlap (turns)",
info="Speaker turns shared between consecutive windows"
)
enable_extraction_reasoning = gr.Checkbox(
value=False,
visible=False,
label="Enable Reasoning Mode",
info="Thinking before JSON extraction (Qwen3 hybrid models only)"
)
# ========== STAGE 2: DEDUPLICATION ==========
gr.HTML('<div class="section-header" style="margin-top: 20px;"><span class="section-icon">🧬</span> Stage 2: Deduplication</div>')
embedding_model = gr.Dropdown(
choices=[("granite-107m", "granite-107m")],
value="granite-107m",
label="Embedding Model",
info="Computes semantic similarity for duplicate detection (Granite-107M optimal)"
)
similarity_threshold = gr.Slider(
minimum=0.70,
maximum=0.95,
step=0.01,
value=0.85,
label="Similarity Threshold",
info="Higher = stricter duplicate detection (items with similarity above this are merged)"
)
# ========== STAGE 3: SYNTHESIS ==========
gr.HTML('<div class="section-header" style="margin-top: 20px;"><span class="section-icon">✨</span> Stage 3: Synthesis</div>')
synthesis_model = gr.Dropdown(
choices=[(SYNTHESIS_MODELS[k]["name"], k) for k in SYNTHESIS_MODELS.keys()],
value=DEFAULT_SYNTHESIS_MODEL,
label="Synthesis Model (1B-30B)",
info="Generates executive summary from deduplicated items"
)
enable_synthesis_reasoning = gr.Checkbox(
value=True,
visible=True,
label="Enable Reasoning Mode",
info="Uses thinking process for higher quality synthesis"
)
adv_max_tokens = gr.Slider(
minimum=512,
maximum=4096,
step=128,
value=2048,
label="Max Output Tokens",
info="Maximum tokens for synthesis output"
)
gr.HTML('<div style="font-size: 0.85em; color: #94a3b8; margin-top: 8px; margin-bottom: 8px;">Inference Parameters</div>')
with gr.Row():
adv_temperature_slider = gr.Slider(
minimum=0.0,
maximum=2.0,
value=0.6,
step=0.1,
label="Temperature",
info="Lower = focused, Higher = creative"
)
adv_top_p = gr.Slider(
minimum=0.0,
maximum=1.0,
value=0.95,
step=0.05,
label="Top P",
info="Nucleus sampling threshold"
)
adv_top_k = gr.Slider(
minimum=0,
maximum=100,
value=20,
step=5,
label="Top K",
info="Token selection limit"
)
# ========== PIPELINE SETTINGS ==========
gr.HTML('<div class="section-header" style="margin-top: 20px;"><span class="section-icon">⚙️</span> Pipeline Settings</div>')
enable_detailed_logging = gr.Checkbox(
value=True,
label="Enable Detailed Trace Logging",
info="Save JSONL trace for debugging (embedded in download JSON)"
)
# ==========================================
# Debug Tools (optional)
# ==========================================
with gr.Accordion("🐛 Debug Tools", open=False):
system_prompt_debug = gr.Textbox(
label="System Prompt (Read-Only)",
lines=5,
max_lines=10,
interactive=False,
value="Select a model and click 'Generate Summary' to see the system prompt.",
info="This shows the exact system prompt sent to the LLM"
)
# ==========================================
# Submit Button
# ==========================================
submit_btn = gr.Button(
"✨ Generate Summary",
variant="primary",
elem_classes=["submit-btn"]
)
# ==========================================
# State Components (invisible, outside visual groups)
# ==========================================
metrics_state = gr.State(value={})
custom_model_state = gr.State(value=None)
custom_model_metadata = gr.State(value={
"repo_id": None,
"filename": None,
"size_mb": 0,
})
custom_repo_files = gr.State([])
# Right column - Outputs
with gr.Column(scale=2):
# Model Information (shows selected model specs)
with gr.Group():
gr.HTML('<div class="section-header"><span class="section-icon">📊</span> Model Information</div>')
_default_threads = DEFAULT_CUSTOM_THREADS if DEFAULT_CUSTOM_THREADS > 0 else 2
_default_info = get_model_info(DEFAULT_MODEL_KEY, n_threads=_default_threads)[0]
model_info_output = gr.Markdown(
value=_default_info,
elem_classes=["info-box"]
)
# Thinking Process
with gr.Group():
gr.HTML('<div class="section-header"><span class="section-icon">🧠</span> Model Thinking Process</div>')
thinking_output = gr.Textbox(
label="",
lines=12,
max_lines=20,
show_label=False,
placeholder="The AI's reasoning process will appear here in real-time...",
elem_classes=["thinking-box"]
)
# Copy Thinking button - now in the correct group
copy_thinking_btn = gr.Button("📋 Copy Thinking", size="sm")
# Summary Output
with gr.Group():
gr.HTML('<div class="section-header"><span class="section-icon">📝</span> Final Summary</div>')
summary_output = gr.Markdown(
value="*Your summarized content will appear here...*",
elem_classes=["summary-box"]
)
# Action buttons for summary
with gr.Row():
copy_summary_btn = gr.Button("📋 Copy Summary", size="sm")
download_btn = gr.Button("⬇️ Download (JSON)", size="sm")
# File output component for download (hidden until generated)
download_output = gr.File(label="Download JSON", visible=False)
# Completion Metrics (separate section)
with gr.Group():
gr.HTML('<div class="section-header"><span class="section-icon">📊</span> Generation Metrics</div>')
info_output = gr.Markdown(
value="*Metrics will appear here after generation...*",
elem_classes=["completion-info"]
)
# Function to update settings when model changes
def update_settings_on_model_change(model_key, thread_config, custom_threads, custom_metadata=None):
"""Update inference settings when model selection changes."""
# Calculate n_threads based on preset
thread_preset_map = {
"free": 2,
"upgrade": 8,
"custom": custom_threads if custom_threads > 0 else 4,
}
n_threads = thread_preset_map.get(thread_config, 2)
info_text, temp_str, top_p_val, top_k_val = get_model_info(model_key, n_threads=n_threads, custom_metadata=custom_metadata)
temperature = float(temp_str) if temp_str else 0.6
return temperature, top_p_val, top_k_val
# Event handlers
# Note: submit_btn.click is registered below (after custom model loader section)
# with the full set of inputs including custom_model_state
# Update settings when model changes
model_dropdown.change(
fn=update_settings_on_model_change,
inputs=[model_dropdown, thread_config_dropdown, custom_threads_slider, custom_model_metadata],
outputs=[temperature_slider, top_p, top_k]
)
# Update reasoning checkbox when model changes
model_dropdown.change(
fn=update_reasoning_visibility,
inputs=[model_dropdown],
outputs=[enable_reasoning]
)
# Show/hide custom thread slider based on selection
def toggle_custom_threads(thread_config):
return gr.update(visible=(thread_config == "custom"))
thread_config_dropdown.change(
fn=toggle_custom_threads,
inputs=[thread_config_dropdown],
outputs=[custom_threads_slider]
)
# Toggle mode visibility based on radio selection
def toggle_mode_visibility(mode_selection):
is_standard = (mode_selection == "Standard Mode")
return gr.update(visible=is_standard), gr.update(visible=not is_standard)
mode_radio.change(
fn=toggle_mode_visibility,
inputs=[mode_radio],
outputs=[standard_mode_group, advanced_mode_group]
)
# Toggle model source visibility (Preset vs Custom GGUF)
def toggle_model_source(model_source):
is_preset = (model_source == "Preset Models")
return gr.update(visible=is_preset), gr.update(visible=not is_preset)
model_source_radio.change(
fn=toggle_model_source,
inputs=[model_source_radio],
outputs=[preset_models_group, custom_gguf_group]
)
# Update Model Information panel based on selected models
def update_model_info_standard(model_key, custom_metadata):
"""Show info for selected Standard mode model."""
info_text, _, _, _ = get_model_info(model_key, n_threads=2, custom_metadata=custom_metadata)
return info_text
def update_model_info_advanced(extraction_key, embedding_key, synthesis_key):
"""Show info for all 3 Advanced mode models."""
ext_info = get_extraction_model_info(extraction_key)
emb_info = get_embedding_model_info(embedding_key)
syn_info = get_synthesis_model_info(synthesis_key)
combined_info = f"""### Extraction Model
{ext_info}
### Embedding Model
{emb_info}
### Synthesis Model
{syn_info}"""
return combined_info
# Update model info when Standard mode model changes
model_dropdown.change(
fn=update_model_info_standard,
inputs=[model_dropdown, custom_model_metadata],
outputs=[model_info_output]
)
# Update model info when Advanced mode models change
extraction_model.change(
fn=update_model_info_advanced,
inputs=[extraction_model, embedding_model, synthesis_model],
outputs=[model_info_output]
)
embedding_model.change(
fn=update_model_info_advanced,
inputs=[extraction_model, embedding_model, synthesis_model],
outputs=[model_info_output]
)
synthesis_model.change(
fn=update_model_info_advanced,
inputs=[extraction_model, embedding_model, synthesis_model],
outputs=[model_info_output]
)
# Update model info when mode changes
mode_radio.change(
fn=lambda mode, std_model, std_metadata, ext_model, emb_model, syn_model: (
update_model_info_standard(std_model, std_metadata)
if mode == "Standard Mode"
else update_model_info_advanced(ext_model, emb_model, syn_model)
),
inputs=[mode_radio, model_dropdown, custom_model_metadata, extraction_model, embedding_model, synthesis_model],
outputs=[model_info_output]
)
# Copy buttons
copy_summary_btn.click(
fn=lambda x: x,
inputs=[summary_output],
outputs=[],
js="(text) => { navigator.clipboard.writeText(text); return text; }"
)
copy_thinking_btn.click(
fn=lambda x: x,
inputs=[thinking_output],
outputs=[],
js="(text) => { navigator.clipboard.writeText(text); return text; }"
)
# Download button
download_btn.click(
fn=download_summary_json,
inputs=[summary_output, thinking_output, model_dropdown, language_selector, metrics_state],
outputs=[download_output]
)
# ==========================================
# NEW: Custom Model Loader Event Handlers
# ==========================================
# Note: toggle_custom_model_ui removed - now using Tabs instead of hidden Group
# Update system prompt debug when model or reasoning changes
def update_system_prompt_debug(model_key, enable_reasoning, language):
"""Update the system prompt debug display."""
if not model_key:
return "Select a model to see the system prompt."
model = AVAILABLE_MODELS.get(model_key, {})
supports_toggle = model.get("supports_toggle", False)
prompt = build_system_prompt(language, supports_toggle, enable_reasoning)
return prompt
model_dropdown.change(
fn=update_system_prompt_debug,
inputs=[model_dropdown, enable_reasoning, language_selector],
outputs=[system_prompt_debug],
)
enable_reasoning.change(
fn=update_system_prompt_debug,
inputs=[model_dropdown, enable_reasoning, language_selector],
outputs=[system_prompt_debug],
)
language_selector.change(
fn=update_system_prompt_debug,
inputs=[model_dropdown, enable_reasoning, language_selector],
outputs=[system_prompt_debug],
)
# ===== ADVANCED MODE EVENT HANDLERS =====
# Update extraction reasoning checkbox visibility when extraction model changes
def update_extraction_reasoning_visibility(model_key):
"""Show/hide extraction reasoning checkbox based on model capabilities."""
if model_key not in EXTRACTION_MODELS:
return gr.update(visible=False, value=False)
config = EXTRACTION_MODELS[model_key]
supports_toggle = config.get("supports_toggle", False)
if supports_toggle:
# Hybrid model — default reasoning ON for better extraction quality
return gr.update(visible=True, value=True, interactive=True, label="🧠 Enable Reasoning for Extraction")
elif config.get("supports_reasoning", False):
# Thinking-only model (none currently in extraction)
return gr.update(visible=True, value=True, interactive=False, label="🧠 Reasoning Mode (Always On)")
else:
# Non-reasoning model
return gr.update(visible=False, value=False)
# Update synthesis reasoning checkbox visibility when synthesis model changes
def update_synthesis_reasoning_visibility(model_key):
"""Show/hide synthesis reasoning checkbox based on model capabilities."""
if model_key not in SYNTHESIS_MODELS:
return gr.update(visible=False, value=False)
config = SYNTHESIS_MODELS[model_key]
supports_reasoning = config.get("supports_reasoning", False)
supports_toggle = config.get("supports_toggle", False)
if not supports_reasoning:
# Non-reasoning model
return gr.update(visible=False, value=False)
elif supports_reasoning and not supports_toggle:
# Thinking-only model
return gr.update(visible=True, value=True, interactive=False, label="⚡ Reasoning Mode (Always On)")
else:
# Hybrid model
return gr.update(visible=True, value=True, interactive=True, label="🧠 Enable Reasoning for Synthesis")
# Wire up Advanced Mode event handlers
extraction_model.change(
fn=update_extraction_reasoning_visibility,
inputs=[extraction_model],
outputs=[enable_extraction_reasoning]
)
synthesis_model.change(
fn=update_synthesis_reasoning_visibility,
inputs=[synthesis_model],
outputs=[enable_synthesis_reasoning]
)
# Debounced auto-discovery for custom repo ID (500ms delay)
import time as time_module
def discover_custom_files(repo_id):
"""Discover GGUF files in the custom repo."""
if not repo_id or "/" not in repo_id:
return (
gr.update(choices=[], value=None, interactive=True),
[],
gr.update(visible=True, value="Enter a valid HuggingFace Repo ID above (e.g., unsloth/DeepSeek-R1-Distill-Qwen-7B-GGUF)")
)
# Show searching status
yield (
gr.update(choices=["Searching..."], value=None, interactive=False),
[],
gr.update(visible=True, value="🔍 Searching for GGUF files...")
)
# Small delay to simulate search
time_module.sleep(0.5)
files, error = list_repo_gguf_files(repo_id)
if error:
# Error - show empty dropdown with error message
yield (
gr.update(choices=[], value=None, interactive=True),
[],
gr.update(visible=True, value=f"❌ {error}")
)
elif not files:
# No files found
yield (
gr.update(choices=[], value=None, interactive=True),
[],
gr.update(visible=True, value="❌ No GGUF files found in this repository")
)
else:
# Success - format choices
choices = [format_file_choice(f) for f in files]
yield (
gr.update(choices=choices, value=choices[0] if choices else None, interactive=True),
files,
gr.update(visible=True, value="✅ Files discovered! Select one and click 'Load Selected Model'")
)
# ==========================================
# NEW: Auto-Discovery Flow with HuggingfaceHubSearch
# ==========================================
def on_model_selected(repo_id):
"""Handle model selection from HuggingfaceHubSearch.
Automatically discovers GGUF files in the selected repo.
"""
if not repo_id:
return (
gr.update(choices=[], value=None),
[],
gr.update(visible=False),
)
# Show searching status
yield (
gr.update(choices=["🔍 Searching for GGUF files..."], value=None, interactive=False),
[],
gr.update(visible=True, value=f"Discovering GGUF files in {repo_id}..."),
)
# Discover files
files, error = list_repo_gguf_files(repo_id)
if error:
yield (
gr.update(choices=[], value=None, interactive=True),
[],
gr.update(visible=True, value=f"❌ {error}"),
)
elif not files:
yield (
gr.update(choices=[], value=None, interactive=True),
[],
gr.update(visible=True, value=f"❌ No GGUF files found in {repo_id}"),
)
else:
# Format and show files
choices = [format_file_choice(f) for f in files]
yield (
gr.update(choices=choices, value=choices[0] if choices else None, interactive=True),
files,
gr.update(visible=True, value=f"✅ Found {len(files)} GGUF files! Select precision and click 'Load Model'"),
)
# When user selects from search, auto-discover files
model_search_input.change(
fn=on_model_selected,
inputs=[model_search_input],
outputs=[custom_file_dropdown, custom_repo_files, custom_status],
)
# Load selected custom model
def load_custom_model_selected(repo_id, selected_file_display, files_data):
"""Load the selected custom model."""
if not repo_id or not selected_file_display:
return "❌ Please enter a Repo ID and select a file first", gr.update(visible=False), None, {}
# Extract filename from the display string
# Format: "📄 filename | size | quant | params | downloads"
filename = selected_file_display.split(" | ")[0].replace("📄 ", "").strip()
if not filename:
return "❌ Could not parse filename from selection", gr.update(visible=False), None, {}
# Extract size from files_data
size_mb = 0
for f in files_data:
if f["name"] == filename:
size_mb = f.get("size_mb", 0)
break
yield "⏳ Loading model... (this may take a while for large files)", gr.update(visible=False), None, {}
try:
# Load the model
n_threads = get_thread_count(thread_config_dropdown.value, custom_threads_slider.value)
llm, load_msg = load_custom_model_from_hf(repo_id, filename, n_threads)
if llm is None:
# Load failed - show error and retry button
yield f"❌ {load_msg}", gr.update(visible=True), None, {}
else:
# Success - create metadata dict
metadata = {
"repo_id": repo_id,
"filename": filename,
"size_mb": size_mb,
}
size_info = f" ({size_mb:.1f} MB)" if size_mb else ""
yield f"✅ Model loaded successfully{size_info}! Ready to generate summaries.", gr.update(visible=False), llm, metadata
except Exception as e:
yield f"❌ Error loading model: {str(e)}", gr.update(visible=True), None, {}
load_btn.click(
fn=load_custom_model_selected,
inputs=[model_search_input, custom_file_dropdown, custom_repo_files],
outputs=[custom_status, retry_btn, custom_model_state, custom_model_metadata],
).then(
fn=lambda metadata, thread_config, custom_threads: get_model_info("custom_hf", n_threads=get_thread_count(thread_config, custom_threads), custom_metadata=metadata)[0],
inputs=[custom_model_metadata, thread_config_dropdown, custom_threads_slider],
outputs=[model_info_output],
)
# Retry button - same as load
retry_btn.click(
fn=load_custom_model_selected,
inputs=[model_search_input, custom_file_dropdown, custom_repo_files],
outputs=[custom_status, retry_btn, custom_model_state, custom_model_metadata],
).then(
fn=lambda metadata, thread_config, custom_threads: get_model_info("custom_hf", n_threads=get_thread_count(thread_config, custom_threads), custom_metadata=metadata)[0],
inputs=[custom_model_metadata, thread_config_dropdown, custom_threads_slider],
outputs=[model_info_output],
)
# ===== SUBMIT BUTTON ROUTER =====
# Routes to Standard or Advanced mode based on active tab
def route_summarize(
# Standard mode inputs
file_input_val, text_input_val, model_dropdown_val, enable_reasoning_val,
max_tokens_val, temperature_val, top_p_val, top_k_val, language_val,
thread_config_val, custom_threads_val, custom_model_val,
# Advanced mode inputs
extraction_model_val, embedding_model_val, synthesis_model_val,
extraction_n_ctx_val, overlap_turns_val, similarity_threshold_val,
enable_extraction_reasoning_val, enable_synthesis_reasoning_val,
adv_max_tokens_val, enable_logging_val,
adv_temperature_val, adv_top_p_val, adv_top_k_val,
# Mode selector
mode_radio_val
):
"""Route to Standard or Advanced mode based on selected mode radio button."""
# Determine active mode based on radio button value
is_advanced_mode = (mode_radio_val == "Advanced Mode (3-Model Pipeline)")
if is_advanced_mode:
# Advanced Mode: Use summarize_advanced()
# Get n_threads from global hardware settings (same for all modes)
thread_map = {"free": 2, "upgrade": 8, "custom": max(1, custom_threads_val)}
n_threads = thread_map.get(thread_config_val, 2)
# Get transcript
transcript = ""
if file_input_val:
with open(file_input_val, 'r', encoding='utf-8') as f:
transcript = f.read()
elif text_input_val:
transcript = text_input_val
else:
yield ("", "⚠️ Please upload a file or paste text", "", {}, "")
return
# Stream Advanced Mode pipeline
for update in summarize_advanced(
transcript=transcript,
extraction_model_key=extraction_model_val,
embedding_model_key=embedding_model_val,
synthesis_model_key=synthesis_model_val,
extraction_n_ctx=extraction_n_ctx_val,
overlap_turns=overlap_turns_val,
similarity_threshold=similarity_threshold_val,
enable_extraction_reasoning=enable_extraction_reasoning_val,
enable_synthesis_reasoning=enable_synthesis_reasoning_val,
output_language=language_val,
max_tokens=adv_max_tokens_val,
enable_logging=enable_logging_val,
n_threads=n_threads,
temperature=adv_temperature_val,
top_p=adv_top_p_val,
top_k=adv_top_k_val
):
stage = update.get("stage", "")
if stage == "extraction":
ticker = update.get("ticker", "")
thinking = update.get("thinking", "")
# Show progress ticker in thinking output, not summary
combined_thinking = f"{thinking}\n\n{ticker}" if thinking else ticker
yield (combined_thinking, "", "", {}, "")
elif stage == "deduplication":
ticker = update.get("ticker", "")
# Show deduplication progress in thinking output
yield (ticker, "", "", {}, "")
elif stage == "synthesis":
thinking = update.get("thinking", "")
summary = update.get("summary", "")
yield (thinking, summary, "", {}, "")
elif stage == "complete":
thinking = update.get("thinking", "")
summary = update.get("summary", "")
trace_stats = update.get("trace_stats", {})
# Format info message
info_msg = f"""**Advanced Mode Complete**
- Total Windows: {trace_stats.get('total_windows', 0)}
- Items Extracted: {trace_stats.get('total_items_extracted', 0)}
- Items After Dedup: {trace_stats.get('total_items_after_dedup', 0)}
- Duplicates Removed: {trace_stats.get('total_duplicates_removed', 0)}
- Total Time: {trace_stats.get('total_elapsed_seconds', 0):.1f}s"""
# Store trace and debug info for download
metrics = {
"mode": "advanced",
"trace_stats": trace_stats,
"trace_json": update.get("trace_json", []),
"debug_json": update.get("debug_json", {})
}
yield (thinking, summary, info_msg, metrics, "Advanced Mode (3-Model Pipeline)")
elif stage == "error":
error = update.get("error", "Unknown error")
yield ("", f"❌ Error: {error}", "", {}, "")
return
else:
# Standard Mode: Use existing summarize_streaming()
for thinking, summary, info, metrics, system_prompt in summarize_streaming(
file_input_val, text_input_val, model_dropdown_val, enable_reasoning_val,
max_tokens_val, temperature_val, top_p_val, top_k_val, language_val,
thread_config_val, custom_threads_val, custom_model_val
):
yield (thinking, summary, info, metrics, system_prompt)
# Wire up submit button with router
submit_btn.click(
fn=route_summarize,
inputs=[
# Standard mode inputs
file_input, text_input, model_dropdown, enable_reasoning,
max_tokens, temperature_slider, top_p, top_k, language_selector,
thread_config_dropdown, custom_threads_slider, custom_model_state,
# Advanced mode inputs
extraction_model, embedding_model, synthesis_model,
extraction_n_ctx, overlap_turns, similarity_threshold,
enable_extraction_reasoning, enable_synthesis_reasoning,
adv_max_tokens, enable_detailed_logging,
adv_temperature_slider, adv_top_p, adv_top_k,
# Mode selector
mode_radio
],
outputs=[thinking_output, summary_output, info_output, metrics_state, system_prompt_debug],
show_progress="full"
)
# Footer
gr.HTML("""
<div class="footer">
Bilingual summaries (English &amp; zh-TW) • Powered by <strong>llama-cpp-python</strong> • Running on <strong>HuggingFace Spaces Free Tier</strong><br>
Traditional Chinese conversion via <strong>OpenCC</strong>
</div>
""")
return demo
# Main entry point
if __name__ == "__main__":
# No pre-load - model loads on first request to avoid HF Spaces timeout
logger.info("Starting Tiny Scribe (model loads on first request)")
# Create and launch interface
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True
)