Mimir / gradio_prompt_testing.py
jdesiree's picture
Upload 7 files
79845af verified
raw
history blame
57.8 kB
# gradio_pipeline_testing.py
"""
Full Pipeline Testing Interface for Mimir Educational AI Assistant
Tests the complete orchestration flow with comprehensive metrics at every step.
Captures conditional model activation, token usage, timing, and quality metrics.
Output: CSV file with ~110 columns capturing full pipeline journey
"""
import os
import sys
import io
import csv
import json
import time
import logging
import warnings
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any
from collections import Counter
# Core dependencies
import torch
import gradio as gr
import numpy as np
# ============================================================================
# ENVIRONMENT SETUP
# ============================================================================
HF_CACHE = "/tmp/huggingface"
os.makedirs(f"{HF_CACHE}/hub", exist_ok=True)
os.environ['HF_HOME'] = HF_CACHE
os.environ['HF_HUB_CACHE'] = f"{HF_CACHE}/hub"
# ============================================================================
# IMPORTS FROM MIMIR APPLICATION
# ============================================================================
try:
from agents import (
ToolDecisionAgent,
PromptRoutingAgents,
ThinkingAgents,
ResponseAgent,
get_shared_qwen3
)
AGENTS_AVAILABLE = True
except ImportError as e:
print(f"⚠️ Warning: Could not import agents: {e}")
AGENTS_AVAILABLE = False
try:
from state_manager import GlobalStateManager, LogicalExpressions
STATE_MANAGER_AVAILABLE = True
except ImportError as e:
print(f"⚠️ Warning: Could not import state_manager: {e}")
STATE_MANAGER_AVAILABLE = False
try:
from prompt_library import (
CORE_IDENTITY,
TOOL_DECISION,
agent_1_system,
agent_2_system,
agent_3_system,
agent_4_system,
MATH_THINKING,
QUESTION_ANSWER_DESIGN,
REASONING_THINKING,
VAUGE_INPUT,
USER_UNDERSTANDING,
GENERAL_FORMATTING,
LATEX_FORMATTING,
GUIDING_TEACHING,
STRUCTURE_PRACTICE_QUESTIONS,
PRACTICE_QUESTION_FOLLOWUP,
TOOL_USE_ENHANCEMENT,
)
PROMPTS_AVAILABLE = True
except ImportError as e:
print(f"⚠️ Warning: Could not import prompt_library: {e}")
PROMPTS_AVAILABLE = False
# Try to import post processor
try:
# Import the post processor class/module from app.py
import importlib.util
spec = importlib.util.spec_from_file_location("app_module", "app.py")
app_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(app_module)
post_processor = app_module.post_processor
POST_PROCESSOR_AVAILABLE = True
except Exception as e:
print(f"⚠️ Warning: Could not import post_processor: {e}")
POST_PROCESSOR_AVAILABLE = False
# Create dummy
class DummyPostProcessor:
def process_response(self, response, user_message):
return response
post_processor = DummyPostProcessor()
# ZeroGPU support
try:
import spaces
ZERO_GPU_AVAILABLE = True
except ImportError:
ZERO_GPU_AVAILABLE = False
class DummySpaces:
@staticmethod
def GPU(duration=600):
def decorator(func):
return func
return decorator
spaces = DummySpaces()
# Tiktoken for accurate token counting
try:
import tiktoken
TIKTOKEN_AVAILABLE = True
except ImportError:
TIKTOKEN_AVAILABLE = False
print("⚠️ Warning: tiktoken not available - using fallback token counting")
# Textstat for readability metrics
try:
import textstat
TEXTSTAT_AVAILABLE = True
except ImportError:
TEXTSTAT_AVAILABLE = False
print("⚠️ Warning: textstat not available - using manual readability calculations")
# ============================================================================
# LOGGING SETUP
# ============================================================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
CURRENT_YEAR = datetime.now().year
# ============================================================================
# GLOBAL INSTANCES
# ============================================================================
if AGENTS_AVAILABLE and STATE_MANAGER_AVAILABLE:
try:
global_state_manager = GlobalStateManager()
logical_expressions = LogicalExpressions()
tool_agent = ToolDecisionAgent()
routing_agents = PromptRoutingAgents()
thinking_agents = ThinkingAgents()
response_agent = ResponseAgent()
logger.info("βœ“ All agents initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize agents: {e}")
raise
else:
logger.error("Cannot initialize - missing core dependencies")
raise ImportError("Missing required modules: agents or state_manager")
# ============================================================================
# CSV SCHEMA DEFINITION
# ============================================================================
CSV_COLUMNS = [
# Identification & Input
"prompt_index",
"timestamp",
"user_prompt",
"user_prompt_tokens",
"user_prompt_chars",
"user_prompt_words",
# Conversation Context
"conversation_history_length",
"conversation_history_tokens",
# Tool Decision Agent
"tool_decision_input_template",
"tool_decision_input_tokens",
"tool_decision_output",
"tool_decision_output_tokens",
"tool_decision_result",
"tool_decision_time_seconds",
"tool_decision_gpu_peak_mb",
# Regex Checks
"regex_checks_applied",
"regex_checks_time_seconds",
# Routing Agent 1
"agent1_input_template",
"agent1_input_tokens",
"agent1_output",
"agent1_output_tokens",
"agent1_decision",
"agent1_time_seconds",
"agent1_gpu_peak_mb",
# Routing Agent 2
"agent2_input_template",
"agent2_input_tokens",
"agent2_output",
"agent2_output_tokens",
"agent2_decision",
"agent2_time_seconds",
"agent2_gpu_peak_mb",
# Routing Agent 3
"agent3_input_template",
"agent3_input_tokens",
"agent3_output",
"agent3_output_tokens",
"agent3_decision",
"agent3_time_seconds",
"agent3_gpu_peak_mb",
# Routing Agent 4
"agent4_input_template",
"agent4_input_tokens",
"agent4_output",
"agent4_output_tokens",
"agent4_decisions",
"agent4_time_seconds",
"agent4_gpu_peak_mb",
# Math Thinking
"math_thinking_activated",
"math_thinking_input_template",
"math_thinking_input_tokens",
"math_thinking_output",
"math_thinking_output_tokens",
"math_thinking_time_seconds",
"math_thinking_gpu_peak_mb",
# QA Design Thinking
"qa_design_activated",
"qa_design_input_template",
"qa_design_input_tokens",
"qa_design_output",
"qa_design_output_tokens",
"qa_design_time_seconds",
"qa_design_gpu_peak_mb",
# Reasoning Thinking
"reasoning_activated",
"reasoning_input_template",
"reasoning_input_tokens",
"reasoning_output",
"reasoning_output_tokens",
"reasoning_time_seconds",
"reasoning_gpu_peak_mb",
# Prompt Assembly
"active_response_prompts",
"final_prompt_template",
"final_prompt_tokens",
"final_prompt_chars",
"final_prompt_words",
"assembly_time_seconds",
# Response Generation
"response_input_template",
"response_input_tokens",
"response_raw",
"response_raw_tokens",
"response_raw_chars",
"response_raw_words",
"response_generation_time_seconds",
"response_gpu_peak_mb",
"response_tokens_per_second",
# Post-processing
"response_processed",
"response_processed_tokens",
"response_processed_chars",
"response_processed_words",
"postprocessing_time_seconds",
# Quality Metrics
"flesch_reading_ease",
"flesch_kincaid_grade",
"completeness_score",
"specificity_score",
"repetition_ratio",
"unique_word_ratio",
"avg_sentence_length",
"question_answered",
# Overall Metrics
"total_pipeline_time_seconds",
"total_input_tokens",
"total_output_tokens",
"total_gpu_peak_mb",
"models_activated_count",
"models_activated_list",
]
# ============================================================================
# TOKEN COUNTING FUNCTIONS
# ============================================================================
def count_tokens_accurate(text: str) -> int:
"""
Count tokens using tiktoken library for accurate estimation.
Args:
text: Input text to tokenize
Returns:
Accurate token count
"""
if not text:
return 0
if not TIKTOKEN_AVAILABLE:
# Fallback: word count approximation
return len(text.split())
try:
# Use cl100k_base encoding (used by GPT-3.5/4, good general estimator)
encoding = tiktoken.get_encoding("cl100k_base")
tokens = encoding.encode(text)
return len(tokens)
except Exception as e:
logger.warning(f"tiktoken encoding failed: {e}, using fallback")
return len(text.split())
def count_words(text: str) -> int:
"""Count words in text"""
if not text:
return 0
return len(text.split())
def count_sentences(text: str) -> int:
"""Count sentences in text (simple heuristic)"""
if not text:
return 0
import re
sentences = re.split(r'[.!?]+', text)
return len([s for s in sentences if s.strip()])
# ============================================================================
# GPU MEMORY TRACKING
# ============================================================================
def get_gpu_memory() -> Dict[str, float]:
"""
Get current GPU memory statistics.
Returns:
Dictionary with allocated, reserved, and peak memory in MB
"""
if torch.cuda.is_available():
return {
"allocated_mb": torch.cuda.memory_allocated() / 1024**2,
"reserved_mb": torch.cuda.memory_reserved() / 1024**2,
"peak_mb": torch.cuda.max_memory_allocated() / 1024**2
}
return {
"allocated_mb": 0.0,
"reserved_mb": 0.0,
"peak_mb": 0.0
}
def reset_gpu_stats():
"""Reset GPU memory statistics"""
if torch.cuda.is_available():
torch.cuda.reset_peak_memory_stats()
torch.cuda.synchronize()
# ============================================================================
# TEMPLATE BUILDING FUNCTIONS
# ============================================================================
def format_history(history: List[Dict]) -> str:
"""Format conversation history for templates"""
if not history:
return "No previous conversation"
formatted = []
for msg in history[-8:]: # Last 8 messages
role = msg.get('role', 'unknown')
content = msg.get('content', '')[:100] # Truncate
formatted.append(f"{role}: {content}")
return "\n".join(formatted)
def build_tool_decision_template(user_prompt: str) -> str:
"""Build template for tool decision agent"""
return f"<s>[INST] {TOOL_DECISION}\n\nUser Query: {user_prompt} [/INST]"
def build_agent1_template(user_prompt: str, history: List) -> str:
"""Build template for Agent 1: Practice Questions"""
history_str = format_history(history)
return f"<s>[INST] {agent_1_system}\n\nConversation History:\n{history_str}\n\nCurrent User Query: {user_prompt} [/INST]"
def build_agent2_template(user_prompt: str) -> str:
"""Build template for Agent 2: Discovery Mode"""
return f"<s>[INST] {agent_2_system}\n\nUser Query: {user_prompt} [/INST]"
def build_agent3_template(user_prompt: str, history: List) -> str:
"""Build template for Agent 3: Followup Assessment"""
history_str = format_history(history)
return f"<s>[INST] {agent_3_system}\n\nConversation History:\n{history_str}\n\nCurrent User Query: {user_prompt} [/INST]"
def build_agent4_template(user_prompt: str, history: List) -> str:
"""Build template for Agent 4: Teaching Mode"""
history_str = format_history(history)
return f"<s>[INST] {agent_4_system}\n\nConversation History:\n{history_str}\n\nCurrent User Query: {user_prompt} [/INST]"
def build_math_thinking_template(user_prompt: str) -> str:
"""Build template for Math Thinking"""
return f"<s>[INST] {MATH_THINKING}\n\nUser Query: {user_prompt} [/INST]"
def build_qa_design_template(user_prompt: str) -> str:
"""Build template for QA Design Thinking"""
return f"<s>[INST] {QUESTION_ANSWER_DESIGN}\n\nUser Query: {user_prompt} [/INST]"
def build_reasoning_template(user_prompt: str) -> str:
"""Build template for Reasoning Thinking"""
return f"<s>[INST] {REASONING_THINKING}\n\nUser Query: {user_prompt} [/INST]"
def build_final_prompt(
user_prompt: str,
active_prompts: List[str],
thinking_context: str,
recent_history_formatted: str,
tool_img_output: str = "",
tool_context: str = ""
) -> str:
"""
Build final prompt for ResponseAgent (Qwen3-Claude).
Matches actual orchestration logic from app.py
"""
# Build prompt segments
prompt_segments = [CORE_IDENTITY]
prompt_map = {
"VAUGE_INPUT": VAUGE_INPUT,
"USER_UNDERSTANDING": USER_UNDERSTANDING,
"GENERAL_FORMATTING": GENERAL_FORMATTING,
"LATEX_FORMATTING": LATEX_FORMATTING,
"GUIDING_TEACHING": GUIDING_TEACHING,
"STRUCTURE_PRACTICE_QUESTIONS": STRUCTURE_PRACTICE_QUESTIONS,
"PRACTICE_QUESTION_FOLLOWUP": PRACTICE_QUESTION_FOLLOWUP,
"TOOL_USE_ENHANCEMENT": TOOL_USE_ENHANCEMENT,
}
for prompt_name in active_prompts:
if prompt_name in prompt_map:
prompt_segments.append(prompt_map[prompt_name])
prompt_segments_text = "\n\n".join(prompt_segments)
knowledge_cutoff = f"""
The current year is {CURRENT_YEAR}. Your knowledge cutoff date is October 2023. If the user asks about recent events or dynamic facts, inform them you may not have the most up-to-date information and suggest referencing direct sources."""
complete_prompt = f"""
{prompt_segments_text}
If tools were used, context and output will be here. Ignore if empty:
Image output: {tool_img_output}
Image context: {tool_context}
Conversation history, if available:
{recent_history_formatted}
Consider any context available to you:
{thinking_context}
Here is the user's current query:
{user_prompt}
{knowledge_cutoff}
"""
return complete_prompt
# ============================================================================
# QUALITY METRICS FUNCTIONS
# ============================================================================
def estimate_syllables(text: str) -> int:
"""
Estimate syllable count (rough heuristic).
Counts vowel groups.
"""
import re
words = text.lower().split()
syllable_count = 0
for word in words:
# Remove non-letters
word = re.sub(r'[^a-z]', '', word)
if not word:
continue
# Count vowel groups
vowel_groups = len(re.findall(r'[aeiouy]+', word))
# Adjust for silent e
if word.endswith('e'):
vowel_groups -= 1
# Ensure at least 1 syllable per word
syllable_count += max(1, vowel_groups)
return syllable_count
def calculate_flesch_reading_ease(text: str) -> float:
"""
Calculate Flesch Reading Ease score.
Score 0-100: Higher = easier to read
90-100: Very easy (5th grade)
60-70: Standard (8th-9th grade)
0-30: Very difficult (college graduate)
Formula: 206.835 - 1.015(words/sentences) - 84.6(syllables/words)
"""
if not text or len(text.strip()) < 10:
return 0.0
if TEXTSTAT_AVAILABLE:
try:
return textstat.flesch_reading_ease(text)
except:
pass
# Manual calculation
words = count_words(text)
sentences = count_sentences(text)
if sentences == 0 or words == 0:
return 0.0
syllables = estimate_syllables(text)
if words == 0:
return 0.0
score = 206.835 - 1.015 * (words / sentences) - 84.6 * (syllables / words)
return max(0.0, min(100.0, score))
def calculate_flesch_kincaid_grade(text: str) -> float:
"""
Calculate Flesch-Kincaid Grade Level.
Returns US grade level needed to understand text.
Formula: 0.39(words/sentences) + 11.8(syllables/words) - 15.59
"""
if not text or len(text.strip()) < 10:
return 0.0
if TEXTSTAT_AVAILABLE:
try:
return textstat.flesch_kincaid_grade(text)
except:
pass
words = count_words(text)
sentences = count_sentences(text)
if sentences == 0 or words == 0:
return 0.0
syllables = estimate_syllables(text)
if words == 0:
return 0.0
grade = 0.39 * (words / sentences) + 11.8 * (syllables / words) - 15.59
return max(0.0, grade)
def calculate_completeness_score(response: str, user_prompt: str) -> float:
"""
Estimate if response addresses the prompt.
Uses keyword overlap and length heuristics.
Returns: Score 0-1 (1 = complete answer)
"""
if not response or not user_prompt:
return 0.0
import re
# Extract keywords from prompt
prompt_words = set(re.findall(r'\b\w+\b', user_prompt.lower()))
# Remove common stopwords
stopwords = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been',
'being', 'have', 'has', 'had', 'do', 'does', 'did', 'will',
'would', 'should', 'could', 'may', 'might', 'can', 'what',
'how', 'why', 'when', 'where', 'who', 'which', 'i', 'you',
'we', 'they', 'he', 'she', 'it', 'me', 'him', 'her', 'us', 'them'}
prompt_words -= stopwords
response_words = set(re.findall(r'\b\w+\b', response.lower()))
if not prompt_words:
return 0.5 # Neutral if no meaningful keywords
# Calculate keyword overlap
overlap = len(prompt_words & response_words) / len(prompt_words)
# Length factor
min_reasonable_length = 20
if len(response) < min_reasonable_length:
length_factor = len(response) / min_reasonable_length
else:
length_factor = 1.0
score = overlap * length_factor
return min(1.0, score)
def check_question_answered(response: str, user_prompt: str) -> bool:
"""
Boolean check: does response attempt to answer the question?
Heuristics:
- Response has minimum length
- Response doesn't start with refusal
- Response contains relevant keywords
"""
if not response or len(response) < 10:
return False
# Check for refusal patterns
refusal_patterns = [
"i don't know",
"i cannot",
"i can't",
"i'm not sure",
"i don't have",
"unable to",
"sorry, i"
]
response_lower = response.lower()
for pattern in refusal_patterns:
if response_lower.startswith(pattern):
return False
# Check for minimum completeness
completeness = calculate_completeness_score(response, user_prompt)
return completeness > 0.3
def calculate_specificity_score(response: str) -> float:
"""
Measure how specific vs vague the response is.
Indicators of specificity:
- Numbers, dates, names
- Technical terms
- Examples
- Concrete nouns
Returns: Score 0-1 (1 = very specific)
"""
if not response:
return 0.0
import re
specificity_indicators = 0
total_possible = 5
# 1. Contains numbers
if re.search(r'\d+', response):
specificity_indicators += 1
# 2. Contains proper nouns
proper_nouns = len(re.findall(r'(?<!\. )\b[A-Z][a-z]+', response))
if proper_nouns > 0:
specificity_indicators += 1
# 3. Contains example phrases
example_phrases = ['for example', 'such as', 'for instance', 'like', 'including']
if any(phrase in response.lower() for phrase in example_phrases):
specificity_indicators += 1
# 4. Average word length
words = response.split()
if words:
avg_word_length = sum(len(w) for w in words) / len(words)
if avg_word_length > 5.0:
specificity_indicators += 1
# 5. Response length
if len(response) > 200:
specificity_indicators += 1
return specificity_indicators / total_possible
def calculate_repetition_ratio(text: str) -> float:
"""
Measure token/word repetition.
Lower = better (less repetitive)
Returns: Ratio of repeated tokens to total tokens (0-1)
"""
if not text:
return 0.0
words = text.lower().split()
if len(words) < 2:
return 0.0
word_counts = Counter(words)
# Count words that appear more than once
repeated_words = sum(count - 1 for count in word_counts.values() if count > 1)
ratio = repeated_words / len(words)
return min(1.0, ratio)
def calculate_unique_word_ratio(text: str) -> float:
"""
Measure vocabulary diversity.
Higher = more diverse vocabulary
Returns: Ratio of unique words to total words (0-1)
"""
if not text:
return 0.0
words = text.lower().split()
if not words:
return 0.0
unique_words = len(set(words))
return unique_words / len(words)
def calculate_avg_sentence_length(text: str) -> float:
"""Calculate average sentence length in words"""
sentences = count_sentences(text)
words = count_words(text)
if sentences == 0:
return 0.0
return words / sentences
# ============================================================================
# INSTRUMENTED PIPELINE RUNNER
# ============================================================================
def run_full_pipeline_instrumented(user_prompt: str, prompt_index: int = 1) -> Dict:
"""
Run the complete orchestration pipeline with full instrumentation.
Captures metrics at every step.
Args:
user_prompt: User's input prompt
prompt_index: Index number for this prompt in batch
Returns:
Dictionary with all metrics for CSV export
"""
result = {
"prompt_index": prompt_index,
"timestamp": datetime.now().isoformat(),
"user_prompt": user_prompt,
"user_prompt_tokens": count_tokens_accurate(user_prompt),
"user_prompt_chars": len(user_prompt),
"user_prompt_words": count_words(user_prompt),
}
# Track overall start time
pipeline_start = time.time()
try:
# ============================================================
# STEP 1-2: SETUP
# ============================================================
setup_start = time.time()
# Reset state
global_state_manager.reset_prompt_state()
prompt_state = global_state_manager.get_prompt_state_manager()
# Get conversation history (empty for testing)
recent_history = []
recent_history_formatted = "No previous conversation"
result["conversation_history_length"] = 0
result["conversation_history_tokens"] = 0
# ============================================================
# STEP 3: TOOL DECISION AGENT
# ============================================================
tool_start = time.time()
tool_template = build_tool_decision_template(user_prompt)
tool_input_tokens = count_tokens_accurate(tool_template)
reset_gpu_stats()
# Execute
tool_decision_result = tool_agent.should_use_visualization(user_prompt)
# Capture output
tool_output = str(tool_decision_result)
tool_output_tokens = count_tokens_accurate(tool_output)
gpu_metrics = get_gpu_memory()
tool_time = time.time() - tool_start
# Record
result.update({
"tool_decision_input_template": tool_template,
"tool_decision_input_tokens": tool_input_tokens,
"tool_decision_output": tool_output,
"tool_decision_output_tokens": tool_output_tokens,
"tool_decision_result": bool(tool_decision_result),
"tool_decision_time_seconds": round(tool_time, 3),
"tool_decision_gpu_peak_mb": round(gpu_metrics["peak_mb"], 2),
})
# Update state
if tool_decision_result:
prompt_state.update("TOOL_USE_ENHANCEMENT", True)
# ============================================================
# STEP 4: REGEX CHECKS
# ============================================================
regex_start = time.time()
# Apply regex checks (returns list of activated prompts)
regex_before = set(prompt_state.get_active_response_prompts())
logical_expressions.apply_all_checks(user_prompt, prompt_state)
regex_after = set(prompt_state.get_active_response_prompts())
regex_applied = list(regex_after - regex_before)
regex_time = time.time() - regex_start
result.update({
"regex_checks_applied": ", ".join(regex_applied) if regex_applied else "None",
"regex_checks_time_seconds": round(regex_time, 3),
})
# ============================================================
# STEP 5: ROUTING AGENTS (Unified Process - Qwen3-Claude)
# ============================================================
routing_start = time.time()
# Build template (simplified - just the user prompt)
routing_template = f"User Query: {user_prompt}"
routing_input_tokens = count_tokens_accurate(routing_template)
reset_gpu_stats()
# Use unified process() method
response_prompts_str, thinking_prompts_str = routing_agents.process(
user_input=user_prompt,
tool_used=tool_decision_result
)
# Parse results
response_prompts = [p.strip() for p in response_prompts_str.split('\n') if p.strip()] if response_prompts_str else []
thinking_prompts = [p.strip() for p in thinking_prompts_str.split('\n') if p.strip()] if thinking_prompts_str else []
routing_output = f"Response: {', '.join(response_prompts) if response_prompts else 'None'}\nThinking: {', '.join(thinking_prompts) if thinking_prompts else 'None'}"
routing_output_tokens = count_tokens_accurate(routing_output)
gpu_metrics = get_gpu_memory()
routing_time = time.time() - routing_start
# Update result with consolidated routing metrics
result.update({
# Agent 1 metrics (legacy columns - use consolidated data)
"agent1_input_template": routing_template,
"agent1_input_tokens": routing_input_tokens // 4, # Divide among 4 agents
"agent1_output": ", ".join([p for p in response_prompts if p in ["STRUCTURE_PRACTICE_QUESTIONS"]]) or "None",
"agent1_output_tokens": routing_output_tokens // 4,
"agent1_decision": "STRUCTURE_PRACTICE_QUESTIONS" in response_prompts,
"agent1_time_seconds": round(routing_time / 4, 3),
"agent1_gpu_peak_mb": round(gpu_metrics["peak_mb"] / 4, 2),
# Agent 2 metrics
"agent2_input_template": routing_template,
"agent2_input_tokens": routing_input_tokens // 4,
"agent2_output": ", ".join([p for p in response_prompts if p in ["GENERAL_FORMATTING", "LATEX_FORMATTING", "GUIDING_TEACHING"]]) or "None",
"agent2_output_tokens": routing_output_tokens // 4,
"agent2_decision": ", ".join([p for p in response_prompts if p in ["GENERAL_FORMATTING", "LATEX_FORMATTING", "GUIDING_TEACHING"]]) or "NULL",
"agent2_time_seconds": round(routing_time / 4, 3),
"agent2_gpu_peak_mb": round(gpu_metrics["peak_mb"] / 4, 2),
# Agent 3 metrics
"agent3_input_template": routing_template,
"agent3_input_tokens": routing_input_tokens // 4,
"agent3_output": ", ".join([p for p in response_prompts + thinking_prompts if p in ["PRACTICE_QUESTION_FOLLOWUP", "MATH_THINKING", "QUESTION_ANSWER_DESIGN", "REASONING_THINKING"]]) or "None",
"agent3_output_tokens": routing_output_tokens // 4,
"agent3_decision": any(p in ["PRACTICE_QUESTION_FOLLOWUP", "MATH_THINKING", "QUESTION_ANSWER_DESIGN", "REASONING_THINKING"] for p in response_prompts + thinking_prompts),
"agent3_time_seconds": round(routing_time / 4, 3),
"agent3_gpu_peak_mb": round(gpu_metrics["peak_mb"] / 4, 2),
# Agent 4 metrics
"agent4_input_template": routing_template,
"agent4_input_tokens": routing_input_tokens // 4,
"agent4_output": ", ".join([p for p in response_prompts if p == "TOOL_USE_ENHANCEMENT"]) or "None",
"agent4_output_tokens": routing_output_tokens // 4,
"agent4_decisions": "TOOL_USE_ENHANCEMENT" if "TOOL_USE_ENHANCEMENT" in response_prompts else "NULL",
"agent4_time_seconds": round(routing_time / 4, 3),
"agent4_gpu_peak_mb": round(gpu_metrics["peak_mb"] / 4, 2),
})
# Update prompt state with all activated prompts
for prompt_name in response_prompts:
prompt_state.update(prompt_name, True)
for prompt_name in thinking_prompts:
prompt_state.update(prompt_name, True)
# ============================================================
# STEP 6: THINKING AGENTS (Conditional)
# ============================================================
thinking_outputs = []
# Determine which thinking agents to activate
math_activated = prompt_state.is_active("LATEX_FORMATTING")
qa_activated = prompt_state.is_active("STRUCTURE_PRACTICE_QUESTIONS")
reasoning_activated = (
prompt_state.is_active("TOOL_USE_ENHANCEMENT") or
prompt_state.is_active("PRACTICE_QUESTION_FOLLOWUP") or
prompt_state.is_active("GUIDING_TEACHING")
)
# --- Math Thinking (GGUF) ---
if math_activated:
math_start = time.time()
math_template = build_math_thinking_template(user_prompt)
math_input_tokens = count_tokens_accurate(math_template)
reset_gpu_stats()
math_output = thinking_agents.math_thinking(
user_input=user_prompt,
conversation_history=recent_history_formatted
)
math_output_tokens = count_tokens_accurate(math_output)
gpu_metrics = get_gpu_memory()
math_time = time.time() - math_start
result.update({
"math_thinking_activated": True,
"math_thinking_input_template": math_template,
"math_thinking_input_tokens": math_input_tokens,
"math_thinking_output": math_output,
"math_thinking_output_tokens": math_output_tokens,
"math_thinking_time_seconds": round(math_time, 3),
"math_thinking_gpu_peak_mb": round(gpu_metrics["peak_mb"], 2),
})
thinking_outputs.append(math_output)
else:
result.update({
"math_thinking_activated": False,
"math_thinking_input_template": "NULL",
"math_thinking_input_tokens": 0,
"math_thinking_output": "NULL",
"math_thinking_output_tokens": 0,
"math_thinking_time_seconds": 0.0,
"math_thinking_gpu_peak_mb": 0.0,
})
# --- QA Design Thinking (Qwen3-Claude) ---
if qa_activated:
qa_start = time.time()
qa_template = build_qa_design_template(user_prompt)
qa_input_tokens = count_tokens_accurate(qa_template)
reset_gpu_stats()
qa_output = thinking_agents.question_answer_design(
user_input=user_prompt,
conversation_history=recent_history_formatted
)
qa_output_tokens = count_tokens_accurate(qa_output)
gpu_metrics = get_gpu_memory()
qa_time = time.time() - qa_start
result.update({
"qa_design_activated": True,
"qa_design_input_template": qa_template,
"qa_design_input_tokens": qa_input_tokens,
"qa_design_output": qa_output,
"qa_design_output_tokens": qa_output_tokens,
"qa_design_time_seconds": round(qa_time, 3),
"qa_design_gpu_peak_mb": round(gpu_metrics["peak_mb"], 2),
})
thinking_outputs.append(qa_output)
else:
result.update({
"qa_design_activated": False,
"qa_design_input_template": "NULL",
"qa_design_input_tokens": 0,
"qa_design_output": "NULL",
"qa_design_output_tokens": 0,
"qa_design_time_seconds": 0.0,
"qa_design_gpu_peak_mb": 0.0,
})
# --- Reasoning Thinking (Qwen3-Claude) ---
if reasoning_activated:
reasoning_start = time.time()
reasoning_template = build_reasoning_template(user_prompt)
reasoning_input_tokens = count_tokens_accurate(reasoning_template)
reset_gpu_stats()
reasoning_output = thinking_agents.reasoning_thinking(
user_input=user_prompt,
conversation_history=recent_history_formatted
)
reasoning_output_tokens = count_tokens_accurate(reasoning_output)
gpu_metrics = get_gpu_memory()
reasoning_time = time.time() - reasoning_start
result.update({
"reasoning_activated": True,
"reasoning_input_template": reasoning_template,
"reasoning_input_tokens": reasoning_input_tokens,
"reasoning_output": reasoning_output,
"reasoning_output_tokens": reasoning_output_tokens,
"reasoning_time_seconds": round(reasoning_time, 3),
"reasoning_gpu_peak_mb": round(gpu_metrics["peak_mb"], 2),
})
thinking_outputs.append(reasoning_output)
else:
result.update({
"reasoning_activated": False,
"reasoning_input_template": "NULL",
"reasoning_input_tokens": 0,
"reasoning_output": "NULL",
"reasoning_output_tokens": 0,
"reasoning_time_seconds": 0.0,
"reasoning_gpu_peak_mb": 0.0,
})
# Combine thinking outputs
thinking_context = "\n\n".join(thinking_outputs) if thinking_outputs else ""
# ============================================================
# STEP 7-8: PROMPT ASSEMBLY
# ============================================================
assembly_start = time.time()
# Get active response prompts
active_prompts = prompt_state.get_active_response_prompts()
# Build final prompt
final_prompt = build_final_prompt(
user_prompt=user_prompt,
active_prompts=active_prompts,
thinking_context=thinking_context,
recent_history_formatted=recent_history_formatted,
tool_img_output="",
tool_context=""
)
final_prompt_tokens = count_tokens_accurate(final_prompt)
final_prompt_chars = len(final_prompt)
final_prompt_words = count_words(final_prompt)
assembly_time = time.time() - assembly_start
result.update({
"active_response_prompts": ", ".join(active_prompts),
"final_prompt_template": final_prompt,
"final_prompt_tokens": final_prompt_tokens,
"final_prompt_chars": final_prompt_chars,
"final_prompt_words": final_prompt_words,
"assembly_time_seconds": round(assembly_time, 3),
})
# ============================================================
# STEP 9: RESPONSE GENERATION (Qwen3-Claude)
# ============================================================
response_start = time.time()
reset_gpu_stats()
raw_response = response_agent.invoke(final_prompt)
response_time = time.time() - response_start
raw_tokens = count_tokens_accurate(raw_response)
raw_chars = len(raw_response)
raw_words = count_words(raw_response)
tokens_per_sec = raw_tokens / response_time if response_time > 0 else 0
gpu_metrics = get_gpu_memory()
result.update({
"response_input_template": final_prompt, # Same as final_prompt
"response_input_tokens": final_prompt_tokens,
"response_raw": raw_response,
"response_raw_tokens": raw_tokens,
"response_raw_chars": raw_chars,
"response_raw_words": raw_words,
"response_generation_time_seconds": round(response_time, 3),
"response_gpu_peak_mb": round(gpu_metrics["peak_mb"], 2),
"response_tokens_per_second": round(tokens_per_sec, 2),
})
# ============================================================
# STEP 10: POST-PROCESSING
# ============================================================
postprocess_start = time.time()
processed_response = post_processor.process_response(raw_response, user_prompt)
postprocess_time = time.time() - postprocess_start
processed_tokens = count_tokens_accurate(processed_response)
processed_chars = len(processed_response)
processed_words = count_words(processed_response)
result.update({
"response_processed": processed_response,
"response_processed_tokens": processed_tokens,
"response_processed_chars": processed_chars,
"response_processed_words": processed_words,
"postprocessing_time_seconds": round(postprocess_time, 3),
})
# ============================================================
# QUALITY METRICS
# ============================================================
flesch_ease = calculate_flesch_reading_ease(processed_response)
flesch_grade = calculate_flesch_kincaid_grade(processed_response)
completeness = calculate_completeness_score(processed_response, user_prompt)
specificity = calculate_specificity_score(processed_response)
repetition = calculate_repetition_ratio(processed_response)
unique_ratio = calculate_unique_word_ratio(processed_response)
avg_sent_len = calculate_avg_sentence_length(processed_response)
question_answered = check_question_answered(processed_response, user_prompt)
result.update({
"flesch_reading_ease": round(flesch_ease, 2),
"flesch_kincaid_grade": round(flesch_grade, 2),
"completeness_score": round(completeness, 3),
"specificity_score": round(specificity, 3),
"repetition_ratio": round(repetition, 3),
"unique_word_ratio": round(unique_ratio, 3),
"avg_sentence_length": round(avg_sent_len, 2),
"question_answered": question_answered,
})
# ============================================================
# OVERALL METRICS
# ============================================================
total_pipeline_time = time.time() - pipeline_start
# Count activated models
models_activated = []
if result["tool_decision_time_seconds"] > 0:
models_activated.append("Tool Decision")
if result["agent1_time_seconds"] > 0:
models_activated.append("Agent 1")
if result["agent2_time_seconds"] > 0:
models_activated.append("Agent 2")
if result["agent3_time_seconds"] > 0:
models_activated.append("Agent 3")
if result["agent4_time_seconds"] > 0:
models_activated.append("Agent 4")
if result["math_thinking_activated"]:
models_activated.append("Math Thinking")
if result["qa_design_activated"]:
models_activated.append("QA Design")
if result["reasoning_activated"]:
models_activated.append("Reasoning")
models_activated.append("Response Agent")
# Sum all input tokens
total_input_tokens = (
result["tool_decision_input_tokens"] +
result["agent1_input_tokens"] +
result["agent2_input_tokens"] +
result["agent3_input_tokens"] +
result["agent4_input_tokens"] +
result.get("math_thinking_input_tokens", 0) +
result.get("qa_design_input_tokens", 0) +
result.get("reasoning_input_tokens", 0) +
result["response_input_tokens"]
)
# Sum all output tokens
total_output_tokens = (
result["tool_decision_output_tokens"] +
result["agent1_output_tokens"] +
result["agent2_output_tokens"] +
result["agent3_output_tokens"] +
result["agent4_output_tokens"] +
result.get("math_thinking_output_tokens", 0) +
result.get("qa_design_output_tokens", 0) +
result.get("reasoning_output_tokens", 0) +
result["response_raw_tokens"]
)
# Max GPU across all steps
total_gpu_peak = max([
result["tool_decision_gpu_peak_mb"],
result["agent1_gpu_peak_mb"],
result["agent2_gpu_peak_mb"],
result["agent3_gpu_peak_mb"],
result["agent4_gpu_peak_mb"],
result.get("math_thinking_gpu_peak_mb", 0.0),
result.get("qa_design_gpu_peak_mb", 0.0),
result.get("reasoning_gpu_peak_mb", 0.0),
result["response_gpu_peak_mb"],
])
result.update({
"total_pipeline_time_seconds": round(total_pipeline_time, 3),
"total_input_tokens": total_input_tokens,
"total_output_tokens": total_output_tokens,
"total_gpu_peak_mb": round(total_gpu_peak, 2),
"models_activated_count": len(models_activated),
"models_activated_list": ", ".join(models_activated),
})
logger.info(f"βœ“ Prompt {prompt_index} complete: {total_pipeline_time:.2f}s, {len(models_activated)} models activated")
return result
except Exception as e:
logger.error(f"Pipeline execution failed for prompt {prompt_index}: {e}")
import traceback
traceback.print_exc()
# Return error result with NULLs
error_result = {col: "ERROR" for col in CSV_COLUMNS}
error_result.update({
"prompt_index": prompt_index,
"timestamp": datetime.now().isoformat(),
"user_prompt": user_prompt,
"user_prompt_tokens": count_tokens_accurate(user_prompt),
"user_prompt_chars": len(user_prompt),
"user_prompt_words": count_words(user_prompt),
})
return error_result
# ============================================================================
# BATCH PROCESSING
# ============================================================================
@spaces.GPU(duration=600)
def process_batch_full_pipeline(
user_prompts: List[str],
progress_callback=None
) -> List[Dict]:
"""
Process batch of prompts through FULL PIPELINE.
Sequential processing - one at a time.
Args:
user_prompts: List of user prompts to test
progress_callback: Optional callback for progress updates
Returns:
List of result dictionaries (one per prompt)
"""
results = []
total = len(user_prompts)
logger.info(f"="*60)
logger.info(f"Starting full pipeline batch: {total} prompts")
logger.info(f"="*60)
batch_start = time.time()
for idx, user_prompt in enumerate(user_prompts, 1):
logger.info(f"\n{'='*60}")
logger.info(f"Processing prompt {idx}/{total}")
logger.info(f"Prompt: {user_prompt[:80]}...")
logger.info(f"{'='*60}")
try:
# Run full instrumented pipeline
result = run_full_pipeline_instrumented(user_prompt, prompt_index=idx)
results.append(result)
logger.info(f"βœ“ Prompt {idx} complete")
logger.info(f" Total time: {result.get('total_pipeline_time_seconds', 0):.2f}s")
logger.info(f" Models activated: {result.get('models_activated_count', 0)}")
logger.info(f" Total tokens: {result.get('total_input_tokens', 0) + result.get('total_output_tokens', 0)}")
if progress_callback:
progress_callback(idx, total)
except Exception as e:
logger.error(f"❌ Prompt {idx} failed: {e}")
import traceback
traceback.print_exc()
# Add error result
error_result = {col: "ERROR" for col in CSV_COLUMNS}
error_result.update({
"prompt_index": idx,
"timestamp": datetime.now().isoformat(),
"user_prompt": user_prompt,
"user_prompt_tokens": count_tokens_accurate(user_prompt),
})
results.append(error_result)
batch_duration = time.time() - batch_start
logger.info(f"\n{'='*60}")
logger.info(f"BATCH COMPLETE")
logger.info(f"{'='*60}")
logger.info(f"Processed: {len(results)}/{total} prompts")
logger.info(f"Total batch time: {batch_duration:.2f}s")
logger.info(f"Average per prompt: {batch_duration/total:.2f}s")
logger.info(f"{'='*60}")
return results
# ============================================================================
# CSV EXPORT
# ============================================================================
def export_full_pipeline_csv(
results: List[Dict],
test_name: str = "pipeline_test"
) -> str:
"""
Export full pipeline results to CSV.
Args:
results: List of result dictionaries
test_name: Name for the test (used in filename)
Returns:
Filepath of exported CSV
"""
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"mimir_full_pipeline_{test_name}_{timestamp}.csv"
filepath = os.path.join("/tmp", filename) # Save to /tmp for ZeroGPU
if not results:
logger.warning("No results to export")
return None
logger.info(f"Exporting {len(results)} results to CSV...")
# Write CSV
with open(filepath, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS)
writer.writeheader()
for result in results:
# Fill missing keys with NULL
row = {key: result.get(key, "NULL") for key in CSV_COLUMNS}
writer.writerow(row)
logger.info(f"βœ“ Full pipeline results exported to {filepath}")
logger.info(f" Columns: {len(CSV_COLUMNS)}")
logger.info(f" Rows: {len(results)}")
return filepath
except Exception as e:
logger.error(f"CSV export failed: {e}")
import traceback
traceback.print_exc()
return None
def calculate_summary_stats(results: List[Dict]) -> Dict:
"""Calculate summary statistics from results"""
if not results:
return {}
valid_results = [r for r in results if r.get("total_pipeline_time_seconds") != "ERROR"]
if not valid_results:
return {"error": "No valid results"}
return {
"total_prompts": len(results),
"successful_prompts": len(valid_results),
"failed_prompts": len(results) - len(valid_results),
"avg_pipeline_time_seconds": round(np.mean([r["total_pipeline_time_seconds"] for r in valid_results]), 3),
"min_pipeline_time_seconds": round(np.min([r["total_pipeline_time_seconds"] for r in valid_results]), 3),
"max_pipeline_time_seconds": round(np.max([r["total_pipeline_time_seconds"] for r in valid_results]), 3),
"avg_total_tokens": round(np.mean([r["total_input_tokens"] + r["total_output_tokens"] for r in valid_results]), 1),
"avg_models_activated": round(np.mean([r["models_activated_count"] for r in valid_results]), 2),
"avg_gpu_peak_mb": round(np.mean([r["total_gpu_peak_mb"] for r in valid_results]), 2),
"avg_completeness_score": round(np.mean([r["completeness_score"] for r in valid_results]), 3),
"avg_flesch_reading_ease": round(np.mean([r["flesch_reading_ease"] for r in valid_results]), 2),
"questions_answered_pct": round(100 * sum([r["question_answered"] for r in valid_results]) / len(valid_results), 1),
}
# ============================================================================
# GRADIO INTERFACE
# ============================================================================
with gr.Blocks(title="Mimir - Full Pipeline Testing", theme=gr.themes.Soft()) as demo:
gr.Markdown("# πŸ§ͺ Mimir Full Pipeline Testing")
gr.Markdown("""
Test the **complete orchestration flow** with comprehensive metrics at every step.
**What this tests:**
- βœ… Tool Decision Agent
- βœ… All 4 Routing Agents (sequential)
- βœ… Thinking Agents (conditional: Math, QA Design, Reasoning)
- βœ… Response Agent (Qwen3-Claude)
- βœ… Post-processing
**Output:** CSV file with ~110 columns capturing the full pipeline journey
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## πŸ“ Test Configuration")
test_name = gr.Textbox(
label="Test Name",
value="pipeline_test",
placeholder="Enter a name for this test run",
info="Used in filename"
)
gr.Markdown("### Input Method")
input_method = gr.Radio(
choices=["CSV Upload", "Manual Entry"],
value="Manual Entry",
label="Choose Input Method"
)
# CSV upload
with gr.Group(visible=False) as csv_section:
csv_file = gr.File(
label="Upload CSV File",
file_types=[".csv"],
info="One prompt per line, first column only"
)
# Manual entry
with gr.Group(visible=True) as manual_section:
prompt_text = gr.Textbox(
label="Enter Prompts (one per line)",
lines=15,
placeholder="What is calculus?\nHelp me understand photosynthesis\nCan you create practice questions for algebra?\nExplain Newton's laws of motion",
info="Enter multiple prompts, one per line"
)
process_btn = gr.Button(
"πŸš€ Run Full Pipeline Test",
variant="primary",
size="lg"
)
status = gr.Textbox(
label="Status",
interactive=False,
lines=3
)
with gr.Column(scale=1):
gr.Markdown("## πŸ“Š Results")
results_summary = gr.JSON(
label="Summary Statistics",
height=400
)
gr.Markdown("### Download Results")
download_csv = gr.File(
label="CSV Export",
interactive=False
)
gr.Markdown("""
**CSV contains ~110 columns:**
- Input metrics (tokens, chars, words)
- Template for each agent
- Output for each agent
- Timing for each step
- GPU usage per step
- Quality metrics (readability, completeness, etc.)
- Overall pipeline metrics
""")
# Toggle between input methods
def toggle_input_method(method):
if method == "CSV Upload":
return gr.update(visible=True), gr.update(visible=False)
else:
return gr.update(visible=False), gr.update(visible=True)
input_method.change(
fn=toggle_input_method,
inputs=[input_method],
outputs=[csv_section, manual_section]
)
# Main processing function
def run_pipeline_test(test_name, input_method, csv_file, prompt_text):
"""Run the full pipeline test"""
# Parse prompts
prompts = []
if input_method == "CSV Upload" and csv_file:
try:
# Read CSV
content = csv_file.decode('utf-8') if isinstance(csv_file, bytes) else csv_file
if hasattr(content, 'read'):
content = content.read()
if isinstance(content, bytes):
content = content.decode('utf-8')
reader = csv.reader(io.StringIO(str(content)))
prompts = [row[0].strip() for row in reader if row and row[0].strip()]
# Skip header if present
if prompts and any(header in prompts[0].lower() for header in ['prompt', 'text', 'query', 'input']):
prompts = prompts[1:]
except Exception as e:
return f"❌ CSV parsing error: {e}", {}, None
elif input_method == "Manual Entry" and prompt_text:
prompts = [p.strip() for p in prompt_text.split('\n') if p.strip()]
if not prompts:
return "❌ No prompts provided. Please enter at least one prompt.", {}, None
status_msg = f"πŸ”„ Processing {len(prompts)} prompts through full pipeline...\n"
status_msg += "This may take several minutes. Please wait...\n"
try:
# Run batch
results = process_batch_full_pipeline(prompts)
# Calculate summary
summary = calculate_summary_stats(results)
# Export CSV
csv_path = export_full_pipeline_csv(results, test_name)
status_msg = f"βœ… Complete!\n"
status_msg += f"Processed: {len(results)} prompts\n"
status_msg += f"Successful: {summary.get('successful_prompts', 0)}\n"
status_msg += f"Failed: {summary.get('failed_prompts', 0)}\n"
status_msg += f"CSV ready for download!"
return status_msg, summary, csv_path
except Exception as e:
error_msg = f"❌ Pipeline test failed: {str(e)}"
logger.error(error_msg)
import traceback
traceback.print_exc()
return error_msg, {}, None
# Wire up event
process_btn.click(
fn=run_pipeline_test,
inputs=[test_name, input_method, csv_file, prompt_text],
outputs=[status, results_summary, download_csv]
)
# ============================================================================
# LAUNCH
# ============================================================================
if __name__ == "__main__":
logger.info("="*60)
logger.info("LAUNCHING MIMIR FULL PIPELINE TESTING INTERFACE")
logger.info("="*60)
logger.info(f"CSV Schema: {len(CSV_COLUMNS)} columns")
logger.info(f"Agents initialized: {AGENTS_AVAILABLE}")
logger.info(f"Tiktoken available: {TIKTOKEN_AVAILABLE}")
logger.info(f"Textstat available: {TEXTSTAT_AVAILABLE}")
logger.info(f"ZeroGPU available: {ZERO_GPU_AVAILABLE}")
logger.info("="*60)
demo.launch(
server_name="0.0.0.0",
server_port=7862,
share=False,
debug=True
)