Mimir / gradio_prompt_testing.py
jdesiree's picture
Update gradio_prompt_testing.py
de8bc14 verified
# gradio_pipeline_testing.py
"""
Full Pipeline Testing Interface for Mimir Educational AI Assistant
Tests the complete orchestration flow with comprehensive metrics at every step.
Captures conditional model activation, token usage, timing, and quality metrics.
UPDATED: Now correctly mirrors app.py orchestrate_turn() process
- Tool decision uses decide() method with conversation history
- Response agent invoked with input_data dict (not raw string)
- Thinking agents process() method matches app.py
- Graph generation included when tools are used
Output: CSV file with ~110 columns capturing full pipeline journey
"""
import os
import sys
import io
import csv
import json
import time
import logging
import warnings
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any
from collections import Counter
# Core dependencies
import torch
import gradio as gr
import numpy as np
# ============================================================================
# ENVIRONMENT SETUP
# ============================================================================
HF_CACHE = "/tmp/huggingface"
os.makedirs(f"{HF_CACHE}/hub", exist_ok=True)
os.environ['HF_HOME'] = HF_CACHE
os.environ['HF_HUB_CACHE'] = f"{HF_CACHE}/hub"
# ============================================================================
# IMPORTS FROM MIMIR APPLICATION
# ============================================================================
try:
from agents import (
ToolDecisionAgent,
PromptRoutingAgents,
ThinkingAgents,
ResponseAgent,
)
AGENTS_AVAILABLE = True
except ImportError as e:
print(f"⚠️ Warning: Could not import agents: {e}")
AGENTS_AVAILABLE = False
from model_manager import get_model as get_shared_llama
try:
from state_manager import GlobalStateManager, LogicalExpressions
STATE_MANAGER_AVAILABLE = True
except ImportError as e:
print(f"⚠️ Warning: Could not import state_manager: {e}")
STATE_MANAGER_AVAILABLE = False
try:
from prompt_library import (
CORE_IDENTITY,
TOOL_DECISION,
agent_1_system,
agent_2_system,
agent_3_system,
agent_4_system,
MATH_THINKING,
QUESTION_ANSWER_DESIGN,
REASONING_THINKING,
VAUGE_INPUT,
USER_UNDERSTANDING,
GENERAL_FORMATTING,
LATEX_FORMATTING,
GUIDING_TEACHING,
STRUCTURE_PRACTICE_QUESTIONS,
PRACTICE_QUESTION_FOLLOWUP,
TOOL_USE_ENHANCEMENT,
)
PROMPTS_AVAILABLE = True
except ImportError as e:
print(f"⚠️ Warning: Could not import prompt_library: {e}")
PROMPTS_AVAILABLE = False
# Try to import post processor
try:
# Import the post processor class/module from app.py
import importlib.util
spec = importlib.util.spec_from_file_location("app_module", "app.py")
app_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(app_module)
post_processor = app_module.post_processor
POST_PROCESSOR_AVAILABLE = True
except Exception as e:
print(f"⚠️ Warning: Could not import post_processor: {e}")
POST_PROCESSOR_AVAILABLE = False
# Create dummy
class DummyPostProcessor:
def process_response(self, response, user_message):
return response
post_processor = DummyPostProcessor()
# ZeroGPU support
try:
import spaces
ZERO_GPU_AVAILABLE = True
except ImportError:
ZERO_GPU_AVAILABLE = False
class DummySpaces:
@staticmethod
def GPU(duration=600):
def decorator(func):
return func
return decorator
spaces = DummySpaces()
# Tiktoken for accurate token counting
try:
import tiktoken
TIKTOKEN_AVAILABLE = True
except ImportError:
TIKTOKEN_AVAILABLE = False
print("⚠️ Warning: tiktoken not available - using fallback token counting")
# Textstat for readability metrics
try:
import textstat
TEXTSTAT_AVAILABLE = True
except ImportError:
TEXTSTAT_AVAILABLE = False
print("⚠️ Warning: textstat not available - using manual readability calculations")
# ============================================================================
# LOGGING SETUP
# ============================================================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
CURRENT_YEAR = datetime.now().year
# ============================================================================
# GLOBAL INSTANCES
# ============================================================================
if AGENTS_AVAILABLE and STATE_MANAGER_AVAILABLE:
try:
global_state_manager = GlobalStateManager()
logical_expressions = LogicalExpressions()
tool_agent = ToolDecisionAgent()
routing_agents = PromptRoutingAgents()
thinking_agents = ThinkingAgents()
response_agent = ResponseAgent()
logger.info("βœ“ All agents initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize agents: {e}")
raise
else:
logger.error("Cannot initialize - missing core dependencies")
raise ImportError("Missing required modules: agents or state_manager")
# ============================================================================
# CSV SCHEMA DEFINITION
# ============================================================================
CSV_COLUMNS = [
# Identification & Input
"prompt_index",
"timestamp",
"user_prompt",
"user_prompt_tokens",
"user_prompt_chars",
"user_prompt_words",
# Conversation Context
"conversation_history_length",
"conversation_history_tokens",
# Tool Decision Agent
"tool_decision_input_template",
"tool_decision_input_tokens",
"tool_decision_output",
"tool_decision_output_tokens",
"tool_decision_result",
"tool_decision_time_seconds",
"tool_decision_gpu_peak_mb",
# Regex Checks
"regex_checks_applied",
"regex_checks_time_seconds",
# Routing Agent 1
"agent1_input_template",
"agent1_input_tokens",
"agent1_output",
"agent1_output_tokens",
"agent1_decision",
"agent1_time_seconds",
"agent1_gpu_peak_mb",
# Routing Agent 2
"agent2_input_template",
"agent2_input_tokens",
"agent2_output",
"agent2_output_tokens",
"agent2_decision",
"agent2_time_seconds",
"agent2_gpu_peak_mb",
# Routing Agent 3
"agent3_input_template",
"agent3_input_tokens",
"agent3_output",
"agent3_output_tokens",
"agent3_decision",
"agent3_time_seconds",
"agent3_gpu_peak_mb",
# Routing Agent 4
"agent4_input_template",
"agent4_input_tokens",
"agent4_output",
"agent4_output_tokens",
"agent4_decisions",
"agent4_time_seconds",
"agent4_gpu_peak_mb",
# Math Thinking
"math_thinking_activated",
"math_thinking_input_template",
"math_thinking_input_tokens",
"math_thinking_output",
"math_thinking_output_tokens",
"math_thinking_time_seconds",
"math_thinking_gpu_peak_mb",
# QA Design Thinking
"qa_design_activated",
"qa_design_input_template",
"qa_design_input_tokens",
"qa_design_output",
"qa_design_output_tokens",
"qa_design_time_seconds",
"qa_design_gpu_peak_mb",
# Reasoning Thinking
"reasoning_activated",
"reasoning_input_template",
"reasoning_input_tokens",
"reasoning_output",
"reasoning_output_tokens",
"reasoning_time_seconds",
"reasoning_gpu_peak_mb",
# Prompt Assembly
"active_response_prompts",
"final_prompt_template",
"final_prompt_tokens",
"final_prompt_chars",
"final_prompt_words",
"assembly_time_seconds",
# Response Generation
"response_input_template",
"response_input_tokens",
"response_raw",
"response_raw_tokens",
"response_raw_chars",
"response_raw_words",
"response_generation_time_seconds",
"response_gpu_peak_mb",
"response_tokens_per_second",
# Post-processing
"response_processed",
"response_processed_tokens",
"response_processed_chars",
"response_processed_words",
"postprocessing_time_seconds",
# Quality Metrics
"flesch_reading_ease",
"flesch_kincaid_grade",
"completeness_score",
"specificity_score",
"repetition_ratio",
"unique_word_ratio",
"avg_sentence_length",
"question_answered",
# Overall Metrics
"total_pipeline_time_seconds",
"total_input_tokens",
"total_output_tokens",
"total_gpu_peak_mb",
"models_activated_count",
"models_activated_list",
]
# ============================================================================
# TOKEN COUNTING FUNCTIONS
# ============================================================================
def count_tokens_accurate(text: str) -> int:
"""
Count tokens using tiktoken library for accurate estimation.
Args:
text: Input text to tokenize
Returns:
Accurate token count
"""
if not text:
return 0
if not TIKTOKEN_AVAILABLE:
# Fallback: word count approximation
return len(text.split())
try:
# Use cl100k_base encoding (used by GPT-3.5/4, good general estimator)
encoding = tiktoken.get_encoding("cl100k_base")
tokens = encoding.encode(text)
return len(tokens)
except Exception as e:
logger.warning(f"tiktoken encoding failed: {e}, using fallback")
return len(text.split())
def count_words(text: str) -> int:
"""Count words in text"""
if not text:
return 0
return len(text.split())
def count_sentences(text: str) -> int:
"""Count sentences in text (simple heuristic)"""
if not text:
return 0
import re
sentences = re.split(r'[.!?]+', text)
return len([s for s in sentences if s.strip()])
# ============================================================================
# GPU MEMORY TRACKING
# ============================================================================
def get_gpu_memory() -> Dict[str, float]:
"""
Get current GPU memory statistics.
Returns:
Dictionary with allocated, reserved, and peak memory in MB
"""
if torch.cuda.is_available():
return {
"allocated_mb": torch.cuda.memory_allocated() / 1024**2,
"reserved_mb": torch.cuda.memory_reserved() / 1024**2,
"peak_mb": torch.cuda.max_memory_allocated() / 1024**2
}
return {
"allocated_mb": 0.0,
"reserved_mb": 0.0,
"peak_mb": 0.0
}
def reset_gpu_stats():
"""Reset GPU memory statistics"""
if torch.cuda.is_available():
torch.cuda.reset_peak_memory_stats()
torch.cuda.synchronize()
# ============================================================================
# TEMPLATE BUILDING FUNCTIONS
# ============================================================================
def format_history(history: List[Dict]) -> str:
"""Format conversation history for templates"""
if not history:
return "No previous conversation"
formatted = []
for msg in history[-8:]: # Last 8 messages
role = msg.get('role', 'unknown')
content = msg.get('content', '')[:100] # Truncate
formatted.append(f"{role}: {content}")
return "\n".join(formatted)
def build_tool_decision_template(user_prompt: str, history: List) -> str:
"""Build template for tool decision agent - matches app.py"""
history_str = format_history(history)
return f"{history_str}\n\nUser Query: {user_prompt}"
def build_agent1_template(user_prompt: str, history: List) -> str:
"""Build template for Agent 1: Practice Questions"""
history_str = format_history(history)
return f"<s>[INST] {agent_1_system}\n\nConversation History:\n{history_str}\n\nCurrent User Query: {user_prompt} [/INST]"
def build_agent2_template(user_prompt: str) -> str:
"""Build template for Agent 2: Discovery Mode"""
return f"<s>[INST] {agent_2_system}\n\nUser Query: {user_prompt} [/INST]"
def build_agent3_template(user_prompt: str, history: List) -> str:
"""Build template for Agent 3: Followup Assessment"""
history_str = format_history(history)
return f"<s>[INST] {agent_3_system}\n\nConversation History:\n{history_str}\n\nCurrent User Query: {user_prompt} [/INST]"
def build_agent4_template(user_prompt: str, history: List) -> str:
"""Build template for Agent 4: Teaching Mode"""
history_str = format_history(history)
return f"<s>[INST] {agent_4_system}\n\nConversation History:\n{history_str}\n\nCurrent User Query: {user_prompt} [/INST]"
def build_math_thinking_template(user_prompt: str) -> str:
"""Build template for Math Thinking"""
return f"<s>[INST] {MATH_THINKING}\n\nUser Query: {user_prompt} [/INST]"
def build_qa_design_template(user_prompt: str) -> str:
"""Build template for QA Design Thinking"""
return f"<s>[INST] {QUESTION_ANSWER_DESIGN}\n\nUser Query: {user_prompt} [/INST]"
def build_reasoning_template(user_prompt: str) -> str:
"""Build template for Reasoning Thinking"""
return f"<s>[INST] {REASONING_THINKING}\n\nUser Query: {user_prompt} [/INST]"
# ============================================================================
# QUALITY METRICS FUNCTIONS
# ============================================================================
def estimate_syllables(text: str) -> int:
"""
Estimate syllable count (rough heuristic).
Counts vowel groups.
"""
import re
words = text.lower().split()
syllable_count = 0
for word in words:
# Remove non-letters
word = re.sub(r'[^a-z]', '', word)
if not word:
continue
# Count vowel groups
vowel_groups = len(re.findall(r'[aeiouy]+', word))
# Ensure at least 1 syllable per word
syllable_count += max(1, vowel_groups)
return syllable_count
def calculate_flesch_reading_ease(text: str) -> float:
"""
Calculate Flesch Reading Ease score.
Score 0-100: Higher = easier to read
90-100: Very easy (5th grade)
60-70: Standard (8th-9th grade)
0-30: Very difficult (college graduate)
Formula: 206.835 - 1.015(words/sentences) - 84.6(syllables/words)
"""
if not text or len(text.strip()) < 10:
return 0.0
if TEXTSTAT_AVAILABLE:
try:
return textstat.flesch_reading_ease(text)
except:
pass
# Manual calculation
words = count_words(text)
sentences = count_sentences(text)
if sentences == 0 or words == 0:
return 0.0
syllables = estimate_syllables(text)
if words == 0:
return 0.0
score = 206.835 - 1.015 * (words / sentences) - 84.6 * (syllables / words)
return max(0.0, min(100.0, score))
def calculate_flesch_kincaid_grade(text: str) -> float:
"""
Calculate Flesch-Kincaid Grade Level.
Returns US grade level needed to understand text.
Formula: 0.39(words/sentences) + 11.8(syllables/words) - 15.59
"""
if not text or len(text.strip()) < 10:
return 0.0
if TEXTSTAT_AVAILABLE:
try:
return textstat.flesch_kincaid_grade(text)
except:
pass
words = count_words(text)
sentences = count_sentences(text)
if sentences == 0 or words == 0:
return 0.0
syllables = estimate_syllables(text)
if words == 0:
return 0.0
grade = 0.39 * (words / sentences) + 11.8 * (syllables / words) - 15.59
return max(0.0, grade)
def calculate_completeness_score(response: str, user_prompt: str) -> float:
"""
Estimate if response addresses the prompt.
Uses keyword overlap and length heuristics.
Returns: Score 0-1 (1 = complete answer)
"""
if not response or not user_prompt:
return 0.0
import re
# Extract keywords from prompt
prompt_words = set(re.findall(r'\b\w+\b', user_prompt.lower()))
# Remove common stopwords
stopwords = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been',
'being', 'have', 'has', 'had', 'do', 'does', 'did', 'will',
'would', 'should', 'could', 'may', 'might', 'can', 'what',
'how', 'why', 'when', 'where', 'who', 'which', 'i', 'you',
'we', 'they', 'he', 'she', 'it', 'me', 'him', 'her', 'us', 'them'}
prompt_words -= stopwords
response_words = set(re.findall(r'\b\w+\b', response.lower()))
if not prompt_words:
return 0.5 # Neutral if no meaningful keywords
# Calculate keyword overlap
overlap = len(prompt_words & response_words) / len(prompt_words)
# Length factor
min_reasonable_length = 20
if len(response) < min_reasonable_length:
length_factor = len(response) / min_reasonable_length
else:
length_factor = 1.0
score = overlap * length_factor
return min(1.0, score)
def check_question_answered(response: str, user_prompt: str) -> bool:
"""
Boolean check: does response attempt to answer the question?
Heuristics:
- Response has minimum length
- Response doesn't start with refusal
- Response contains relevant keywords
"""
if not response or len(response) < 10:
return False
# Check for refusal patterns
refusal_patterns = [
"i don't know",
"i cannot",
"i can't",
"i'm not sure",
"i don't have",
"unable to",
"sorry, i"
]
response_lower = response.lower()
for pattern in refusal_patterns:
if response_lower.startswith(pattern):
return False
# Check for minimum completeness
completeness = calculate_completeness_score(response, user_prompt)
return completeness > 0.3
def calculate_specificity_score(response: str) -> float:
"""
Measure how specific vs vague the response is.
Indicators of specificity:
- Numbers, dates, names
- Technical terms
- Examples
- Concrete nouns
Returns: Score 0-1 (1 = very specific)
"""
if not response:
return 0.0
import re
specificity_indicators = 0
total_possible = 5
# 1. Contains numbers
if re.search(r'\d+', response):
specificity_indicators += 1
# 2. Contains proper nouns
proper_nouns = len(re.findall(r'(?<!\. )\b[A-Z][a-z]+', response))
if proper_nouns > 0:
specificity_indicators += 1
# 3. Contains example phrases
example_phrases = ['for example', 'such as', 'for instance', 'like', 'including']
if any(phrase in response.lower() for phrase in example_phrases):
specificity_indicators += 1
# 4. Average word length
words = response.split()
if words:
avg_word_length = sum(len(w) for w in words) / len(words)
if avg_word_length > 5.0:
specificity_indicators += 1
# 5. Response length
if len(response) > 200:
specificity_indicators += 1
return specificity_indicators / total_possible
def calculate_repetition_ratio(text: str) -> float:
"""
Measure token/word repetition.
Lower = better (less repetitive)
Returns: Ratio of repeated tokens to total tokens (0-1)
"""
if not text:
return 0.0
words = text.lower().split()
if len(words) < 2:
return 0.0
word_counts = Counter(words)
# Count words that appear more than once
repeated_words = sum(count - 1 for count in word_counts.values() if count > 1)
ratio = repeated_words / len(words)
return min(1.0, ratio)
def calculate_unique_word_ratio(text: str) -> float:
"""
Measure vocabulary diversity.
Higher = more diverse vocabulary
Returns: Ratio of unique words to total words (0-1)
"""
if not text:
return 0.0
words = text.lower().split()
if not words:
return 0.0
unique_words = len(set(words))
return unique_words / len(words)
def calculate_avg_sentence_length(text: str) -> float:
"""Calculate average sentence length in words"""
sentences = count_sentences(text)
words = count_words(text)
if sentences == 0:
return 0.0
return words / sentences
# ============================================================================
# INSTRUMENTED PIPELINE RUNNER
# ============================================================================
def run_full_pipeline_instrumented(user_prompt: str, prompt_index: int = 1) -> Dict:
"""
Run the complete orchestration pipeline with full instrumentation.
Captures metrics at every step.
βœ… UPDATED: Now correctly mirrors app.py orchestrate_turn() process
Args:
user_prompt: User's input prompt
prompt_index: Index number for this prompt in batch
Returns:
Dictionary with all metrics for CSV export
"""
result = {
"prompt_index": prompt_index,
"timestamp": datetime.now().isoformat(),
"user_prompt": user_prompt,
"user_prompt_tokens": count_tokens_accurate(user_prompt),
"user_prompt_chars": len(user_prompt),
"user_prompt_words": count_words(user_prompt),
}
# Track overall start time
pipeline_start = time.time()
try:
# ============================================================
# STEP 1-2: SETUP
# ============================================================
setup_start = time.time()
# Reset state
global_state_manager.reset_prompt_state()
prompt_state = global_state_manager.get_prompt_state_manager()
# Get conversation history (empty for testing)
recent_history = []
recent_history_formatted = "No previous conversation"
result["conversation_history_length"] = 0
result["conversation_history_tokens"] = 0
# ============================================================
# STEP 3: TOOL DECISION AGENT (βœ… FIXED: Use decide() with history)
# ============================================================
tool_start = time.time()
tool_template = build_tool_decision_template(user_prompt, recent_history)
tool_input_tokens = count_tokens_accurate(tool_template)
reset_gpu_stats()
# βœ… FIXED: Use decide() method with conversation history (matches app.py)
tool_decision_result = tool_agent.decide(user_prompt, recent_history)
# Capture output
tool_output = str(tool_decision_result)
tool_output_tokens = count_tokens_accurate(tool_output)
gpu_metrics = get_gpu_memory()
tool_time = time.time() - tool_start
# Record
result.update({
"tool_decision_input_template": tool_template,
"tool_decision_input_tokens": tool_input_tokens,
"tool_decision_output": tool_output,
"tool_decision_output_tokens": tool_output_tokens,
"tool_decision_result": bool(tool_decision_result),
"tool_decision_time_seconds": round(tool_time, 3),
"tool_decision_gpu_peak_mb": round(gpu_metrics["peak_mb"], 2),
})
# Update state
tool_img_output = ""
tool_context = ""
if tool_decision_result:
prompt_state.update("TOOL_USE_ENHANCEMENT", True)
# Note: In real app.py, graph generation happens here
# For testing, we'll just note that tools would be used
tool_context = "Tool usage detected (graph would be generated in production)"
# ============================================================
# STEP 4: REGEX CHECKS
# ============================================================
regex_start = time.time()
# Apply regex checks (returns list of activated prompts)
regex_before = set(prompt_state.get_active_response_prompts())
logical_expressions.apply_all_checks(user_prompt, prompt_state)
regex_after = set(prompt_state.get_active_response_prompts())
regex_applied = list(regex_after - regex_before)
regex_time = time.time() - regex_start
result.update({
"regex_checks_applied": ", ".join(regex_applied) if regex_applied else "None",
"regex_checks_time_seconds": round(regex_time, 3),
})
# ============================================================
# STEP 5: ROUTING AGENTS (βœ… Unified Process - matches app.py)
# ============================================================
routing_start = time.time()
# Build template (simplified - just the user prompt)
routing_template = f"User Query: {user_prompt}"
routing_input_tokens = count_tokens_accurate(routing_template)
reset_gpu_stats()
# βœ… Use unified process() method (matches app.py)
response_prompts_str, thinking_prompts_str = routing_agents.process(
user_input=user_prompt,
tool_used=(tool_decision_result and bool(tool_img_output))
)
# Parse results
response_prompts = [p.strip() for p in response_prompts_str.split('\n') if p.strip()] if response_prompts_str else []
thinking_prompts = [p.strip() for p in thinking_prompts_str.split('\n') if p.strip()] if thinking_prompts_str else []
routing_output = f"Response: {', '.join(response_prompts) if response_prompts else 'None'}\nThinking: {', '.join(thinking_prompts) if thinking_prompts else 'None'}"
routing_output_tokens = count_tokens_accurate(routing_output)
gpu_metrics = get_gpu_memory()
routing_time = time.time() - routing_start
# Update result with consolidated routing metrics
result.update({
# Agent 1 metrics (legacy columns - use consolidated data)
"agent1_input_template": routing_template,
"agent1_input_tokens": routing_input_tokens // 4, # Divide among 4 agents
"agent1_output": ", ".join([p for p in response_prompts if p in ["STRUCTURE_PRACTICE_QUESTIONS"]]) or "None",
"agent1_output_tokens": routing_output_tokens // 4,
"agent1_decision": "STRUCTURE_PRACTICE_QUESTIONS" in response_prompts,
"agent1_time_seconds": round(routing_time / 4, 3),
"agent1_gpu_peak_mb": round(gpu_metrics["peak_mb"] / 4, 2),
# Agent 2 metrics
"agent2_input_template": routing_template,
"agent2_input_tokens": routing_input_tokens // 4,
"agent2_output": ", ".join([p for p in response_prompts if p in ["GENERAL_FORMATTING", "LATEX_FORMATTING", "GUIDING_TEACHING"]]) or "None",
"agent2_output_tokens": routing_output_tokens // 4,
"agent2_decision": ", ".join([p for p in response_prompts if p in ["GENERAL_FORMATTING", "LATEX_FORMATTING", "GUIDING_TEACHING"]]) or "NULL",
"agent2_time_seconds": round(routing_time / 4, 3),
"agent2_gpu_peak_mb": round(gpu_metrics["peak_mb"] / 4, 2),
# Agent 3 metrics
"agent3_input_template": routing_template,
"agent3_input_tokens": routing_input_tokens // 4,
"agent3_output": ", ".join([p for p in response_prompts + thinking_prompts if p in ["PRACTICE_QUESTION_FOLLOWUP", "MATH_THINKING", "QUESTION_ANSWER_DESIGN", "REASONING_THINKING"]]) or "None",
"agent3_output_tokens": routing_output_tokens // 4,
"agent3_decision": any(p in ["PRACTICE_QUESTION_FOLLOWUP", "MATH_THINKING", "QUESTION_ANSWER_DESIGN", "REASONING_THINKING"] for p in response_prompts + thinking_prompts),
"agent3_time_seconds": round(routing_time / 4, 3),
"agent3_gpu_peak_mb": round(gpu_metrics["peak_mb"] / 4, 2),
# Agent 4 metrics
"agent4_input_template": routing_template,
"agent4_input_tokens": routing_input_tokens // 4,
"agent4_output": ", ".join([p for p in response_prompts if p == "TOOL_USE_ENHANCEMENT"]) or "None",
"agent4_output_tokens": routing_output_tokens // 4,
"agent4_decisions": "TOOL_USE_ENHANCEMENT" if "TOOL_USE_ENHANCEMENT" in response_prompts else "NULL",
"agent4_time_seconds": round(routing_time / 4, 3),
"agent4_gpu_peak_mb": round(gpu_metrics["peak_mb"] / 4, 2),
})
# Update prompt state with all activated prompts
for prompt_name in response_prompts:
prompt_state.update(prompt_name, True)
for prompt_name in thinking_prompts:
prompt_state.update(prompt_name, True)
# ============================================================
# STEP 6: THINKING AGENTS (βœ… FIXED: Use process() - matches app.py)
# ============================================================
# Build thinking prompts list (matches app.py logic)
thinking_prompts_list = []
for prompt_name in thinking_prompts:
if prompt_name.strip():
thinking_prompts_list.append(prompt_name.strip())
# Additional heuristic: Add MATH_THINKING if LATEX_FORMATTING is active
if prompt_state.is_active("LATEX_FORMATTING") and "MATH_THINKING" not in thinking_prompts_list:
thinking_prompts_list.append("MATH_THINKING")
prompt_state.update("MATH_THINKING", True)
# Execute thinking agents if any are active
thinking_context = ""
if thinking_prompts_list:
thinking_start = time.time()
thinking_prompts_string = '\n'.join(thinking_prompts_list)
reset_gpu_stats()
# βœ… FIXED: Use process() method (matches app.py)
thinking_context = thinking_agents.process(
user_input=user_prompt,
conversation_history=recent_history_formatted,
thinking_prompts=thinking_prompts_string,
tool_img_output=tool_img_output,
tool_context=tool_context
)
thinking_time = time.time() - thinking_start
gpu_metrics = get_gpu_memory()
# Record metrics for activated thinking agents
# Note: For simplicity, we're recording aggregate metrics
# In production, you might want to separate these
if "MATH_THINKING" in thinking_prompts_list:
result.update({
"math_thinking_activated": True,
"math_thinking_input_template": build_math_thinking_template(user_prompt),
"math_thinking_input_tokens": count_tokens_accurate(user_prompt),
"math_thinking_output": thinking_context[:500], # Truncate for CSV
"math_thinking_output_tokens": count_tokens_accurate(thinking_context),
"math_thinking_time_seconds": round(thinking_time / len(thinking_prompts_list), 3),
"math_thinking_gpu_peak_mb": round(gpu_metrics["peak_mb"] / len(thinking_prompts_list), 2),
})
else:
result.update({
"math_thinking_activated": False,
"math_thinking_input_template": "NULL",
"math_thinking_input_tokens": 0,
"math_thinking_output": "NULL",
"math_thinking_output_tokens": 0,
"math_thinking_time_seconds": 0.0,
"math_thinking_gpu_peak_mb": 0.0,
})
if "QUESTION_ANSWER_DESIGN" in thinking_prompts_list:
result.update({
"qa_design_activated": True,
"qa_design_input_template": build_qa_design_template(user_prompt),
"qa_design_input_tokens": count_tokens_accurate(user_prompt),
"qa_design_output": thinking_context[:500],
"qa_design_output_tokens": count_tokens_accurate(thinking_context),
"qa_design_time_seconds": round(thinking_time / len(thinking_prompts_list), 3),
"qa_design_gpu_peak_mb": round(gpu_metrics["peak_mb"] / len(thinking_prompts_list), 2),
})
else:
result.update({
"qa_design_activated": False,
"qa_design_input_template": "NULL",
"qa_design_input_tokens": 0,
"qa_design_output": "NULL",
"qa_design_output_tokens": 0,
"qa_design_time_seconds": 0.0,
"qa_design_gpu_peak_mb": 0.0,
})
if "REASONING_THINKING" in thinking_prompts_list:
result.update({
"reasoning_activated": True,
"reasoning_input_template": build_reasoning_template(user_prompt),
"reasoning_input_tokens": count_tokens_accurate(user_prompt),
"reasoning_output": thinking_context[:500],
"reasoning_output_tokens": count_tokens_accurate(thinking_context),
"reasoning_time_seconds": round(thinking_time / len(thinking_prompts_list), 3),
"reasoning_gpu_peak_mb": round(gpu_metrics["peak_mb"] / len(thinking_prompts_list), 2),
})
else:
result.update({
"reasoning_activated": False,
"reasoning_input_template": "NULL",
"reasoning_input_tokens": 0,
"reasoning_output": "NULL",
"reasoning_output_tokens": 0,
"reasoning_time_seconds": 0.0,
"reasoning_gpu_peak_mb": 0.0,
})
else:
# No thinking agents activated
result.update({
"math_thinking_activated": False,
"math_thinking_input_template": "NULL",
"math_thinking_input_tokens": 0,
"math_thinking_output": "NULL",
"math_thinking_output_tokens": 0,
"math_thinking_time_seconds": 0.0,
"math_thinking_gpu_peak_mb": 0.0,
"qa_design_activated": False,
"qa_design_input_template": "NULL",
"qa_design_input_tokens": 0,
"qa_design_output": "NULL",
"qa_design_output_tokens": 0,
"qa_design_time_seconds": 0.0,
"qa_design_gpu_peak_mb": 0.0,
"reasoning_activated": False,
"reasoning_input_template": "NULL",
"reasoning_input_tokens": 0,
"reasoning_output": "NULL",
"reasoning_output_tokens": 0,
"reasoning_time_seconds": 0.0,
"reasoning_gpu_peak_mb": 0.0,
})
# ============================================================
# STEP 7-8: PROMPT ASSEMBLY (matches app.py)
# ============================================================
assembly_start = time.time()
# Get active response prompts
active_prompts = prompt_state.get_active_response_prompts()
assembly_time = time.time() - assembly_start
result.update({
"active_response_prompts": ", ".join(active_prompts),
"final_prompt_template": "Response input dict (see response_input_template)",
"final_prompt_tokens": 0, # Will be calculated in response step
"final_prompt_chars": 0,
"final_prompt_words": 0,
"assembly_time_seconds": round(assembly_time, 3),
})
# ============================================================
# STEP 9: RESPONSE GENERATION (βœ… FIXED: Use input_data dict)
# ============================================================
response_start = time.time()
reset_gpu_stats()
# βœ… FIXED: Build input_data dict (matches app.py Step 8)
input_data = {
'user_query': user_prompt,
'conversation_history': recent_history,
'active_prompts': active_prompts,
'thinking_context': thinking_context,
'tool_context': tool_context,
}
# βœ… FIXED: Invoke with dict and extract response (matches app.py)
result_dict = response_agent.invoke(input_data)
raw_response = result_dict.get('response', '')
metadata = result_dict.get('metadata', {})
response_time = time.time() - response_start
raw_tokens = count_tokens_accurate(raw_response)
raw_chars = len(raw_response)
raw_words = count_words(raw_response)
tokens_per_sec = raw_tokens / response_time if response_time > 0 else 0
gpu_metrics = get_gpu_memory()
# Calculate input template string for metrics
input_template_str = f"user_query: {user_prompt[:100]}..., active_prompts: {active_prompts}, thinking: {len(thinking_context)} chars, tool: {len(tool_context)} chars"
result.update({
"response_input_template": input_template_str,
"response_input_tokens": count_tokens_accurate(input_template_str),
"response_raw": raw_response,
"response_raw_tokens": raw_tokens,
"response_raw_chars": raw_chars,
"response_raw_words": raw_words,
"response_generation_time_seconds": round(response_time, 3),
"response_gpu_peak_mb": round(gpu_metrics["peak_mb"], 2),
"response_tokens_per_second": round(tokens_per_sec, 2),
})
# ============================================================
# STEP 10: POST-PROCESSING (matches app.py)
# ============================================================
postprocess_start = time.time()
processed_response = post_processor.process_response(raw_response, user_prompt)
postprocess_time = time.time() - postprocess_start
processed_tokens = count_tokens_accurate(processed_response)
processed_chars = len(processed_response)
processed_words = count_words(processed_response)
result.update({
"response_processed": processed_response,
"response_processed_tokens": processed_tokens,
"response_processed_chars": processed_chars,
"response_processed_words": processed_words,
"postprocessing_time_seconds": round(postprocess_time, 3),
})
# ============================================================
# QUALITY METRICS
# ============================================================
flesch_ease = calculate_flesch_reading_ease(processed_response)
flesch_grade = calculate_flesch_kincaid_grade(processed_response)
completeness = calculate_completeness_score(processed_response, user_prompt)
specificity = calculate_specificity_score(processed_response)
repetition = calculate_repetition_ratio(processed_response)
unique_ratio = calculate_unique_word_ratio(processed_response)
avg_sent_len = calculate_avg_sentence_length(processed_response)
question_answered = check_question_answered(processed_response, user_prompt)
result.update({
"flesch_reading_ease": round(flesch_ease, 2),
"flesch_kincaid_grade": round(flesch_grade, 2),
"completeness_score": round(completeness, 3),
"specificity_score": round(specificity, 3),
"repetition_ratio": round(repetition, 3),
"unique_word_ratio": round(unique_ratio, 3),
"avg_sentence_length": round(avg_sent_len, 2),
"question_answered": question_answered,
})
# ============================================================
# OVERALL METRICS
# ============================================================
total_pipeline_time = time.time() - pipeline_start
# Count activated models
models_activated = []
if result["tool_decision_time_seconds"] > 0:
models_activated.append("Tool Decision")
if result["agent1_time_seconds"] > 0:
models_activated.append("Routing Agents")
if result["math_thinking_activated"]:
models_activated.append("Math Thinking")
if result["qa_design_activated"]:
models_activated.append("QA Design")
if result["reasoning_activated"]:
models_activated.append("Reasoning")
models_activated.append("Response Agent")
# Sum all input tokens
total_input_tokens = (
result["tool_decision_input_tokens"] +
result["agent1_input_tokens"] * 4 + # Multiply back since we divided
result.get("math_thinking_input_tokens", 0) +
result.get("qa_design_input_tokens", 0) +
result.get("reasoning_input_tokens", 0) +
result["response_input_tokens"]
)
# Sum all output tokens
total_output_tokens = (
result["tool_decision_output_tokens"] +
result["agent1_output_tokens"] * 4 +
result.get("math_thinking_output_tokens", 0) +
result.get("qa_design_output_tokens", 0) +
result.get("reasoning_output_tokens", 0) +
result["response_raw_tokens"]
)
# Max GPU across all steps
total_gpu_peak = max([
result["tool_decision_gpu_peak_mb"],
result["agent1_gpu_peak_mb"],
result.get("math_thinking_gpu_peak_mb", 0.0),
result.get("qa_design_gpu_peak_mb", 0.0),
result.get("reasoning_gpu_peak_mb", 0.0),
result["response_gpu_peak_mb"],
])
result.update({
"total_pipeline_time_seconds": round(total_pipeline_time, 3),
"total_input_tokens": total_input_tokens,
"total_output_tokens": total_output_tokens,
"total_gpu_peak_mb": round(total_gpu_peak, 2),
"models_activated_count": len(models_activated),
"models_activated_list": ", ".join(models_activated),
})
logger.info(f"βœ“ Prompt {prompt_index} complete: {total_pipeline_time:.2f}s, {len(models_activated)} models activated")
return result
except Exception as e:
logger.error(f"Pipeline execution failed for prompt {prompt_index}: {e}")
import traceback
traceback.print_exc()
# Return error result with NULLs
error_result = {col: "ERROR" for col in CSV_COLUMNS}
error_result.update({
"prompt_index": prompt_index,
"timestamp": datetime.now().isoformat(),
"user_prompt": user_prompt,
"user_prompt_tokens": count_tokens_accurate(user_prompt),
"user_prompt_chars": len(user_prompt),
"user_prompt_words": count_words(user_prompt),
})
return error_result
# ============================================================================
# BATCH PROCESSING
# ============================================================================
@spaces.GPU(duration=600)
def process_batch_full_pipeline(
user_prompts: List[str],
progress_callback=None
) -> List[Dict]:
"""
Process batch of prompts through FULL PIPELINE.
Sequential processing - one at a time.
Args:
user_prompts: List of user prompts to test
progress_callback: Optional callback for progress updates
Returns:
List of result dictionaries (one per prompt)
"""
results = []
total = len(user_prompts)
logger.info(f"="*60)
logger.info(f"Starting full pipeline batch: {total} prompts")
logger.info(f"="*60)
batch_start = time.time()
for idx, user_prompt in enumerate(user_prompts, 1):
logger.info(f"\n{'='*60}")
logger.info(f"Processing prompt {idx}/{total}")
logger.info(f"Prompt: {user_prompt[:80]}...")
logger.info(f"{'='*60}")
try:
# Run full instrumented pipeline
result = run_full_pipeline_instrumented(user_prompt, prompt_index=idx)
results.append(result)
logger.info(f"βœ“ Prompt {idx} complete")
logger.info(f" Total time: {result.get('total_pipeline_time_seconds', 0):.2f}s")
logger.info(f" Models activated: {result.get('models_activated_count', 0)}")
logger.info(f" Total tokens: {result.get('total_input_tokens', 0) + result.get('total_output_tokens', 0)}")
if progress_callback:
progress_callback(idx, total)
except Exception as e:
logger.error(f"❌ Prompt {idx} failed: {e}")
import traceback
traceback.print_exc()
# Add error result
error_result = {col: "ERROR" for col in CSV_COLUMNS}
error_result.update({
"prompt_index": idx,
"timestamp": datetime.now().isoformat(),
"user_prompt": user_prompt,
"user_prompt_tokens": count_tokens_accurate(user_prompt),
})
results.append(error_result)
batch_duration = time.time() - batch_start
logger.info(f"\n{'='*60}")
logger.info(f"BATCH COMPLETE")
logger.info(f"{'='*60}")
logger.info(f"Processed: {len(results)}/{total} prompts")
logger.info(f"Total batch time: {batch_duration:.2f}s")
logger.info(f"Average per prompt: {batch_duration/total:.2f}s")
logger.info(f"{'='*60}")
return results
# ============================================================================
# CSV EXPORT
# ============================================================================
def export_full_pipeline_csv(
results: List[Dict],
test_name: str = "pipeline_test"
) -> str:
"""
Export full pipeline results to CSV.
Args:
results: List of result dictionaries
test_name: Name for the test (used in filename)
Returns:
Filepath of exported CSV
"""
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"mimir_full_pipeline_{test_name}_{timestamp}.csv"
filepath = os.path.join("/tmp", filename) # Save to /tmp for ZeroGPU
if not results:
logger.warning("No results to export")
return None
logger.info(f"Exporting {len(results)} results to CSV...")
# Write CSV
with open(filepath, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS)
writer.writeheader()
for result in results:
# Fill missing keys with NULL
row = {key: result.get(key, "NULL") for key in CSV_COLUMNS}
writer.writerow(row)
logger.info(f"βœ“ Full pipeline results exported to {filepath}")
logger.info(f" Columns: {len(CSV_COLUMNS)}")
logger.info(f" Rows: {len(results)}")
return filepath
except Exception as e:
logger.error(f"CSV export failed: {e}")
import traceback
traceback.print_exc()
return None
def calculate_summary_stats(results: List[Dict]) -> Dict:
"""Calculate summary statistics from results"""
if not results:
return {}
valid_results = [r for r in results if r.get("total_pipeline_time_seconds") != "ERROR"]
if not valid_results:
return {"error": "No valid results"}
return {
"total_prompts": len(results),
"successful_prompts": len(valid_results),
"failed_prompts": len(results) - len(valid_results),
"avg_pipeline_time_seconds": round(np.mean([r["total_pipeline_time_seconds"] for r in valid_results]), 3),
"min_pipeline_time_seconds": round(np.min([r["total_pipeline_time_seconds"] for r in valid_results]), 3),
"max_pipeline_time_seconds": round(np.max([r["total_pipeline_time_seconds"] for r in valid_results]), 3),
"avg_total_tokens": round(np.mean([r["total_input_tokens"] + r["total_output_tokens"] for r in valid_results]), 1),
"avg_models_activated": round(np.mean([r["models_activated_count"] for r in valid_results]), 2),
"avg_gpu_peak_mb": round(np.mean([r["total_gpu_peak_mb"] for r in valid_results]), 2),
"avg_completeness_score": round(np.mean([r["completeness_score"] for r in valid_results]), 3),
"avg_flesch_reading_ease": round(np.mean([r["flesch_reading_ease"] for r in valid_results]), 2),
"questions_answered_pct": round(100 * sum([r["question_answered"] for r in valid_results]) / len(valid_results), 1),
}
# ============================================================================
# GRADIO INTERFACE
# ============================================================================
with gr.Blocks(title="Mimir - Full Pipeline Testing", theme=gr.themes.Soft()) as demo:
gr.Markdown("# πŸ§ͺ Mimir Full Pipeline Testing")
gr.Markdown("""
Test the **complete orchestration flow** with comprehensive metrics at every step.
**βœ… UPDATED:** Now correctly mirrors app.py orchestrate_turn() process
- Tool decision uses `decide()` method with conversation history
- Response agent invoked with `input_data` dict (not raw string)
- Thinking agents use `process()` method matching app.py
**What this tests:**
- βœ… Tool Decision Agent
- βœ… All 4 Routing Agents (unified process)
- βœ… Thinking Agents (conditional: Math, QA Design, Reasoning)
- βœ… Response Agent (Llama-3.2-3B)
- βœ… Post-processing
**Output:** CSV file with ~110 columns capturing the full pipeline journey
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## πŸ“ Test Configuration")
test_name = gr.Textbox(
label="Test Name",
value="pipeline_test",
placeholder="Enter a name for this test run",
)
gr.Markdown("### Input Method")
input_method = gr.Radio(
choices=["CSV Upload", "Manual Entry"],
value="Manual Entry",
label="Choose Input Method"
)
# CSV upload
with gr.Group(visible=False) as csv_section:
csv_file = gr.File(
label="Upload CSV File",
file_types=[".csv"],
)
# Manual entry
with gr.Group(visible=True) as manual_section:
prompt_text = gr.Textbox(
label="Enter Prompts (one per line)",
lines=15,
placeholder="What is calculus?\nHelp me understand photosynthesis\nCan you create practice questions for algebra?\nExplain Newton's laws of motion",
)
process_btn = gr.Button(
"πŸš€ Run Full Pipeline Test",
variant="primary",
size="lg"
)
status = gr.Textbox(
label="Status",
interactive=False,
lines=3
)
with gr.Column(scale=1):
gr.Markdown("## πŸ“Š Results")
results_summary = gr.JSON(
label="Summary Statistics",
height=400
)
gr.Markdown("### Download Results")
download_csv = gr.File(
label="CSV Export",
interactive=False
)
gr.Markdown("""
**CSV contains ~110 columns:**
- Input metrics (tokens, chars, words)
- Template for each agent
- Output for each agent
- Timing for each step
- GPU usage per step
- Quality metrics (readability, completeness, etc.)
- Overall pipeline metrics
""")
# Toggle between input methods
def toggle_input_method(method):
if method == "CSV Upload":
return gr.update(visible=True), gr.update(visible=False)
else:
return gr.update(visible=False), gr.update(visible=True)
input_method.change(
fn=toggle_input_method,
inputs=[input_method],
outputs=[csv_section, manual_section]
)
# Main processing function
def run_pipeline_test(test_name, input_method, csv_file, prompt_text):
"""Run the full pipeline test"""
# Parse prompts
prompts = []
if input_method == "CSV Upload" and csv_file:
try:
# Read CSV
content = csv_file.decode('utf-8') if isinstance(csv_file, bytes) else csv_file
if hasattr(content, 'read'):
content = content.read()
if isinstance(content, bytes):
content = content.decode('utf-8')
reader = csv.reader(io.StringIO(str(content)))
prompts = [row[0].strip() for row in reader if row and row[0].strip()]
# Skip header if present
if prompts and any(header in prompts[0].lower() for header in ['prompt', 'text', 'query', 'input']):
prompts = prompts[1:]
except Exception as e:
return f"❌ CSV parsing error: {e}", {}, None
elif input_method == "Manual Entry" and prompt_text:
prompts = [p.strip() for p in prompt_text.split('\n') if p.strip()]
if not prompts:
return "❌ No prompts provided. Please enter at least one prompt.", {}, None
status_msg = f"πŸ”„ Processing {len(prompts)} prompts through full pipeline...\n"
status_msg += "This may take several minutes. Please wait...\n"
try:
# Run batch
results = process_batch_full_pipeline(prompts)
# Calculate summary
summary = calculate_summary_stats(results)
# Export CSV
csv_path = export_full_pipeline_csv(results, test_name)
status_msg = f"βœ… Complete!\n"
status_msg += f"Processed: {len(results)} prompts\n"
status_msg += f"Successful: {summary.get('successful_prompts', 0)}\n"
status_msg += f"Failed: {summary.get('failed_prompts', 0)}\n"
status_msg += f"CSV ready for download!"
return status_msg, summary, csv_path
except Exception as e:
error_msg = f"❌ Pipeline test failed: {str(e)}"
logger.error(error_msg)
import traceback
traceback.print_exc()
return error_msg, {}, None
# Wire up event
process_btn.click(
fn=run_pipeline_test,
inputs=[test_name, input_method, csv_file, prompt_text],
outputs=[status, results_summary, download_csv]
)
# ============================================================================
# LAUNCH
# ============================================================================
if __name__ == "__main__":
logger.info("="*60)
logger.info("LAUNCHING MIMIR FULL PIPELINE TESTING INTERFACE")
logger.info("βœ… UPDATED: Now correctly mirrors app.py orchestration")
logger.info("="*60)
logger.info(f"CSV Schema: {len(CSV_COLUMNS)} columns")
logger.info(f"Agents initialized: {AGENTS_AVAILABLE}")
logger.info(f"Tiktoken available: {TIKTOKEN_AVAILABLE}")
logger.info(f"Textstat available: {TEXTSTAT_AVAILABLE}")
logger.info(f"ZeroGPU available: {ZERO_GPU_AVAILABLE}")
logger.info("="*60)
demo.launch(
server_name="0.0.0.0",
server_port=7862,
share=False,
debug=True
)