Spaces:
Sleeping
Sleeping
| # app.py | |
| """ | |
| Mimir Educational AI Assistant - Main Application | |
| Architecture: | |
| - Multi-page Gradio interface (Chatbot + Analytics with link to Mimir case study) | |
| - Agent-based orchestration (Tool, Routing, Thinking, Response) | |
| - Global state management with SQLite + HF dataset backup | |
| - Prompt state tracking per turn | |
| - LightEval for metrics tracking | |
| - Logger for timing functions | |
| - OPTIMIZED: Single Llama-3.2-3B model with lazy loading (loads on first use, ~1GB) | |
| """ | |
| import os | |
| import re | |
| import sys | |
| import time | |
| import json | |
| import base64 | |
| import logging | |
| import sqlite3 | |
| import subprocess | |
| import threading | |
| import warnings | |
| import uuid | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple, Any | |
| # ============================================================================ | |
| # HUGGINGFACE CACHE SETUP - Avoid Permission Errors | |
| # ============================================================================ | |
| # Use /tmp for all HuggingFace operations (writable at runtime) | |
| HF_CACHE = "/tmp/huggingface" | |
| os.makedirs(f"{HF_CACHE}/hub", exist_ok=True) | |
| os.makedirs(f"{HF_CACHE}/modules", exist_ok=True) | |
| os.makedirs(f"{HF_CACHE}/transformers", exist_ok=True) | |
| # Configure HuggingFace cache locations | |
| os.environ['HF_HOME'] = HF_CACHE | |
| os.environ['HF_HUB_CACHE'] = f"{HF_CACHE}/hub" | |
| os.environ['HF_MODULES_CACHE'] = f"{HF_CACHE}/modules" | |
| os.environ['HF_CACHE'] = f"{HF_CACHE}/transformers" | |
| os.environ['HF_HUB_ENABLE_HF_TRANSFER'] = '1' # Faster downloads | |
| # Matplotlib cache (avoid permission warnings) | |
| os.environ['MPLCONFIGDIR'] = "/tmp/matplotlib" | |
| os.makedirs("/tmp/matplotlib", exist_ok=True) | |
| # ============================================================================ | |
| # CORE DEPENDENCIES | |
| # ============================================================================ | |
| import torch | |
| import gradio as gr | |
| from dotenv import load_dotenv | |
| # Agent architecture | |
| from agents import ( | |
| ToolDecisionAgent, | |
| PromptRoutingAgents, | |
| ThinkingAgents, | |
| ResponseAgent, | |
| ) | |
| # Lazy-loading model (optional pre-warm) | |
| from model_manager import get_model | |
| # State management | |
| from state_manager import ( | |
| GlobalStateManager, | |
| LogicalExpressions, | |
| ) | |
| # Prompt library | |
| from prompt_library import ( | |
| CORE_IDENTITY, | |
| VAUGE_INPUT, | |
| USER_UNDERSTANDING, | |
| GENERAL_FORMATTING, | |
| LATEX_FORMATTING, | |
| GUIDING_TEACHING, | |
| STRUCTURE_PRACTICE_QUESTIONS, | |
| PRACTICE_QUESTION_FOLLOWUP, | |
| TOOL_USE_ENHANCEMENT, | |
| ) | |
| # LangGraph imports | |
| from langgraph.graph import StateGraph, START, END | |
| from langgraph.graph.message import add_messages | |
| from langgraph.checkpoint.memory import MemorySaver | |
| # LangChain Core | |
| from langchain_core.tools import tool | |
| from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, ToolMessage, BaseMessage | |
| # Tool for graphing | |
| from graph_tool import generate_plot | |
| # ============================================================================ | |
| # LIGHTEVAL FOR METRICS | |
| # ============================================================================ | |
| try: | |
| from lighteval.logging.evaluation_tracker import EvaluationTracker | |
| from lighteval.models.transformers.transformers_model import TransformersModel | |
| from lighteval.metrics.metrics_sample import BertScore, ROUGE | |
| from lighteval.tasks.requests import Doc | |
| LIGHTEVAL_AVAILABLE = True | |
| except ImportError: | |
| LIGHTEVAL_AVAILABLE = False | |
| logging.warning("LightEval not available - metrics tracking limited") | |
| # ============================================================================ | |
| # CONFIGURATION | |
| # ============================================================================ | |
| # Suppress warnings | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| warnings.filterwarnings("ignore", category=FutureWarning) | |
| # Load environment | |
| load_dotenv(".env") | |
| HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN") | |
| # Debug and runtime settings | |
| DEBUG_STATE = os.getenv("DEBUG_STATE", "false").lower() == "true" | |
| CURRENT_YEAR = datetime.now().year | |
| # ============================================================================ | |
| # LOGGING SETUP | |
| # ============================================================================ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def log_step(step_name: str, start_time: Optional[float] = None) -> float: | |
| """ | |
| Log a pipeline step with timestamp and duration. | |
| Args: | |
| step_name: Name of the step | |
| start_time: Start time from previous call (if completing a step) | |
| Returns: | |
| Current time for next call | |
| """ | |
| now = time.time() | |
| timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] | |
| if start_time: | |
| duration = now - start_time | |
| logger.info(f"[{timestamp}] COMPLETED: {step_name} ({duration:.2f}s)") | |
| else: | |
| logger.info(f"[{timestamp}] STARTING: {step_name}") | |
| return now | |
| # ============================================================================ | |
| # MODEL INFORMATION | |
| # ============================================================================ | |
| print("="*60) | |
| print("MIMIR - Using Llama-3.2-3B-Instruct") | |
| print(" Model: meta-llama/Llama-3.2-3B-Instruct") | |
| print(" Memory: ~1GB (4-bit quantized)") | |
| print(" Context: 128K tokens") | |
| print(" Architecture: Single unified model") | |
| print("="*60) | |
| # ============================================================================ | |
| # GLOBAL INITIALIZATION | |
| # ============================================================================ | |
| logger.info("="*60) | |
| logger.info("INITIALIZING MIMIR APPLICATION") | |
| logger.info("="*60) | |
| init_start = log_step("Global Initialization") | |
| # Initialize state management | |
| global_state_manager = GlobalStateManager() | |
| logical_expressions = LogicalExpressions() | |
| logger.info("State management initialized") | |
| # Initialize agents (lazy loading - models load on first use) | |
| tool_agent = ToolDecisionAgent() | |
| routing_agents = PromptRoutingAgents() | |
| thinking_agents = ThinkingAgents() | |
| response_agent = ResponseAgent() | |
| logger.info("Agents initialized (using shared get_shared_llama)") | |
| # Pre-warm shared Qwen3-Claude (optional - happens on first agent call anyway) | |
| logger.info("Shared Qwen3-Claude agent ready (loads on first use)") | |
| log_step("Global Initialization", init_start) | |
| # ============================================================================ | |
| # ANALYTICS & DATABASE FUNCTIONS | |
| # ============================================================================ | |
| def get_trackio_database_path(project_name: str) -> Optional[str]: | |
| """Get path to metrics SQLite database""" | |
| possible_paths = [ | |
| f"./{project_name}.db", | |
| f"./trackio_data/{project_name}.db", | |
| f"./.trackio/{project_name}.db", | |
| "./mimir_metrics.db" | |
| ] | |
| for path in possible_paths: | |
| if os.path.exists(path): | |
| return path | |
| return None | |
| def get_project_statistics_with_nulls(cursor, project_name: str) -> Dict: | |
| """Query metrics database for project statistics""" | |
| try: | |
| stats = {} | |
| # Total conversations | |
| try: | |
| cursor.execute(""" | |
| SELECT COUNT(DISTINCT run_id) as total_runs | |
| FROM metrics | |
| WHERE project_name = ? | |
| """, (project_name,)) | |
| result = cursor.fetchone() | |
| stats["total_conversations"] = result["total_runs"] if result and result["total_runs"] > 0 else None | |
| except sqlite3.Error: | |
| stats["total_conversations"] = None | |
| # Average response time | |
| try: | |
| cursor.execute(""" | |
| SELECT AVG(CAST(value AS FLOAT)) as avg_response_time | |
| FROM metrics | |
| WHERE project_name = ? AND metric_name = 'response_time' | |
| """, (project_name,)) | |
| result = cursor.fetchone() | |
| if result and result["avg_response_time"] is not None: | |
| stats["avg_session_length"] = round(result["avg_response_time"], 2) | |
| else: | |
| stats["avg_session_length"] = None | |
| except sqlite3.Error: | |
| stats["avg_session_length"] = None | |
| # Success rate | |
| try: | |
| cursor.execute(""" | |
| SELECT | |
| COUNT(*) as total_responses, | |
| SUM(CASE WHEN CAST(value AS FLOAT) > 3.5 THEN 1 ELSE 0 END) as successful_responses | |
| FROM metrics | |
| WHERE project_name = ? AND metric_name = 'quality_score' | |
| """, (project_name,)) | |
| result = cursor.fetchone() | |
| if result and result["total_responses"] > 0: | |
| success_rate = (result["successful_responses"] / result["total_responses"]) * 100 | |
| stats["success_rate"] = round(success_rate, 1) | |
| else: | |
| stats["success_rate"] = None | |
| except sqlite3.Error: | |
| stats["success_rate"] = None | |
| return stats | |
| except sqlite3.Error as e: | |
| logger.error(f"Database error: {e}") | |
| return {"total_conversations": None, "avg_session_length": None, "success_rate": None} | |
| def get_recent_interactions_with_nulls(cursor, project_name: str, limit: int = 10) -> List: | |
| """Query for recent interactions""" | |
| try: | |
| cursor.execute(""" | |
| SELECT | |
| m1.timestamp, | |
| m2.value as response_time, | |
| m3.value as prompt_mode, | |
| m4.value as tools_used, | |
| m5.value as quality_score, | |
| m6.value as adapter_used, | |
| m1.run_id | |
| FROM metrics m1 | |
| LEFT JOIN metrics m2 ON m1.run_id = m2.run_id AND m2.metric_name = 'response_time' | |
| LEFT JOIN metrics m3 ON m1.run_id = m3.run_id AND m3.metric_name = 'prompt_mode' | |
| LEFT JOIN metrics m4 ON m1.run_id = m4.run_id AND m4.metric_name = 'tools_used' | |
| LEFT JOIN metrics m5 ON m1.run_id = m5.run_id AND m5.metric_name = 'quality_score' | |
| LEFT JOIN metrics m6 ON m1.run_id = m6.run_id AND m6.metric_name = 'active_adapter' | |
| WHERE m1.project_name = ? AND m1.metric_name = 'conversation_start' | |
| ORDER BY m1.timestamp DESC | |
| LIMIT ? | |
| """, (project_name, limit)) | |
| results = cursor.fetchall() | |
| recent_data = [] | |
| for row in results: | |
| recent_data.append([ | |
| row["timestamp"][:16] if row["timestamp"] else None, | |
| float(row["response_time"]) if row["response_time"] is not None else None, | |
| row["prompt_mode"] if row["prompt_mode"] else None, | |
| bool(int(row["tools_used"])) if row["tools_used"] is not None else None, | |
| float(row["quality_score"]) if row["quality_score"] is not None else None, | |
| row["adapter_used"] if row["adapter_used"] else None | |
| ]) | |
| return recent_data | |
| except sqlite3.Error as e: | |
| logger.error(f"Database error: {e}") | |
| return [] | |
| def create_dashboard_html_with_nulls(project_name: str, project_stats: Dict) -> str: | |
| """Create dashboard HTML with enhanced agent-based metrics""" | |
| def format_stat(value, suffix="", no_data_text="No data"): | |
| if value is None: | |
| return f'<span style="color: #999; font-style: italic;">{no_data_text}</span>' | |
| return f"{value}{suffix}" | |
| def format_large_stat(value, suffix="", no_data_text="--"): | |
| if value is None: | |
| return f'<span style="color: #ccc;">{no_data_text}</span>' | |
| return f"{value}{suffix}" | |
| # Get evaluation metrics from global state | |
| try: | |
| eval_summary = global_state_manager.get_evaluation_summary() | |
| cache_status = global_state_manager.get_cache_status() | |
| project_stats["ml_educational_quality"] = eval_summary['aggregate_metrics']['avg_educational_quality'] | |
| project_stats["user_satisfaction"] = eval_summary['aggregate_metrics']['user_satisfaction_rate'] | |
| project_stats["active_sessions"] = cache_status['total_conversation_sessions'] | |
| except Exception as e: | |
| logger.warning(f"Could not get global state metrics: {e}") | |
| project_stats["ml_educational_quality"] = None | |
| project_stats["user_satisfaction"] = None | |
| project_stats["active_sessions"] = None | |
| # Status determination | |
| success_rate = project_stats.get("success_rate") | |
| if success_rate is not None: | |
| if success_rate >= 80: | |
| status_color = "#4CAF50" | |
| status_text = "Excellent" | |
| elif success_rate >= 60: | |
| status_color = "#FF9800" | |
| status_text = "Good" | |
| else: | |
| status_color = "#F44336" | |
| status_text = "Needs Improvement" | |
| else: | |
| status_color = "#999" | |
| status_text = "No data" | |
| # Agent-based metrics section | |
| agent_metrics_section = f""" | |
| <div style="margin: 15px 0; padding: 10px; background: #f0f8ff; border-radius: 4px; border-left: 4px solid #007bff;"> | |
| <strong>π Agent Performance (Qwen3-Claude Single Model):</strong> | |
| Educational Quality: {format_stat(project_stats.get('ml_educational_quality'), '', 'N/A')} | | |
| User Satisfaction: {format_stat(project_stats.get('user_satisfaction'), '%' if project_stats.get('user_satisfaction') else '', 'N/A')} | | |
| Active Sessions: {format_stat(project_stats.get('active_sessions'), '', 'N/A')} | |
| </div> | |
| """ | |
| dashboard_html = f''' | |
| <div style="text-align: center; padding: 20px; border: 1px solid #ddd; border-radius: 8px; background: #f9f9f9;"> | |
| <h3>π {project_name} Analytics</h3> | |
| <div style="display: grid; grid-template-columns: 1fr 1fr 1fr; gap: 15px; margin: 20px 0;"> | |
| <div style="padding: 15px; background: white; border-radius: 6px; box-shadow: 0 2px 4px rgba(0,0,0,0.1);"> | |
| <div style="font-size: 24px; font-weight: bold; color: #2196F3;">{format_large_stat(project_stats.get('total_conversations'))}</div> | |
| <div style="color: #666; font-size: 12px;">Total Sessions</div> | |
| </div> | |
| <div style="padding: 15px; background: white; border-radius: 6px; box-shadow: 0 2px 4px rgba(0,0,0,0.1);"> | |
| <div style="font-size: 24px; font-weight: bold; color: #FF9800;">{format_large_stat(project_stats.get('avg_session_length'), 's' if project_stats.get('avg_session_length') else '')}</div> | |
| <div style="color: #666; font-size: 12px;">Avg Response Time</div> | |
| </div> | |
| <div style="padding: 15px; background: white; border-radius: 6px; box-shadow: 0 2px 4px rgba(0,0,0,0.1);"> | |
| <div style="font-size: 24px; font-weight: bold; color: {status_color};">{format_large_stat(success_rate, '%' if success_rate else '')}</div> | |
| <div style="color: #666; font-size: 12px;">Success Rate ({status_text})</div> | |
| </div> | |
| </div> | |
| {agent_metrics_section} | |
| <div style="margin: 15px 0; padding: 10px; background: #fff3cd; border-radius: 4px; font-size: 14px;"> | |
| <strong>Model:</strong> {format_stat(project_stats.get('model_type'), no_data_text='Unknown')} | | |
| <strong>Last Updated:</strong> {project_stats.get('last_updated', 'Unknown')} | |
| </div> | |
| </div> | |
| ''' | |
| return dashboard_html | |
| def calculate_response_quality(response: str) -> float: | |
| """Calculate response quality score""" | |
| try: | |
| length_score = min(len(response) / 200, 1.0) | |
| educational_keywords = ['learn', 'understand', 'concept', 'example', 'practice'] | |
| keyword_score = sum(1 for keyword in educational_keywords if keyword in response.lower()) / len(educational_keywords) | |
| if len(response) < 20: | |
| return 2.0 | |
| elif len(response) > 2000: | |
| return 3.5 | |
| base_score = 2.5 + (length_score * 1.5) + (keyword_score * 1.0) | |
| return min(max(base_score, 1.0), 5.0) | |
| except: | |
| return 3.0 | |
| def evaluate_educational_quality_with_tracking(user_query: str, response: str, thread_id: str = None, session_id: str = None): | |
| """Educational quality evaluation with state tracking using LightEval""" | |
| start_time = time.time() | |
| try: | |
| # Educational indicators | |
| educational_indicators = { | |
| 'has_examples': 'example' in response.lower(), | |
| 'structured_explanation': '##' in response or '1.' in response, | |
| 'appropriate_length': 100 < len(response) < 1500, | |
| 'encourages_learning': any(phrase in response.lower() | |
| for phrase in ['practice', 'try', 'consider', 'think about']), | |
| 'uses_latex': '$' in response, | |
| 'has_clear_sections': response.count('\n\n') >= 2 | |
| } | |
| educational_score = sum(educational_indicators.values()) / len(educational_indicators) | |
| semantic_quality = min(len(response) / 500, 1.0) | |
| response_time = time.time() - start_time | |
| # Use LightEval if available | |
| if LIGHTEVAL_AVAILABLE: | |
| try: | |
| doc = Doc( | |
| task_name=f"turn_{thread_id or session_id}", | |
| query=user_query, | |
| choices=[response], | |
| gold_index=-1, | |
| specific_output=response | |
| ) | |
| bert_score = BertScore().compute(doc) | |
| semantic_quality = bert_score if bert_score else semantic_quality | |
| except Exception as lighteval_error: | |
| logger.warning(f"LightEval computation failed: {lighteval_error}") | |
| metrics = { | |
| 'semantic_quality': semantic_quality, | |
| 'educational_score': educational_score, | |
| 'response_time': response_time, | |
| 'indicators': educational_indicators | |
| } | |
| # Track in global state | |
| global_state_manager.add_educational_quality_score( | |
| user_query=user_query, | |
| response=response, | |
| metrics=metrics, | |
| session_id=session_id | |
| ) | |
| logger.info(f"Educational quality evaluated: {educational_score:.3f}") | |
| return metrics | |
| except Exception as e: | |
| logger.error(f"Educational quality evaluation failed: {e}") | |
| return {'educational_score': 0.5, 'semantic_quality': 0.5, 'response_time': 0.0} | |
| def log_metrics_to_database(project_name: str, run_id: str, metrics: Dict): | |
| """Log metrics to SQLite database for dashboard""" | |
| try: | |
| db_path = get_trackio_database_path(project_name) | |
| if db_path is None: | |
| db_path = "./mimir_metrics.db" | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| # Create metrics table if not exists | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS metrics ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| project_name TEXT, | |
| run_id TEXT, | |
| metric_name TEXT, | |
| value TEXT, | |
| timestamp TEXT | |
| ) | |
| """) | |
| # Insert metrics | |
| timestamp = datetime.now().isoformat() | |
| for metric_name, metric_value in metrics.items(): | |
| cursor.execute(""" | |
| INSERT INTO metrics (project_name, run_id, metric_name, value, timestamp) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, (project_name, run_id, metric_name, str(metric_value), timestamp)) | |
| conn.commit() | |
| conn.close() | |
| logger.info(f"Logged {len(metrics)} metrics to database") | |
| except Exception as e: | |
| logger.error(f"Failed to log metrics to database: {e}") | |
| def sync_trackio_with_global_state(): | |
| """Sync metrics database with global state manager data""" | |
| try: | |
| eval_summary = global_state_manager.get_evaluation_summary() | |
| # Log to database (agent-based metrics only) | |
| metrics = { | |
| "educational_quality_avg": eval_summary['aggregate_metrics']['avg_educational_quality'], | |
| "user_satisfaction": eval_summary['aggregate_metrics']['user_satisfaction_rate'], | |
| "total_evaluations": sum(eval_summary['total_evaluations'].values()) | |
| } | |
| log_metrics_to_database("Mimir", str(uuid.uuid4()), metrics) | |
| logger.info("Synced global state metrics to database") | |
| except Exception as e: | |
| logger.error(f"Failed to sync metrics to database: {e}") | |
| def refresh_analytics_data_persistent(): | |
| """Refresh analytics data with global state persistence""" | |
| project_name = "Mimir" | |
| try: | |
| analytics_state = global_state_manager.get_analytics_state() | |
| last_refresh = analytics_state.get('last_refresh') | |
| # If refreshed within last 30 seconds, return cached | |
| if last_refresh and (datetime.now() - last_refresh).seconds < 30: | |
| logger.info("Using cached analytics data (recent refresh)") | |
| return ( | |
| analytics_state['project_stats'], | |
| analytics_state['recent_interactions'], | |
| analytics_state['dashboard_html'] | |
| ) | |
| db_path = get_trackio_database_path(project_name) | |
| if db_path is None: | |
| logger.warning("No metrics database found") | |
| project_stats = { | |
| "total_conversations": None, | |
| "avg_session_length": None, | |
| "success_rate": None, | |
| "model_type": "Qwen3-4B-Claude GGUF (Q6_K - Single Model)", | |
| "last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| dashboard_html = create_dashboard_html_with_nulls(project_name, project_stats) | |
| recent_interactions = [] | |
| global_state_manager.update_analytics_state( | |
| project_stats=project_stats, | |
| recent_interactions=recent_interactions, | |
| dashboard_html=dashboard_html | |
| ) | |
| return project_stats, recent_interactions, dashboard_html | |
| conn = sqlite3.connect(db_path) | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| project_stats = get_project_statistics_with_nulls(cursor, project_name) | |
| project_stats["model_type"] = "Qwen3-4B-Claude GGUF (Q6_K - Single Model)" | |
| project_stats["last_updated"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| recent_data = get_recent_interactions_with_nulls(cursor, project_name, limit=10) | |
| dashboard_html = create_dashboard_html_with_nulls(project_name, project_stats) | |
| conn.close() | |
| global_state_manager.update_analytics_state( | |
| project_stats=project_stats, | |
| recent_interactions=recent_data, | |
| dashboard_html=dashboard_html | |
| ) | |
| logger.info("Analytics data refreshed and cached successfully") | |
| return project_stats, recent_data, dashboard_html | |
| except Exception as e: | |
| logger.error(f"Error refreshing analytics: {e}") | |
| error_stats = { | |
| "error": str(e), | |
| "total_conversations": None, | |
| "avg_session_length": None, | |
| "success_rate": None, | |
| "model_type": "Error", | |
| "last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| error_html = f""" | |
| <div style="text-align: center; padding: 40px; border: 2px dashed #f44336; border-radius: 8px; background: #ffebee;"> | |
| <h3 style="color: #f44336;">β οΈ Analytics Error</h3> | |
| <p>Could not load analytics data: {str(e)[:100]}</p> | |
| </div> | |
| """ | |
| global_state_manager.update_analytics_state( | |
| project_stats=error_stats, | |
| recent_interactions=[], | |
| dashboard_html=error_html, | |
| error_state=str(e) | |
| ) | |
| return error_stats, [], error_html | |
| def export_metrics_json_persistent(): | |
| """Export metrics as JSON file""" | |
| try: | |
| project_stats, recent_data, _ = refresh_analytics_data_persistent() | |
| export_data = { | |
| "project": "Mimir", | |
| "export_timestamp": datetime.now().isoformat(), | |
| "statistics": project_stats, | |
| "recent_interactions": recent_data | |
| } | |
| filename = f"mimir_metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
| with open(filename, 'w') as f: | |
| json.dump(export_data, f, indent=2, default=str) | |
| global_state_manager.add_export_record("JSON", filename, success=True) | |
| logger.info(f"Metrics exported to {filename}") | |
| gr.Info(f"Metrics exported successfully to {filename}") | |
| except Exception as e: | |
| global_state_manager.add_export_record("JSON", "failed", success=False) | |
| logger.error(f"Export failed: {e}") | |
| gr.Warning(f"Export failed: {str(e)}") | |
| def export_metrics_csv_persistent(): | |
| """Export metrics as CSV file""" | |
| try: | |
| import csv | |
| _, recent_data, _ = refresh_analytics_data_persistent() | |
| filename = f"mimir_metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
| with open(filename, 'w', newline='') as f: | |
| writer = csv.writer(f) | |
| writer.writerow(["Timestamp", "Response Time", "Mode", "Tools Used", "Quality Score", "Adapter"]) | |
| for row in recent_data: | |
| writer.writerow(row) | |
| global_state_manager.add_export_record("CSV", filename, success=True) | |
| logger.info(f"Metrics exported to {filename}") | |
| gr.Info(f"Metrics exported successfully to {filename}") | |
| except Exception as e: | |
| global_state_manager.add_export_record("CSV", "failed", success=False) | |
| logger.error(f"Export failed: {e}") | |
| gr.Warning(f"Export failed: {str(e)}") | |
| def load_analytics_state(): | |
| """Load analytics state from global manager""" | |
| analytics_state = global_state_manager.get_analytics_state() | |
| project_stats = analytics_state['project_stats'] | |
| recent_interactions = analytics_state['recent_interactions'] | |
| dashboard_html = analytics_state['dashboard_html'] | |
| if dashboard_html is None: | |
| dashboard_html = """ | |
| <div style="text-align: center; padding: 40px; border: 2px dashed #ccc; border-radius: 8px; background: #f8f9fa;"> | |
| <h3>π Analytics Dashboard</h3> | |
| <p>Click "Refresh Data" to load analytics.</p> | |
| </div> | |
| """ | |
| return project_stats, recent_interactions, dashboard_html | |
| def get_global_state_debug_info(): | |
| """Get debug information about global state""" | |
| cache_status = global_state_manager.get_cache_status() | |
| debug_info = { | |
| "cache_status": cache_status, | |
| "timestamp": datetime.now().isoformat(), | |
| "sessions": global_state_manager.get_all_sessions() | |
| } | |
| return debug_info | |
| # ============================================================================ | |
| # POST-PROCESSING | |
| # ============================================================================ | |
| class ResponsePostProcessor: | |
| """Post-processing pipeline for educational responses""" | |
| def __init__(self, max_length: int = 1800, min_length: int = 10): | |
| self.max_length = max_length | |
| self.min_length = min_length | |
| self.logical_stop_patterns = [ | |
| r'\n\n---\n', | |
| r'\n\n## Summary\b', | |
| r'\n\nIn conclusion\b', | |
| r'\n\nTo summarize\b', | |
| ] | |
| def process_response(self, raw_response: str, user_query: str = "") -> str: | |
| """Main post-processing pipeline""" | |
| try: | |
| cleaned = self._enhanced_token_cleanup(raw_response) | |
| cleaned = self._truncate_intelligently(cleaned) | |
| cleaned = self._enhance_readability(cleaned) | |
| if not self._passes_quality_check(cleaned): | |
| return self._generate_fallback_response(user_query) | |
| return cleaned.strip() | |
| except Exception as e: | |
| logger.error(f"Post-processing error: {e}") | |
| return raw_response | |
| def _enhanced_token_cleanup(self, text: str) -> str: | |
| """Remove model artifacts""" | |
| artifacts = [ | |
| r'<\|.*?\|>', | |
| r'###\s*$', | |
| r'User:\s*$', | |
| r'Assistant:\s*$', | |
| r'\n\s*\n\s*\n+', | |
| ] | |
| for pattern in artifacts: | |
| text = re.sub(pattern, '', text, flags=re.MULTILINE) | |
| return text | |
| def _truncate_intelligently(self, text: str) -> str: | |
| """Truncate at logical educational endpoints""" | |
| for pattern in self.logical_stop_patterns: | |
| match = re.search(pattern, text, re.IGNORECASE) | |
| if match: | |
| return text[:match.start()].strip() | |
| if len(text) <= self.max_length: | |
| return text | |
| sentences = re.split(r'[.!?]+\s+', text) | |
| truncated = "" | |
| for sentence in sentences: | |
| test_length = len(truncated + sentence + ". ") | |
| if test_length <= self.max_length: | |
| truncated += sentence + ". " | |
| else: | |
| break | |
| return truncated.strip() | |
| def _enhance_readability(self, text: str) -> str: | |
| """Format for better presentation""" | |
| text = re.sub(r'([.!?])([A-Z])', r'\1 \2', text) | |
| text = re.sub(r'\s{2,}', ' ', text) | |
| text = re.sub(r'\n\s*[-*]\s*', '\n- ', text) | |
| return text | |
| def _passes_quality_check(self, text: str) -> bool: | |
| """Final quality validation""" | |
| if len(text.strip()) < self.min_length: | |
| return False | |
| sentences = re.split(r'[.!?]+', text) | |
| valid_sentences = [s for s in sentences if len(s.strip()) > 5] | |
| return len(valid_sentences) > 0 | |
| def _generate_fallback_response(self, user_query: str) -> str: | |
| """Generate safe fallback""" | |
| return "I'd be happy to help you understand this better. Could you clarify what specific aspect you'd like me to focus on?" | |
| def process_and_stream_response(self, raw_response: str, user_query: str = ""): | |
| """Process response then stream word-by-word""" | |
| try: | |
| processed_response = self.process_response(raw_response, user_query) | |
| words = processed_response.split() | |
| current_output = "" | |
| for i, word in enumerate(words): | |
| current_output += word | |
| if i < len(words) - 1: | |
| current_output += " " | |
| yield current_output | |
| time.sleep(0.015) | |
| except Exception as e: | |
| logger.error(f"Stream processing error: {e}") | |
| yield "I encountered an error processing the response." | |
| post_processor = ResponsePostProcessor() | |
| # ============================================================================ | |
| # DATA EXTRACTION FOR GRAPHING | |
| # ============================================================================ | |
| def extract_graph_data(user_input: str, conversation_history: Optional[List[Dict]] = None) -> Optional[Dict]: | |
| """ | |
| Use LLM to extract graphable data from user input. | |
| Returns: | |
| Dict with keys: data, plot_type, title, x_label, y_label, educational_context | |
| Or None if no data can be extracted | |
| """ | |
| from model_manager import get_model | |
| model = get_model() | |
| # Format conversation context | |
| context = "" | |
| if conversation_history: # This now handles None safely | |
| recent = conversation_history[-2:] | |
| context = "\n".join([f"{msg['role']}: {msg['content'][:200]}" for msg in recent]) | |
| extraction_prompt = f"""Extract graphable data from the user's message. | |
| Previous context: | |
| {context} | |
| Current message: {user_input} | |
| If the message contains data that can be graphed (numbers, comparisons, datasets, trends), extract: | |
| 1. The data as key-value pairs | |
| 2. The best plot type (bar, line, or pie) | |
| 3. A descriptive title | |
| 4. Axis labels (if applicable) | |
| 5. Educational context explaining what the graph shows | |
| Respond in JSON format ONLY: | |
| {{ | |
| "has_data": true/false, | |
| "data": {{"label1": value1, "label2": value2, ...}}, | |
| "plot_type": "bar/line/pie", | |
| "title": "Graph Title", | |
| "x_label": "X Axis Label", | |
| "y_label": "Y Axis Label", | |
| "educational_context": "Brief explanation of what this graph represents" | |
| }} | |
| If no graphable data exists, respond: {{"has_data": false}}""" | |
| try: | |
| system_prompt = "You are a data extraction expert. Extract graphable data from text and respond in valid JSON only." | |
| response = model.generate( | |
| system_prompt=system_prompt, | |
| user_message=extraction_prompt, | |
| max_tokens=300, | |
| temperature=0.3 | |
| ) | |
| # Parse JSON response | |
| import json | |
| # Try to extract JSON from response | |
| json_start = response.find('{') | |
| json_end = response.rfind('}') + 1 | |
| if json_start == -1 or json_end == 0: | |
| return None | |
| json_str = response[json_start:json_end] | |
| result = json.loads(json_str) | |
| if not result.get('has_data', False): | |
| return None | |
| # Validate required fields | |
| if 'data' not in result or not result['data']: | |
| return None | |
| return result | |
| except Exception as e: | |
| logger.error(f"Data extraction failed: {e}") | |
| return None | |
| # ============================================================================ | |
| # TOOL FUNCTIONS | |
| # ============================================================================ | |
| def Create_Graph_Tool( | |
| data: dict, | |
| plot_type: str, | |
| title: str = "Generated Plot", | |
| x_label: str = "", | |
| y_label: str = "", | |
| educational_context: str = "" | |
| ) -> str: | |
| """Generate educational graphs""" | |
| tool_start = log_step("Create_Graph_Tool") | |
| try: | |
| content, artifact = generate_plot( | |
| data=data, | |
| plot_type=plot_type, | |
| title=title, | |
| x_label=x_label, | |
| y_label=y_label | |
| ) | |
| if "error" in artifact: | |
| log_step("Create_Graph_Tool", tool_start) | |
| return f'<p style="color:red;">Graph generation failed: {artifact["error"]}</p>' | |
| base64_image = artifact["base64_image"] | |
| context_html = "" | |
| if educational_context: | |
| context_html = f'<div style="margin: 10px 0; padding: 10px; background: #f8f9fa; border-left: 4px solid #007bff;">π‘ {educational_context}</div>' | |
| result = f"""{context_html} | |
| <div style="text-align: center; margin: 20px 0;"> | |
| <img src="data:image/png;base64,{base64_image}" | |
| style="max-width: 100%; height: auto; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1);" | |
| alt="{title}" /> | |
| </div>""" | |
| log_step("Create_Graph_Tool", tool_start) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Graph tool error: {e}") | |
| log_step("Create_Graph_Tool", tool_start) | |
| return f'<p style="color:red;">Error: {str(e)}</p>' | |
| # ============================================================================ | |
| # MAIN ORCHESTRATION WORKFLOW | |
| # ============================================================================ | |
| def orchestrate_turn(user_input: str, conversation_history: list = None, session_id: str = "default") -> str: | |
| """ | |
| Main orchestration function implementing the redesign workflow. | |
| Steps: | |
| 1. Reset prompt state | |
| 2. Process user input (history) | |
| 3. Tool decision | |
| 4. Regex checks | |
| 5. Agent execution (Qwen3-Claude) | |
| 6. Thinking agents (Qwen3-Claude) | |
| 7. Prompt assembly | |
| 8. Response generation (Qwen3-Claude) | |
| 9. Post-processing | |
| 10. Metrics tracking (background thread) | |
| """ | |
| turn_start = log_step("orchestrate_turn") | |
| run_id = str(uuid.uuid4()) | |
| try: | |
| # ==================================================================== | |
| # STEP 1: RESET PROMPT STATE | |
| # ==================================================================== | |
| step_start = log_step("Step 1: Reset prompt state") | |
| global_state_manager.reset_prompt_state() | |
| prompt_state = global_state_manager.get_prompt_state_manager() | |
| log_step("Step 1: Reset prompt state", step_start) | |
| # ==================================================================== | |
| # STEP 2: USER INPUT PROCESSING | |
| # ==================================================================== | |
| step_start = log_step("Step 2: Process user input") | |
| # Use conversation_history passed from Gradio state (no global state call) | |
| if conversation_history is None: | |
| conversation_history = [] | |
| # Take last 8 messages | |
| conversation_history = conversation_history[-8:] if conversation_history else [] | |
| # Format history for agents | |
| conversation_history_formatted = "\n".join([ | |
| f"{msg['role']}: {msg['content'][:100]}" | |
| for msg in conversation_history | |
| ]) if conversation_history else "No previous conversation" | |
| log_step("Step 2: Process user input", step_start) | |
| # ==================================================================== | |
| # STEP 3: TOOL DECISION ENGINE & GRAPH GENERATION | |
| # ==================================================================== | |
| step_start = log_step("Step 3: Tool decision & graph generation") | |
| # Check if visualization is needed | |
| tool_decision_result = tool_agent.decide(user_input, conversation_history) | |
| tool_img_output = "" | |
| tool_context = "" | |
| if tool_decision_result: | |
| logger.info("Tool decision: YES - visualization needed") | |
| prompt_state.update("TOOL_USE_ENHANCEMENT", True) | |
| # Extract data and generate graph | |
| logger.info("β Extracting graphable data from input...") | |
| graph_data = extract_graph_data(user_input, conversation_history) | |
| if graph_data: | |
| logger.info(f"β Data extracted: {len(graph_data['data'])} data points") | |
| logger.info(f" Plot type: {graph_data['plot_type']}") | |
| # Generate the graph | |
| tool_img_output = Create_Graph_Tool( | |
| data=graph_data['data'], | |
| plot_type=graph_data['plot_type'], | |
| title=graph_data.get('title', 'Generated Plot'), | |
| x_label=graph_data.get('x_label', ''), | |
| y_label=graph_data.get('y_label', ''), | |
| educational_context=graph_data.get('educational_context', '') | |
| ) | |
| # Store context for thinking agents | |
| tool_context = f"Graph created: {graph_data['plot_type']} chart showing {graph_data.get('title', 'data visualization')}" | |
| logger.info("β Graph generated successfully") | |
| else: | |
| logger.info("β οΈ No extractable data found - skipping graph generation") | |
| else: | |
| logger.info("Tool decision: NO - no visualization needed") | |
| log_step("Step 3: Tool decision & graph generation", step_start) | |
| # ==================================================================== | |
| # STEP 4: REGEX LOGICAL EXPRESSIONS | |
| # ==================================================================== | |
| step_start = log_step("Step 4: Regex checks") | |
| logical_expressions.apply_all_checks(user_input, prompt_state) | |
| log_step("Step 4: Regex checks", step_start) | |
| # ==================================================================== | |
| # STEP 5: SEQUENTIAL AGENT EXECUTION (Qwen3-Claude) | |
| # ==================================================================== | |
| step_start = log_step("Step 5: Routing agents") | |
| # Use unified process() method that handles all 4 routing agents | |
| response_prompts_str, thinking_prompts_str = routing_agents.process( | |
| user_input=user_input, | |
| tool_used=(tool_decision_result and bool(tool_img_output)) | |
| ) | |
| # Update prompt state with response prompts | |
| if response_prompts_str: | |
| for prompt_name in response_prompts_str.split('\n'): | |
| if prompt_name.strip(): | |
| prompt_state.update(prompt_name.strip(), True) | |
| logger.info(f"Response prompt activated: {prompt_name.strip()}") | |
| # Store thinking prompts for Step 6 (will be processed by ThinkingAgents) | |
| thinking_prompts_from_routing = thinking_prompts_str.split('\n') if thinking_prompts_str else [] | |
| for prompt_name in thinking_prompts_from_routing: | |
| if prompt_name.strip(): | |
| logger.info(f"Thinking prompt queued: {prompt_name.strip()}") | |
| log_step("Step 5: Routing agents", step_start) | |
| # ==================================================================== | |
| # STEP 6: THINKING AGENT PROCESSING (Qwen3-Claude) | |
| # ==================================================================== | |
| step_start = log_step("Step 6: Thinking agents") | |
| # Use thinking prompts identified by routing agents in Step 5 | |
| thinking_prompts_list = [] | |
| # Add thinking prompts from routing agents | |
| for prompt_name in thinking_prompts_from_routing: | |
| if prompt_name.strip(): | |
| thinking_prompts_list.append(prompt_name.strip()) | |
| prompt_state.update(prompt_name.strip(), True) | |
| # Additional heuristic: Add MATH_THINKING if LATEX_FORMATTING is active | |
| # (This ensures math thinking is triggered even if routing agents didn't detect it) | |
| if prompt_state.is_active("LATEX_FORMATTING") and "MATH_THINKING" not in thinking_prompts_list: | |
| thinking_prompts_list.append("MATH_THINKING") | |
| prompt_state.update("MATH_THINKING", True) | |
| # Execute thinking agents if any are active | |
| thinking_context = "" | |
| if thinking_prompts_list: | |
| thinking_prompts_string = '\n'.join(thinking_prompts_list) | |
| logger.info(f"Active thinking agents: {thinking_prompts_list}") | |
| think_start = log_step("Thinking agents execution") | |
| thinking_context = thinking_agents.process( | |
| user_input=user_input, | |
| conversation_history=conversation_history_formatted, | |
| thinking_prompts=thinking_prompts_string, | |
| tool_img_output=tool_img_output, | |
| tool_context=tool_context | |
| ) | |
| log_step("Thinking agents execution", think_start) | |
| log_step("Step 6: Thinking agents", step_start) | |
| # ==================================================================== | |
| # STEP 7: RESPONSE PROMPT ASSEMBLY | |
| # ==================================================================== | |
| step_start = log_step("Step 7: Prompt assembly") | |
| # Get active response prompts | |
| response_prompt_names = prompt_state.get_active_response_prompts() | |
| # Build prompt segments | |
| prompt_segments = [CORE_IDENTITY] | |
| prompt_map = { | |
| "VAUGE_INPUT": VAUGE_INPUT, | |
| "USER_UNDERSTANDING": USER_UNDERSTANDING, | |
| "GENERAL_FORMATTING": GENERAL_FORMATTING, | |
| "LATEX_FORMATTING": LATEX_FORMATTING, | |
| "GUIDING_TEACHING": GUIDING_TEACHING, | |
| "STRUCTURE_PRACTICE_QUESTIONS": STRUCTURE_PRACTICE_QUESTIONS, | |
| "PRACTICE_QUESTION_FOLLOWUP": PRACTICE_QUESTION_FOLLOWUP, | |
| "TOOL_USE_ENHANCEMENT": TOOL_USE_ENHANCEMENT, | |
| } | |
| for prompt_name in response_prompt_names: | |
| if prompt_name in prompt_map: | |
| prompt_segments.append(prompt_map[prompt_name]) | |
| prompt_segments_text = "\n\n".join(prompt_segments) | |
| logger.info(f"Active prompts: {response_prompt_names}") | |
| log_step("Step 7: Prompt assembly", step_start) | |
| # ==================================================================== | |
| # STEP 8: PREPARE RESPONSE AGENT INPUT | |
| # ==================================================================== | |
| step_start = log_step("Step 8: Prepare response input") | |
| # Get active response prompts | |
| response_prompt_names = prompt_state.get_active_response_prompts() | |
| logger.info(f"Active prompts: {response_prompt_names}") | |
| # Combine tool outputs for context | |
| # If we have tool_img_output, mention it in tool_context | |
| combined_tool_context = tool_context | |
| if tool_img_output: | |
| # Note: tool_img_output is HTML that will be embedded separately | |
| # Just note its presence in the context | |
| if combined_tool_context: | |
| combined_tool_context += "\n\nNote: A visualization has been generated for the user." | |
| else: | |
| combined_tool_context = "A visualization has been generated for the user." | |
| # Build input dictionary for ResponseAgent | |
| # CRITICAL: Must be a Dict, NOT a string! | |
| input_data = { | |
| 'user_query': user_input, | |
| 'conversation_history': conversation_history, | |
| 'active_prompts': response_prompt_names, | |
| 'thinking_context': thinking_context, # str (from thinking agents) | |
| 'tool_context': combined_tool_context, # str (tool usage info) | |
| } | |
| logger.info(f"Response input prepared:") | |
| logger.info(f" - User query: {len(user_input)} chars") | |
| logger.info(f" - History: {len(conversation_history)} messages") | |
| logger.info(f" - Active prompts: {len(response_prompt_names)} prompts") | |
| logger.info(f" - Thinking context: {len(thinking_context)} chars") | |
| logger.info(f" - Tool context: {len(combined_tool_context)} chars") | |
| log_step("Step 8: Prepare response input", step_start) | |
| # ==================================================================== | |
| # STEP 9: RESPONSE GENERATION (Llama-3.2-3B) | |
| # ==================================================================== | |
| step_start = log_step("Step 9: Response generation") | |
| try: | |
| result = response_agent.invoke(input_data) | |
| # Extract response from result dict | |
| raw_response = result.get('response', '') | |
| metadata = result.get('metadata', {}) | |
| if not raw_response: | |
| logger.warning("ResponseAgent returned empty response") | |
| raw_response = "I apologize, but I wasn't able to generate a response. Please try again." | |
| logger.info(f"β Generated {len(raw_response)} chars") | |
| if metadata: | |
| logger.info(f" Metadata: {metadata}") | |
| except Exception as e: | |
| logger.error(f"Response generation failed: {e}") | |
| import traceback | |
| logger.error(traceback.format_exc()) | |
| raw_response = "I apologize, but I encountered an error while generating a response. Please try rephrasing your question or try again." | |
| log_step("Step 9: Response generation", step_start) | |
| # ==================================================================== | |
| # STEP 10: POST-PROCESSING | |
| # ==================================================================== | |
| step_start = log_step("Step 10: Post-processing") | |
| processed_response = post_processor.process_response(raw_response, user_input) | |
| log_step("Step 10: Post-processing", step_start) | |
| # ==================================================================== | |
| # STEP 11: METRICS TRACKING (BACKGROUND THREAD - NON-BLOCKING) | |
| # ==================================================================== | |
| step_start = log_step("Step 11: Metrics tracking") | |
| def track_metrics_async(): | |
| """Run metrics tracking in background to avoid blocking""" | |
| try: | |
| logger.info("[Background] Starting metrics tracking...") | |
| # Track educational quality | |
| quality_metrics = evaluate_educational_quality_with_tracking( | |
| user_query=user_input, | |
| response=processed_response, | |
| thread_id=run_id, | |
| session_id=session_id | |
| ) | |
| # Log metrics to database | |
| metrics_to_log = { | |
| "conversation_start": datetime.now().isoformat(), | |
| "response_time": time.time() - turn_start, | |
| "quality_score": calculate_response_quality(processed_response), | |
| "educational_score": quality_metrics['educational_score'], | |
| "prompt_mode": ",".join(response_prompt_names), | |
| "tools_used": 1 if prompt_state.is_active("TOOL_USE_ENHANCEMENT") else 0, | |
| "thinking_agents": ",".join(thinking_prompts_list) if thinking_prompts_list else "none", | |
| "active_adapter": response_agent.model_type if response_agent.model_loaded else "not_loaded" | |
| } | |
| log_metrics_to_database("Mimir", run_id, metrics_to_log) | |
| logger.info("[Background] β Metrics tracking completed") | |
| except Exception as metrics_error: | |
| logger.warning(f"[Background] Metrics tracking failed: {metrics_error}") | |
| # Start background thread (daemon=True so it doesn't block shutdown) | |
| metrics_thread = threading.Thread( | |
| target=track_metrics_async, | |
| daemon=True, | |
| name="MetricsTracking" | |
| ) | |
| metrics_thread.start() | |
| log_step("Step 11: Metrics tracking", step_start) | |
| logger.info("β Metrics tracking started in background - continuing immediately") | |
| log_step("orchestrate_turn", turn_start) | |
| return processed_response | |
| except Exception as e: | |
| logger.error(f"Orchestration error: {e}") | |
| import traceback | |
| logger.error(traceback.format_exc()) | |
| log_step("orchestrate_turn", turn_start) | |
| return f"I encountered an error: {str(e)}" | |
| # ============================================================================ | |
| # GRADIO CALLBACK FUNCTIONS (FIXED STATE MANAGEMENT) | |
| # ============================================================================ | |
| def get_loading_animation_base64(): | |
| """Load animated GIF as base64""" | |
| try: | |
| with open("loading_animation.gif", "rb") as gif_file: | |
| gif_data = gif_file.read() | |
| gif_base64 = base64.b64encode(gif_data).decode('utf-8') | |
| return f"data:image/gif;base64,{gif_base64}" | |
| except FileNotFoundError: | |
| logger.warning("loading_animation.gif not found") | |
| return None | |
| def remove_loading_animations(chat_history): | |
| """Remove loading animations from chat""" | |
| return [msg for msg in chat_history if not ( | |
| msg.get("role") == "assistant" and | |
| "loading-animation" in str(msg.get("content", "")) | |
| )] | |
| def add_user_message(message, chat_history, conversation_state): | |
| """ | |
| Add user message with proper state management. | |
| β FIXED: Creates new lists to avoid reference issues with Gradio state. | |
| """ | |
| callback_start = log_step("add_user_message") | |
| if not message.strip(): | |
| log_step("add_user_message", callback_start) | |
| return "", chat_history, conversation_state | |
| # Create new lists with the user message appended | |
| new_conversation_state = conversation_state + [{"role": "user", "content": message}] | |
| new_chat_history = chat_history + [{"role": "user", "content": message}] | |
| # Update global state for persistence | |
| # global_state_manager.update_conversation_state(new_chat_history, new_conversation_state) | |
| log_step("add_user_message", callback_start) | |
| # Return NEW states to Gradio | |
| return "", new_chat_history, new_conversation_state | |
| def add_loading_animation(chat_history, conversation_state): | |
| """ | |
| Add loading animation with proper state management. | |
| β FIXED: Creates new lists to avoid reference issues. | |
| """ | |
| callback_start = log_step("add_loading_animation") | |
| if not conversation_state: | |
| log_step("add_loading_animation", callback_start) | |
| return chat_history, conversation_state | |
| # Remove any existing loading animations | |
| new_chat_history = remove_loading_animations(chat_history) | |
| # Add loading animation to NEW list | |
| gif_data = get_loading_animation_base64() | |
| if gif_data: | |
| loading_html = f'<div class="loading-animation" style="display: flex; align-items: center; justify-content: center; padding: 0.5px;"><img src="{gif_data}" alt="Thinking..." style="height: 64px; width: auto; max-width: 80px;" /></div>' | |
| else: | |
| loading_html = '<div class="loading-animation" style="display: flex; align-items: center; justify-content: center; padding: 0.5px;"><div style="width: 64px; height: 64px;"></div></div>' | |
| new_chat_history = new_chat_history + [{"role": "assistant", "content": loading_html}] | |
| # Update global state for persistence | |
| # global_state_manager.update_conversation_state(new_chat_history, conversation_state) | |
| log_step("add_loading_animation", callback_start) | |
| # Return NEW states to Gradio | |
| return new_chat_history, conversation_state | |
| def generate_response(chat_history, conversation_state): | |
| """ | |
| Generate response using orchestration with proper streaming. | |
| Uses Gradio state instead of overwriting, only pulls from global on error. | |
| """ | |
| callback_start = log_step("generate_response") | |
| # Use the state passed by Gradio | |
| # Only pull from global manager if state is empty (error recovery) | |
| if not conversation_state: | |
| logger.warning("Empty conversation_state in generate_response, attempting recovery from global state") | |
| current_state = global_state_manager.get_conversation_state() | |
| chat_history = current_state['chat_history'] | |
| conversation_state = current_state['conversation_state'] | |
| if not conversation_state: | |
| log_step("generate_response", callback_start) | |
| return chat_history, conversation_state | |
| # Get last user message | |
| last_user_message = "" | |
| for msg in reversed(conversation_state): | |
| if msg["role"] == "user": | |
| last_user_message = msg["content"] | |
| break | |
| if not last_user_message: | |
| log_step("generate_response", callback_start) | |
| return chat_history, conversation_state | |
| try: | |
| # Call orchestration | |
| orch_start = log_step("orchestrate_turn call") | |
| raw_response = orchestrate_turn(last_user_message, conversation_state) | |
| log_step("orchestrate_turn call", orch_start) | |
| # Stream the processed response | |
| first_chunk = True | |
| for chunk in post_processor.process_and_stream_response(raw_response, last_user_message): | |
| # Remove loading animation on FIRST chunk only | |
| if first_chunk: | |
| chat_history = remove_loading_animations(chat_history) | |
| first_chunk = False | |
| # Update chat display - create new list for Gradio to detect change | |
| if chat_history and chat_history[-1]["role"] == "assistant": | |
| # Update existing assistant message | |
| new_chat_history = chat_history[:-1] + [{"role": "assistant", "content": chunk}] | |
| else: | |
| # Add new assistant message | |
| new_chat_history = chat_history + [{"role": "assistant", "content": chunk}] | |
| chat_history = new_chat_history | |
| # Yield to update UI during streaming | |
| yield chat_history, conversation_state | |
| # Add final response to conversation state (create new list) | |
| final_response = chunk if 'chunk' in locals() else raw_response | |
| new_conversation_state = conversation_state + [{"role": "assistant", "content": final_response}] | |
| threading.Thread( | |
| target=lambda: global_state_manager.update_conversation_state(chat_history, new_conversation_state), | |
| daemon=True | |
| ).start() | |
| # Final yield with complete states | |
| yield chat_history, new_conversation_state | |
| except Exception as e: | |
| logger.error(f"Response generation error: {e}") | |
| import traceback | |
| logger.error(traceback.format_exc()) | |
| error_msg = f"I encountered an error: {str(e)}" | |
| # Clean up and show error (create new lists) | |
| new_chat_history = remove_loading_animations(chat_history) + [{"role": "assistant", "content": error_msg}] | |
| new_conversation_state = conversation_state + [{"role": "assistant", "content": error_msg}] | |
| global_state_manager.update_conversation_state(new_chat_history, new_conversation_state) | |
| yield new_chat_history, new_conversation_state | |
| log_step("generate_response", callback_start) | |
| def reset_conversation(): | |
| """ | |
| Reset conversation with global state persistence. | |
| β Returns empty states to Gradio components. | |
| """ | |
| callback_start = log_step("reset_conversation") | |
| global_state_manager.reset_conversation_state() | |
| log_step("reset_conversation", callback_start) | |
| return [], [] | |
| def load_conversation_state(): | |
| """ | |
| Load conversation state from global manager. | |
| β Returns current states to Gradio components. | |
| """ | |
| callback_start = log_step("load_conversation_state") | |
| current_state = global_state_manager.get_conversation_state() | |
| log_step("load_conversation_state", callback_start) | |
| # Extract and return both states | |
| return current_state['chat_history'], current_state['conversation_state'] | |
| # ============================================================================ | |
| # MULTI-PAGE INTERFACE | |
| # ============================================================================ | |
| def create_interface(): | |
| """Create multi-page Gradio interface""" | |
| logger.info("Creating Gradio interface...") | |
| # Pages | |
| import gradio_chatbot | |
| import gradio_analytics | |
| import gradio_prompt_testing | |
| with gr.Blocks(title="Mimir - Educational AI Assistant") as demo: | |
| navbar = gr.Navbar( | |
| visible=True, | |
| main_page_name="Mimir Chatbot", | |
| value=[("Case Study", "https://github.com/Jdesiree112/Technical_Portfolio/tree/main/CaseStudy_Mimir")] | |
| ) | |
| gradio_chatbot.demo.render() | |
| with demo.route("Analytics"): | |
| navbar = gr.Navbar( | |
| visible=True, | |
| main_page_name="Mimir Chatbot", | |
| value=[("Case Study", "https://github.com/Jdesiree112/Technical_Portfolio/tree/main/CaseStudy_Mimir")] | |
| ) | |
| gradio_analytics.demo.render() | |
| with demo.route("Prompt Testing"): | |
| navbar = gr.Navbar( | |
| visible=True, | |
| main_page_name="Mimir Chatbot", | |
| value=[("Case Study", "https://github.com/Jdesiree112/Technical_Portfolio/tree/main/CaseStudy_Mimir")] | |
| ) | |
| gradio_prompt_testing.demo.render() | |
| logger.info("Interface created successfully") | |
| return demo | |
| # ============================================================================ | |
| # MAIN EXECUTION | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| try: | |
| logger.info("="*60) | |
| logger.info("STARTING MIMIR APPLICATION") | |
| logger.info("="*60) | |
| logger.info(f"LightEval available: {LIGHTEVAL_AVAILABLE}") | |
| logger.info(f"Current year: {CURRENT_YEAR}") | |
| logger.info(f"Lazy loading enabled: Model loads on first request β ") | |
| logger.info("="*60) | |
| # Create and launch interface | |
| logger.info("β Creating Gradio interface...") | |
| interface_start = time.time() | |
| interface = create_interface() | |
| interface_duration = time.time() - interface_start | |
| logger.info(f"β Interface created in {interface_duration:.2f}s") | |
| logger.info("β Launching Gradio server on 0.0.0.0:7860...") | |
| logger.info("β Model will load on first user request (lazy loading)") | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| debug=True, | |
| favicon_path="favicon.ico" if os.path.exists("favicon.ico") else None, | |
| show_error=True, | |
| ssr_mode=False, | |
| quiet=False, | |
| prevent_thread_lock=False, | |
| max_threads=40 | |
| ) | |
| logger.info("β Gradio server started successfully") | |
| except KeyboardInterrupt: | |
| logger.info("Shutting down Mimir gracefully...") | |
| except Exception as e: | |
| logger.error("="*60) | |
| logger.error("CRITICAL ERROR IN MAIN EXECUTION") | |
| logger.error("="*60) | |
| logger.error(f"Error: {e}") | |
| import traceback | |
| logger.error(traceback.format_exc()) | |
| logger.error("="*60) | |
| raise |