Spaces:
Sleeping
Sleeping
| # gradio_pipeline_testing.py | |
| """ | |
| Full Pipeline Testing Interface for Mimir Educational AI Assistant | |
| Tests the complete orchestration flow with comprehensive metrics at every step. | |
| Captures conditional model activation, token usage, timing, and quality metrics. | |
| UPDATED: Now correctly mirrors app.py orchestrate_turn() process | |
| - Tool decision uses decide() method with conversation history | |
| - Response agent invoked with input_data dict (not raw string) | |
| - Thinking agents process() method matches app.py | |
| - Graph generation included when tools are used | |
| Output: CSV file with ~110 columns capturing full pipeline journey | |
| """ | |
| import os | |
| import sys | |
| import io | |
| import csv | |
| import json | |
| import time | |
| import logging | |
| import warnings | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Tuple, Any | |
| from collections import Counter | |
| # Core dependencies | |
| import torch | |
| import gradio as gr | |
| import numpy as np | |
| # ============================================================================ | |
| # ENVIRONMENT SETUP | |
| # ============================================================================ | |
| HF_CACHE = "/tmp/huggingface" | |
| os.makedirs(f"{HF_CACHE}/hub", exist_ok=True) | |
| os.environ['HF_HOME'] = HF_CACHE | |
| os.environ['HF_HUB_CACHE'] = f"{HF_CACHE}/hub" | |
| # ============================================================================ | |
| # IMPORTS FROM MIMIR APPLICATION | |
| # ============================================================================ | |
| try: | |
| from agents import ( | |
| ToolDecisionAgent, | |
| PromptRoutingAgents, | |
| ThinkingAgents, | |
| ResponseAgent, | |
| ) | |
| AGENTS_AVAILABLE = True | |
| except ImportError as e: | |
| print(f"β οΈ Warning: Could not import agents: {e}") | |
| AGENTS_AVAILABLE = False | |
| from model_manager import get_model as get_shared_llama | |
| try: | |
| from state_manager import GlobalStateManager, LogicalExpressions | |
| STATE_MANAGER_AVAILABLE = True | |
| except ImportError as e: | |
| print(f"β οΈ Warning: Could not import state_manager: {e}") | |
| STATE_MANAGER_AVAILABLE = False | |
| try: | |
| from prompt_library import ( | |
| CORE_IDENTITY, | |
| TOOL_DECISION, | |
| agent_1_system, | |
| agent_2_system, | |
| agent_3_system, | |
| agent_4_system, | |
| MATH_THINKING, | |
| QUESTION_ANSWER_DESIGN, | |
| REASONING_THINKING, | |
| VAUGE_INPUT, | |
| USER_UNDERSTANDING, | |
| GENERAL_FORMATTING, | |
| LATEX_FORMATTING, | |
| GUIDING_TEACHING, | |
| STRUCTURE_PRACTICE_QUESTIONS, | |
| PRACTICE_QUESTION_FOLLOWUP, | |
| TOOL_USE_ENHANCEMENT, | |
| ) | |
| PROMPTS_AVAILABLE = True | |
| except ImportError as e: | |
| print(f"β οΈ Warning: Could not import prompt_library: {e}") | |
| PROMPTS_AVAILABLE = False | |
| # Try to import post processor | |
| try: | |
| # Import the post processor class/module from app.py | |
| import importlib.util | |
| spec = importlib.util.spec_from_file_location("app_module", "app.py") | |
| app_module = importlib.util.module_from_spec(spec) | |
| spec.loader.exec_module(app_module) | |
| post_processor = app_module.post_processor | |
| POST_PROCESSOR_AVAILABLE = True | |
| except Exception as e: | |
| print(f"β οΈ Warning: Could not import post_processor: {e}") | |
| POST_PROCESSOR_AVAILABLE = False | |
| # Create dummy | |
| class DummyPostProcessor: | |
| def process_response(self, response, user_message): | |
| return response | |
| post_processor = DummyPostProcessor() | |
| # ZeroGPU support | |
| try: | |
| import spaces | |
| ZERO_GPU_AVAILABLE = True | |
| except ImportError: | |
| ZERO_GPU_AVAILABLE = False | |
| class DummySpaces: | |
| def GPU(duration=600): | |
| def decorator(func): | |
| return func | |
| return decorator | |
| spaces = DummySpaces() | |
| # Tiktoken for accurate token counting | |
| try: | |
| import tiktoken | |
| TIKTOKEN_AVAILABLE = True | |
| except ImportError: | |
| TIKTOKEN_AVAILABLE = False | |
| print("β οΈ Warning: tiktoken not available - using fallback token counting") | |
| # Textstat for readability metrics | |
| try: | |
| import textstat | |
| TEXTSTAT_AVAILABLE = True | |
| except ImportError: | |
| TEXTSTAT_AVAILABLE = False | |
| print("β οΈ Warning: textstat not available - using manual readability calculations") | |
| # ============================================================================ | |
| # LOGGING SETUP | |
| # ============================================================================ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| warnings.filterwarnings("ignore", category=FutureWarning) | |
| CURRENT_YEAR = datetime.now().year | |
| # ============================================================================ | |
| # GLOBAL INSTANCES | |
| # ============================================================================ | |
| if AGENTS_AVAILABLE and STATE_MANAGER_AVAILABLE: | |
| try: | |
| global_state_manager = GlobalStateManager() | |
| logical_expressions = LogicalExpressions() | |
| tool_agent = ToolDecisionAgent() | |
| routing_agents = PromptRoutingAgents() | |
| thinking_agents = ThinkingAgents() | |
| response_agent = ResponseAgent() | |
| logger.info("β All agents initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize agents: {e}") | |
| raise | |
| else: | |
| logger.error("Cannot initialize - missing core dependencies") | |
| raise ImportError("Missing required modules: agents or state_manager") | |
| # ============================================================================ | |
| # CSV SCHEMA DEFINITION | |
| # ============================================================================ | |
| CSV_COLUMNS = [ | |
| # Identification & Input | |
| "prompt_index", | |
| "timestamp", | |
| "user_prompt", | |
| "user_prompt_tokens", | |
| "user_prompt_chars", | |
| "user_prompt_words", | |
| # Conversation Context | |
| "conversation_history_length", | |
| "conversation_history_tokens", | |
| # Tool Decision Agent | |
| "tool_decision_input_template", | |
| "tool_decision_input_tokens", | |
| "tool_decision_output", | |
| "tool_decision_output_tokens", | |
| "tool_decision_result", | |
| "tool_decision_time_seconds", | |
| "tool_decision_gpu_peak_mb", | |
| # Regex Checks | |
| "regex_checks_applied", | |
| "regex_checks_time_seconds", | |
| # Routing Agent 1 | |
| "agent1_input_template", | |
| "agent1_input_tokens", | |
| "agent1_output", | |
| "agent1_output_tokens", | |
| "agent1_decision", | |
| "agent1_time_seconds", | |
| "agent1_gpu_peak_mb", | |
| # Routing Agent 2 | |
| "agent2_input_template", | |
| "agent2_input_tokens", | |
| "agent2_output", | |
| "agent2_output_tokens", | |
| "agent2_decision", | |
| "agent2_time_seconds", | |
| "agent2_gpu_peak_mb", | |
| # Routing Agent 3 | |
| "agent3_input_template", | |
| "agent3_input_tokens", | |
| "agent3_output", | |
| "agent3_output_tokens", | |
| "agent3_decision", | |
| "agent3_time_seconds", | |
| "agent3_gpu_peak_mb", | |
| # Routing Agent 4 | |
| "agent4_input_template", | |
| "agent4_input_tokens", | |
| "agent4_output", | |
| "agent4_output_tokens", | |
| "agent4_decisions", | |
| "agent4_time_seconds", | |
| "agent4_gpu_peak_mb", | |
| # Math Thinking | |
| "math_thinking_activated", | |
| "math_thinking_input_template", | |
| "math_thinking_input_tokens", | |
| "math_thinking_output", | |
| "math_thinking_output_tokens", | |
| "math_thinking_time_seconds", | |
| "math_thinking_gpu_peak_mb", | |
| # QA Design Thinking | |
| "qa_design_activated", | |
| "qa_design_input_template", | |
| "qa_design_input_tokens", | |
| "qa_design_output", | |
| "qa_design_output_tokens", | |
| "qa_design_time_seconds", | |
| "qa_design_gpu_peak_mb", | |
| # Reasoning Thinking | |
| "reasoning_activated", | |
| "reasoning_input_template", | |
| "reasoning_input_tokens", | |
| "reasoning_output", | |
| "reasoning_output_tokens", | |
| "reasoning_time_seconds", | |
| "reasoning_gpu_peak_mb", | |
| # Prompt Assembly | |
| "active_response_prompts", | |
| "final_prompt_template", | |
| "final_prompt_tokens", | |
| "final_prompt_chars", | |
| "final_prompt_words", | |
| "assembly_time_seconds", | |
| # Response Generation | |
| "response_input_template", | |
| "response_input_tokens", | |
| "response_raw", | |
| "response_raw_tokens", | |
| "response_raw_chars", | |
| "response_raw_words", | |
| "response_generation_time_seconds", | |
| "response_gpu_peak_mb", | |
| "response_tokens_per_second", | |
| # Post-processing | |
| "response_processed", | |
| "response_processed_tokens", | |
| "response_processed_chars", | |
| "response_processed_words", | |
| "postprocessing_time_seconds", | |
| # Quality Metrics | |
| "flesch_reading_ease", | |
| "flesch_kincaid_grade", | |
| "completeness_score", | |
| "specificity_score", | |
| "repetition_ratio", | |
| "unique_word_ratio", | |
| "avg_sentence_length", | |
| "question_answered", | |
| # Overall Metrics | |
| "total_pipeline_time_seconds", | |
| "total_input_tokens", | |
| "total_output_tokens", | |
| "total_gpu_peak_mb", | |
| "models_activated_count", | |
| "models_activated_list", | |
| ] | |
| # ============================================================================ | |
| # TOKEN COUNTING FUNCTIONS | |
| # ============================================================================ | |
| def count_tokens_accurate(text: str) -> int: | |
| """ | |
| Count tokens using tiktoken library for accurate estimation. | |
| Args: | |
| text: Input text to tokenize | |
| Returns: | |
| Accurate token count | |
| """ | |
| if not text: | |
| return 0 | |
| if not TIKTOKEN_AVAILABLE: | |
| # Fallback: word count approximation | |
| return len(text.split()) | |
| try: | |
| # Use cl100k_base encoding (used by GPT-3.5/4, good general estimator) | |
| encoding = tiktoken.get_encoding("cl100k_base") | |
| tokens = encoding.encode(text) | |
| return len(tokens) | |
| except Exception as e: | |
| logger.warning(f"tiktoken encoding failed: {e}, using fallback") | |
| return len(text.split()) | |
| def count_words(text: str) -> int: | |
| """Count words in text""" | |
| if not text: | |
| return 0 | |
| return len(text.split()) | |
| def count_sentences(text: str) -> int: | |
| """Count sentences in text (simple heuristic)""" | |
| if not text: | |
| return 0 | |
| import re | |
| sentences = re.split(r'[.!?]+', text) | |
| return len([s for s in sentences if s.strip()]) | |
| # ============================================================================ | |
| # GPU MEMORY TRACKING | |
| # ============================================================================ | |
| def get_gpu_memory() -> Dict[str, float]: | |
| """ | |
| Get current GPU memory statistics. | |
| Returns: | |
| Dictionary with allocated, reserved, and peak memory in MB | |
| """ | |
| if torch.cuda.is_available(): | |
| return { | |
| "allocated_mb": torch.cuda.memory_allocated() / 1024**2, | |
| "reserved_mb": torch.cuda.memory_reserved() / 1024**2, | |
| "peak_mb": torch.cuda.max_memory_allocated() / 1024**2 | |
| } | |
| return { | |
| "allocated_mb": 0.0, | |
| "reserved_mb": 0.0, | |
| "peak_mb": 0.0 | |
| } | |
| def reset_gpu_stats(): | |
| """Reset GPU memory statistics""" | |
| if torch.cuda.is_available(): | |
| torch.cuda.reset_peak_memory_stats() | |
| torch.cuda.synchronize() | |
| # ============================================================================ | |
| # TEMPLATE BUILDING FUNCTIONS | |
| # ============================================================================ | |
| def format_history(history: List[Dict]) -> str: | |
| """Format conversation history for templates""" | |
| if not history: | |
| return "No previous conversation" | |
| formatted = [] | |
| for msg in history[-8:]: # Last 8 messages | |
| role = msg.get('role', 'unknown') | |
| content = msg.get('content', '')[:100] # Truncate | |
| formatted.append(f"{role}: {content}") | |
| return "\n".join(formatted) | |
| def build_tool_decision_template(user_prompt: str, history: List) -> str: | |
| """Build template for tool decision agent - matches app.py""" | |
| history_str = format_history(history) | |
| return f"{history_str}\n\nUser Query: {user_prompt}" | |
| def build_agent1_template(user_prompt: str, history: List) -> str: | |
| """Build template for Agent 1: Practice Questions""" | |
| history_str = format_history(history) | |
| return f"<s>[INST] {agent_1_system}\n\nConversation History:\n{history_str}\n\nCurrent User Query: {user_prompt} [/INST]" | |
| def build_agent2_template(user_prompt: str) -> str: | |
| """Build template for Agent 2: Discovery Mode""" | |
| return f"<s>[INST] {agent_2_system}\n\nUser Query: {user_prompt} [/INST]" | |
| def build_agent3_template(user_prompt: str, history: List) -> str: | |
| """Build template for Agent 3: Followup Assessment""" | |
| history_str = format_history(history) | |
| return f"<s>[INST] {agent_3_system}\n\nConversation History:\n{history_str}\n\nCurrent User Query: {user_prompt} [/INST]" | |
| def build_agent4_template(user_prompt: str, history: List) -> str: | |
| """Build template for Agent 4: Teaching Mode""" | |
| history_str = format_history(history) | |
| return f"<s>[INST] {agent_4_system}\n\nConversation History:\n{history_str}\n\nCurrent User Query: {user_prompt} [/INST]" | |
| def build_math_thinking_template(user_prompt: str) -> str: | |
| """Build template for Math Thinking""" | |
| return f"<s>[INST] {MATH_THINKING}\n\nUser Query: {user_prompt} [/INST]" | |
| def build_qa_design_template(user_prompt: str) -> str: | |
| """Build template for QA Design Thinking""" | |
| return f"<s>[INST] {QUESTION_ANSWER_DESIGN}\n\nUser Query: {user_prompt} [/INST]" | |
| def build_reasoning_template(user_prompt: str) -> str: | |
| """Build template for Reasoning Thinking""" | |
| return f"<s>[INST] {REASONING_THINKING}\n\nUser Query: {user_prompt} [/INST]" | |
| # ============================================================================ | |
| # QUALITY METRICS FUNCTIONS | |
| # ============================================================================ | |
| def estimate_syllables(text: str) -> int: | |
| """ | |
| Estimate syllable count (rough heuristic). | |
| Counts vowel groups. | |
| """ | |
| import re | |
| words = text.lower().split() | |
| syllable_count = 0 | |
| for word in words: | |
| # Remove non-letters | |
| word = re.sub(r'[^a-z]', '', word) | |
| if not word: | |
| continue | |
| # Count vowel groups | |
| vowel_groups = len(re.findall(r'[aeiouy]+', word)) | |
| # Ensure at least 1 syllable per word | |
| syllable_count += max(1, vowel_groups) | |
| return syllable_count | |
| def calculate_flesch_reading_ease(text: str) -> float: | |
| """ | |
| Calculate Flesch Reading Ease score. | |
| Score 0-100: Higher = easier to read | |
| 90-100: Very easy (5th grade) | |
| 60-70: Standard (8th-9th grade) | |
| 0-30: Very difficult (college graduate) | |
| Formula: 206.835 - 1.015(words/sentences) - 84.6(syllables/words) | |
| """ | |
| if not text or len(text.strip()) < 10: | |
| return 0.0 | |
| if TEXTSTAT_AVAILABLE: | |
| try: | |
| return textstat.flesch_reading_ease(text) | |
| except: | |
| pass | |
| # Manual calculation | |
| words = count_words(text) | |
| sentences = count_sentences(text) | |
| if sentences == 0 or words == 0: | |
| return 0.0 | |
| syllables = estimate_syllables(text) | |
| if words == 0: | |
| return 0.0 | |
| score = 206.835 - 1.015 * (words / sentences) - 84.6 * (syllables / words) | |
| return max(0.0, min(100.0, score)) | |
| def calculate_flesch_kincaid_grade(text: str) -> float: | |
| """ | |
| Calculate Flesch-Kincaid Grade Level. | |
| Returns US grade level needed to understand text. | |
| Formula: 0.39(words/sentences) + 11.8(syllables/words) - 15.59 | |
| """ | |
| if not text or len(text.strip()) < 10: | |
| return 0.0 | |
| if TEXTSTAT_AVAILABLE: | |
| try: | |
| return textstat.flesch_kincaid_grade(text) | |
| except: | |
| pass | |
| words = count_words(text) | |
| sentences = count_sentences(text) | |
| if sentences == 0 or words == 0: | |
| return 0.0 | |
| syllables = estimate_syllables(text) | |
| if words == 0: | |
| return 0.0 | |
| grade = 0.39 * (words / sentences) + 11.8 * (syllables / words) - 15.59 | |
| return max(0.0, grade) | |
| def calculate_completeness_score(response: str, user_prompt: str) -> float: | |
| """ | |
| Estimate if response addresses the prompt. | |
| Uses keyword overlap and length heuristics. | |
| Returns: Score 0-1 (1 = complete answer) | |
| """ | |
| if not response or not user_prompt: | |
| return 0.0 | |
| import re | |
| # Extract keywords from prompt | |
| prompt_words = set(re.findall(r'\b\w+\b', user_prompt.lower())) | |
| # Remove common stopwords | |
| stopwords = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been', | |
| 'being', 'have', 'has', 'had', 'do', 'does', 'did', 'will', | |
| 'would', 'should', 'could', 'may', 'might', 'can', 'what', | |
| 'how', 'why', 'when', 'where', 'who', 'which', 'i', 'you', | |
| 'we', 'they', 'he', 'she', 'it', 'me', 'him', 'her', 'us', 'them'} | |
| prompt_words -= stopwords | |
| response_words = set(re.findall(r'\b\w+\b', response.lower())) | |
| if not prompt_words: | |
| return 0.5 # Neutral if no meaningful keywords | |
| # Calculate keyword overlap | |
| overlap = len(prompt_words & response_words) / len(prompt_words) | |
| # Length factor | |
| min_reasonable_length = 20 | |
| if len(response) < min_reasonable_length: | |
| length_factor = len(response) / min_reasonable_length | |
| else: | |
| length_factor = 1.0 | |
| score = overlap * length_factor | |
| return min(1.0, score) | |
| def check_question_answered(response: str, user_prompt: str) -> bool: | |
| """ | |
| Boolean check: does response attempt to answer the question? | |
| Heuristics: | |
| - Response has minimum length | |
| - Response doesn't start with refusal | |
| - Response contains relevant keywords | |
| """ | |
| if not response or len(response) < 10: | |
| return False | |
| # Check for refusal patterns | |
| refusal_patterns = [ | |
| "i don't know", | |
| "i cannot", | |
| "i can't", | |
| "i'm not sure", | |
| "i don't have", | |
| "unable to", | |
| "sorry, i" | |
| ] | |
| response_lower = response.lower() | |
| for pattern in refusal_patterns: | |
| if response_lower.startswith(pattern): | |
| return False | |
| # Check for minimum completeness | |
| completeness = calculate_completeness_score(response, user_prompt) | |
| return completeness > 0.3 | |
| def calculate_specificity_score(response: str) -> float: | |
| """ | |
| Measure how specific vs vague the response is. | |
| Indicators of specificity: | |
| - Numbers, dates, names | |
| - Technical terms | |
| - Examples | |
| - Concrete nouns | |
| Returns: Score 0-1 (1 = very specific) | |
| """ | |
| if not response: | |
| return 0.0 | |
| import re | |
| specificity_indicators = 0 | |
| total_possible = 5 | |
| # 1. Contains numbers | |
| if re.search(r'\d+', response): | |
| specificity_indicators += 1 | |
| # 2. Contains proper nouns | |
| proper_nouns = len(re.findall(r'(?<!\. )\b[A-Z][a-z]+', response)) | |
| if proper_nouns > 0: | |
| specificity_indicators += 1 | |
| # 3. Contains example phrases | |
| example_phrases = ['for example', 'such as', 'for instance', 'like', 'including'] | |
| if any(phrase in response.lower() for phrase in example_phrases): | |
| specificity_indicators += 1 | |
| # 4. Average word length | |
| words = response.split() | |
| if words: | |
| avg_word_length = sum(len(w) for w in words) / len(words) | |
| if avg_word_length > 5.0: | |
| specificity_indicators += 1 | |
| # 5. Response length | |
| if len(response) > 200: | |
| specificity_indicators += 1 | |
| return specificity_indicators / total_possible | |
| def calculate_repetition_ratio(text: str) -> float: | |
| """ | |
| Measure token/word repetition. | |
| Lower = better (less repetitive) | |
| Returns: Ratio of repeated tokens to total tokens (0-1) | |
| """ | |
| if not text: | |
| return 0.0 | |
| words = text.lower().split() | |
| if len(words) < 2: | |
| return 0.0 | |
| word_counts = Counter(words) | |
| # Count words that appear more than once | |
| repeated_words = sum(count - 1 for count in word_counts.values() if count > 1) | |
| ratio = repeated_words / len(words) | |
| return min(1.0, ratio) | |
| def calculate_unique_word_ratio(text: str) -> float: | |
| """ | |
| Measure vocabulary diversity. | |
| Higher = more diverse vocabulary | |
| Returns: Ratio of unique words to total words (0-1) | |
| """ | |
| if not text: | |
| return 0.0 | |
| words = text.lower().split() | |
| if not words: | |
| return 0.0 | |
| unique_words = len(set(words)) | |
| return unique_words / len(words) | |
| def calculate_avg_sentence_length(text: str) -> float: | |
| """Calculate average sentence length in words""" | |
| sentences = count_sentences(text) | |
| words = count_words(text) | |
| if sentences == 0: | |
| return 0.0 | |
| return words / sentences | |
| # ============================================================================ | |
| # INSTRUMENTED PIPELINE RUNNER | |
| # ============================================================================ | |
| def run_full_pipeline_instrumented(user_prompt: str, prompt_index: int = 1) -> Dict: | |
| """ | |
| Run the complete orchestration pipeline with full instrumentation. | |
| Captures metrics at every step. | |
| β UPDATED: Now correctly mirrors app.py orchestrate_turn() process | |
| Args: | |
| user_prompt: User's input prompt | |
| prompt_index: Index number for this prompt in batch | |
| Returns: | |
| Dictionary with all metrics for CSV export | |
| """ | |
| result = { | |
| "prompt_index": prompt_index, | |
| "timestamp": datetime.now().isoformat(), | |
| "user_prompt": user_prompt, | |
| "user_prompt_tokens": count_tokens_accurate(user_prompt), | |
| "user_prompt_chars": len(user_prompt), | |
| "user_prompt_words": count_words(user_prompt), | |
| } | |
| # Track overall start time | |
| pipeline_start = time.time() | |
| try: | |
| # ============================================================ | |
| # STEP 1-2: SETUP | |
| # ============================================================ | |
| setup_start = time.time() | |
| # Reset state | |
| global_state_manager.reset_prompt_state() | |
| prompt_state = global_state_manager.get_prompt_state_manager() | |
| # Get conversation history (empty for testing) | |
| recent_history = [] | |
| recent_history_formatted = "No previous conversation" | |
| result["conversation_history_length"] = 0 | |
| result["conversation_history_tokens"] = 0 | |
| # ============================================================ | |
| # STEP 3: TOOL DECISION AGENT (β FIXED: Use decide() with history) | |
| # ============================================================ | |
| tool_start = time.time() | |
| tool_template = build_tool_decision_template(user_prompt, recent_history) | |
| tool_input_tokens = count_tokens_accurate(tool_template) | |
| reset_gpu_stats() | |
| # β FIXED: Use decide() method with conversation history (matches app.py) | |
| tool_decision_result = tool_agent.decide(user_prompt, recent_history) | |
| # Capture output | |
| tool_output = str(tool_decision_result) | |
| tool_output_tokens = count_tokens_accurate(tool_output) | |
| gpu_metrics = get_gpu_memory() | |
| tool_time = time.time() - tool_start | |
| # Record | |
| result.update({ | |
| "tool_decision_input_template": tool_template, | |
| "tool_decision_input_tokens": tool_input_tokens, | |
| "tool_decision_output": tool_output, | |
| "tool_decision_output_tokens": tool_output_tokens, | |
| "tool_decision_result": bool(tool_decision_result), | |
| "tool_decision_time_seconds": round(tool_time, 3), | |
| "tool_decision_gpu_peak_mb": round(gpu_metrics["peak_mb"], 2), | |
| }) | |
| # Update state | |
| tool_img_output = "" | |
| tool_context = "" | |
| if tool_decision_result: | |
| prompt_state.update("TOOL_USE_ENHANCEMENT", True) | |
| # Note: In real app.py, graph generation happens here | |
| # For testing, we'll just note that tools would be used | |
| tool_context = "Tool usage detected (graph would be generated in production)" | |
| # ============================================================ | |
| # STEP 4: REGEX CHECKS | |
| # ============================================================ | |
| regex_start = time.time() | |
| # Apply regex checks (returns list of activated prompts) | |
| regex_before = set(prompt_state.get_active_response_prompts()) | |
| logical_expressions.apply_all_checks(user_prompt, prompt_state) | |
| regex_after = set(prompt_state.get_active_response_prompts()) | |
| regex_applied = list(regex_after - regex_before) | |
| regex_time = time.time() - regex_start | |
| result.update({ | |
| "regex_checks_applied": ", ".join(regex_applied) if regex_applied else "None", | |
| "regex_checks_time_seconds": round(regex_time, 3), | |
| }) | |
| # ============================================================ | |
| # STEP 5: ROUTING AGENTS (β Unified Process - matches app.py) | |
| # ============================================================ | |
| routing_start = time.time() | |
| # Build template (simplified - just the user prompt) | |
| routing_template = f"User Query: {user_prompt}" | |
| routing_input_tokens = count_tokens_accurate(routing_template) | |
| reset_gpu_stats() | |
| # β Use unified process() method (matches app.py) | |
| response_prompts_str, thinking_prompts_str = routing_agents.process( | |
| user_input=user_prompt, | |
| tool_used=(tool_decision_result and bool(tool_img_output)) | |
| ) | |
| # Parse results | |
| response_prompts = [p.strip() for p in response_prompts_str.split('\n') if p.strip()] if response_prompts_str else [] | |
| thinking_prompts = [p.strip() for p in thinking_prompts_str.split('\n') if p.strip()] if thinking_prompts_str else [] | |
| routing_output = f"Response: {', '.join(response_prompts) if response_prompts else 'None'}\nThinking: {', '.join(thinking_prompts) if thinking_prompts else 'None'}" | |
| routing_output_tokens = count_tokens_accurate(routing_output) | |
| gpu_metrics = get_gpu_memory() | |
| routing_time = time.time() - routing_start | |
| # Update result with consolidated routing metrics | |
| result.update({ | |
| # Agent 1 metrics (legacy columns - use consolidated data) | |
| "agent1_input_template": routing_template, | |
| "agent1_input_tokens": routing_input_tokens // 4, # Divide among 4 agents | |
| "agent1_output": ", ".join([p for p in response_prompts if p in ["STRUCTURE_PRACTICE_QUESTIONS"]]) or "None", | |
| "agent1_output_tokens": routing_output_tokens // 4, | |
| "agent1_decision": "STRUCTURE_PRACTICE_QUESTIONS" in response_prompts, | |
| "agent1_time_seconds": round(routing_time / 4, 3), | |
| "agent1_gpu_peak_mb": round(gpu_metrics["peak_mb"] / 4, 2), | |
| # Agent 2 metrics | |
| "agent2_input_template": routing_template, | |
| "agent2_input_tokens": routing_input_tokens // 4, | |
| "agent2_output": ", ".join([p for p in response_prompts if p in ["GENERAL_FORMATTING", "LATEX_FORMATTING", "GUIDING_TEACHING"]]) or "None", | |
| "agent2_output_tokens": routing_output_tokens // 4, | |
| "agent2_decision": ", ".join([p for p in response_prompts if p in ["GENERAL_FORMATTING", "LATEX_FORMATTING", "GUIDING_TEACHING"]]) or "NULL", | |
| "agent2_time_seconds": round(routing_time / 4, 3), | |
| "agent2_gpu_peak_mb": round(gpu_metrics["peak_mb"] / 4, 2), | |
| # Agent 3 metrics | |
| "agent3_input_template": routing_template, | |
| "agent3_input_tokens": routing_input_tokens // 4, | |
| "agent3_output": ", ".join([p for p in response_prompts + thinking_prompts if p in ["PRACTICE_QUESTION_FOLLOWUP", "MATH_THINKING", "QUESTION_ANSWER_DESIGN", "REASONING_THINKING"]]) or "None", | |
| "agent3_output_tokens": routing_output_tokens // 4, | |
| "agent3_decision": any(p in ["PRACTICE_QUESTION_FOLLOWUP", "MATH_THINKING", "QUESTION_ANSWER_DESIGN", "REASONING_THINKING"] for p in response_prompts + thinking_prompts), | |
| "agent3_time_seconds": round(routing_time / 4, 3), | |
| "agent3_gpu_peak_mb": round(gpu_metrics["peak_mb"] / 4, 2), | |
| # Agent 4 metrics | |
| "agent4_input_template": routing_template, | |
| "agent4_input_tokens": routing_input_tokens // 4, | |
| "agent4_output": ", ".join([p for p in response_prompts if p == "TOOL_USE_ENHANCEMENT"]) or "None", | |
| "agent4_output_tokens": routing_output_tokens // 4, | |
| "agent4_decisions": "TOOL_USE_ENHANCEMENT" if "TOOL_USE_ENHANCEMENT" in response_prompts else "NULL", | |
| "agent4_time_seconds": round(routing_time / 4, 3), | |
| "agent4_gpu_peak_mb": round(gpu_metrics["peak_mb"] / 4, 2), | |
| }) | |
| # Update prompt state with all activated prompts | |
| for prompt_name in response_prompts: | |
| prompt_state.update(prompt_name, True) | |
| for prompt_name in thinking_prompts: | |
| prompt_state.update(prompt_name, True) | |
| # ============================================================ | |
| # STEP 6: THINKING AGENTS (β FIXED: Use process() - matches app.py) | |
| # ============================================================ | |
| # Build thinking prompts list (matches app.py logic) | |
| thinking_prompts_list = [] | |
| for prompt_name in thinking_prompts: | |
| if prompt_name.strip(): | |
| thinking_prompts_list.append(prompt_name.strip()) | |
| # Additional heuristic: Add MATH_THINKING if LATEX_FORMATTING is active | |
| if prompt_state.is_active("LATEX_FORMATTING") and "MATH_THINKING" not in thinking_prompts_list: | |
| thinking_prompts_list.append("MATH_THINKING") | |
| prompt_state.update("MATH_THINKING", True) | |
| # Execute thinking agents if any are active | |
| thinking_context = "" | |
| if thinking_prompts_list: | |
| thinking_start = time.time() | |
| thinking_prompts_string = '\n'.join(thinking_prompts_list) | |
| reset_gpu_stats() | |
| # β FIXED: Use process() method (matches app.py) | |
| thinking_context = thinking_agents.process( | |
| user_input=user_prompt, | |
| conversation_history=recent_history_formatted, | |
| thinking_prompts=thinking_prompts_string, | |
| tool_img_output=tool_img_output, | |
| tool_context=tool_context | |
| ) | |
| thinking_time = time.time() - thinking_start | |
| gpu_metrics = get_gpu_memory() | |
| # Record metrics for activated thinking agents | |
| # Note: For simplicity, we're recording aggregate metrics | |
| # In production, you might want to separate these | |
| if "MATH_THINKING" in thinking_prompts_list: | |
| result.update({ | |
| "math_thinking_activated": True, | |
| "math_thinking_input_template": build_math_thinking_template(user_prompt), | |
| "math_thinking_input_tokens": count_tokens_accurate(user_prompt), | |
| "math_thinking_output": thinking_context[:500], # Truncate for CSV | |
| "math_thinking_output_tokens": count_tokens_accurate(thinking_context), | |
| "math_thinking_time_seconds": round(thinking_time / len(thinking_prompts_list), 3), | |
| "math_thinking_gpu_peak_mb": round(gpu_metrics["peak_mb"] / len(thinking_prompts_list), 2), | |
| }) | |
| else: | |
| result.update({ | |
| "math_thinking_activated": False, | |
| "math_thinking_input_template": "NULL", | |
| "math_thinking_input_tokens": 0, | |
| "math_thinking_output": "NULL", | |
| "math_thinking_output_tokens": 0, | |
| "math_thinking_time_seconds": 0.0, | |
| "math_thinking_gpu_peak_mb": 0.0, | |
| }) | |
| if "QUESTION_ANSWER_DESIGN" in thinking_prompts_list: | |
| result.update({ | |
| "qa_design_activated": True, | |
| "qa_design_input_template": build_qa_design_template(user_prompt), | |
| "qa_design_input_tokens": count_tokens_accurate(user_prompt), | |
| "qa_design_output": thinking_context[:500], | |
| "qa_design_output_tokens": count_tokens_accurate(thinking_context), | |
| "qa_design_time_seconds": round(thinking_time / len(thinking_prompts_list), 3), | |
| "qa_design_gpu_peak_mb": round(gpu_metrics["peak_mb"] / len(thinking_prompts_list), 2), | |
| }) | |
| else: | |
| result.update({ | |
| "qa_design_activated": False, | |
| "qa_design_input_template": "NULL", | |
| "qa_design_input_tokens": 0, | |
| "qa_design_output": "NULL", | |
| "qa_design_output_tokens": 0, | |
| "qa_design_time_seconds": 0.0, | |
| "qa_design_gpu_peak_mb": 0.0, | |
| }) | |
| if "REASONING_THINKING" in thinking_prompts_list: | |
| result.update({ | |
| "reasoning_activated": True, | |
| "reasoning_input_template": build_reasoning_template(user_prompt), | |
| "reasoning_input_tokens": count_tokens_accurate(user_prompt), | |
| "reasoning_output": thinking_context[:500], | |
| "reasoning_output_tokens": count_tokens_accurate(thinking_context), | |
| "reasoning_time_seconds": round(thinking_time / len(thinking_prompts_list), 3), | |
| "reasoning_gpu_peak_mb": round(gpu_metrics["peak_mb"] / len(thinking_prompts_list), 2), | |
| }) | |
| else: | |
| result.update({ | |
| "reasoning_activated": False, | |
| "reasoning_input_template": "NULL", | |
| "reasoning_input_tokens": 0, | |
| "reasoning_output": "NULL", | |
| "reasoning_output_tokens": 0, | |
| "reasoning_time_seconds": 0.0, | |
| "reasoning_gpu_peak_mb": 0.0, | |
| }) | |
| else: | |
| # No thinking agents activated | |
| result.update({ | |
| "math_thinking_activated": False, | |
| "math_thinking_input_template": "NULL", | |
| "math_thinking_input_tokens": 0, | |
| "math_thinking_output": "NULL", | |
| "math_thinking_output_tokens": 0, | |
| "math_thinking_time_seconds": 0.0, | |
| "math_thinking_gpu_peak_mb": 0.0, | |
| "qa_design_activated": False, | |
| "qa_design_input_template": "NULL", | |
| "qa_design_input_tokens": 0, | |
| "qa_design_output": "NULL", | |
| "qa_design_output_tokens": 0, | |
| "qa_design_time_seconds": 0.0, | |
| "qa_design_gpu_peak_mb": 0.0, | |
| "reasoning_activated": False, | |
| "reasoning_input_template": "NULL", | |
| "reasoning_input_tokens": 0, | |
| "reasoning_output": "NULL", | |
| "reasoning_output_tokens": 0, | |
| "reasoning_time_seconds": 0.0, | |
| "reasoning_gpu_peak_mb": 0.0, | |
| }) | |
| # ============================================================ | |
| # STEP 7-8: PROMPT ASSEMBLY (matches app.py) | |
| # ============================================================ | |
| assembly_start = time.time() | |
| # Get active response prompts | |
| active_prompts = prompt_state.get_active_response_prompts() | |
| assembly_time = time.time() - assembly_start | |
| result.update({ | |
| "active_response_prompts": ", ".join(active_prompts), | |
| "final_prompt_template": "Response input dict (see response_input_template)", | |
| "final_prompt_tokens": 0, # Will be calculated in response step | |
| "final_prompt_chars": 0, | |
| "final_prompt_words": 0, | |
| "assembly_time_seconds": round(assembly_time, 3), | |
| }) | |
| # ============================================================ | |
| # STEP 9: RESPONSE GENERATION (β FIXED: Use input_data dict) | |
| # ============================================================ | |
| response_start = time.time() | |
| reset_gpu_stats() | |
| # β FIXED: Build input_data dict (matches app.py Step 8) | |
| input_data = { | |
| 'user_query': user_prompt, | |
| 'conversation_history': recent_history, | |
| 'active_prompts': active_prompts, | |
| 'thinking_context': thinking_context, | |
| 'tool_context': tool_context, | |
| } | |
| # β FIXED: Invoke with dict and extract response (matches app.py) | |
| result_dict = response_agent.invoke(input_data) | |
| raw_response = result_dict.get('response', '') | |
| metadata = result_dict.get('metadata', {}) | |
| response_time = time.time() - response_start | |
| raw_tokens = count_tokens_accurate(raw_response) | |
| raw_chars = len(raw_response) | |
| raw_words = count_words(raw_response) | |
| tokens_per_sec = raw_tokens / response_time if response_time > 0 else 0 | |
| gpu_metrics = get_gpu_memory() | |
| # Calculate input template string for metrics | |
| input_template_str = f"user_query: {user_prompt[:100]}..., active_prompts: {active_prompts}, thinking: {len(thinking_context)} chars, tool: {len(tool_context)} chars" | |
| result.update({ | |
| "response_input_template": input_template_str, | |
| "response_input_tokens": count_tokens_accurate(input_template_str), | |
| "response_raw": raw_response, | |
| "response_raw_tokens": raw_tokens, | |
| "response_raw_chars": raw_chars, | |
| "response_raw_words": raw_words, | |
| "response_generation_time_seconds": round(response_time, 3), | |
| "response_gpu_peak_mb": round(gpu_metrics["peak_mb"], 2), | |
| "response_tokens_per_second": round(tokens_per_sec, 2), | |
| }) | |
| # ============================================================ | |
| # STEP 10: POST-PROCESSING (matches app.py) | |
| # ============================================================ | |
| postprocess_start = time.time() | |
| processed_response = post_processor.process_response(raw_response, user_prompt) | |
| postprocess_time = time.time() - postprocess_start | |
| processed_tokens = count_tokens_accurate(processed_response) | |
| processed_chars = len(processed_response) | |
| processed_words = count_words(processed_response) | |
| result.update({ | |
| "response_processed": processed_response, | |
| "response_processed_tokens": processed_tokens, | |
| "response_processed_chars": processed_chars, | |
| "response_processed_words": processed_words, | |
| "postprocessing_time_seconds": round(postprocess_time, 3), | |
| }) | |
| # ============================================================ | |
| # QUALITY METRICS | |
| # ============================================================ | |
| flesch_ease = calculate_flesch_reading_ease(processed_response) | |
| flesch_grade = calculate_flesch_kincaid_grade(processed_response) | |
| completeness = calculate_completeness_score(processed_response, user_prompt) | |
| specificity = calculate_specificity_score(processed_response) | |
| repetition = calculate_repetition_ratio(processed_response) | |
| unique_ratio = calculate_unique_word_ratio(processed_response) | |
| avg_sent_len = calculate_avg_sentence_length(processed_response) | |
| question_answered = check_question_answered(processed_response, user_prompt) | |
| result.update({ | |
| "flesch_reading_ease": round(flesch_ease, 2), | |
| "flesch_kincaid_grade": round(flesch_grade, 2), | |
| "completeness_score": round(completeness, 3), | |
| "specificity_score": round(specificity, 3), | |
| "repetition_ratio": round(repetition, 3), | |
| "unique_word_ratio": round(unique_ratio, 3), | |
| "avg_sentence_length": round(avg_sent_len, 2), | |
| "question_answered": question_answered, | |
| }) | |
| # ============================================================ | |
| # OVERALL METRICS | |
| # ============================================================ | |
| total_pipeline_time = time.time() - pipeline_start | |
| # Count activated models | |
| models_activated = [] | |
| if result["tool_decision_time_seconds"] > 0: | |
| models_activated.append("Tool Decision") | |
| if result["agent1_time_seconds"] > 0: | |
| models_activated.append("Routing Agents") | |
| if result["math_thinking_activated"]: | |
| models_activated.append("Math Thinking") | |
| if result["qa_design_activated"]: | |
| models_activated.append("QA Design") | |
| if result["reasoning_activated"]: | |
| models_activated.append("Reasoning") | |
| models_activated.append("Response Agent") | |
| # Sum all input tokens | |
| total_input_tokens = ( | |
| result["tool_decision_input_tokens"] + | |
| result["agent1_input_tokens"] * 4 + # Multiply back since we divided | |
| result.get("math_thinking_input_tokens", 0) + | |
| result.get("qa_design_input_tokens", 0) + | |
| result.get("reasoning_input_tokens", 0) + | |
| result["response_input_tokens"] | |
| ) | |
| # Sum all output tokens | |
| total_output_tokens = ( | |
| result["tool_decision_output_tokens"] + | |
| result["agent1_output_tokens"] * 4 + | |
| result.get("math_thinking_output_tokens", 0) + | |
| result.get("qa_design_output_tokens", 0) + | |
| result.get("reasoning_output_tokens", 0) + | |
| result["response_raw_tokens"] | |
| ) | |
| # Max GPU across all steps | |
| total_gpu_peak = max([ | |
| result["tool_decision_gpu_peak_mb"], | |
| result["agent1_gpu_peak_mb"], | |
| result.get("math_thinking_gpu_peak_mb", 0.0), | |
| result.get("qa_design_gpu_peak_mb", 0.0), | |
| result.get("reasoning_gpu_peak_mb", 0.0), | |
| result["response_gpu_peak_mb"], | |
| ]) | |
| result.update({ | |
| "total_pipeline_time_seconds": round(total_pipeline_time, 3), | |
| "total_input_tokens": total_input_tokens, | |
| "total_output_tokens": total_output_tokens, | |
| "total_gpu_peak_mb": round(total_gpu_peak, 2), | |
| "models_activated_count": len(models_activated), | |
| "models_activated_list": ", ".join(models_activated), | |
| }) | |
| logger.info(f"β Prompt {prompt_index} complete: {total_pipeline_time:.2f}s, {len(models_activated)} models activated") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Pipeline execution failed for prompt {prompt_index}: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| # Return error result with NULLs | |
| error_result = {col: "ERROR" for col in CSV_COLUMNS} | |
| error_result.update({ | |
| "prompt_index": prompt_index, | |
| "timestamp": datetime.now().isoformat(), | |
| "user_prompt": user_prompt, | |
| "user_prompt_tokens": count_tokens_accurate(user_prompt), | |
| "user_prompt_chars": len(user_prompt), | |
| "user_prompt_words": count_words(user_prompt), | |
| }) | |
| return error_result | |
| # ============================================================================ | |
| # BATCH PROCESSING | |
| # ============================================================================ | |
| def process_batch_full_pipeline( | |
| user_prompts: List[str], | |
| progress_callback=None | |
| ) -> List[Dict]: | |
| """ | |
| Process batch of prompts through FULL PIPELINE. | |
| Sequential processing - one at a time. | |
| Args: | |
| user_prompts: List of user prompts to test | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| List of result dictionaries (one per prompt) | |
| """ | |
| results = [] | |
| total = len(user_prompts) | |
| logger.info(f"="*60) | |
| logger.info(f"Starting full pipeline batch: {total} prompts") | |
| logger.info(f"="*60) | |
| batch_start = time.time() | |
| for idx, user_prompt in enumerate(user_prompts, 1): | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"Processing prompt {idx}/{total}") | |
| logger.info(f"Prompt: {user_prompt[:80]}...") | |
| logger.info(f"{'='*60}") | |
| try: | |
| # Run full instrumented pipeline | |
| result = run_full_pipeline_instrumented(user_prompt, prompt_index=idx) | |
| results.append(result) | |
| logger.info(f"β Prompt {idx} complete") | |
| logger.info(f" Total time: {result.get('total_pipeline_time_seconds', 0):.2f}s") | |
| logger.info(f" Models activated: {result.get('models_activated_count', 0)}") | |
| logger.info(f" Total tokens: {result.get('total_input_tokens', 0) + result.get('total_output_tokens', 0)}") | |
| if progress_callback: | |
| progress_callback(idx, total) | |
| except Exception as e: | |
| logger.error(f"β Prompt {idx} failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| # Add error result | |
| error_result = {col: "ERROR" for col in CSV_COLUMNS} | |
| error_result.update({ | |
| "prompt_index": idx, | |
| "timestamp": datetime.now().isoformat(), | |
| "user_prompt": user_prompt, | |
| "user_prompt_tokens": count_tokens_accurate(user_prompt), | |
| }) | |
| results.append(error_result) | |
| batch_duration = time.time() - batch_start | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"BATCH COMPLETE") | |
| logger.info(f"{'='*60}") | |
| logger.info(f"Processed: {len(results)}/{total} prompts") | |
| logger.info(f"Total batch time: {batch_duration:.2f}s") | |
| logger.info(f"Average per prompt: {batch_duration/total:.2f}s") | |
| logger.info(f"{'='*60}") | |
| return results | |
| # ============================================================================ | |
| # CSV EXPORT | |
| # ============================================================================ | |
| def export_full_pipeline_csv( | |
| results: List[Dict], | |
| test_name: str = "pipeline_test" | |
| ) -> str: | |
| """ | |
| Export full pipeline results to CSV. | |
| Args: | |
| results: List of result dictionaries | |
| test_name: Name for the test (used in filename) | |
| Returns: | |
| Filepath of exported CSV | |
| """ | |
| try: | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| filename = f"mimir_full_pipeline_{test_name}_{timestamp}.csv" | |
| filepath = os.path.join("/tmp", filename) # Save to /tmp for ZeroGPU | |
| if not results: | |
| logger.warning("No results to export") | |
| return None | |
| logger.info(f"Exporting {len(results)} results to CSV...") | |
| # Write CSV | |
| with open(filepath, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS) | |
| writer.writeheader() | |
| for result in results: | |
| # Fill missing keys with NULL | |
| row = {key: result.get(key, "NULL") for key in CSV_COLUMNS} | |
| writer.writerow(row) | |
| logger.info(f"β Full pipeline results exported to {filepath}") | |
| logger.info(f" Columns: {len(CSV_COLUMNS)}") | |
| logger.info(f" Rows: {len(results)}") | |
| return filepath | |
| except Exception as e: | |
| logger.error(f"CSV export failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| def calculate_summary_stats(results: List[Dict]) -> Dict: | |
| """Calculate summary statistics from results""" | |
| if not results: | |
| return {} | |
| valid_results = [r for r in results if r.get("total_pipeline_time_seconds") != "ERROR"] | |
| if not valid_results: | |
| return {"error": "No valid results"} | |
| return { | |
| "total_prompts": len(results), | |
| "successful_prompts": len(valid_results), | |
| "failed_prompts": len(results) - len(valid_results), | |
| "avg_pipeline_time_seconds": round(np.mean([r["total_pipeline_time_seconds"] for r in valid_results]), 3), | |
| "min_pipeline_time_seconds": round(np.min([r["total_pipeline_time_seconds"] for r in valid_results]), 3), | |
| "max_pipeline_time_seconds": round(np.max([r["total_pipeline_time_seconds"] for r in valid_results]), 3), | |
| "avg_total_tokens": round(np.mean([r["total_input_tokens"] + r["total_output_tokens"] for r in valid_results]), 1), | |
| "avg_models_activated": round(np.mean([r["models_activated_count"] for r in valid_results]), 2), | |
| "avg_gpu_peak_mb": round(np.mean([r["total_gpu_peak_mb"] for r in valid_results]), 2), | |
| "avg_completeness_score": round(np.mean([r["completeness_score"] for r in valid_results]), 3), | |
| "avg_flesch_reading_ease": round(np.mean([r["flesch_reading_ease"] for r in valid_results]), 2), | |
| "questions_answered_pct": round(100 * sum([r["question_answered"] for r in valid_results]) / len(valid_results), 1), | |
| } | |
| # ============================================================================ | |
| # GRADIO INTERFACE | |
| # ============================================================================ | |
| with gr.Blocks(title="Mimir - Full Pipeline Testing", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π§ͺ Mimir Full Pipeline Testing") | |
| gr.Markdown(""" | |
| Test the **complete orchestration flow** with comprehensive metrics at every step. | |
| **β UPDATED:** Now correctly mirrors app.py orchestrate_turn() process | |
| - Tool decision uses `decide()` method with conversation history | |
| - Response agent invoked with `input_data` dict (not raw string) | |
| - Thinking agents use `process()` method matching app.py | |
| **What this tests:** | |
| - β Tool Decision Agent | |
| - β All 4 Routing Agents (unified process) | |
| - β Thinking Agents (conditional: Math, QA Design, Reasoning) | |
| - β Response Agent (Llama-3.2-3B) | |
| - β Post-processing | |
| **Output:** CSV file with ~110 columns capturing the full pipeline journey | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("## π Test Configuration") | |
| test_name = gr.Textbox( | |
| label="Test Name", | |
| value="pipeline_test", | |
| placeholder="Enter a name for this test run", | |
| ) | |
| gr.Markdown("### Input Method") | |
| input_method = gr.Radio( | |
| choices=["CSV Upload", "Manual Entry"], | |
| value="Manual Entry", | |
| label="Choose Input Method" | |
| ) | |
| # CSV upload | |
| with gr.Group(visible=False) as csv_section: | |
| csv_file = gr.File( | |
| label="Upload CSV File", | |
| file_types=[".csv"], | |
| ) | |
| # Manual entry | |
| with gr.Group(visible=True) as manual_section: | |
| prompt_text = gr.Textbox( | |
| label="Enter Prompts (one per line)", | |
| lines=15, | |
| placeholder="What is calculus?\nHelp me understand photosynthesis\nCan you create practice questions for algebra?\nExplain Newton's laws of motion", | |
| ) | |
| process_btn = gr.Button( | |
| "π Run Full Pipeline Test", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| status = gr.Textbox( | |
| label="Status", | |
| interactive=False, | |
| lines=3 | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("## π Results") | |
| results_summary = gr.JSON( | |
| label="Summary Statistics", | |
| height=400 | |
| ) | |
| gr.Markdown("### Download Results") | |
| download_csv = gr.File( | |
| label="CSV Export", | |
| interactive=False | |
| ) | |
| gr.Markdown(""" | |
| **CSV contains ~110 columns:** | |
| - Input metrics (tokens, chars, words) | |
| - Template for each agent | |
| - Output for each agent | |
| - Timing for each step | |
| - GPU usage per step | |
| - Quality metrics (readability, completeness, etc.) | |
| - Overall pipeline metrics | |
| """) | |
| # Toggle between input methods | |
| def toggle_input_method(method): | |
| if method == "CSV Upload": | |
| return gr.update(visible=True), gr.update(visible=False) | |
| else: | |
| return gr.update(visible=False), gr.update(visible=True) | |
| input_method.change( | |
| fn=toggle_input_method, | |
| inputs=[input_method], | |
| outputs=[csv_section, manual_section] | |
| ) | |
| # Main processing function | |
| def run_pipeline_test(test_name, input_method, csv_file, prompt_text): | |
| """Run the full pipeline test""" | |
| # Parse prompts | |
| prompts = [] | |
| if input_method == "CSV Upload" and csv_file: | |
| try: | |
| # Read CSV | |
| content = csv_file.decode('utf-8') if isinstance(csv_file, bytes) else csv_file | |
| if hasattr(content, 'read'): | |
| content = content.read() | |
| if isinstance(content, bytes): | |
| content = content.decode('utf-8') | |
| reader = csv.reader(io.StringIO(str(content))) | |
| prompts = [row[0].strip() for row in reader if row and row[0].strip()] | |
| # Skip header if present | |
| if prompts and any(header in prompts[0].lower() for header in ['prompt', 'text', 'query', 'input']): | |
| prompts = prompts[1:] | |
| except Exception as e: | |
| return f"β CSV parsing error: {e}", {}, None | |
| elif input_method == "Manual Entry" and prompt_text: | |
| prompts = [p.strip() for p in prompt_text.split('\n') if p.strip()] | |
| if not prompts: | |
| return "β No prompts provided. Please enter at least one prompt.", {}, None | |
| status_msg = f"π Processing {len(prompts)} prompts through full pipeline...\n" | |
| status_msg += "This may take several minutes. Please wait...\n" | |
| try: | |
| # Run batch | |
| results = process_batch_full_pipeline(prompts) | |
| # Calculate summary | |
| summary = calculate_summary_stats(results) | |
| # Export CSV | |
| csv_path = export_full_pipeline_csv(results, test_name) | |
| status_msg = f"β Complete!\n" | |
| status_msg += f"Processed: {len(results)} prompts\n" | |
| status_msg += f"Successful: {summary.get('successful_prompts', 0)}\n" | |
| status_msg += f"Failed: {summary.get('failed_prompts', 0)}\n" | |
| status_msg += f"CSV ready for download!" | |
| return status_msg, summary, csv_path | |
| except Exception as e: | |
| error_msg = f"β Pipeline test failed: {str(e)}" | |
| logger.error(error_msg) | |
| import traceback | |
| traceback.print_exc() | |
| return error_msg, {}, None | |
| # Wire up event | |
| process_btn.click( | |
| fn=run_pipeline_test, | |
| inputs=[test_name, input_method, csv_file, prompt_text], | |
| outputs=[status, results_summary, download_csv] | |
| ) | |
| # ============================================================================ | |
| # LAUNCH | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| logger.info("="*60) | |
| logger.info("LAUNCHING MIMIR FULL PIPELINE TESTING INTERFACE") | |
| logger.info("β UPDATED: Now correctly mirrors app.py orchestration") | |
| logger.info("="*60) | |
| logger.info(f"CSV Schema: {len(CSV_COLUMNS)} columns") | |
| logger.info(f"Agents initialized: {AGENTS_AVAILABLE}") | |
| logger.info(f"Tiktoken available: {TIKTOKEN_AVAILABLE}") | |
| logger.info(f"Textstat available: {TEXTSTAT_AVAILABLE}") | |
| logger.info(f"ZeroGPU available: {ZERO_GPU_AVAILABLE}") | |
| logger.info("="*60) | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7862, | |
| share=False, | |
| debug=True | |
| ) |