#!/usr/bin/env python3 """ Tiny Scribe - HuggingFace Spaces Demo A Gradio app for summarizing transcripts using GGUF models with live streaming output. Optimized for HuggingFace Spaces Free CPU Tier (2 vCPUs). UI Version: 2.0 - Enhanced with modern styling and UX improvements """ import os import gc import time import logging import re import json from typing import Dict, List, Any, Optional, Generator, Tuple from datetime import datetime from opencc import OpenCC from llama_cpp import Llama import gradio as gr from huggingface_hub import list_repo_files, hf_hub_download from gradio_huggingfacehub_search import HuggingfaceHubSearch from meeting_summarizer.trace import Tracer from meeting_summarizer.extraction import ( EmbeddingModel, Window, preprocess_transcript, stream_extract_from_window, deduplicate_items, stream_synthesize_executive_summary ) logger = logging.getLogger(__name__) # Increase Hugging Face timeout to handle slow connections os.environ['HF_HUB_DOWNLOAD_TIMEOUT'] = '300' # 5 minutes # Global model instance llm = None converter = None current_model_key = None def parse_quantization(filename: str) -> Optional[str]: """Extract quantization level from GGUF filename. Examples: model-Q4_K_M.gguf -> Q4_K_M model.Q5_K_S.gguf -> Q5_K_S model-fp16.gguf -> fp16 Args: filename: GGUF filename Returns: Quantization string or None if not found """ # Common quantization patterns patterns = [ r'[.-](Q[0-9]_[A-Z]_[A-Z])\.gguf$', # Q4_K_M r'[.-](Q[0-9]_[A-Z]+)\.gguf$', # Q4_K r'[.-](fp16|fp32|q4_0|q4_1|q5_0|q5_1|q8_0)\.gguf$', # fp16, q4_0, etc. ] for pattern in patterns: match = re.search(pattern, filename, re.IGNORECASE) if match: return match.group(1).upper() return None def list_repo_gguf_files(repo_id: str) -> Tuple[List[Dict[str, Any]], str]: """List all GGUF files in a HuggingFace repository with metadata. Args: repo_id: HuggingFace repository ID (e.g., 'unsloth/DeepSeek-R1-Distill-Qwen-7B-GGUF') Returns: Tuple of (files_list, error_message) - files_list: List of dicts with name, size_mb, quant, params, downloads - error_message: Empty string on success, error description on failure """ if not repo_id or "/" not in repo_id: return [], "Invalid repo ID format. Use 'username/repo-name'" try: # List all files in repo files = list(list_repo_files(repo_id)) # Filter for GGUF files only gguf_files = [f for f in files if f.endswith('.gguf')] if not gguf_files: return [], f"No GGUF files found in repository '{repo_id}'" # Get repo info for downloads (optional, may fail for some repos) try: from huggingface_hub import model_info info = model_info(repo_id) repo_downloads = info.downloads except: repo_downloads = 0 # Build file metadata result = [] for filename in sorted(gguf_files): # Alphabetical sorting (preference C) quant = parse_quantization(filename) or "Unknown" # Estimate size (we'd need to fetch file info for exact size) # For now, use placeholder that will be updated when downloading size_mb = 0 # Try to extract parameter count from filename params = "Unknown" param_patterns = [ r'(\d+\.?\d*)b', # 7b, 1.5b r'(\d+\.?\d*)B', # 7B, 1.5B ] for pattern in param_patterns: match = re.search(pattern, filename, re.IGNORECASE) if match: params = f"{match.group(1)}B" break result.append({ "name": filename, "size_mb": size_mb, "quant": quant, "params": params, "downloads": repo_downloads, }) return result, "" except Exception as e: error_msg = str(e).lower() if "not found" in error_msg or "404" in error_msg: return [], f"Repository '{repo_id}' not found" elif "permission" in error_msg or "access" in error_msg: return [], f"Cannot access '{repo_id}' - may be private or gated" else: return [], f"Error listing files: {str(e)}" def format_file_choice(file_info: Dict[str, Any]) -> str: """Format a file info dict for display in dropdown. Args: file_info: Dict with name, size_mb, quant, params, downloads Returns: Formatted string for dropdown display """ name = file_info["name"] size = file_info["size_mb"] quant = file_info["quant"] params = file_info["params"] downloads = file_info.get("downloads", 0) # Format downloads nicely if downloads >= 1000000: dl_str = f"{downloads/1000000:.1f}M" elif downloads >= 1000: dl_str = f"{downloads/1000:.1f}K" else: dl_str = str(downloads) return f"๐Ÿ“„ {name} | {size} | {quant} | {params} params | โฌ‡๏ธ {dl_str}" def build_system_prompt(output_language: str, supports_toggle: bool, enable_reasoning: bool) -> str: """Build the system prompt for the summarization task. This function creates the system prompt that will be displayed in the debug field and sent to the LLM. It handles language-specific prompts and reasoning toggles. Args: output_language: Target language ("en" or "zh-TW") supports_toggle: Whether the model supports reasoning toggle (/think, /no_think) enable_reasoning: Whether reasoning mode is enabled Returns: The complete system prompt string """ if output_language == "zh-TW": if supports_toggle: reasoning_mode = "/think" if enable_reasoning else "/no_think" return f"ไฝ ๆ˜ฏไธ€ๅ€‹ๆœ‰ๅŠฉ็š„ๅŠฉๆ‰‹๏ผŒ่ฒ ่ฒฌ็ธฝ็ต่ฝ‰้Œ„ๅ…งๅฎนใ€‚{reasoning_mode}" else: return "ไฝ ๆ˜ฏไธ€ๅ€‹ๆœ‰ๅŠฉ็š„ๅŠฉๆ‰‹๏ผŒ่ฒ ่ฒฌ็ธฝ็ต่ฝ‰้Œ„ๅ…งๅฎนใ€‚" else: if supports_toggle: reasoning_mode = "/think" if enable_reasoning else "/no_think" return f"You are a helpful assistant that summarizes transcripts. {reasoning_mode}" else: return "You are a helpful assistant that summarizes transcripts." def build_user_prompt(transcript: str, output_language: str) -> str: """Build the user prompt containing the transcript to summarize. Args: transcript: The transcript content to summarize output_language: Target language ("en" or "zh-TW") Returns: The user prompt string with the transcript """ if output_language == "zh-TW": return f"่ซ‹็ธฝ็ตไปฅไธ‹ๅ…งๅฎน๏ผš\n\n{transcript}" else: return f"Please summarize the following content:\n\n{transcript}" def get_thread_count(thread_config: str, custom_threads: int) -> int: """Get the actual thread count based on configuration. Args: thread_config: Thread preset ("free", "upgrade", "custom") custom_threads: Custom thread count when preset is "custom" Returns: Number of threads to use """ if thread_config == "free": return 2 elif thread_config == "upgrade": return 8 else: # custom return max(1, min(32, custom_threads)) def load_custom_model_from_hf(repo_id: str, filename: str, n_threads: int) -> Tuple[Optional[Llama], str]: """Load a custom GGUF model from HuggingFace Hub. Args: repo_id: HuggingFace repository ID filename: GGUF filename to load n_threads: Number of CPU threads Returns: Tuple of (model_or_none, message) """ try: logger.info(f"Loading custom model from {repo_id}/{filename}") # Conservative defaults for custom models n_ctx = 8192 n_batch = 512 n_gpu_layers = 0 # CPU only for safety model = Llama.from_pretrained( repo_id=repo_id, filename=filename, n_ctx=n_ctx, n_batch=n_batch, n_threads=n_threads, n_gpu_layers=n_gpu_layers, verbose=False, ) return model, f"Successfully loaded {repo_id}/{filename}" except Exception as e: error_msg = str(e) logger.error(f"Failed to load custom model: {error_msg}") if "not found" in error_msg.lower(): return None, f"Model or file not found: {repo_id}/{filename}" elif "permission" in error_msg.lower(): return None, f"Access denied (model may be private/gated): {repo_id}" elif "memory" in error_msg.lower() or "oom" in error_msg.lower(): return None, f"Out of memory loading model. Try a smaller file or lower quantization." else: return None, f"Error loading model: {error_msg}" # Thread configuration from environment variable def _get_default_thread_config(): """Get default thread configuration from environment variable.""" env_threads = os.environ.get("DEFAULT_N_THREADS", "").strip() if env_threads: try: thread_count = int(env_threads) if 1 <= thread_count <= 32: logger.info(f"Using DEFAULT_N_THREADS={thread_count} from environment") return "custom", thread_count else: logger.warning(f"DEFAULT_N_THREADS={thread_count} out of range (1-32), using HF Free Tier") except ValueError: logger.warning(f"Invalid DEFAULT_N_THREADS='{env_threads}', using HF Free Tier") return "free", -1 # -1 = irrelevant when preset is not "custom" DEFAULT_THREAD_PRESET, DEFAULT_CUSTOM_THREADS = _get_default_thread_config() # Maximum context window to use (caps memory usage on 2 vCPUs) MAX_USABLE_CTX = 32768 # Available models registry - ordered by parameter count (smallest to largest) AVAILABLE_MODELS = { "falcon_h1_100m": { "name": "Falcon-H1 100M", "repo_id": "mradermacher/Falcon-H1-Tiny-Multilingual-100M-Instruct-GGUF", "filename": "*Q8_0.gguf", "max_context": 32768, "default_temperature": 0.6, "supports_reasoning": False, "inference_settings": { "temperature": 0.1, "top_p": 0.9, "top_k": 40, "repeat_penalty": 1.05, }, }, "gemma3_270m": { "name": "Gemma-3 270M", "repo_id": "unsloth/gemma-3-270m-it-qat-GGUF", "filename": "*Q8_0.gguf", "max_context": 32768, "default_temperature": 0.6, "supports_reasoning": False, "inference_settings": { "temperature": 1.0, "top_p": 0.95, "top_k": 64, "repeat_penalty": 1.0, }, }, "ernie_300m": { "name": "ERNIE-4.5 0.3B (131K Context)", "repo_id": "unsloth/ERNIE-4.5-0.3B-PT-GGUF", "filename": "*Q8_0.gguf", "max_context": 131072, "default_temperature": 0.6, "supports_reasoning": False, "inference_settings": { "temperature": 0.3, "top_p": 0.95, "top_k": 30, "repeat_penalty": 1.05, }, }, "granite_350m": { "name": "Granite-4.0 350M", "repo_id": "unsloth/granite-4.0-h-350m-GGUF", "filename": "*Q8_0.gguf", "max_context": 32768, "default_temperature": 0.6, "supports_reasoning": False, "inference_settings": { "temperature": 0.0, "top_p": 1.0, "top_k": 0, "repeat_penalty": 1.05, }, }, "lfm2_350m": { "name": "LFM2 350M", "repo_id": "LiquidAI/LFM2-350M-GGUF", "filename": "*Q8_0.gguf", "max_context": 32768, "default_temperature": 0.6, "supports_reasoning": False, "inference_settings": { "temperature": 0.1, "top_p": 0.1, "top_k": 50, "repeat_penalty": 1.05, }, }, "bitcpm4_500m": { "name": "BitCPM4 0.5B (128K Context)", "repo_id": "openbmb/BitCPM4-0.5B-GGUF", "filename": "*q4_0.gguf", "max_context": 131072, "default_temperature": 0.6, "supports_reasoning": False, "inference_settings": { "temperature": 0.3, "top_p": 0.95, "top_k": 30, "repeat_penalty": 1.05, }, }, "hunyuan_500m": { "name": "Hunyuan 0.5B (256K Context)", "repo_id": "mradermacher/Hunyuan-0.5B-Instruct-GGUF", "filename": "*Q8_0.gguf", "max_context": 262144, "default_temperature": 0.6, "supports_reasoning": False, "inference_settings": { "temperature": 0.3, "top_p": 0.95, "top_k": 30, "repeat_penalty": 1.05, }, }, "qwen3_600m_q4": { "name": "Qwen3 0.6B Q4 (32K Context)", "repo_id": "unsloth/Qwen3-0.6B-GGUF", "filename": "*Q4_0.gguf", "max_context": 32768, "default_temperature": 0.6, "supports_reasoning": True, "supports_toggle": True, "inference_settings": { "temperature": 0.6, "top_p": 0.95, "top_k": 20, "repeat_penalty": 1.0, }, }, "granite_3_1_1b_q8": { "name": "Granite 3.1 1B-A400M Instruct (128K Context)", "repo_id": "bartowski/granite-3.1-1b-a400m-instruct-GGUF", "filename": "*Q8_0.gguf", "max_context": 131072, "default_temperature": 0.7, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.7, "top_p": 0.9, "top_k": 40, "repeat_penalty": 1.1, }, }, "falcon_h1_1.5b_q4": { "name": "Falcon-H1 1.5B Q4", "repo_id": "unsloth/Falcon-H1-1.5B-Deep-Instruct-GGUF", "filename": "*Q4_K_M.gguf", "max_context": 32768, "default_temperature": 0.6, "supports_reasoning": False, "inference_settings": { "temperature": 0.1, "top_p": 0.9, "top_k": 40, "repeat_penalty": 1.05, }, }, "qwen3_1.7b_q4": { "name": "Qwen3 1.7B Q4 (32K Context)", "repo_id": "unsloth/Qwen3-1.7B-GGUF", "filename": "*Q4_0.gguf", "max_context": 32768, "default_temperature": 0.6, "supports_reasoning": True, "supports_toggle": True, "inference_settings": { "temperature": 0.6, "top_p": 0.95, "top_k": 20, "repeat_penalty": 1.0, }, }, "granite_3_3_2b_q4": { "name": "Granite 3.3 2B Instruct (128K Context)", "repo_id": "ibm-granite/granite-3.3-2b-instruct-GGUF", "filename": "*Q4_K_M.gguf", "max_context": 131072, "default_temperature": 0.7, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.7, "top_p": 0.9, "top_k": 40, "repeat_penalty": 1.1, }, }, "youtu_llm_2b_q8": { "name": "Youtu-LLM 2B (128K Context)", "repo_id": "tencent/Youtu-LLM-2B-GGUF", "filename": "*Q8_0.gguf", "max_context": 131072, "default_temperature": 0.7, "supports_reasoning": True, "supports_toggle": True, "inference_settings": { "temperature": 0.7, "top_p": 0.8, "top_k": 20, "repeat_penalty": 1.05, }, }, "lfm2_2_6b_transcript": { "name": "LFM2 2.6B Transcript (32K Context)", "repo_id": "LiquidAI/LFM-2.6B-Transcript-GGUF", "filename": "*Q4_0.gguf", "max_context": 32768, "default_temperature": 0.6, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.6, "top_p": 0.95, "top_k": 20, "repeat_penalty": 1.1, }, }, "breeze_3b_q4": { "name": "Breeze 3B Q4 (32K Context)", "repo_id": "mradermacher/breeze-3b-GGUF", "filename": "*Q4_K_M.gguf", "max_context": 32768, "default_temperature": 0.6, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.6, "top_p": 0.95, "top_k": 20, "repeat_penalty": 1.0, }, }, "granite_3_1_3b_q4": { "name": "Granite 3.1 3B-A800M Instruct (128K Context)", "repo_id": "bartowski/granite-3.1-3b-a800m-instruct-GGUF", "filename": "*Q4_K_M.gguf", "max_context": 131072, "default_temperature": 0.7, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.7, "top_p": 0.9, "top_k": 40, "repeat_penalty": 1.1, }, }, "qwen3_4b_thinking_q3": { "name": "Qwen3 4B Thinking (256K Context)", "repo_id": "unsloth/Qwen3-4B-Thinking-2507-GGUF", "filename": "*Q3_K_M.gguf", "max_context": 262144, "default_temperature": 0.6, "supports_reasoning": True, "supports_toggle": False, # Thinking-only mode "inference_settings": { "temperature": 0.6, "top_p": 0.95, "top_k": 20, "repeat_penalty": 1.0, }, }, "granite4_tiny_q3": { "name": "Granite 4.0 Tiny 7B (128K Context)", "repo_id": "ibm-research/granite-4.0-Tiny-7B-Instruct-GGUF", "filename": "*Q3_K_M.gguf", "max_context": 131072, "default_temperature": 0.7, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.7, "top_p": 0.9, "top_k": 40, "repeat_penalty": 1.1, }, }, "ernie_21b_pt_q1": { "name": "ERNIE-4.5 21B PT (128K Context)", "repo_id": "unsloth/ERNIE-4.5-21B-A3B-PT-GGUF", "filename": "*TQ1_0.gguf", "max_context": 131072, "default_temperature": 0.7, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.7, "top_p": 0.9, "top_k": 40, "repeat_penalty": 1.1, }, }, "ernie_21b_thinking_q1": { "name": "ERNIE-4.5 21B Thinking (128K Context)", "repo_id": "unsloth/ERNIE-4.5-21B-A3B-Thinking-GGUF", "filename": "*TQ1_0.gguf", "max_context": 131072, "default_temperature": 0.8, "supports_reasoning": True, "supports_toggle": False, # Thinking-only mode "inference_settings": { "temperature": 0.8, "top_p": 0.95, "top_k": 40, "repeat_penalty": 1.1, }, }, "glm_4_7_flash_reap_30b": { "name": "GLM-4.7-Flash-REAP-30B Thinking (128K Context)", "repo_id": "unsloth/GLM-4.7-Flash-REAP-23B-A3B-GGUF", "filename": "*TQ1_0.gguf", "max_context": 131072, "default_temperature": 0.6, "supports_reasoning": True, "supports_toggle": False, "inference_settings": { "temperature": 0.6, "top_p": 0.95, "top_k": 20, "repeat_penalty": 1.05, }, }, "glm_4_7_flash_30b_iq2": { "name": "GLM-4.7-Flash-30B (Original) IQ2_XXS (128K Context)", "repo_id": "bartowski/zai-org_GLM-4.7-Flash-GGUF", "filename": "*IQ2_XXS.gguf", "max_context": 131072, "default_temperature": 0.6, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.6, "top_p": 0.95, "top_k": 20, "repeat_penalty": 1.05, }, }, "qwen3_30b_thinking_q1": { "name": "Qwen3 30B Thinking (256K Context)", "repo_id": "unsloth/Qwen3-30B-A3B-Thinking-2507-GGUF", "filename": "*TQ1_0.gguf", "max_context": 262144, "default_temperature": 0.6, "supports_reasoning": True, "supports_toggle": False, # Thinking-only mode "inference_settings": { "temperature": 0.6, "top_p": 0.95, "top_k": 20, "repeat_penalty": 1.0, }, }, "qwen3_30b_instruct_q1": { "name": "Qwen3 30B Instruct (256K Context)", "repo_id": "unsloth/Qwen3-30B-A3B-Instruct-2507-GGUF", "filename": "*TQ1_0.gguf", "max_context": 262144, "default_temperature": 0.6, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.6, "top_p": 0.95, "top_k": 20, "repeat_penalty": 1.0, }, }, "custom_hf": { "name": "๐Ÿ”ง Custom HF GGUF...", "repo_id": None, "filename": None, "max_context": 8192, "default_temperature": 0.6, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.6, "top_p": 0.95, "top_k": 40, "repeat_penalty": 1.0, }, }, } DEFAULT_MODEL_KEY = "qwen3_600m_q4" # ===== ADVANCED MODE: EXTRACTION MODELS REGISTRY (13 models, โ‰ค1.7B) ===== # Used exclusively for Stage 1: Extraction (transcript windows โ†’ structured JSON) # Extraction-optimized settings: Low temperature (0.1-0.3) for deterministic output EXTRACTION_MODELS = { "qwen2.5_1.5b": { "name": "Qwen2.5 1.5B (128K Context)", "repo_id": "Qwen/Qwen2.5-1.5B-Instruct-GGUF", "filename": "qwen2.5-1.5b-instruct-q4_k_m.gguf", "max_context": 131072, "default_n_ctx": 4096, "params_size": "1.5B", "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.2, "top_p": 0.9, "top_k": 30, "repeat_penalty": 1.0, }, }, } DEFAULT_EXTRACTION_MODEL = "qwen2.5_1.5b" # ===== ADVANCED MODE: SYNTHESIS MODELS REGISTRY (16 models, 1B-30B) ===== # Used exclusively for Stage 3: Synthesis (deduplicated items โ†’ executive summary) # Synthesis-optimized settings: Higher temperature (0.7-0.9) for creative synthesis # FULLY INDEPENDENT from AVAILABLE_MODELS (no shared references) SYNTHESIS_MODELS = { "granite_3_1_1b_q8": { "name": "Granite 3.1 1B-A400M Instruct (128K Context)", "repo_id": "bartowski/granite-3.1-1b-a400m-instruct-GGUF", "filename": "*Q8_0.gguf", "max_context": 131072, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.8, "top_p": 0.95, "top_k": 50, "repeat_penalty": 1.05, }, }, "falcon_h1_1.5b_q4": { "name": "Falcon-H1 1.5B Q4", "repo_id": "unsloth/Falcon-H1-1.5B-Deep-Instruct-GGUF", "filename": "*Q4_K_M.gguf", "max_context": 32768, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.7, "top_p": 0.95, "top_k": 40, "repeat_penalty": 1.0, }, }, "qwen3_1.7b_q4": { "name": "Qwen3 1.7B Q4 (32K Context)", "repo_id": "unsloth/Qwen3-1.7B-GGUF", "filename": "*Q4_0.gguf", "max_context": 32768, "supports_reasoning": True, "supports_toggle": True, # Hybrid model "inference_settings": { "temperature": 0.8, "top_p": 0.95, "top_k": 30, "repeat_penalty": 1.0, }, }, "granite_3_3_2b_q4": { "name": "Granite 3.3 2B Instruct (128K Context)", "repo_id": "ibm-granite/granite-3.3-2b-instruct-GGUF", "filename": "*Q4_K_M.gguf", "max_context": 131072, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.8, "top_p": 0.95, "top_k": 50, "repeat_penalty": 1.05, }, }, "youtu_llm_2b_q8": { "name": "Youtu-LLM 2B (128K Context)", "repo_id": "tencent/Youtu-LLM-2B-GGUF", "filename": "*Q8_0.gguf", "max_context": 131072, "supports_reasoning": True, "supports_toggle": True, # Hybrid model "inference_settings": { "temperature": 0.8, "top_p": 0.95, "top_k": 40, "repeat_penalty": 1.0, }, }, "lfm2_2_6b_transcript": { "name": "LFM2 2.6B Transcript (32K Context)", "repo_id": "LiquidAI/LFM-2.6B-Transcript-GGUF", "filename": "*Q4_0.gguf", "max_context": 32768, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.7, "top_p": 0.95, "top_k": 40, "repeat_penalty": 1.05, }, }, "breeze_3b_q4": { "name": "Breeze 3B Q4 (32K Context)", "repo_id": "mradermacher/breeze-3b-GGUF", "filename": "*Q4_K_M.gguf", "max_context": 32768, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.7, "top_p": 0.95, "top_k": 40, "repeat_penalty": 1.0, }, }, "granite_3_1_3b_q4": { "name": "Granite 3.1 3B-A800M Instruct (128K Context)", "repo_id": "bartowski/granite-3.1-3b-a800m-instruct-GGUF", "filename": "*Q4_K_M.gguf", "max_context": 131072, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.8, "top_p": 0.95, "top_k": 50, "repeat_penalty": 1.05, }, }, "qwen3_4b_thinking_q3": { "name": "Qwen3 4B Thinking (256K Context)", "repo_id": "unsloth/Qwen3-4B-Thinking-2507-GGUF", "filename": "*Q3_K_M.gguf", "max_context": 262144, "supports_reasoning": True, "supports_toggle": False, # Thinking-only "inference_settings": { "temperature": 0.8, "top_p": 0.95, "top_k": 30, "repeat_penalty": 1.0, }, }, "granite4_tiny_q3": { "name": "Granite 4.0 Tiny 7B (128K Context)", "repo_id": "ibm-research/granite-4.0-Tiny-7B-Instruct-GGUF", "filename": "*Q3_K_M.gguf", "max_context": 131072, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.8, "top_p": 0.95, "top_k": 50, "repeat_penalty": 1.05, }, }, "ernie_21b_pt_q1": { "name": "ERNIE-4.5 21B PT (128K Context)", "repo_id": "unsloth/ERNIE-4.5-21B-A3B-PT-GGUF", "filename": "*TQ1_0.gguf", "max_context": 131072, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.8, "top_p": 0.95, "top_k": 50, "repeat_penalty": 1.05, }, }, "ernie_21b_thinking_q1": { "name": "ERNIE-4.5 21B Thinking (128K Context)", "repo_id": "unsloth/ERNIE-4.5-21B-A3B-Thinking-GGUF", "filename": "*TQ1_0.gguf", "max_context": 131072, "supports_reasoning": True, "supports_toggle": False, # Thinking-only "inference_settings": { "temperature": 0.9, "top_p": 0.95, "top_k": 50, "repeat_penalty": 1.05, }, }, "glm_4_7_flash_reap_30b": { "name": "GLM-4.7-Flash-REAP-30B Thinking (128K Context)", "repo_id": "unsloth/GLM-4.7-Flash-REAP-23B-A3B-GGUF", "filename": "*TQ1_0.gguf", "max_context": 131072, "supports_reasoning": True, "supports_toggle": False, # Thinking-only "inference_settings": { "temperature": 0.8, "top_p": 0.95, "top_k": 40, "repeat_penalty": 1.0, }, }, "glm_4_7_flash_30b_iq2": { "name": "GLM-4.7-Flash-30B (Original) IQ2_XXS (128K Context)", "repo_id": "bartowski/zai-org_GLM-4.7-Flash-GGUF", "filename": "*IQ2_XXS.gguf", "max_context": 131072, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.7, "top_p": 0.95, "top_k": 40, "repeat_penalty": 1.0, }, }, "qwen3_30b_thinking_q1": { "name": "Qwen3 30B Thinking (256K Context)", "repo_id": "unsloth/Qwen3-30B-A3B-Thinking-2507-GGUF", "filename": "*TQ1_0.gguf", "max_context": 262144, "supports_reasoning": True, "supports_toggle": False, # Thinking-only "inference_settings": { "temperature": 0.8, "top_p": 0.95, "top_k": 30, "repeat_penalty": 1.0, }, }, "qwen3_30b_instruct_q1": { "name": "Qwen3 30B Instruct (256K Context)", "repo_id": "unsloth/Qwen3-30B-A3B-Instruct-2507-GGUF", "filename": "*TQ1_0.gguf", "max_context": 262144, "supports_reasoning": False, "supports_toggle": False, "inference_settings": { "temperature": 0.7, "top_p": 0.95, "top_k": 30, "repeat_penalty": 1.0, }, }, } DEFAULT_SYNTHESIS_MODEL = "qwen3_1.7b_q4" def load_model(model_key: str = None, n_threads: int = 2) -> Tuple[Llama, str]: """ Load model with CPU optimizations. Only reloads if model changes. Args: model_key: Model identifier from AVAILABLE_MODELS n_threads: Number of CPU threads to use for inference Returns: Tuple of (loaded_model, info_message) """ global llm, converter, current_model_key # Default to current or default model if model_key is None: model_key = current_model_key if current_model_key else DEFAULT_MODEL_KEY model = AVAILABLE_MODELS[model_key] # Already loaded? if llm is not None and model_key == current_model_key: return llm, f"Model ready: {model['name']}" # Unload old model to free memory if llm is not None: logger.info(f"Unloading previous model: {AVAILABLE_MODELS[current_model_key]['name']}") del llm llm = None gc.collect() # Initialize OpenCC converter once if converter is None: converter = OpenCC('s2twp') # Calculate n_ctx: model max capped at MAX_USABLE_CTX n_ctx = min(model["max_context"], MAX_USABLE_CTX) logger.info(f"Loading {model['name']} with n_ctx={n_ctx}") # Detect GPU support and adjust n_gpu_layers requested_ngl = int(os.environ.get("N_GPU_LAYERS", 0)) n_gpu_layers = requested_ngl if requested_ngl != 0: # Check if GPU offload is actually supported try: from llama_cpp import llama_supports_gpu_offload gpu_available = llama_supports_gpu_offload() if not gpu_available: logger.warning(f"N_GPU_LAYERS={requested_ngl} requested but GPU offload not available. Falling back to CPU.") n_gpu_layers = 0 except Exception as e: logger.warning(f"Could not detect GPU support: {e}. Using CPU fallback.") n_gpu_layers = 0 try: llm = Llama.from_pretrained( repo_id=model["repo_id"], filename=model["filename"], n_ctx=n_ctx, n_batch=min(2048, n_ctx), # Batch size for throughput n_threads=n_threads, # Configurable thread count n_threads_batch=n_threads, # Parallel batch processing n_gpu_layers=n_gpu_layers, # 0=CPU only, -1=all GPU layers (if available) verbose=False, seed=1337, v_type=2, k_type=2, ) current_model_key = model_key info_msg = f"Loaded: {model['name']} ({n_ctx:,} context)" logger.info(info_msg) return llm, info_msg except Exception as e: logger.error(f"Error loading model: {e}") raise def update_reasoning_visibility(model_key): """ Update reasoning checkbox visibility, value, and interactivity based on model type. Three model types: - Non-reasoning: checkbox hidden - Thinking-only: checkbox visible, checked, locked (non-interactive), label "Reasoning Mode (Always On)" - Hybrid: checkbox visible, toggleable, label "Enable Reasoning Mode" Returns: Single gr.update() with all properties """ model = AVAILABLE_MODELS[model_key] supports_reasoning = model.get("supports_reasoning", False) supports_toggle = model.get("supports_toggle", False) if not supports_reasoning: # Non-reasoning model: hide checkbox return gr.update(visible=False, value=False, interactive=False, label="Enable Reasoning Mode") elif supports_reasoning and not supports_toggle: # Thinking-only model: show, check, lock return gr.update(visible=True, value=True, interactive=False, label="โšก Reasoning Mode (Always On)") else: # Hybrid model: show, toggleable return gr.update(visible=True, value=True, interactive=True, label="Enable Reasoning Mode") # ===== ADVANCED MODE: HELPER FUNCTIONS ===== def get_model_config(model_key: str, model_role: str) -> Dict[str, Any]: """ Get model configuration based on role. Ensures same model (e.g., qwen3_1.7b_q4) uses DIFFERENT settings for extraction vs synthesis. Args: model_key: Model identifier (e.g., "qwen3_1.7b_q4") model_role: "extraction" or "synthesis" Returns: Model configuration dict with role-specific settings Raises: ValueError: If model_key not available for specified role """ if model_role == "extraction": if model_key not in EXTRACTION_MODELS: available = ", ".join(list(EXTRACTION_MODELS.keys())[:3]) + "..." raise ValueError( f"Model '{model_key}' not available for extraction role. " f"Available: {available}" ) return EXTRACTION_MODELS[model_key] elif model_role == "synthesis": if model_key not in SYNTHESIS_MODELS: available = ", ".join(list(SYNTHESIS_MODELS.keys())[:3]) + "..." raise ValueError( f"Model '{model_key}' not available for synthesis role. " f"Available: {available}" ) return SYNTHESIS_MODELS[model_key] else: raise ValueError( f"Unknown model role: '{model_role}'. " f"Must be 'extraction' or 'synthesis'" ) def load_model_for_role( model_key: str, model_role: str, n_threads: int = 2, user_n_ctx: Optional[int] = None ) -> Tuple[Llama, str]: """ Load model with role-specific configuration. Args: model_key: Model identifier model_role: "extraction" or "synthesis" n_threads: CPU threads user_n_ctx: User-specified n_ctx (extraction only, from slider) Returns: (loaded_model, info_message) Raises: Exception: If model loading fails (graceful failure) """ try: config = get_model_config(model_key, model_role) # Calculate n_ctx if model_role == "extraction" and user_n_ctx is not None: n_ctx = min(user_n_ctx, config["max_context"], MAX_USABLE_CTX) else: # Synthesis or default extraction n_ctx = min(config.get("max_context", 8192), MAX_USABLE_CTX) # Detect GPU support requested_ngl = int(os.environ.get("N_GPU_LAYERS", 0)) n_gpu_layers = requested_ngl if requested_ngl != 0: try: from llama_cpp import llama_supports_gpu_offload gpu_available = llama_supports_gpu_offload() if not gpu_available: logger.warning("GPU requested but not available. Using CPU.") n_gpu_layers = 0 except Exception as e: logger.warning(f"Could not detect GPU: {e}. Using CPU.") n_gpu_layers = 0 # Load model logger.info(f"Loading {config['name']} for {model_role} role (n_ctx={n_ctx:,})") llm = Llama.from_pretrained( repo_id=config["repo_id"], filename=config["filename"], n_ctx=n_ctx, n_batch=min(2048, n_ctx), n_threads=n_threads, n_threads_batch=n_threads, n_gpu_layers=n_gpu_layers, verbose=False, seed=1337, ) info_msg = ( f"โœ… Loaded: {config['name']} for {model_role} " f"(n_ctx={n_ctx:,}, threads={n_threads})" ) logger.info(info_msg) return llm, info_msg except Exception as e: # Graceful failure - let user select different model error_msg = ( f"โŒ Failed to load {model_key} for {model_role}: {str(e)}\n\n" f"Please select a different model and try again." ) logger.error(error_msg, exc_info=True) raise Exception(error_msg) def unload_model(llm: Optional[Llama], model_name: str = "model") -> None: """Explicitly unload model and trigger garbage collection.""" if llm: logger.info(f"Unloading {model_name}") del llm gc.collect() time.sleep(0.5) # Allow OS to reclaim memory def get_extraction_model_info(model_key: str) -> str: """Generate markdown info for extraction model.""" config = EXTRACTION_MODELS.get(model_key, {}) if not config: return "**Extraction Model**\n\nSelect a model to see details" settings = config.get("inference_settings", {}) reasoning_support = "" if config.get("supports_toggle"): reasoning_support = "\n**Reasoning:** Hybrid (user-toggleable)" elif config.get("supports_reasoning"): reasoning_support = "\n**Reasoning:** Thinking-only (always on)" return f"""**{config.get('name', 'Unknown')}** **Size:** {config.get('params_size', 'N/A')} **Max Context:** {config.get('max_context', 0):,} tokens **Default n_ctx:** {config.get('default_n_ctx', 4096):,} tokens (user-adjustable via slider) **Repository:** `{config.get('repo_id', 'N/A')}`{reasoning_support} **Extraction-Optimized Settings:** - Temperature: {settings.get('temperature', 'N/A')} - Top P: {settings.get('top_p', 'N/A')} - Top K: {settings.get('top_k', 'N/A')} - Repeat Penalty: {settings.get('repeat_penalty', 'N/A')} """ def get_embedding_model_info(model_key: str) -> str: """Generate markdown info for embedding model.""" from meeting_summarizer.extraction import EMBEDDING_MODELS config = EMBEDDING_MODELS.get(model_key, {}) if not config: return "**Embedding Model**\n\nSelect a model to see details" return f"""**{config.get('name', 'Unknown')}** **Embedding Dimension:** {config.get('embedding_dim', 'N/A')} **Context:** {config.get('max_context', 0):,} tokens **Repository:** `{config.get('repo_id', 'N/A')}` **Description:** {config.get('description', 'N/A')} """ def get_synthesis_model_info(model_key: str) -> str: """Generate markdown info for synthesis model.""" config = SYNTHESIS_MODELS.get(model_key, {}) if not config: return "**Synthesis Model**\n\nSelect a model to see details" settings = config.get("inference_settings", {}) reasoning_support = "" if config.get("supports_toggle"): reasoning_support = "\n**Reasoning:** Hybrid (user-toggleable)" elif config.get("supports_reasoning"): reasoning_support = "\n**Reasoning:** Thinking-only (always on)" return f"""**{config.get('name', 'Unknown')}** **Max Context:** {config.get('max_context', 0):,} tokens **Repository:** `{config.get('repo_id', 'N/A')}`{reasoning_support} **Synthesis-Optimized Settings:** - Temperature: {settings.get('temperature', 'N/A')} - Top P: {settings.get('top_p', 'N/A')} - Top K: {settings.get('top_k', 'N/A')} - Repeat Penalty: {settings.get('repeat_penalty', 'N/A')} """ def summarize_advanced( transcript: str, extraction_model_key: str, embedding_model_key: str, synthesis_model_key: str, extraction_n_ctx: int, overlap_turns: int, similarity_threshold: float, enable_extraction_reasoning: bool, enable_synthesis_reasoning: bool, output_language: str, max_tokens: int, enable_logging: bool, n_threads: int = 2, temperature: float = 0.6, top_p: float = 0.95, top_k: int = 20 ) -> Generator[Dict[str, Any], None, None]: """ Advanced 3-stage pipeline: Extraction โ†’ Deduplication โ†’ Synthesis. Yields progress updates as dicts with keys: - stage: "extraction" | "deduplication" | "synthesis" | "complete" | "error" - ticker: Progress ticker text (for extraction) - thinking: Thinking/reasoning content - summary: Final summary (for synthesis/complete) - error: Error message (if any) - trace_stats: Summary statistics (on complete) """ from meeting_summarizer.trace import Tracer from meeting_summarizer.extraction import ( EmbeddingModel, Window, preprocess_transcript, stream_extract_from_window, deduplicate_items, stream_synthesize_executive_summary ) # Initialize tracer tracer = Tracer(enabled=enable_logging) extraction_llm = None embedding_model = None synthesis_llm = None try: # ===== STAGE 1: EXTRACTION ===== yield {"stage": "extraction", "ticker": "Loading extraction model...", "thinking": "", "summary": ""} extraction_llm, load_msg = load_model_for_role( model_key=extraction_model_key, model_role="extraction", n_threads=n_threads, user_n_ctx=extraction_n_ctx ) yield {"stage": "extraction", "ticker": load_msg, "thinking": "", "summary": ""} # Use the model's actual tokenizer for accurate token counting def count_tokens(text: str) -> int: """Count tokens using the extraction model's tokenizer.""" return len(extraction_llm.tokenize(text.encode('utf-8'))) # Preprocess transcript: strip CSV format, remove noise/repetition raw_line_count = len(transcript.split('\n')) raw_char_count = len(transcript) transcript, noise_phrases = preprocess_transcript(transcript) cleaned_line_count = len(transcript.split('\n')) cleaned_char_count = len(transcript) # Log preprocessing info to tracer tracer.log_preprocessing( original_line_count=raw_line_count, cleaned_line_count=cleaned_line_count, original_char_count=raw_char_count, cleaned_char_count=cleaned_char_count, noise_phrases_removed=noise_phrases ) # Create windows from preprocessed transcript lines = [l.strip() for l in transcript.split('\n') if l.strip()] # Reserve tokens for system prompt (~200) and output (~2048) max_window_tokens = extraction_n_ctx - 2300 # Target ~1800 tokens per window # Simple windowing: split into chunks based on token count windows = [] current_window = [] current_tokens = 0 window_id = 1 for line_num, line in enumerate(lines): line_tokens = count_tokens(line) if current_tokens + line_tokens > max_window_tokens and current_window: # Create window window_content = '\n'.join(current_window) windows.append(Window( id=window_id, content=window_content, start_turn=line_num - len(current_window), end_turn=line_num - 1, token_count=current_tokens )) # Log window to tracer for debugging tracer.log_window( window_id=window_id, content=window_content, token_count=current_tokens, start_turn=line_num - len(current_window), end_turn=line_num - 1 ) window_id += 1 # Start new window with overlap overlap_lines = current_window[-overlap_turns:] if len(current_window) >= overlap_turns else current_window current_window = overlap_lines + [line] current_tokens = sum(count_tokens(l) for l in current_window) else: current_window.append(line) current_tokens += line_tokens # Add final window if current_window: window_content = '\n'.join(current_window) windows.append(Window( id=window_id, content=window_content, start_turn=len(lines) - len(current_window), end_turn=len(lines) - 1, token_count=current_tokens )) # Log window to tracer for debugging tracer.log_window( window_id=window_id, content=window_content, token_count=current_tokens, start_turn=len(lines) - len(current_window), end_turn=len(lines) - 1 ) total_windows = len(windows) yield {"stage": "extraction", "ticker": f"Created {total_windows} windows", "thinking": "", "summary": ""} # Extract from each window all_items = {"action_items": [], "decisions": [], "key_points": [], "open_questions": []} extraction_config = get_model_config(extraction_model_key, "extraction") for window in windows: for ticker, thinking, partial_items, is_complete in stream_extract_from_window( extraction_llm=extraction_llm, window=window, window_id=window.id, total_windows=total_windows, tracer=tracer, model_config=extraction_config, enable_reasoning=enable_extraction_reasoning ): yield {"stage": "extraction", "ticker": ticker, "thinking": thinking, "summary": ""} if is_complete: # Merge items for category, items in partial_items.items(): all_items[category].extend(items) # Unload extraction model unload_model(extraction_llm, "extraction model") extraction_llm = None total_extracted = sum(len(v) for v in all_items.values()) yield {"stage": "extraction", "ticker": f"โœ… Extracted {total_extracted} total items", "thinking": "", "summary": ""} # ===== STAGE 2: DEDUPLICATION ===== yield {"stage": "deduplication", "ticker": "Loading embedding model...", "thinking": "", "summary": ""} embedding_model = EmbeddingModel(embedding_model_key, n_threads=n_threads) load_msg = embedding_model.load() yield {"stage": "deduplication", "ticker": load_msg, "thinking": "", "summary": ""} # Deduplicate - now a generator for progress updates deduplicated_items = {"action_items": [], "decisions": [], "key_points": [], "open_questions": []} categories_processed = 0 total_categories = len([k for k, v in all_items.items() if v]) for intermediate_dedup in deduplicate_items( all_items=all_items, embedding_model=embedding_model, similarity_threshold=similarity_threshold, tracer=tracer ): deduplicated_items = intermediate_dedup categories_processed += 1 current_total = sum(len(v) for v in deduplicated_items.values()) yield { "stage": "deduplication", "ticker": f"Deduplicating: {categories_processed}/{total_categories} categories processed ({current_total} items so far)...", "thinking": "", "summary": "" } # Unload embedding model embedding_model.unload() embedding_model = None total_deduplicated = sum(len(v) for v in deduplicated_items.values()) duplicates_removed = total_extracted - total_deduplicated yield { "stage": "deduplication", "ticker": f"โœ… Deduplication complete: {total_extracted} โ†’ {total_deduplicated} ({duplicates_removed} duplicates removed)", "thinking": "", "summary": "" } # ===== STAGE 3: SYNTHESIS ===== yield {"stage": "synthesis", "ticker": "", "thinking": "Loading synthesis model...", "summary": ""} synthesis_llm, load_msg = load_model_for_role( model_key=synthesis_model_key, model_role="synthesis", n_threads=n_threads ) yield {"stage": "synthesis", "ticker": "", "thinking": f"โœ… {load_msg}", "summary": ""} # Synthesize synthesis_config = get_model_config(synthesis_model_key, "synthesis") # Override inference settings with custom parameters synthesis_config["inference_settings"] = { "temperature": temperature, "top_p": top_p, "top_k": top_k, "repeat_penalty": 1.1 } final_summary = "" final_thinking = "" for summary_chunk, thinking_chunk, is_complete in stream_synthesize_executive_summary( synthesis_llm=synthesis_llm, deduplicated_items=deduplicated_items, model_config=synthesis_config, output_language=output_language, enable_reasoning=enable_synthesis_reasoning, max_tokens=max_tokens, tracer=tracer ): final_summary = summary_chunk final_thinking = thinking_chunk yield {"stage": "synthesis", "ticker": "", "thinking": thinking_chunk, "summary": summary_chunk} # Unload synthesis model unload_model(synthesis_llm, "synthesis model") synthesis_llm = None # Apply Chinese conversion if needed if output_language == "zh-TW": converter = OpenCC('s2twp') final_summary = converter.convert(final_summary) if final_thinking: final_thinking = converter.convert(final_thinking) # Get trace stats and add model names for download JSON trace_stats = tracer.get_summary_stats() debug_json = tracer.get_debug_json() ext_config = get_model_config(extraction_model_key, "extraction") syn_config = get_model_config(synthesis_model_key, "synthesis") trace_stats["extraction_model"] = ext_config.get("name", extraction_model_key) trace_stats["embedding_model"] = embedding_model_key trace_stats["synthesis_model"] = syn_config.get("name", synthesis_model_key) yield { "stage": "complete", "ticker": "", "thinking": final_thinking, "summary": final_summary, "trace_stats": trace_stats, "trace_json": tracer.get_trace_json(), "debug_json": debug_json } except Exception as e: logger.error(f"Advanced pipeline error: {e}", exc_info=True) # Cleanup if extraction_llm: unload_model(extraction_llm, "extraction model") if embedding_model: embedding_model.unload() if synthesis_llm: unload_model(synthesis_llm, "synthesis model") yield { "stage": "error", "ticker": "", "thinking": "", "summary": "", "error": str(e) } def download_summary_json(summary, thinking, model_key, language, metrics): """Generate JSON file with summary and metadata for both Standard and Advanced modes.""" import json from datetime import datetime is_advanced = isinstance(metrics, dict) and metrics.get("mode") == "advanced" if is_advanced: # Advanced Mode: embed trace data and use pipeline model names trace_stats = metrics.get("trace_stats", {}) debug_info = metrics.get("debug_json", {}) data = { "metadata": { "generated_at": datetime.now().isoformat(), "mode": "advanced", "pipeline": "extraction โ†’ deduplication โ†’ synthesis", "extraction_model": trace_stats.get("extraction_model", "unknown"), "embedding_model": trace_stats.get("embedding_model", "unknown"), "synthesis_model": trace_stats.get("synthesis_model", "unknown"), "language": language }, "thinking_process": thinking, "summary": summary, "pipeline_stats": { "total_windows": trace_stats.get("total_windows", 0), "successful_extractions": trace_stats.get("successful_extractions", 0), "total_items_extracted": trace_stats.get("total_items_extracted", 0), "total_items_after_dedup": trace_stats.get("total_items_after_dedup", 0), "total_duplicates_removed": trace_stats.get("total_duplicates_removed", 0), "duplicate_rate": trace_stats.get("duplicate_rate", 0), "synthesis_success": trace_stats.get("synthesis_success", False), "total_elapsed_seconds": trace_stats.get("total_elapsed_seconds", 0), }, "debug_info": debug_info, "trace": metrics.get("trace_json", []) } else: # Standard Mode: original behavior model_name = "unknown" if model_key and model_key in AVAILABLE_MODELS: model_name = AVAILABLE_MODELS[model_key]["name"] data = { "metadata": { "generated_at": datetime.now().isoformat(), "mode": "standard", "model": model_name, "model_id": model_key, "language": language }, "thinking_process": thinking, "summary": summary } # Add generation metrics if available if metrics and isinstance(metrics, dict): data["generation_metrics"] = { "settings_used": metrics.get("settings", {}), "timing": { "time_to_first_token_ms": round(metrics.get("time_to_first_token_ms", 0), 2) if metrics.get("time_to_first_token_ms") else None, "total_processing_time_ms": round(metrics.get("total_processing_time_ms", 0), 2) if metrics.get("total_processing_time_ms") else None, "model_load_time_ms": round(metrics.get("model_load_time_ms", 0), 2) if metrics.get("model_load_time_ms") else None, }, "tokens": { "n_ctx": metrics.get("n_ctx"), "input_tokens": metrics.get("input_tokens"), "output_tokens": metrics.get("output_tokens"), "thinking_tokens": metrics.get("thinking_tokens"), "total_tokens": metrics.get("total_tokens"), "generation_tokens": metrics.get("generation_tokens"), "prefill_tokens": metrics.get("prefill_tokens") }, "performance": { "generation_speed_tps": round(metrics.get("generation_speed_tps", 0), 2) if metrics.get("generation_speed_tps") else None, "prefill_speed_tps": round(metrics.get("prefill_speed_tps", 0), 2) if metrics.get("prefill_speed_tps") else None }, "file_info": metrics.get("file_info", {}), "truncation_info": metrics.get("truncation_info", {}) } filename = f"summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) return gr.update(value=filename, visible=True) def estimate_tokens(text: str) -> int: """ Estimate token count for mixed CJK/English text. ~3 UTF-8 bytes per token for Chinese-heavy content. """ return len(text.encode('utf-8')) // 3 def calculate_n_ctx(model_key: str, transcript: str, max_tokens: int, enable_reasoning: bool = False) -> Tuple[int, str]: """ Calculate optimal n_ctx based on model limits and input size. Args: model_key: Model identifier from AVAILABLE_MODELS transcript: Input text content max_tokens: Maximum tokens to generate for summary enable_reasoning: If True, add extra buffer for thinking tokens Returns: Tuple of (n_ctx, warning_message) -- warning is "" if no issue """ model = AVAILABLE_MODELS[model_key] model_max = model["max_context"] usable_max = min(model_max, MAX_USABLE_CTX) input_tokens = estimate_tokens(transcript) # Calculate thinking buffer for reasoning models thinking_buffer = 0 if enable_reasoning: # Reserve 50% of max_tokens for thinking output thinking_buffer = int(max_tokens * 0.5) required = input_tokens + max_tokens + thinking_buffer + 512 # 512 for system prompt + buffer # Round up to nearest 512 for efficiency n_ctx = ((required // 512) + 1) * 512 n_ctx = max(2048, min(n_ctx, usable_max)) warning = "" if required > usable_max: available_input = usable_max - max_tokens - thinking_buffer - 512 warning = ( f"โš ๏ธ Warning: File too large for {model['name']} " f"(need ~{required:,} tokens, max {usable_max:,}). " f"Input will be truncated to ~{available_input:,} tokens. " f"Consider Hunyuan (256K) or ERNIE (131K) for large files." ) return n_ctx, warning def calculate_effective_max_tokens(model_key: str, max_tokens: int, enable_reasoning: bool) -> int: """ Calculate effective max_tokens with thinking headroom for reasoning models. When reasoning is enabled for thinking-capable models, adds 50% headroom to accommodate both thinking process and final output. Args: model_key: Model identifier from AVAILABLE_MODELS max_tokens: User-specified maximum tokens enable_reasoning: Whether reasoning mode is enabled Returns: Adjusted max_tokens value (1.5x for reasoning models, unchanged otherwise) """ if not enable_reasoning: return max_tokens model_config = AVAILABLE_MODELS.get(model_key) if not model_config: return max_tokens # Check if model supports reasoning/thinking supports_reasoning = model_config.get("supports_reasoning", False) if supports_reasoning: # Add 50% headroom for thinking process thinking_headroom = int(max_tokens * 0.5) effective_max = max_tokens + thinking_headroom logger.info(f"Reasoning enabled for {model_key}: extending max_tokens from {max_tokens} to {effective_max}") return effective_max return max_tokens def get_model_info(model_key: str, n_threads: int = 2, custom_metadata: Optional[dict] = None) -> Tuple[str, str, float, int]: """Get model information and inference settings for UI display. Args: model_key: Model identifier from AVAILABLE_MODELS n_threads: Number of CPU threads currently configured custom_metadata: Optional metadata for custom models (repo_id, filename, size_mb) Returns: Tuple of (info_text, temperature, top_p, top_k) """ # Handle custom model case if model_key == "custom_hf" and custom_metadata: repo_id = custom_metadata.get("repo_id", "Unknown") filename = custom_metadata.get("filename", "Unknown") size_mb = custom_metadata.get("size_mb", 0) size_str = f"{size_mb:.1f} MB" if size_mb > 0 else "Unknown" # Determine thread preset label if n_threads == 2: thread_label = "HF Free Tier" elif n_threads == 8: thread_label = "HF Upgrade Tier" else: thread_label = "Custom" info_text = ( f"## ๐Ÿค– Custom GGUF Model\n\n" f"### ๐Ÿ“Š Model Specs\n" f"| Property | Value |\n" f"|----------|-------|\n" f"| **Repository** | `{repo_id}` |\n" f"| **Quantization** | `{filename}` |\n" f"| **Size** | {size_str} |\n" f"| **Context** | Dynamic (up to 32K) |\n\n" f"### ๐Ÿ–ฅ๏ธ Hardware Configuration\n" f"| Property | Value |\n" f"|----------|-------|\n" f"| **CPU Threads** | {n_threads} ({thread_label}) |\n\n" f"### โš™๏ธ Inference Settings\n" f"| Property | Value |\n" f"|----------|-------|\n" f"| **Temperature** | 0.6 |\n" f"| **Top P** | 0.9 |\n" f"| **Top K** | 40 |\n" f"| **Repeat Penalty** | 1.0 |" ) return info_text, "0.6", 0.9, 40 # Handle predefined models m = AVAILABLE_MODELS[model_key] usable_ctx = min(m["max_context"], MAX_USABLE_CTX) settings = m["inference_settings"] # Determine thread preset label if n_threads == 2: thread_label = "HF Free Tier" elif n_threads == 8: thread_label = "HF Upgrade Tier" else: thread_label = "Custom" info_text = ( f"## ๐Ÿค– {m['name']}\n\n" f"### ๐Ÿ“Š Model Specs\n" f"| Property | Value |\n" f"|----------|-------|\n" f"| **Context** | {m['max_context']:,} tokens (capped at {usable_ctx:,}) |\n" f"| **Quantization** | `{m['filename']}` |\n" f"| **Repository** | `{m['repo_id']}` |\n\n" f"### ๐Ÿ–ฅ๏ธ Hardware Configuration\n" f"| Property | Value |\n" f"|----------|-------|\n" f"| **CPU Threads** | {n_threads} ({thread_label}) |\n\n" f"### โš™๏ธ Inference Settings\n" f"| Property | Value |\n" f"|----------|-------|\n" f"| **Temperature** | {settings['temperature']} |\n" f"| **Top P** | {settings['top_p']} |\n" f"| **Top K** | {settings['top_k']} |\n" f"| **Repeat Penalty** | {settings.get('repeat_penalty', 1.0)} |" ) return info_text, str(settings["temperature"]), settings["top_p"], settings["top_k"] def parse_thinking_blocks(content: str, streaming: bool = False) -> Tuple[str, str]: """ Parse thinking blocks from model output. Supports both and tags. Args: content: Full model response streaming: If True, handle unclosed tags for live display Returns: Tuple of (thinking_content, summary_content) """ closed_pattern = r'(.*?)' open_pattern = r'([^<]*)$' # Extract completed thinking blocks closed_matches = re.findall(closed_pattern, content, re.DOTALL) # Remove completed blocks to get summary remaining = re.sub(closed_pattern, '', content, flags=re.DOTALL).strip() thinking_parts = [m.strip() for m in closed_matches if m.strip()] if streaming: # Check for unclosed tag (model still generating thinking tokens) open_match = re.search(open_pattern, content, re.DOTALL) if open_match: partial = open_match.group(1).strip() if partial: thinking_parts.append(partial) # Nothing after the open tag counts as summary yet remaining = re.sub(r'[^<]*$', '', remaining, flags=re.DOTALL).strip() thinking = '\n\n'.join(thinking_parts) if not thinking and not closed_matches: # No thinking tags found at all return ("", content if not content.startswith(' Generator[Tuple[str, str, str, dict, str], None, None]: """ Stream summary generation from uploaded file or text input. Args: file_obj: Gradio file object text_input: Direct text input from user model_key: Model identifier from AVAILABLE_MODELS enable_reasoning: Whether to use reasoning mode (/think) for Qwen3 models max_tokens: Maximum tokens to generate top_p: Nucleus sampling parameter (uses model default if None) top_k: Top-k sampling parameter (uses model default if None) output_language: Target language for summary ("en" or "zh-TW") thread_config: Thread configuration preset ("free", "upgrade", "custom") custom_threads: Custom thread count when preset is "custom" custom_model_state: Pre-loaded custom model (if using custom_hf) Yields: Tuple of (thinking_text, summary_text, info_text, metrics_dict, system_prompt) """ import time metrics = { "start_time": None, "time_to_first_token_ms": None, "generation_start_time": None, "generation_end_time": None, "model_load_time_ms": None, "total_tokens": 0, "generation_tokens": 0, "prefill_tokens": 0, "input_tokens": 0, "output_tokens": 0, "thinking_tokens": 0, "n_ctx": 0, "settings": {}, "file_info": {}, "truncation_info": {}, } global llm, converter # Determine thread count based on configuration preset thread_preset_map = { "free": 2, # HF Spaces Free Tier: 2 vCPUs "upgrade": 8, # HF Spaces CPU Upgrade: 8 vCPUs "custom": custom_threads, # User-specified thread count } n_threads = thread_preset_map.get(thread_config, 2) logger.info(f"Using {n_threads} threads (config: {thread_config})") model = AVAILABLE_MODELS[model_key] usable_max = min(model["max_context"], MAX_USABLE_CTX) # Adjust max_tokens for thinking models when reasoning is enabled original_max_tokens = max_tokens max_tokens = calculate_effective_max_tokens(model_key, max_tokens, enable_reasoning) if max_tokens != original_max_tokens: logger.info(f"Adjusted max_tokens from {original_max_tokens} to {max_tokens} for reasoning mode") # Validate max_tokens fits in context if max_tokens > usable_max - 512: max_tokens = usable_max - 512 # Read input source (prioritize text_input) try: transcript = "" source_name = "Direct Input" source_size = 0 if text_input and text_input.strip(): transcript = text_input source_size = len(transcript.encode('utf-8')) elif file_obj is not None: path = file_obj.name if hasattr(file_obj, 'name') else file_obj source_name = os.path.basename(path) source_size = os.path.getsize(path) with open(path, 'r', encoding='utf-8') as f: transcript = f.read() else: system_prompt_preview = build_system_prompt(output_language, False, enable_reasoning) yield ("", "Error: Please upload a file or paste text first", "", metrics, system_prompt_preview) return # Store input info metrics["file_info"] = { "source": source_name, "size_bytes": source_size, "original_char_count": len(transcript), } except Exception as e: system_prompt_preview = build_system_prompt(output_language, False, enable_reasoning) yield ("", f"Error reading input: {e}", "", metrics, system_prompt_preview) return if not transcript.strip(): system_prompt_preview = build_system_prompt(output_language, False, enable_reasoning) yield ("", "Error: File is empty", "", metrics, system_prompt_preview) return # Calculate context and check truncation (with reasoning buffer if enabled) n_ctx, warning = calculate_n_ctx(model_key, transcript, max_tokens, enable_reasoning) metrics["n_ctx"] = n_ctx # Truncate if needed (estimate max chars from available tokens) available_tokens = usable_max - max_tokens - 512 max_bytes = available_tokens * 3 # Reverse estimate: tokens * 3 bytes encoded = transcript.encode('utf-8') was_truncated = len(encoded) > max_bytes original_length = len(transcript) if was_truncated: transcript = encoded[:max_bytes].decode('utf-8', errors='ignore') transcript += "\n\n[Content truncated to fit model context]" # Store truncation info metrics["truncation_info"] = { "was_truncated": was_truncated, "original_char_count": original_length, "final_char_count": len(transcript), "original_token_estimate": estimate_tokens(transcript) if not was_truncated else estimate_tokens(encoded[:max_bytes].decode('utf-8', errors='ignore')), } # Get base model info with current thread configuration info_text, _, _, _ = get_model_info(model_key, n_threads=n_threads) # Build generation stats section input_tokens = estimate_tokens(transcript) max_output_text = f"{max_tokens:,} tokens" if max_tokens != original_max_tokens: max_output_text += f" (adjusted from {original_max_tokens:,} for thinking mode)" generation_stats = ( f"\n\n### ๐Ÿ“ˆ Generation Stats\n" f"| Property | Value |\n" f"|----------|-------|\n" f"| **Context Window** | {n_ctx:,} tokens |\n" f"| **Input Tokens** | ~{input_tokens:,} tokens |\n" f"| **Max Output** | {max_output_text} |" ) # Combine model info with generation stats info = info_text + generation_stats if warning: info += f"\n\nโš ๏ธ {warning}" # Load model (no-op if already loaded) with timing model_load_start = time.time() try: if model_key == "custom_hf": # Use pre-loaded custom model if custom_model_state is None: system_prompt_preview = build_system_prompt(output_language, False, enable_reasoning) yield ("", "Error: No custom model loaded. Please load a custom model first.", "", metrics, system_prompt_preview) return llm = custom_model_state load_msg = "Using pre-loaded custom model" else: llm, load_msg = load_model(model_key, n_threads=n_threads) logger.info(load_msg) metrics["model_load_time_ms"] = (time.time() - model_load_start) * 1000 except Exception as e: system_prompt_preview = build_system_prompt(output_language, False, enable_reasoning) yield ("", f"Error loading model: {e}", "", metrics, system_prompt_preview) return # Prepare system prompt with reasoning toggle for Qwen3 models if model_key == "custom_hf": # Use default settings for custom models model = AVAILABLE_MODELS["custom_hf"] else: model = AVAILABLE_MODELS[model_key] # Calculate dynamic temperature for Qwen3 models if model.get("supports_toggle") and "temperature_thinking" in model.get("inference_settings", {}): if enable_reasoning: effective_temperature = model["inference_settings"]["temperature_thinking"] else: effective_temperature = model["inference_settings"]["temperature_no_thinking"] else: effective_temperature = temperature # Build system and user prompts using the extracted function system_content = build_system_prompt(output_language, model.get("supports_toggle", False), enable_reasoning) user_content = build_user_prompt(transcript, output_language) messages = [ {"role": "system", "content": system_content}, {"role": "user", "content": user_content}, ] # Get model-specific inference settings inference_settings = model["inference_settings"] temperature = inference_settings["temperature"] final_top_p = top_p if top_p is not None else inference_settings["top_p"] final_top_k = top_k if top_k is not None else inference_settings["top_k"] repeat_penalty = inference_settings["repeat_penalty"] # Stream - NO stop= parameter, let GGUF metadata handle it full_response = "" current_thinking = "" current_summary = "" try: # Record generation settings metrics["settings"] = { "model": model_key, "max_tokens": max_tokens, "temperature": effective_temperature, "top_p": final_top_p, "top_k": final_top_k, "repeat_penalty": repeat_penalty, "enable_reasoning": enable_reasoning, "output_language": output_language, "n_ctx": metrics["n_ctx"], } # Calculate exact input tokens (system + user prompts) system_tokens = estimate_tokens(system_content) user_tokens = estimate_tokens(user_content) metrics["input_tokens"] = system_tokens + user_tokens # Start timing metrics["start_time"] = time.time() first_token_time = None token_count = 0 # Apply model-specific inference settings stream = llm.create_chat_completion( messages=messages, max_tokens=max_tokens, temperature=effective_temperature, min_p=0.0, top_p=final_top_p, top_k=final_top_k, repeat_penalty=repeat_penalty, stream=True, ) metrics["generation_start_time"] = time.time() for chunk in stream: if 'choices' in chunk and len(chunk['choices']) > 0: delta = chunk['choices'][0].get('delta', {}) content = delta.get('content', '') if content: # Track time to first token if first_token_time is None: first_token_time = time.time() metrics["time_to_first_token_ms"] = (first_token_time - metrics["start_time"]) * 1000 token_count += 1 if output_language == "zh-TW": converted = converter.convert(content) full_response += converted else: full_response += content thinking, summary = parse_thinking_blocks(full_response, streaming=True) current_thinking = thinking or "" current_summary = summary or "" yield (current_thinking, current_summary, info, metrics, system_content) # Final timing calculations metrics["generation_end_time"] = time.time() metrics["generation_tokens"] = token_count metrics["total_tokens"] = token_count # Calculate speeds generation_duration = metrics["generation_end_time"] - metrics["generation_start_time"] if generation_duration > 0: metrics["generation_speed_tps"] = token_count / generation_duration else: metrics["generation_speed_tps"] = 0.0 # Prefill = time from start to first token if metrics["time_to_first_token_ms"]: prefill_seconds = metrics["time_to_first_token_ms"] / 1000 # Estimate prefill tokens (input tokens processed before first output) input_tokens = estimate_tokens(transcript) metrics["prefill_tokens"] = input_tokens if prefill_seconds > 0: metrics["prefill_speed_tps"] = input_tokens / prefill_seconds else: metrics["prefill_speed_tps"] = 0.0 # Total processing time metrics["total_processing_time_ms"] = (metrics["generation_end_time"] - metrics["start_time"]) * 1000 # Final parse and token counts thinking, summary = parse_thinking_blocks(full_response) # Calculate output tokens metrics["output_tokens"] = estimate_tokens(summary) if summary else 0 metrics["thinking_tokens"] = estimate_tokens(thinking) if thinking else 0 # Update totals metrics["total_tokens"] = metrics["input_tokens"] + metrics["output_tokens"] + metrics["thinking_tokens"] yield (thinking or "", summary or "", info, metrics, system_content) llm.reset() except Exception as e: logger.error(f"Generation error: {e}") metrics["error"] = str(e) yield (current_thinking, current_summary + f"\n\nError: {e}", info, metrics, system_content) # Custom CSS for better UI custom_css = """ :root { --primary-color: #6366f1; --primary-dark: #4f46e5; --primary-light: #c7d2fe; --accent-color: #8b5cf6; --bg-color: #f8fafc; --card-bg: rgba(255, 255, 255, 0.85); --text-color: #1e293b; --text-muted: #64748b; --border-color: #e2e8f0; --border-light: #f1f5f9; /* Semantic Colors */ --thinking-bg: #f5f3ff; --thinking-border: #ddd6fe; --thinking-accent: #8b5cf6; --summary-bg: #f0fdf4; --summary-border: #dcfce7; --summary-accent: #22c55e; --shadow-sm: 0 1px 2px rgba(0, 0, 0, 0.05); --shadow-md: 0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06); --shadow-lg: 0 10px 15px -3px rgba(0, 0, 0, 0.1), 0 4px 6px -2px rgba(0, 0, 0, 0.05); --radius-sm: 8px; --radius-md: 12px; --radius-lg: 20px; } /* ===== LAYOUT & BASE ===== */ .gradio-container { max-width: 1400px !important; background: radial-gradient(circle at top right, #eef2ff 0%, #f8fafc 40%) !important; } /* ===== HEADER ===== */ .app-header { text-align: center; padding: 2.5rem 1.5rem; background: linear-gradient(135deg, var(--primary-color) 0%, var(--accent-color) 100%); border-radius: var(--radius-lg); margin-bottom: 2rem; color: white; box-shadow: var(--shadow-lg); position: relative; overflow: hidden; } .app-header::before { content: ""; position: absolute; top: -50%; left: -50%; width: 200%; height: 200%; background: radial-gradient(circle, rgba(255,255,255,0.1) 0%, transparent 60%); animation: rotate 20s linear infinite; } @keyframes rotate { from { transform: rotate(0deg); } to { transform: rotate(360deg); } } .app-header h1 { margin: 0 0 0.5rem 0; font-size: 2.5rem; font-weight: 800; letter-spacing: -0.04em; position: relative; z-index: 1; } .app-header p { margin: 0; opacity: 0.9; font-size: 1.15rem; font-weight: 400; position: relative; z-index: 1; } .model-badge { display: inline-flex; align-items: center; gap: 0.5rem; background: rgba(255, 255, 255, 0.15); padding: 0.6rem 1.25rem; border-radius: 30px; font-size: 0.9rem; margin-top: 1.25rem; backdrop-filter: blur(8px); border: 1px solid rgba(255, 255, 255, 0.2); position: relative; z-index: 1; font-weight: 500; } /* ===== INSTRUCTIONS ===== */ .instructions { background: var(--card-bg); border-left: 5px solid var(--primary-color); padding: 1.25rem 1.5rem; border-radius: var(--radius-sm) var(--radius-md) var(--radius-md) var(--radius-sm); margin-bottom: 2rem; box-shadow: var(--shadow-sm); backdrop-filter: blur(10px); border: 1px solid var(--border-color); } /* ===== SECTION HEADERS ===== */ .section-header { font-size: 0.95rem; font-weight: 700; color: var(--text-color); margin-bottom: 1rem; display: flex; align-items: center; gap: 0.6rem; padding-bottom: 0.6rem; border-bottom: 2px solid var(--border-light); text-transform: uppercase; letter-spacing: 0.05em; } .section-icon { font-size: 1.2rem; } /* ===== TABS STYLING ===== */ .gradio-tabs { border: 1px solid var(--border-color) !important; border-radius: var(--radius-md) !important; overflow: hidden; box-shadow: var(--shadow-sm); background: var(--card-bg) !important; backdrop-filter: blur(10px); } .tab-nav { background: #f1f5f9 !important; padding: 0.25rem 0.25rem 0 0.25rem !important; gap: 4px !important; } .tab-nav button { border-radius: 8px 8px 0 0 !important; padding: 0.75rem 1rem !important; } /* ===== GROUPS & CARDS ===== */ .gradio-group { border: 1px solid var(--border-color) !important; border-radius: var(--radius-md) !important; padding: 1.25rem !important; background: var(--card-bg) !important; box-shadow: var(--shadow-sm) !important; margin-bottom: 1.5rem !important; backdrop-filter: blur(10px); transition: transform 0.2s ease, box-shadow 0.2s ease !important; } .gradio-group:hover { box-shadow: var(--shadow-md) !important; } /* ===== ACCORDION STYLING ===== */ .gradio-accordion { border: 1px solid var(--border-color) !important; border-radius: var(--radius-md) !important; background: var(--card-bg) !important; } /* ===== BUTTONS ===== */ .submit-btn { background: linear-gradient(135deg, var(--primary-color) 0%, var(--accent-color) 100%) !important; border: none !important; color: white !important; font-weight: 700 !important; padding: 1rem 2rem !important; border-radius: var(--radius-md) !important; cursor: pointer; transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1) !important; box-shadow: 0 4px 15px rgba(99, 102, 241, 0.4) !important; width: 100% !important; font-size: 1.1rem !important; letter-spacing: 0.02em; } .submit-btn:hover { transform: translateY(-3px) scale(1.02); box-shadow: 0 8px 25px rgba(99, 102, 241, 0.5) !important; } /* ===== OUTPUT BOXES ===== */ .thinking-box { background: var(--thinking-bg) !important; border: 1px solid var(--thinking-border) !important; border-left: 4px solid var(--thinking-accent) !important; border-radius: var(--radius-md) !important; font-family: 'JetBrains Mono', 'Fira Code', monospace !important; transition: all 0.3s ease !important; } .thinking-box:focus-within { box-shadow: 0 0 0 3px rgba(139, 92, 246, 0.1) !important; } .summary-box { background: var(--summary-bg) !important; border: 1px solid var(--summary-border) !important; border-radius: var(--radius-md) !important; padding: 1.5rem !important; font-size: 1.1rem !important; line-height: 1.7 !important; color: #0f172a !important; box-shadow: var(--shadow-sm); } .completion-info { background: linear-gradient(135deg, #f8fafc 0%, #f1f5f9 100%) !important; border: 1px solid #cbd5e1 !important; border-left: 4px solid #10b981 !important; border-radius: var(--radius-md) !important; padding: 1.2rem !important; font-size: 0.95rem !important; line-height: 1.6 !important; color: #334155 !important; box-shadow: 0 2px 8px rgba(0, 0, 0, 0.08); } .completion-info h3 { color: #10b981 !important; font-size: 1.1rem !important; margin-bottom: 0.5rem !important; } .completion-info strong { color: #0f172a !important; } /* ===== RESPONSIVE ADJUSTMENTS ===== */ @media (max-width: 1024px) { .gradio-container { padding: 1rem !important; } .submit-btn { position: sticky; bottom: 1rem; z-index: 100; } } @media (max-width: 768px) { .app-header { padding: 1.5rem 1rem; } .app-header h1 { font-size: 1.8rem; } } /* ===== MODE VISUAL INDICATORS ===== */ /* Style for visible mode groups to indicate they are active */ .gradio-group:not([style*="display: none"]) { position: relative; } /* Add subtle highlight border to active mode group */ .gradio-group:not([style*="display: none"]) > .form { border-left: 3px solid var(--primary-color); padding-left: 12px; background: linear-gradient(90deg, rgba(99, 102, 241, 0.03) 0%, transparent 100%); } """ # Create Gradio interface def create_interface(): """Create and configure the Gradio interface.""" with gr.Blocks( title="Tiny Scribe - AI Transcript Summarizer" ) as demo: # Header section (simplified - no Row/Column wrapper needed for full-width) gr.HTML("""

๐Ÿ“„ Tiny Scribe

AI-Powered Transcript Summarization with Real-Time Streaming

Select a model below to get started
""") # Instructions (simplified) gr.HTML("""
๐Ÿ“‹ How to use:
  • Upload a .txt file containing your transcript, notes, or document
  • Click "Generate Summary" to start AI processing
  • Watch the Thinking Process (left) - see how the AI reasons
  • Read the Final Summary (right) - the polished result
  • Both outputs stream in real-time as the AI generates content
""") # Main content area with gr.Row(): # Left column - Configuration with gr.Column(scale=1): # ========================================== # Section 1: Output Configuration # ========================================== with gr.Group(): gr.HTML('
๐ŸŒ Output Settings
') language_selector = gr.Dropdown( choices=[("English", "en"), ("Traditional Chinese (zh-TW)", "zh-TW")], value="en", label="Output Language", info="Target language for the summary" ) with gr.Group(): gr.HTML('
๐Ÿ“ฅ Input Content
') with gr.Tabs() as input_tabs: with gr.TabItem("๐Ÿ“„ Upload File", id=0): file_input = gr.File( label="Transcript (.txt)", file_types=[".txt"], type="filepath", elem_classes=["file-upload-area"] ) with gr.TabItem("โœ๏ธ Paste Text", id=1): text_input = gr.Textbox( label="Paste Transcript", placeholder="Paste your transcript content here...", lines=10, max_lines=20 ) # ========================================== # Section 2: Hardware Configuration (Global) # ========================================== with gr.Group(): gr.HTML('
๐Ÿ–ฅ๏ธ Hardware Configuration
') thread_config_dropdown = gr.Dropdown( choices=[ ("HF Spaces Free Tier (2 vCPUs)", "free"), ("HF Spaces CPU Upgrade (8 vCPUs)", "upgrade"), ("Custom (manual)", "custom"), ], value=DEFAULT_THREAD_PRESET, label="CPU Thread Preset", info="Select hardware tier or specify custom thread count" ) custom_threads_slider = gr.Slider( minimum=1, maximum=32, value=DEFAULT_CUSTOM_THREADS if DEFAULT_CUSTOM_THREADS > 0 else 4, step=1, label="Custom Thread Count", info="Number of CPU threads for model inference (1-32)", visible=DEFAULT_THREAD_PRESET == "custom" ) # ========================================== # Section 3: Mode Selection (Standard vs Advanced) # ========================================== mode_radio = gr.Radio( choices=["Standard Mode", "Advanced Mode (3-Model Pipeline)"], value="Standard Mode", label="๐ŸŽฏ Summarization Mode", info="Select between single-model Standard or multi-model Advanced mode" ) # ===== STANDARD MODE ===== with gr.Group(visible=True) as standard_mode_group: gr.HTML('
๐Ÿ“Š Standard Mode - Single-model direct summarization
') # Model source selector model_source_radio = gr.Radio( choices=["Preset Models", "Custom GGUF"], value="Preset Models", label="Model Source", info="Choose between curated presets or custom HuggingFace models" ) # Preset Models Group with gr.Group(visible=True) as preset_models_group: # Filter out custom_hf from preset choices preset_choices = [ (info["name"] + (" โšก" if info.get("supports_reasoning", False) and not info.get("supports_toggle", False) else ""), key) for key, info in AVAILABLE_MODELS.items() if key != "custom_hf" ] model_dropdown = gr.Dropdown( choices=preset_choices, value=DEFAULT_MODEL_KEY, label="Select Model", info="Smaller = faster. โšก = Always-reasoning models." ) enable_reasoning = gr.Checkbox( value=True, label="Enable Reasoning Mode", info="Uses /think for deeper analysis (slower) or /no_think for direct output (faster).", interactive=True, visible=AVAILABLE_MODELS[DEFAULT_MODEL_KEY].get("supports_toggle", False) ) # Custom GGUF Group with gr.Group(visible=False) as custom_gguf_group: gr.HTML('
Load any GGUF model from HuggingFace Hub
') # HF Hub Search Component model_search_input = HuggingfaceHubSearch( label="๐Ÿ” Search HuggingFace Models", placeholder="Type model name (e.g., 'qwen', 'phi', 'llama')", search_type="model", ) # File dropdown (populated after repo discovery) custom_file_dropdown = gr.Dropdown( label="๐Ÿ“ฆ Select GGUF File", choices=[], value=None, info="GGUF files appear after selecting a model above", interactive=True, ) # Load button load_btn = gr.Button("โฌ‡๏ธ Load Selected Model", variant="primary", size="sm") # Status message custom_status = gr.Textbox( label="Status", interactive=False, value="", visible=False, ) retry_btn = gr.Button("๐Ÿ”„ Retry", variant="secondary", visible=False) # Inference Parameters (Standard Mode) gr.HTML('
๐ŸŽ›๏ธ Inference Parameters
') temperature_slider = gr.Slider( minimum=0.0, maximum=2.0, value=0.6, step=0.1, label="Temperature", info="Lower = more focused, Higher = more creative" ) max_tokens = gr.Slider( minimum=256, maximum=4096, value=2048, step=256, label="Max Output Tokens", info="Higher = more detailed summary" ) top_p = gr.Slider( minimum=0.0, maximum=1.0, value=0.95, step=0.05, label="Top P (Nucleus Sampling)", info="Lower = more focused, Higher = more diverse" ) top_k = gr.Slider( minimum=0, maximum=100, value=20, step=5, label="Top K", info="Limits token selection to top K tokens (0 = disabled)" ) # ===== ADVANCED MODE ===== with gr.Group(visible=False) as advanced_mode_group: gr.HTML('
๐Ÿง  Advanced Mode (3-Model Pipeline) - Extraction โ†’ Deduplication โ†’ Synthesis
') # ========== STAGE 1: EXTRACTION ========== gr.HTML('
๐Ÿ” Stage 1: Extraction
') extraction_model = gr.Dropdown( choices=[(EXTRACTION_MODELS[k]["name"], k) for k in EXTRACTION_MODELS.keys()], value=DEFAULT_EXTRACTION_MODEL, label="Extraction Model (โ‰ค1.7B)", info="Extracts structured items from transcript windows" ) with gr.Row(): extraction_n_ctx = gr.Slider( minimum=2048, maximum=8192, step=1024, value=4096, label="Context Window (n_ctx)", info="Smaller = more windows, Larger = fewer windows" ) overlap_turns = gr.Slider( minimum=1, maximum=5, step=1, value=2, label="Window Overlap (turns)", info="Speaker turns shared between consecutive windows" ) enable_extraction_reasoning = gr.Checkbox( value=False, visible=False, label="Enable Reasoning Mode", info="Thinking before JSON extraction (Qwen3 hybrid models only)" ) # ========== STAGE 2: DEDUPLICATION ========== gr.HTML('
๐Ÿงฌ Stage 2: Deduplication
') embedding_model = gr.Dropdown( choices=[("granite-107m", "granite-107m")], value="granite-107m", label="Embedding Model", info="Computes semantic similarity for duplicate detection (Granite-107M optimal)" ) similarity_threshold = gr.Slider( minimum=0.70, maximum=0.95, step=0.01, value=0.85, label="Similarity Threshold", info="Higher = stricter duplicate detection (items with similarity above this are merged)" ) # ========== STAGE 3: SYNTHESIS ========== gr.HTML('
โœจ Stage 3: Synthesis
') synthesis_model = gr.Dropdown( choices=[(SYNTHESIS_MODELS[k]["name"], k) for k in SYNTHESIS_MODELS.keys()], value=DEFAULT_SYNTHESIS_MODEL, label="Synthesis Model (1B-30B)", info="Generates executive summary from deduplicated items" ) enable_synthesis_reasoning = gr.Checkbox( value=True, visible=True, label="Enable Reasoning Mode", info="Uses thinking process for higher quality synthesis" ) adv_max_tokens = gr.Slider( minimum=512, maximum=4096, step=128, value=2048, label="Max Output Tokens", info="Maximum tokens for synthesis output" ) gr.HTML('
Inference Parameters
') with gr.Row(): adv_temperature_slider = gr.Slider( minimum=0.0, maximum=2.0, value=0.6, step=0.1, label="Temperature", info="Lower = focused, Higher = creative" ) adv_top_p = gr.Slider( minimum=0.0, maximum=1.0, value=0.95, step=0.05, label="Top P", info="Nucleus sampling threshold" ) adv_top_k = gr.Slider( minimum=0, maximum=100, value=20, step=5, label="Top K", info="Token selection limit" ) # ========== PIPELINE SETTINGS ========== gr.HTML('
โš™๏ธ Pipeline Settings
') enable_detailed_logging = gr.Checkbox( value=True, label="Enable Detailed Trace Logging", info="Save JSONL trace for debugging (embedded in download JSON)" ) # ========================================== # Debug Tools (optional) # ========================================== with gr.Accordion("๐Ÿ› Debug Tools", open=False): system_prompt_debug = gr.Textbox( label="System Prompt (Read-Only)", lines=5, max_lines=10, interactive=False, value="Select a model and click 'Generate Summary' to see the system prompt.", info="This shows the exact system prompt sent to the LLM" ) # ========================================== # Submit Button # ========================================== submit_btn = gr.Button( "โœจ Generate Summary", variant="primary", elem_classes=["submit-btn"] ) # ========================================== # State Components (invisible, outside visual groups) # ========================================== metrics_state = gr.State(value={}) custom_model_state = gr.State(value=None) custom_model_metadata = gr.State(value={ "repo_id": None, "filename": None, "size_mb": 0, }) custom_repo_files = gr.State([]) # Right column - Outputs with gr.Column(scale=2): # Model Information (shows selected model specs) with gr.Group(): gr.HTML('
๐Ÿ“Š Model Information
') _default_threads = DEFAULT_CUSTOM_THREADS if DEFAULT_CUSTOM_THREADS > 0 else 2 _default_info = get_model_info(DEFAULT_MODEL_KEY, n_threads=_default_threads)[0] model_info_output = gr.Markdown( value=_default_info, elem_classes=["info-box"] ) # Thinking Process with gr.Group(): gr.HTML('
๐Ÿง  Model Thinking Process
') thinking_output = gr.Textbox( label="", lines=12, max_lines=20, show_label=False, placeholder="The AI's reasoning process will appear here in real-time...", elem_classes=["thinking-box"] ) # Copy Thinking button - now in the correct group copy_thinking_btn = gr.Button("๐Ÿ“‹ Copy Thinking", size="sm") # Summary Output with gr.Group(): gr.HTML('
๐Ÿ“ Final Summary
') summary_output = gr.Markdown( value="*Your summarized content will appear here...*", elem_classes=["summary-box"] ) # Action buttons for summary with gr.Row(): copy_summary_btn = gr.Button("๐Ÿ“‹ Copy Summary", size="sm") download_btn = gr.Button("โฌ‡๏ธ Download (JSON)", size="sm") # File output component for download (hidden until generated) download_output = gr.File(label="Download JSON", visible=False) # Completion Metrics (separate section) with gr.Group(): gr.HTML('
๐Ÿ“Š Generation Metrics
') info_output = gr.Markdown( value="*Metrics will appear here after generation...*", elem_classes=["completion-info"] ) # Function to update settings when model changes def update_settings_on_model_change(model_key, thread_config, custom_threads, custom_metadata=None): """Update inference settings when model selection changes.""" # Calculate n_threads based on preset thread_preset_map = { "free": 2, "upgrade": 8, "custom": custom_threads if custom_threads > 0 else 4, } n_threads = thread_preset_map.get(thread_config, 2) info_text, temp_str, top_p_val, top_k_val = get_model_info(model_key, n_threads=n_threads, custom_metadata=custom_metadata) temperature = float(temp_str) if temp_str else 0.6 return temperature, top_p_val, top_k_val # Event handlers # Note: submit_btn.click is registered below (after custom model loader section) # with the full set of inputs including custom_model_state # Update settings when model changes model_dropdown.change( fn=update_settings_on_model_change, inputs=[model_dropdown, thread_config_dropdown, custom_threads_slider, custom_model_metadata], outputs=[temperature_slider, top_p, top_k] ) # Update reasoning checkbox when model changes model_dropdown.change( fn=update_reasoning_visibility, inputs=[model_dropdown], outputs=[enable_reasoning] ) # Show/hide custom thread slider based on selection def toggle_custom_threads(thread_config): return gr.update(visible=(thread_config == "custom")) thread_config_dropdown.change( fn=toggle_custom_threads, inputs=[thread_config_dropdown], outputs=[custom_threads_slider] ) # Toggle mode visibility based on radio selection def toggle_mode_visibility(mode_selection): is_standard = (mode_selection == "Standard Mode") return gr.update(visible=is_standard), gr.update(visible=not is_standard) mode_radio.change( fn=toggle_mode_visibility, inputs=[mode_radio], outputs=[standard_mode_group, advanced_mode_group] ) # Toggle model source visibility (Preset vs Custom GGUF) def toggle_model_source(model_source): is_preset = (model_source == "Preset Models") return gr.update(visible=is_preset), gr.update(visible=not is_preset) model_source_radio.change( fn=toggle_model_source, inputs=[model_source_radio], outputs=[preset_models_group, custom_gguf_group] ) # Update Model Information panel based on selected models def update_model_info_standard(model_key, custom_metadata): """Show info for selected Standard mode model.""" info_text, _, _, _ = get_model_info(model_key, n_threads=2, custom_metadata=custom_metadata) return info_text def update_model_info_advanced(extraction_key, embedding_key, synthesis_key): """Show info for all 3 Advanced mode models.""" ext_info = get_extraction_model_info(extraction_key) emb_info = get_embedding_model_info(embedding_key) syn_info = get_synthesis_model_info(synthesis_key) combined_info = f"""### Extraction Model {ext_info} ### Embedding Model {emb_info} ### Synthesis Model {syn_info}""" return combined_info # Update model info when Standard mode model changes model_dropdown.change( fn=update_model_info_standard, inputs=[model_dropdown, custom_model_metadata], outputs=[model_info_output] ) # Update model info when Advanced mode models change extraction_model.change( fn=update_model_info_advanced, inputs=[extraction_model, embedding_model, synthesis_model], outputs=[model_info_output] ) embedding_model.change( fn=update_model_info_advanced, inputs=[extraction_model, embedding_model, synthesis_model], outputs=[model_info_output] ) synthesis_model.change( fn=update_model_info_advanced, inputs=[extraction_model, embedding_model, synthesis_model], outputs=[model_info_output] ) # Update model info when mode changes mode_radio.change( fn=lambda mode, std_model, std_metadata, ext_model, emb_model, syn_model: ( update_model_info_standard(std_model, std_metadata) if mode == "Standard Mode" else update_model_info_advanced(ext_model, emb_model, syn_model) ), inputs=[mode_radio, model_dropdown, custom_model_metadata, extraction_model, embedding_model, synthesis_model], outputs=[model_info_output] ) # Copy buttons copy_summary_btn.click( fn=lambda x: x, inputs=[summary_output], outputs=[], js="(text) => { navigator.clipboard.writeText(text); return text; }" ) copy_thinking_btn.click( fn=lambda x: x, inputs=[thinking_output], outputs=[], js="(text) => { navigator.clipboard.writeText(text); return text; }" ) # Download button download_btn.click( fn=download_summary_json, inputs=[summary_output, thinking_output, model_dropdown, language_selector, metrics_state], outputs=[download_output] ) # ========================================== # NEW: Custom Model Loader Event Handlers # ========================================== # Note: toggle_custom_model_ui removed - now using Tabs instead of hidden Group # Update system prompt debug when model or reasoning changes def update_system_prompt_debug(model_key, enable_reasoning, language): """Update the system prompt debug display.""" if not model_key: return "Select a model to see the system prompt." model = AVAILABLE_MODELS.get(model_key, {}) supports_toggle = model.get("supports_toggle", False) prompt = build_system_prompt(language, supports_toggle, enable_reasoning) return prompt model_dropdown.change( fn=update_system_prompt_debug, inputs=[model_dropdown, enable_reasoning, language_selector], outputs=[system_prompt_debug], ) enable_reasoning.change( fn=update_system_prompt_debug, inputs=[model_dropdown, enable_reasoning, language_selector], outputs=[system_prompt_debug], ) language_selector.change( fn=update_system_prompt_debug, inputs=[model_dropdown, enable_reasoning, language_selector], outputs=[system_prompt_debug], ) # ===== ADVANCED MODE EVENT HANDLERS ===== # Update extraction reasoning checkbox visibility when extraction model changes def update_extraction_reasoning_visibility(model_key): """Show/hide extraction reasoning checkbox based on model capabilities.""" if model_key not in EXTRACTION_MODELS: return gr.update(visible=False, value=False) config = EXTRACTION_MODELS[model_key] supports_toggle = config.get("supports_toggle", False) if supports_toggle: # Hybrid model โ€” default reasoning ON for better extraction quality return gr.update(visible=True, value=True, interactive=True, label="๐Ÿง  Enable Reasoning for Extraction") elif config.get("supports_reasoning", False): # Thinking-only model (none currently in extraction) return gr.update(visible=True, value=True, interactive=False, label="๐Ÿง  Reasoning Mode (Always On)") else: # Non-reasoning model return gr.update(visible=False, value=False) # Update synthesis reasoning checkbox visibility when synthesis model changes def update_synthesis_reasoning_visibility(model_key): """Show/hide synthesis reasoning checkbox based on model capabilities.""" if model_key not in SYNTHESIS_MODELS: return gr.update(visible=False, value=False) config = SYNTHESIS_MODELS[model_key] supports_reasoning = config.get("supports_reasoning", False) supports_toggle = config.get("supports_toggle", False) if not supports_reasoning: # Non-reasoning model return gr.update(visible=False, value=False) elif supports_reasoning and not supports_toggle: # Thinking-only model return gr.update(visible=True, value=True, interactive=False, label="โšก Reasoning Mode (Always On)") else: # Hybrid model return gr.update(visible=True, value=True, interactive=True, label="๐Ÿง  Enable Reasoning for Synthesis") # Wire up Advanced Mode event handlers extraction_model.change( fn=update_extraction_reasoning_visibility, inputs=[extraction_model], outputs=[enable_extraction_reasoning] ) synthesis_model.change( fn=update_synthesis_reasoning_visibility, inputs=[synthesis_model], outputs=[enable_synthesis_reasoning] ) # Debounced auto-discovery for custom repo ID (500ms delay) import time as time_module def discover_custom_files(repo_id): """Discover GGUF files in the custom repo.""" if not repo_id or "/" not in repo_id: return ( gr.update(choices=[], value=None, interactive=True), [], gr.update(visible=True, value="Enter a valid HuggingFace Repo ID above (e.g., unsloth/DeepSeek-R1-Distill-Qwen-7B-GGUF)") ) # Show searching status yield ( gr.update(choices=["Searching..."], value=None, interactive=False), [], gr.update(visible=True, value="๐Ÿ” Searching for GGUF files...") ) # Small delay to simulate search time_module.sleep(0.5) files, error = list_repo_gguf_files(repo_id) if error: # Error - show empty dropdown with error message yield ( gr.update(choices=[], value=None, interactive=True), [], gr.update(visible=True, value=f"โŒ {error}") ) elif not files: # No files found yield ( gr.update(choices=[], value=None, interactive=True), [], gr.update(visible=True, value="โŒ No GGUF files found in this repository") ) else: # Success - format choices choices = [format_file_choice(f) for f in files] yield ( gr.update(choices=choices, value=choices[0] if choices else None, interactive=True), files, gr.update(visible=True, value="โœ… Files discovered! Select one and click 'Load Selected Model'") ) # ========================================== # NEW: Auto-Discovery Flow with HuggingfaceHubSearch # ========================================== def on_model_selected(repo_id): """Handle model selection from HuggingfaceHubSearch. Automatically discovers GGUF files in the selected repo. """ if not repo_id: return ( gr.update(choices=[], value=None), [], gr.update(visible=False), ) # Show searching status yield ( gr.update(choices=["๐Ÿ” Searching for GGUF files..."], value=None, interactive=False), [], gr.update(visible=True, value=f"Discovering GGUF files in {repo_id}..."), ) # Discover files files, error = list_repo_gguf_files(repo_id) if error: yield ( gr.update(choices=[], value=None, interactive=True), [], gr.update(visible=True, value=f"โŒ {error}"), ) elif not files: yield ( gr.update(choices=[], value=None, interactive=True), [], gr.update(visible=True, value=f"โŒ No GGUF files found in {repo_id}"), ) else: # Format and show files choices = [format_file_choice(f) for f in files] yield ( gr.update(choices=choices, value=choices[0] if choices else None, interactive=True), files, gr.update(visible=True, value=f"โœ… Found {len(files)} GGUF files! Select precision and click 'Load Model'"), ) # When user selects from search, auto-discover files model_search_input.change( fn=on_model_selected, inputs=[model_search_input], outputs=[custom_file_dropdown, custom_repo_files, custom_status], ) # Load selected custom model def load_custom_model_selected(repo_id, selected_file_display, files_data): """Load the selected custom model.""" if not repo_id or not selected_file_display: return "โŒ Please enter a Repo ID and select a file first", gr.update(visible=False), None, {} # Extract filename from the display string # Format: "๐Ÿ“„ filename | size | quant | params | downloads" filename = selected_file_display.split(" | ")[0].replace("๐Ÿ“„ ", "").strip() if not filename: return "โŒ Could not parse filename from selection", gr.update(visible=False), None, {} # Extract size from files_data size_mb = 0 for f in files_data: if f["name"] == filename: size_mb = f.get("size_mb", 0) break yield "โณ Loading model... (this may take a while for large files)", gr.update(visible=False), None, {} try: # Load the model n_threads = get_thread_count(thread_config_dropdown.value, custom_threads_slider.value) llm, load_msg = load_custom_model_from_hf(repo_id, filename, n_threads) if llm is None: # Load failed - show error and retry button yield f"โŒ {load_msg}", gr.update(visible=True), None, {} else: # Success - create metadata dict metadata = { "repo_id": repo_id, "filename": filename, "size_mb": size_mb, } size_info = f" ({size_mb:.1f} MB)" if size_mb else "" yield f"โœ… Model loaded successfully{size_info}! Ready to generate summaries.", gr.update(visible=False), llm, metadata except Exception as e: yield f"โŒ Error loading model: {str(e)}", gr.update(visible=True), None, {} load_btn.click( fn=load_custom_model_selected, inputs=[model_search_input, custom_file_dropdown, custom_repo_files], outputs=[custom_status, retry_btn, custom_model_state, custom_model_metadata], ).then( fn=lambda metadata, thread_config, custom_threads: get_model_info("custom_hf", n_threads=get_thread_count(thread_config, custom_threads), custom_metadata=metadata)[0], inputs=[custom_model_metadata, thread_config_dropdown, custom_threads_slider], outputs=[model_info_output], ) # Retry button - same as load retry_btn.click( fn=load_custom_model_selected, inputs=[model_search_input, custom_file_dropdown, custom_repo_files], outputs=[custom_status, retry_btn, custom_model_state, custom_model_metadata], ).then( fn=lambda metadata, thread_config, custom_threads: get_model_info("custom_hf", n_threads=get_thread_count(thread_config, custom_threads), custom_metadata=metadata)[0], inputs=[custom_model_metadata, thread_config_dropdown, custom_threads_slider], outputs=[model_info_output], ) # ===== SUBMIT BUTTON ROUTER ===== # Routes to Standard or Advanced mode based on active tab def route_summarize( # Standard mode inputs file_input_val, text_input_val, model_dropdown_val, enable_reasoning_val, max_tokens_val, temperature_val, top_p_val, top_k_val, language_val, thread_config_val, custom_threads_val, custom_model_val, # Advanced mode inputs extraction_model_val, embedding_model_val, synthesis_model_val, extraction_n_ctx_val, overlap_turns_val, similarity_threshold_val, enable_extraction_reasoning_val, enable_synthesis_reasoning_val, adv_max_tokens_val, enable_logging_val, adv_temperature_val, adv_top_p_val, adv_top_k_val, # Mode selector mode_radio_val ): """Route to Standard or Advanced mode based on selected mode radio button.""" # Determine active mode based on radio button value is_advanced_mode = (mode_radio_val == "Advanced Mode (3-Model Pipeline)") if is_advanced_mode: # Advanced Mode: Use summarize_advanced() # Get n_threads from global hardware settings (same for all modes) thread_map = {"free": 2, "upgrade": 8, "custom": max(1, custom_threads_val)} n_threads = thread_map.get(thread_config_val, 2) # Get transcript transcript = "" if file_input_val: with open(file_input_val, 'r', encoding='utf-8') as f: transcript = f.read() elif text_input_val: transcript = text_input_val else: yield ("", "โš ๏ธ Please upload a file or paste text", "", {}, "") return # Stream Advanced Mode pipeline for update in summarize_advanced( transcript=transcript, extraction_model_key=extraction_model_val, embedding_model_key=embedding_model_val, synthesis_model_key=synthesis_model_val, extraction_n_ctx=extraction_n_ctx_val, overlap_turns=overlap_turns_val, similarity_threshold=similarity_threshold_val, enable_extraction_reasoning=enable_extraction_reasoning_val, enable_synthesis_reasoning=enable_synthesis_reasoning_val, output_language=language_val, max_tokens=adv_max_tokens_val, enable_logging=enable_logging_val, n_threads=n_threads, temperature=adv_temperature_val, top_p=adv_top_p_val, top_k=adv_top_k_val ): stage = update.get("stage", "") if stage == "extraction": ticker = update.get("ticker", "") thinking = update.get("thinking", "") # Show progress ticker in thinking output, not summary combined_thinking = f"{thinking}\n\n{ticker}" if thinking else ticker yield (combined_thinking, "", "", {}, "") elif stage == "deduplication": ticker = update.get("ticker", "") # Show deduplication progress in thinking output yield (ticker, "", "", {}, "") elif stage == "synthesis": thinking = update.get("thinking", "") summary = update.get("summary", "") yield (thinking, summary, "", {}, "") elif stage == "complete": thinking = update.get("thinking", "") summary = update.get("summary", "") trace_stats = update.get("trace_stats", {}) # Format info message info_msg = f"""**Advanced Mode Complete** - Total Windows: {trace_stats.get('total_windows', 0)} - Items Extracted: {trace_stats.get('total_items_extracted', 0)} - Items After Dedup: {trace_stats.get('total_items_after_dedup', 0)} - Duplicates Removed: {trace_stats.get('total_duplicates_removed', 0)} - Total Time: {trace_stats.get('total_elapsed_seconds', 0):.1f}s""" # Store trace and debug info for download metrics = { "mode": "advanced", "trace_stats": trace_stats, "trace_json": update.get("trace_json", []), "debug_json": update.get("debug_json", {}) } yield (thinking, summary, info_msg, metrics, "Advanced Mode (3-Model Pipeline)") elif stage == "error": error = update.get("error", "Unknown error") yield ("", f"โŒ Error: {error}", "", {}, "") return else: # Standard Mode: Use existing summarize_streaming() for thinking, summary, info, metrics, system_prompt in summarize_streaming( file_input_val, text_input_val, model_dropdown_val, enable_reasoning_val, max_tokens_val, temperature_val, top_p_val, top_k_val, language_val, thread_config_val, custom_threads_val, custom_model_val ): yield (thinking, summary, info, metrics, system_prompt) # Wire up submit button with router submit_btn.click( fn=route_summarize, inputs=[ # Standard mode inputs file_input, text_input, model_dropdown, enable_reasoning, max_tokens, temperature_slider, top_p, top_k, language_selector, thread_config_dropdown, custom_threads_slider, custom_model_state, # Advanced mode inputs extraction_model, embedding_model, synthesis_model, extraction_n_ctx, overlap_turns, similarity_threshold, enable_extraction_reasoning, enable_synthesis_reasoning, adv_max_tokens, enable_detailed_logging, adv_temperature_slider, adv_top_p, adv_top_k, # Mode selector mode_radio ], outputs=[thinking_output, summary_output, info_output, metrics_state, system_prompt_debug], show_progress="full" ) # Footer gr.HTML(""" """) return demo # Main entry point if __name__ == "__main__": # No pre-load - model loads on first request to avoid HF Spaces timeout logger.info("Starting Tiny Scribe (model loads on first request)") # Create and launch interface demo = create_interface() demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True )