# gradio_pipeline_testing.py """ Full Pipeline Testing Interface for Mimir Educational AI Assistant Tests the complete orchestration flow with comprehensive metrics at every step. Captures conditional model activation, token usage, timing, and quality metrics. UPDATED: Now correctly mirrors app.py orchestrate_turn() process - Tool decision uses decide() method with conversation history - Response agent invoked with input_data dict (not raw string) - Thinking agents process() method matches app.py - Graph generation included when tools are used Output: CSV file with ~110 columns capturing full pipeline journey """ import os import sys import io import csv import json import time import logging import warnings from datetime import datetime from typing import Dict, List, Optional, Tuple, Any from collections import Counter # Core dependencies import torch import gradio as gr import numpy as np # ============================================================================ # ENVIRONMENT SETUP # ============================================================================ HF_CACHE = "/tmp/huggingface" os.makedirs(f"{HF_CACHE}/hub", exist_ok=True) os.environ['HF_HOME'] = HF_CACHE os.environ['HF_HUB_CACHE'] = f"{HF_CACHE}/hub" # ============================================================================ # IMPORTS FROM MIMIR APPLICATION # ============================================================================ try: from agents import ( ToolDecisionAgent, PromptRoutingAgents, ThinkingAgents, ResponseAgent, ) AGENTS_AVAILABLE = True except ImportError as e: print(f"⚠️ Warning: Could not import agents: {e}") AGENTS_AVAILABLE = False from model_manager import get_model as get_shared_llama try: from state_manager import GlobalStateManager, LogicalExpressions STATE_MANAGER_AVAILABLE = True except ImportError as e: print(f"⚠️ Warning: Could not import state_manager: {e}") STATE_MANAGER_AVAILABLE = False try: from prompt_library import ( CORE_IDENTITY, TOOL_DECISION, agent_1_system, agent_2_system, agent_3_system, agent_4_system, MATH_THINKING, QUESTION_ANSWER_DESIGN, REASONING_THINKING, VAUGE_INPUT, USER_UNDERSTANDING, GENERAL_FORMATTING, LATEX_FORMATTING, GUIDING_TEACHING, STRUCTURE_PRACTICE_QUESTIONS, PRACTICE_QUESTION_FOLLOWUP, TOOL_USE_ENHANCEMENT, ) PROMPTS_AVAILABLE = True except ImportError as e: print(f"⚠️ Warning: Could not import prompt_library: {e}") PROMPTS_AVAILABLE = False # Try to import post processor try: # Import the post processor class/module from app.py import importlib.util spec = importlib.util.spec_from_file_location("app_module", "app.py") app_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(app_module) post_processor = app_module.post_processor POST_PROCESSOR_AVAILABLE = True except Exception as e: print(f"⚠️ Warning: Could not import post_processor: {e}") POST_PROCESSOR_AVAILABLE = False # Create dummy class DummyPostProcessor: def process_response(self, response, user_message): return response post_processor = DummyPostProcessor() # ZeroGPU support try: import spaces ZERO_GPU_AVAILABLE = True except ImportError: ZERO_GPU_AVAILABLE = False class DummySpaces: @staticmethod def GPU(duration=600): def decorator(func): return func return decorator spaces = DummySpaces() # Tiktoken for accurate token counting try: import tiktoken TIKTOKEN_AVAILABLE = True except ImportError: TIKTOKEN_AVAILABLE = False print("⚠️ Warning: tiktoken not available - using fallback token counting") # Textstat for readability metrics try: import textstat TEXTSTAT_AVAILABLE = True except ImportError: TEXTSTAT_AVAILABLE = False print("⚠️ Warning: textstat not available - using manual readability calculations") # ============================================================================ # LOGGING SETUP # ============================================================================ logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) warnings.filterwarnings("ignore", category=UserWarning) warnings.filterwarnings("ignore", category=FutureWarning) CURRENT_YEAR = datetime.now().year # ============================================================================ # GLOBAL INSTANCES # ============================================================================ if AGENTS_AVAILABLE and STATE_MANAGER_AVAILABLE: try: global_state_manager = GlobalStateManager() logical_expressions = LogicalExpressions() tool_agent = ToolDecisionAgent() routing_agents = PromptRoutingAgents() thinking_agents = ThinkingAgents() response_agent = ResponseAgent() logger.info("✓ All agents initialized successfully") except Exception as e: logger.error(f"Failed to initialize agents: {e}") raise else: logger.error("Cannot initialize - missing core dependencies") raise ImportError("Missing required modules: agents or state_manager") # ============================================================================ # CSV SCHEMA DEFINITION # ============================================================================ CSV_COLUMNS = [ # Identification & Input "prompt_index", "timestamp", "user_prompt", "user_prompt_tokens", "user_prompt_chars", "user_prompt_words", # Conversation Context "conversation_history_length", "conversation_history_tokens", # Tool Decision Agent "tool_decision_input_template", "tool_decision_input_tokens", "tool_decision_output", "tool_decision_output_tokens", "tool_decision_result", "tool_decision_time_seconds", "tool_decision_gpu_peak_mb", # Regex Checks "regex_checks_applied", "regex_checks_time_seconds", # Routing Agent 1 "agent1_input_template", "agent1_input_tokens", "agent1_output", "agent1_output_tokens", "agent1_decision", "agent1_time_seconds", "agent1_gpu_peak_mb", # Routing Agent 2 "agent2_input_template", "agent2_input_tokens", "agent2_output", "agent2_output_tokens", "agent2_decision", "agent2_time_seconds", "agent2_gpu_peak_mb", # Routing Agent 3 "agent3_input_template", "agent3_input_tokens", "agent3_output", "agent3_output_tokens", "agent3_decision", "agent3_time_seconds", "agent3_gpu_peak_mb", # Routing Agent 4 "agent4_input_template", "agent4_input_tokens", "agent4_output", "agent4_output_tokens", "agent4_decisions", "agent4_time_seconds", "agent4_gpu_peak_mb", # Math Thinking "math_thinking_activated", "math_thinking_input_template", "math_thinking_input_tokens", "math_thinking_output", "math_thinking_output_tokens", "math_thinking_time_seconds", "math_thinking_gpu_peak_mb", # QA Design Thinking "qa_design_activated", "qa_design_input_template", "qa_design_input_tokens", "qa_design_output", "qa_design_output_tokens", "qa_design_time_seconds", "qa_design_gpu_peak_mb", # Reasoning Thinking "reasoning_activated", "reasoning_input_template", "reasoning_input_tokens", "reasoning_output", "reasoning_output_tokens", "reasoning_time_seconds", "reasoning_gpu_peak_mb", # Prompt Assembly "active_response_prompts", "final_prompt_template", "final_prompt_tokens", "final_prompt_chars", "final_prompt_words", "assembly_time_seconds", # Response Generation "response_input_template", "response_input_tokens", "response_raw", "response_raw_tokens", "response_raw_chars", "response_raw_words", "response_generation_time_seconds", "response_gpu_peak_mb", "response_tokens_per_second", # Post-processing "response_processed", "response_processed_tokens", "response_processed_chars", "response_processed_words", "postprocessing_time_seconds", # Quality Metrics "flesch_reading_ease", "flesch_kincaid_grade", "completeness_score", "specificity_score", "repetition_ratio", "unique_word_ratio", "avg_sentence_length", "question_answered", # Overall Metrics "total_pipeline_time_seconds", "total_input_tokens", "total_output_tokens", "total_gpu_peak_mb", "models_activated_count", "models_activated_list", ] # ============================================================================ # TOKEN COUNTING FUNCTIONS # ============================================================================ def count_tokens_accurate(text: str) -> int: """ Count tokens using tiktoken library for accurate estimation. Args: text: Input text to tokenize Returns: Accurate token count """ if not text: return 0 if not TIKTOKEN_AVAILABLE: # Fallback: word count approximation return len(text.split()) try: # Use cl100k_base encoding (used by GPT-3.5/4, good general estimator) encoding = tiktoken.get_encoding("cl100k_base") tokens = encoding.encode(text) return len(tokens) except Exception as e: logger.warning(f"tiktoken encoding failed: {e}, using fallback") return len(text.split()) def count_words(text: str) -> int: """Count words in text""" if not text: return 0 return len(text.split()) def count_sentences(text: str) -> int: """Count sentences in text (simple heuristic)""" if not text: return 0 import re sentences = re.split(r'[.!?]+', text) return len([s for s in sentences if s.strip()]) # ============================================================================ # GPU MEMORY TRACKING # ============================================================================ def get_gpu_memory() -> Dict[str, float]: """ Get current GPU memory statistics. Returns: Dictionary with allocated, reserved, and peak memory in MB """ if torch.cuda.is_available(): return { "allocated_mb": torch.cuda.memory_allocated() / 1024**2, "reserved_mb": torch.cuda.memory_reserved() / 1024**2, "peak_mb": torch.cuda.max_memory_allocated() / 1024**2 } return { "allocated_mb": 0.0, "reserved_mb": 0.0, "peak_mb": 0.0 } def reset_gpu_stats(): """Reset GPU memory statistics""" if torch.cuda.is_available(): torch.cuda.reset_peak_memory_stats() torch.cuda.synchronize() # ============================================================================ # TEMPLATE BUILDING FUNCTIONS # ============================================================================ def format_history(history: List[Dict]) -> str: """Format conversation history for templates""" if not history: return "No previous conversation" formatted = [] for msg in history[-8:]: # Last 8 messages role = msg.get('role', 'unknown') content = msg.get('content', '')[:100] # Truncate formatted.append(f"{role}: {content}") return "\n".join(formatted) def build_tool_decision_template(user_prompt: str, history: List) -> str: """Build template for tool decision agent - matches app.py""" history_str = format_history(history) return f"{history_str}\n\nUser Query: {user_prompt}" def build_agent1_template(user_prompt: str, history: List) -> str: """Build template for Agent 1: Practice Questions""" history_str = format_history(history) return f"[INST] {agent_1_system}\n\nConversation History:\n{history_str}\n\nCurrent User Query: {user_prompt} [/INST]" def build_agent2_template(user_prompt: str) -> str: """Build template for Agent 2: Discovery Mode""" return f"[INST] {agent_2_system}\n\nUser Query: {user_prompt} [/INST]" def build_agent3_template(user_prompt: str, history: List) -> str: """Build template for Agent 3: Followup Assessment""" history_str = format_history(history) return f"[INST] {agent_3_system}\n\nConversation History:\n{history_str}\n\nCurrent User Query: {user_prompt} [/INST]" def build_agent4_template(user_prompt: str, history: List) -> str: """Build template for Agent 4: Teaching Mode""" history_str = format_history(history) return f"[INST] {agent_4_system}\n\nConversation History:\n{history_str}\n\nCurrent User Query: {user_prompt} [/INST]" def build_math_thinking_template(user_prompt: str) -> str: """Build template for Math Thinking""" return f"[INST] {MATH_THINKING}\n\nUser Query: {user_prompt} [/INST]" def build_qa_design_template(user_prompt: str) -> str: """Build template for QA Design Thinking""" return f"[INST] {QUESTION_ANSWER_DESIGN}\n\nUser Query: {user_prompt} [/INST]" def build_reasoning_template(user_prompt: str) -> str: """Build template for Reasoning Thinking""" return f"[INST] {REASONING_THINKING}\n\nUser Query: {user_prompt} [/INST]" # ============================================================================ # QUALITY METRICS FUNCTIONS # ============================================================================ def estimate_syllables(text: str) -> int: """ Estimate syllable count (rough heuristic). Counts vowel groups. """ import re words = text.lower().split() syllable_count = 0 for word in words: # Remove non-letters word = re.sub(r'[^a-z]', '', word) if not word: continue # Count vowel groups vowel_groups = len(re.findall(r'[aeiouy]+', word)) # Ensure at least 1 syllable per word syllable_count += max(1, vowel_groups) return syllable_count def calculate_flesch_reading_ease(text: str) -> float: """ Calculate Flesch Reading Ease score. Score 0-100: Higher = easier to read 90-100: Very easy (5th grade) 60-70: Standard (8th-9th grade) 0-30: Very difficult (college graduate) Formula: 206.835 - 1.015(words/sentences) - 84.6(syllables/words) """ if not text or len(text.strip()) < 10: return 0.0 if TEXTSTAT_AVAILABLE: try: return textstat.flesch_reading_ease(text) except: pass # Manual calculation words = count_words(text) sentences = count_sentences(text) if sentences == 0 or words == 0: return 0.0 syllables = estimate_syllables(text) if words == 0: return 0.0 score = 206.835 - 1.015 * (words / sentences) - 84.6 * (syllables / words) return max(0.0, min(100.0, score)) def calculate_flesch_kincaid_grade(text: str) -> float: """ Calculate Flesch-Kincaid Grade Level. Returns US grade level needed to understand text. Formula: 0.39(words/sentences) + 11.8(syllables/words) - 15.59 """ if not text or len(text.strip()) < 10: return 0.0 if TEXTSTAT_AVAILABLE: try: return textstat.flesch_kincaid_grade(text) except: pass words = count_words(text) sentences = count_sentences(text) if sentences == 0 or words == 0: return 0.0 syllables = estimate_syllables(text) if words == 0: return 0.0 grade = 0.39 * (words / sentences) + 11.8 * (syllables / words) - 15.59 return max(0.0, grade) def calculate_completeness_score(response: str, user_prompt: str) -> float: """ Estimate if response addresses the prompt. Uses keyword overlap and length heuristics. Returns: Score 0-1 (1 = complete answer) """ if not response or not user_prompt: return 0.0 import re # Extract keywords from prompt prompt_words = set(re.findall(r'\b\w+\b', user_prompt.lower())) # Remove common stopwords stopwords = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'should', 'could', 'may', 'might', 'can', 'what', 'how', 'why', 'when', 'where', 'who', 'which', 'i', 'you', 'we', 'they', 'he', 'she', 'it', 'me', 'him', 'her', 'us', 'them'} prompt_words -= stopwords response_words = set(re.findall(r'\b\w+\b', response.lower())) if not prompt_words: return 0.5 # Neutral if no meaningful keywords # Calculate keyword overlap overlap = len(prompt_words & response_words) / len(prompt_words) # Length factor min_reasonable_length = 20 if len(response) < min_reasonable_length: length_factor = len(response) / min_reasonable_length else: length_factor = 1.0 score = overlap * length_factor return min(1.0, score) def check_question_answered(response: str, user_prompt: str) -> bool: """ Boolean check: does response attempt to answer the question? Heuristics: - Response has minimum length - Response doesn't start with refusal - Response contains relevant keywords """ if not response or len(response) < 10: return False # Check for refusal patterns refusal_patterns = [ "i don't know", "i cannot", "i can't", "i'm not sure", "i don't have", "unable to", "sorry, i" ] response_lower = response.lower() for pattern in refusal_patterns: if response_lower.startswith(pattern): return False # Check for minimum completeness completeness = calculate_completeness_score(response, user_prompt) return completeness > 0.3 def calculate_specificity_score(response: str) -> float: """ Measure how specific vs vague the response is. Indicators of specificity: - Numbers, dates, names - Technical terms - Examples - Concrete nouns Returns: Score 0-1 (1 = very specific) """ if not response: return 0.0 import re specificity_indicators = 0 total_possible = 5 # 1. Contains numbers if re.search(r'\d+', response): specificity_indicators += 1 # 2. Contains proper nouns proper_nouns = len(re.findall(r'(? 0: specificity_indicators += 1 # 3. Contains example phrases example_phrases = ['for example', 'such as', 'for instance', 'like', 'including'] if any(phrase in response.lower() for phrase in example_phrases): specificity_indicators += 1 # 4. Average word length words = response.split() if words: avg_word_length = sum(len(w) for w in words) / len(words) if avg_word_length > 5.0: specificity_indicators += 1 # 5. Response length if len(response) > 200: specificity_indicators += 1 return specificity_indicators / total_possible def calculate_repetition_ratio(text: str) -> float: """ Measure token/word repetition. Lower = better (less repetitive) Returns: Ratio of repeated tokens to total tokens (0-1) """ if not text: return 0.0 words = text.lower().split() if len(words) < 2: return 0.0 word_counts = Counter(words) # Count words that appear more than once repeated_words = sum(count - 1 for count in word_counts.values() if count > 1) ratio = repeated_words / len(words) return min(1.0, ratio) def calculate_unique_word_ratio(text: str) -> float: """ Measure vocabulary diversity. Higher = more diverse vocabulary Returns: Ratio of unique words to total words (0-1) """ if not text: return 0.0 words = text.lower().split() if not words: return 0.0 unique_words = len(set(words)) return unique_words / len(words) def calculate_avg_sentence_length(text: str) -> float: """Calculate average sentence length in words""" sentences = count_sentences(text) words = count_words(text) if sentences == 0: return 0.0 return words / sentences # ============================================================================ # INSTRUMENTED PIPELINE RUNNER # ============================================================================ def run_full_pipeline_instrumented(user_prompt: str, prompt_index: int = 1) -> Dict: """ Run the complete orchestration pipeline with full instrumentation. Captures metrics at every step. ✅ UPDATED: Now correctly mirrors app.py orchestrate_turn() process Args: user_prompt: User's input prompt prompt_index: Index number for this prompt in batch Returns: Dictionary with all metrics for CSV export """ result = { "prompt_index": prompt_index, "timestamp": datetime.now().isoformat(), "user_prompt": user_prompt, "user_prompt_tokens": count_tokens_accurate(user_prompt), "user_prompt_chars": len(user_prompt), "user_prompt_words": count_words(user_prompt), } # Track overall start time pipeline_start = time.time() try: # ============================================================ # STEP 1-2: SETUP # ============================================================ setup_start = time.time() # Reset state global_state_manager.reset_prompt_state() prompt_state = global_state_manager.get_prompt_state_manager() # Get conversation history (empty for testing) recent_history = [] recent_history_formatted = "No previous conversation" result["conversation_history_length"] = 0 result["conversation_history_tokens"] = 0 # ============================================================ # STEP 3: TOOL DECISION AGENT (✅ FIXED: Use decide() with history) # ============================================================ tool_start = time.time() tool_template = build_tool_decision_template(user_prompt, recent_history) tool_input_tokens = count_tokens_accurate(tool_template) reset_gpu_stats() # ✅ FIXED: Use decide() method with conversation history (matches app.py) tool_decision_result = tool_agent.decide(user_prompt, recent_history) # Capture output tool_output = str(tool_decision_result) tool_output_tokens = count_tokens_accurate(tool_output) gpu_metrics = get_gpu_memory() tool_time = time.time() - tool_start # Record result.update({ "tool_decision_input_template": tool_template, "tool_decision_input_tokens": tool_input_tokens, "tool_decision_output": tool_output, "tool_decision_output_tokens": tool_output_tokens, "tool_decision_result": bool(tool_decision_result), "tool_decision_time_seconds": round(tool_time, 3), "tool_decision_gpu_peak_mb": round(gpu_metrics["peak_mb"], 2), }) # Update state tool_img_output = "" tool_context = "" if tool_decision_result: prompt_state.update("TOOL_USE_ENHANCEMENT", True) # Note: In real app.py, graph generation happens here # For testing, we'll just note that tools would be used tool_context = "Tool usage detected (graph would be generated in production)" # ============================================================ # STEP 4: REGEX CHECKS # ============================================================ regex_start = time.time() # Apply regex checks (returns list of activated prompts) regex_before = set(prompt_state.get_active_response_prompts()) logical_expressions.apply_all_checks(user_prompt, prompt_state) regex_after = set(prompt_state.get_active_response_prompts()) regex_applied = list(regex_after - regex_before) regex_time = time.time() - regex_start result.update({ "regex_checks_applied": ", ".join(regex_applied) if regex_applied else "None", "regex_checks_time_seconds": round(regex_time, 3), }) # ============================================================ # STEP 5: ROUTING AGENTS (✅ Unified Process - matches app.py) # ============================================================ routing_start = time.time() # Build template (simplified - just the user prompt) routing_template = f"User Query: {user_prompt}" routing_input_tokens = count_tokens_accurate(routing_template) reset_gpu_stats() # ✅ Use unified process() method (matches app.py) response_prompts_str, thinking_prompts_str = routing_agents.process( user_input=user_prompt, tool_used=(tool_decision_result and bool(tool_img_output)) ) # Parse results response_prompts = [p.strip() for p in response_prompts_str.split('\n') if p.strip()] if response_prompts_str else [] thinking_prompts = [p.strip() for p in thinking_prompts_str.split('\n') if p.strip()] if thinking_prompts_str else [] routing_output = f"Response: {', '.join(response_prompts) if response_prompts else 'None'}\nThinking: {', '.join(thinking_prompts) if thinking_prompts else 'None'}" routing_output_tokens = count_tokens_accurate(routing_output) gpu_metrics = get_gpu_memory() routing_time = time.time() - routing_start # Update result with consolidated routing metrics result.update({ # Agent 1 metrics (legacy columns - use consolidated data) "agent1_input_template": routing_template, "agent1_input_tokens": routing_input_tokens // 4, # Divide among 4 agents "agent1_output": ", ".join([p for p in response_prompts if p in ["STRUCTURE_PRACTICE_QUESTIONS"]]) or "None", "agent1_output_tokens": routing_output_tokens // 4, "agent1_decision": "STRUCTURE_PRACTICE_QUESTIONS" in response_prompts, "agent1_time_seconds": round(routing_time / 4, 3), "agent1_gpu_peak_mb": round(gpu_metrics["peak_mb"] / 4, 2), # Agent 2 metrics "agent2_input_template": routing_template, "agent2_input_tokens": routing_input_tokens // 4, "agent2_output": ", ".join([p for p in response_prompts if p in ["GENERAL_FORMATTING", "LATEX_FORMATTING", "GUIDING_TEACHING"]]) or "None", "agent2_output_tokens": routing_output_tokens // 4, "agent2_decision": ", ".join([p for p in response_prompts if p in ["GENERAL_FORMATTING", "LATEX_FORMATTING", "GUIDING_TEACHING"]]) or "NULL", "agent2_time_seconds": round(routing_time / 4, 3), "agent2_gpu_peak_mb": round(gpu_metrics["peak_mb"] / 4, 2), # Agent 3 metrics "agent3_input_template": routing_template, "agent3_input_tokens": routing_input_tokens // 4, "agent3_output": ", ".join([p for p in response_prompts + thinking_prompts if p in ["PRACTICE_QUESTION_FOLLOWUP", "MATH_THINKING", "QUESTION_ANSWER_DESIGN", "REASONING_THINKING"]]) or "None", "agent3_output_tokens": routing_output_tokens // 4, "agent3_decision": any(p in ["PRACTICE_QUESTION_FOLLOWUP", "MATH_THINKING", "QUESTION_ANSWER_DESIGN", "REASONING_THINKING"] for p in response_prompts + thinking_prompts), "agent3_time_seconds": round(routing_time / 4, 3), "agent3_gpu_peak_mb": round(gpu_metrics["peak_mb"] / 4, 2), # Agent 4 metrics "agent4_input_template": routing_template, "agent4_input_tokens": routing_input_tokens // 4, "agent4_output": ", ".join([p for p in response_prompts if p == "TOOL_USE_ENHANCEMENT"]) or "None", "agent4_output_tokens": routing_output_tokens // 4, "agent4_decisions": "TOOL_USE_ENHANCEMENT" if "TOOL_USE_ENHANCEMENT" in response_prompts else "NULL", "agent4_time_seconds": round(routing_time / 4, 3), "agent4_gpu_peak_mb": round(gpu_metrics["peak_mb"] / 4, 2), }) # Update prompt state with all activated prompts for prompt_name in response_prompts: prompt_state.update(prompt_name, True) for prompt_name in thinking_prompts: prompt_state.update(prompt_name, True) # ============================================================ # STEP 6: THINKING AGENTS (✅ FIXED: Use process() - matches app.py) # ============================================================ # Build thinking prompts list (matches app.py logic) thinking_prompts_list = [] for prompt_name in thinking_prompts: if prompt_name.strip(): thinking_prompts_list.append(prompt_name.strip()) # Additional heuristic: Add MATH_THINKING if LATEX_FORMATTING is active if prompt_state.is_active("LATEX_FORMATTING") and "MATH_THINKING" not in thinking_prompts_list: thinking_prompts_list.append("MATH_THINKING") prompt_state.update("MATH_THINKING", True) # Execute thinking agents if any are active thinking_context = "" if thinking_prompts_list: thinking_start = time.time() thinking_prompts_string = '\n'.join(thinking_prompts_list) reset_gpu_stats() # ✅ FIXED: Use process() method (matches app.py) thinking_context = thinking_agents.process( user_input=user_prompt, conversation_history=recent_history_formatted, thinking_prompts=thinking_prompts_string, tool_img_output=tool_img_output, tool_context=tool_context ) thinking_time = time.time() - thinking_start gpu_metrics = get_gpu_memory() # Record metrics for activated thinking agents # Note: For simplicity, we're recording aggregate metrics # In production, you might want to separate these if "MATH_THINKING" in thinking_prompts_list: result.update({ "math_thinking_activated": True, "math_thinking_input_template": build_math_thinking_template(user_prompt), "math_thinking_input_tokens": count_tokens_accurate(user_prompt), "math_thinking_output": thinking_context[:500], # Truncate for CSV "math_thinking_output_tokens": count_tokens_accurate(thinking_context), "math_thinking_time_seconds": round(thinking_time / len(thinking_prompts_list), 3), "math_thinking_gpu_peak_mb": round(gpu_metrics["peak_mb"] / len(thinking_prompts_list), 2), }) else: result.update({ "math_thinking_activated": False, "math_thinking_input_template": "NULL", "math_thinking_input_tokens": 0, "math_thinking_output": "NULL", "math_thinking_output_tokens": 0, "math_thinking_time_seconds": 0.0, "math_thinking_gpu_peak_mb": 0.0, }) if "QUESTION_ANSWER_DESIGN" in thinking_prompts_list: result.update({ "qa_design_activated": True, "qa_design_input_template": build_qa_design_template(user_prompt), "qa_design_input_tokens": count_tokens_accurate(user_prompt), "qa_design_output": thinking_context[:500], "qa_design_output_tokens": count_tokens_accurate(thinking_context), "qa_design_time_seconds": round(thinking_time / len(thinking_prompts_list), 3), "qa_design_gpu_peak_mb": round(gpu_metrics["peak_mb"] / len(thinking_prompts_list), 2), }) else: result.update({ "qa_design_activated": False, "qa_design_input_template": "NULL", "qa_design_input_tokens": 0, "qa_design_output": "NULL", "qa_design_output_tokens": 0, "qa_design_time_seconds": 0.0, "qa_design_gpu_peak_mb": 0.0, }) if "REASONING_THINKING" in thinking_prompts_list: result.update({ "reasoning_activated": True, "reasoning_input_template": build_reasoning_template(user_prompt), "reasoning_input_tokens": count_tokens_accurate(user_prompt), "reasoning_output": thinking_context[:500], "reasoning_output_tokens": count_tokens_accurate(thinking_context), "reasoning_time_seconds": round(thinking_time / len(thinking_prompts_list), 3), "reasoning_gpu_peak_mb": round(gpu_metrics["peak_mb"] / len(thinking_prompts_list), 2), }) else: result.update({ "reasoning_activated": False, "reasoning_input_template": "NULL", "reasoning_input_tokens": 0, "reasoning_output": "NULL", "reasoning_output_tokens": 0, "reasoning_time_seconds": 0.0, "reasoning_gpu_peak_mb": 0.0, }) else: # No thinking agents activated result.update({ "math_thinking_activated": False, "math_thinking_input_template": "NULL", "math_thinking_input_tokens": 0, "math_thinking_output": "NULL", "math_thinking_output_tokens": 0, "math_thinking_time_seconds": 0.0, "math_thinking_gpu_peak_mb": 0.0, "qa_design_activated": False, "qa_design_input_template": "NULL", "qa_design_input_tokens": 0, "qa_design_output": "NULL", "qa_design_output_tokens": 0, "qa_design_time_seconds": 0.0, "qa_design_gpu_peak_mb": 0.0, "reasoning_activated": False, "reasoning_input_template": "NULL", "reasoning_input_tokens": 0, "reasoning_output": "NULL", "reasoning_output_tokens": 0, "reasoning_time_seconds": 0.0, "reasoning_gpu_peak_mb": 0.0, }) # ============================================================ # STEP 7-8: PROMPT ASSEMBLY (matches app.py) # ============================================================ assembly_start = time.time() # Get active response prompts active_prompts = prompt_state.get_active_response_prompts() assembly_time = time.time() - assembly_start result.update({ "active_response_prompts": ", ".join(active_prompts), "final_prompt_template": "Response input dict (see response_input_template)", "final_prompt_tokens": 0, # Will be calculated in response step "final_prompt_chars": 0, "final_prompt_words": 0, "assembly_time_seconds": round(assembly_time, 3), }) # ============================================================ # STEP 9: RESPONSE GENERATION (✅ FIXED: Use input_data dict) # ============================================================ response_start = time.time() reset_gpu_stats() # ✅ FIXED: Build input_data dict (matches app.py Step 8) input_data = { 'user_query': user_prompt, 'conversation_history': recent_history, 'active_prompts': active_prompts, 'thinking_context': thinking_context, 'tool_context': tool_context, } # ✅ FIXED: Invoke with dict and extract response (matches app.py) result_dict = response_agent.invoke(input_data) raw_response = result_dict.get('response', '') metadata = result_dict.get('metadata', {}) response_time = time.time() - response_start raw_tokens = count_tokens_accurate(raw_response) raw_chars = len(raw_response) raw_words = count_words(raw_response) tokens_per_sec = raw_tokens / response_time if response_time > 0 else 0 gpu_metrics = get_gpu_memory() # Calculate input template string for metrics input_template_str = f"user_query: {user_prompt[:100]}..., active_prompts: {active_prompts}, thinking: {len(thinking_context)} chars, tool: {len(tool_context)} chars" result.update({ "response_input_template": input_template_str, "response_input_tokens": count_tokens_accurate(input_template_str), "response_raw": raw_response, "response_raw_tokens": raw_tokens, "response_raw_chars": raw_chars, "response_raw_words": raw_words, "response_generation_time_seconds": round(response_time, 3), "response_gpu_peak_mb": round(gpu_metrics["peak_mb"], 2), "response_tokens_per_second": round(tokens_per_sec, 2), }) # ============================================================ # STEP 10: POST-PROCESSING (matches app.py) # ============================================================ postprocess_start = time.time() processed_response = post_processor.process_response(raw_response, user_prompt) postprocess_time = time.time() - postprocess_start processed_tokens = count_tokens_accurate(processed_response) processed_chars = len(processed_response) processed_words = count_words(processed_response) result.update({ "response_processed": processed_response, "response_processed_tokens": processed_tokens, "response_processed_chars": processed_chars, "response_processed_words": processed_words, "postprocessing_time_seconds": round(postprocess_time, 3), }) # ============================================================ # QUALITY METRICS # ============================================================ flesch_ease = calculate_flesch_reading_ease(processed_response) flesch_grade = calculate_flesch_kincaid_grade(processed_response) completeness = calculate_completeness_score(processed_response, user_prompt) specificity = calculate_specificity_score(processed_response) repetition = calculate_repetition_ratio(processed_response) unique_ratio = calculate_unique_word_ratio(processed_response) avg_sent_len = calculate_avg_sentence_length(processed_response) question_answered = check_question_answered(processed_response, user_prompt) result.update({ "flesch_reading_ease": round(flesch_ease, 2), "flesch_kincaid_grade": round(flesch_grade, 2), "completeness_score": round(completeness, 3), "specificity_score": round(specificity, 3), "repetition_ratio": round(repetition, 3), "unique_word_ratio": round(unique_ratio, 3), "avg_sentence_length": round(avg_sent_len, 2), "question_answered": question_answered, }) # ============================================================ # OVERALL METRICS # ============================================================ total_pipeline_time = time.time() - pipeline_start # Count activated models models_activated = [] if result["tool_decision_time_seconds"] > 0: models_activated.append("Tool Decision") if result["agent1_time_seconds"] > 0: models_activated.append("Routing Agents") if result["math_thinking_activated"]: models_activated.append("Math Thinking") if result["qa_design_activated"]: models_activated.append("QA Design") if result["reasoning_activated"]: models_activated.append("Reasoning") models_activated.append("Response Agent") # Sum all input tokens total_input_tokens = ( result["tool_decision_input_tokens"] + result["agent1_input_tokens"] * 4 + # Multiply back since we divided result.get("math_thinking_input_tokens", 0) + result.get("qa_design_input_tokens", 0) + result.get("reasoning_input_tokens", 0) + result["response_input_tokens"] ) # Sum all output tokens total_output_tokens = ( result["tool_decision_output_tokens"] + result["agent1_output_tokens"] * 4 + result.get("math_thinking_output_tokens", 0) + result.get("qa_design_output_tokens", 0) + result.get("reasoning_output_tokens", 0) + result["response_raw_tokens"] ) # Max GPU across all steps total_gpu_peak = max([ result["tool_decision_gpu_peak_mb"], result["agent1_gpu_peak_mb"], result.get("math_thinking_gpu_peak_mb", 0.0), result.get("qa_design_gpu_peak_mb", 0.0), result.get("reasoning_gpu_peak_mb", 0.0), result["response_gpu_peak_mb"], ]) result.update({ "total_pipeline_time_seconds": round(total_pipeline_time, 3), "total_input_tokens": total_input_tokens, "total_output_tokens": total_output_tokens, "total_gpu_peak_mb": round(total_gpu_peak, 2), "models_activated_count": len(models_activated), "models_activated_list": ", ".join(models_activated), }) logger.info(f"✓ Prompt {prompt_index} complete: {total_pipeline_time:.2f}s, {len(models_activated)} models activated") return result except Exception as e: logger.error(f"Pipeline execution failed for prompt {prompt_index}: {e}") import traceback traceback.print_exc() # Return error result with NULLs error_result = {col: "ERROR" for col in CSV_COLUMNS} error_result.update({ "prompt_index": prompt_index, "timestamp": datetime.now().isoformat(), "user_prompt": user_prompt, "user_prompt_tokens": count_tokens_accurate(user_prompt), "user_prompt_chars": len(user_prompt), "user_prompt_words": count_words(user_prompt), }) return error_result # ============================================================================ # BATCH PROCESSING # ============================================================================ @spaces.GPU(duration=600) def process_batch_full_pipeline( user_prompts: List[str], progress_callback=None ) -> List[Dict]: """ Process batch of prompts through FULL PIPELINE. Sequential processing - one at a time. Args: user_prompts: List of user prompts to test progress_callback: Optional callback for progress updates Returns: List of result dictionaries (one per prompt) """ results = [] total = len(user_prompts) logger.info(f"="*60) logger.info(f"Starting full pipeline batch: {total} prompts") logger.info(f"="*60) batch_start = time.time() for idx, user_prompt in enumerate(user_prompts, 1): logger.info(f"\n{'='*60}") logger.info(f"Processing prompt {idx}/{total}") logger.info(f"Prompt: {user_prompt[:80]}...") logger.info(f"{'='*60}") try: # Run full instrumented pipeline result = run_full_pipeline_instrumented(user_prompt, prompt_index=idx) results.append(result) logger.info(f"✓ Prompt {idx} complete") logger.info(f" Total time: {result.get('total_pipeline_time_seconds', 0):.2f}s") logger.info(f" Models activated: {result.get('models_activated_count', 0)}") logger.info(f" Total tokens: {result.get('total_input_tokens', 0) + result.get('total_output_tokens', 0)}") if progress_callback: progress_callback(idx, total) except Exception as e: logger.error(f"❌ Prompt {idx} failed: {e}") import traceback traceback.print_exc() # Add error result error_result = {col: "ERROR" for col in CSV_COLUMNS} error_result.update({ "prompt_index": idx, "timestamp": datetime.now().isoformat(), "user_prompt": user_prompt, "user_prompt_tokens": count_tokens_accurate(user_prompt), }) results.append(error_result) batch_duration = time.time() - batch_start logger.info(f"\n{'='*60}") logger.info(f"BATCH COMPLETE") logger.info(f"{'='*60}") logger.info(f"Processed: {len(results)}/{total} prompts") logger.info(f"Total batch time: {batch_duration:.2f}s") logger.info(f"Average per prompt: {batch_duration/total:.2f}s") logger.info(f"{'='*60}") return results # ============================================================================ # CSV EXPORT # ============================================================================ def export_full_pipeline_csv( results: List[Dict], test_name: str = "pipeline_test" ) -> str: """ Export full pipeline results to CSV. Args: results: List of result dictionaries test_name: Name for the test (used in filename) Returns: Filepath of exported CSV """ try: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f"mimir_full_pipeline_{test_name}_{timestamp}.csv" filepath = os.path.join("/tmp", filename) # Save to /tmp for ZeroGPU if not results: logger.warning("No results to export") return None logger.info(f"Exporting {len(results)} results to CSV...") # Write CSV with open(filepath, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS) writer.writeheader() for result in results: # Fill missing keys with NULL row = {key: result.get(key, "NULL") for key in CSV_COLUMNS} writer.writerow(row) logger.info(f"✓ Full pipeline results exported to {filepath}") logger.info(f" Columns: {len(CSV_COLUMNS)}") logger.info(f" Rows: {len(results)}") return filepath except Exception as e: logger.error(f"CSV export failed: {e}") import traceback traceback.print_exc() return None def calculate_summary_stats(results: List[Dict]) -> Dict: """Calculate summary statistics from results""" if not results: return {} valid_results = [r for r in results if r.get("total_pipeline_time_seconds") != "ERROR"] if not valid_results: return {"error": "No valid results"} return { "total_prompts": len(results), "successful_prompts": len(valid_results), "failed_prompts": len(results) - len(valid_results), "avg_pipeline_time_seconds": round(np.mean([r["total_pipeline_time_seconds"] for r in valid_results]), 3), "min_pipeline_time_seconds": round(np.min([r["total_pipeline_time_seconds"] for r in valid_results]), 3), "max_pipeline_time_seconds": round(np.max([r["total_pipeline_time_seconds"] for r in valid_results]), 3), "avg_total_tokens": round(np.mean([r["total_input_tokens"] + r["total_output_tokens"] for r in valid_results]), 1), "avg_models_activated": round(np.mean([r["models_activated_count"] for r in valid_results]), 2), "avg_gpu_peak_mb": round(np.mean([r["total_gpu_peak_mb"] for r in valid_results]), 2), "avg_completeness_score": round(np.mean([r["completeness_score"] for r in valid_results]), 3), "avg_flesch_reading_ease": round(np.mean([r["flesch_reading_ease"] for r in valid_results]), 2), "questions_answered_pct": round(100 * sum([r["question_answered"] for r in valid_results]) / len(valid_results), 1), } # ============================================================================ # GRADIO INTERFACE # ============================================================================ with gr.Blocks(title="Mimir - Full Pipeline Testing", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🧪 Mimir Full Pipeline Testing") gr.Markdown(""" Test the **complete orchestration flow** with comprehensive metrics at every step. **✅ UPDATED:** Now correctly mirrors app.py orchestrate_turn() process - Tool decision uses `decide()` method with conversation history - Response agent invoked with `input_data` dict (not raw string) - Thinking agents use `process()` method matching app.py **What this tests:** - ✅ Tool Decision Agent - ✅ All 4 Routing Agents (unified process) - ✅ Thinking Agents (conditional: Math, QA Design, Reasoning) - ✅ Response Agent (Llama-3.2-3B) - ✅ Post-processing **Output:** CSV file with ~110 columns capturing the full pipeline journey """) with gr.Row(): with gr.Column(scale=1): gr.Markdown("## 📝 Test Configuration") test_name = gr.Textbox( label="Test Name", value="pipeline_test", placeholder="Enter a name for this test run", ) gr.Markdown("### Input Method") input_method = gr.Radio( choices=["CSV Upload", "Manual Entry"], value="Manual Entry", label="Choose Input Method" ) # CSV upload with gr.Group(visible=False) as csv_section: csv_file = gr.File( label="Upload CSV File", file_types=[".csv"], ) # Manual entry with gr.Group(visible=True) as manual_section: prompt_text = gr.Textbox( label="Enter Prompts (one per line)", lines=15, placeholder="What is calculus?\nHelp me understand photosynthesis\nCan you create practice questions for algebra?\nExplain Newton's laws of motion", ) process_btn = gr.Button( "🚀 Run Full Pipeline Test", variant="primary", size="lg" ) status = gr.Textbox( label="Status", interactive=False, lines=3 ) with gr.Column(scale=1): gr.Markdown("## 📊 Results") results_summary = gr.JSON( label="Summary Statistics", height=400 ) gr.Markdown("### Download Results") download_csv = gr.File( label="CSV Export", interactive=False ) gr.Markdown(""" **CSV contains ~110 columns:** - Input metrics (tokens, chars, words) - Template for each agent - Output for each agent - Timing for each step - GPU usage per step - Quality metrics (readability, completeness, etc.) - Overall pipeline metrics """) # Toggle between input methods def toggle_input_method(method): if method == "CSV Upload": return gr.update(visible=True), gr.update(visible=False) else: return gr.update(visible=False), gr.update(visible=True) input_method.change( fn=toggle_input_method, inputs=[input_method], outputs=[csv_section, manual_section] ) # Main processing function def run_pipeline_test(test_name, input_method, csv_file, prompt_text): """Run the full pipeline test""" # Parse prompts prompts = [] if input_method == "CSV Upload" and csv_file: try: # Read CSV content = csv_file.decode('utf-8') if isinstance(csv_file, bytes) else csv_file if hasattr(content, 'read'): content = content.read() if isinstance(content, bytes): content = content.decode('utf-8') reader = csv.reader(io.StringIO(str(content))) prompts = [row[0].strip() for row in reader if row and row[0].strip()] # Skip header if present if prompts and any(header in prompts[0].lower() for header in ['prompt', 'text', 'query', 'input']): prompts = prompts[1:] except Exception as e: return f"❌ CSV parsing error: {e}", {}, None elif input_method == "Manual Entry" and prompt_text: prompts = [p.strip() for p in prompt_text.split('\n') if p.strip()] if not prompts: return "❌ No prompts provided. Please enter at least one prompt.", {}, None status_msg = f"🔄 Processing {len(prompts)} prompts through full pipeline...\n" status_msg += "This may take several minutes. Please wait...\n" try: # Run batch results = process_batch_full_pipeline(prompts) # Calculate summary summary = calculate_summary_stats(results) # Export CSV csv_path = export_full_pipeline_csv(results, test_name) status_msg = f"✅ Complete!\n" status_msg += f"Processed: {len(results)} prompts\n" status_msg += f"Successful: {summary.get('successful_prompts', 0)}\n" status_msg += f"Failed: {summary.get('failed_prompts', 0)}\n" status_msg += f"CSV ready for download!" return status_msg, summary, csv_path except Exception as e: error_msg = f"❌ Pipeline test failed: {str(e)}" logger.error(error_msg) import traceback traceback.print_exc() return error_msg, {}, None # Wire up event process_btn.click( fn=run_pipeline_test, inputs=[test_name, input_method, csv_file, prompt_text], outputs=[status, results_summary, download_csv] ) # ============================================================================ # LAUNCH # ============================================================================ if __name__ == "__main__": logger.info("="*60) logger.info("LAUNCHING MIMIR FULL PIPELINE TESTING INTERFACE") logger.info("✅ UPDATED: Now correctly mirrors app.py orchestration") logger.info("="*60) logger.info(f"CSV Schema: {len(CSV_COLUMNS)} columns") logger.info(f"Agents initialized: {AGENTS_AVAILABLE}") logger.info(f"Tiktoken available: {TIKTOKEN_AVAILABLE}") logger.info(f"Textstat available: {TEXTSTAT_AVAILABLE}") logger.info(f"ZeroGPU available: {ZERO_GPU_AVAILABLE}") logger.info("="*60) demo.launch( server_name="0.0.0.0", server_port=7862, share=False, debug=True )