Mimir / app.py
jdesiree's picture
Update app.py
90154cd verified
# app.py
"""
Mimir Educational AI Assistant - Main Application
Architecture:
- Multi-page Gradio interface (Chatbot + Analytics with link to Mimir case study)
- Agent-based orchestration (Tool, Routing, Thinking, Response)
- Global state management with SQLite + HF dataset backup
- Prompt state tracking per turn
- LightEval for metrics tracking
- Logger for timing functions
- OPTIMIZED: Single Llama-3.2-3B model with lazy loading (loads on first use, ~1GB)
"""
import os
import re
import sys
import time
import json
import base64
import logging
import sqlite3
import subprocess
import threading
import warnings
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
# ============================================================================
# HUGGINGFACE CACHE SETUP - Avoid Permission Errors
# ============================================================================
# Use /tmp for all HuggingFace operations (writable at runtime)
HF_CACHE = "/tmp/huggingface"
os.makedirs(f"{HF_CACHE}/hub", exist_ok=True)
os.makedirs(f"{HF_CACHE}/modules", exist_ok=True)
os.makedirs(f"{HF_CACHE}/transformers", exist_ok=True)
# Configure HuggingFace cache locations
os.environ['HF_HOME'] = HF_CACHE
os.environ['HF_HUB_CACHE'] = f"{HF_CACHE}/hub"
os.environ['HF_MODULES_CACHE'] = f"{HF_CACHE}/modules"
os.environ['HF_CACHE'] = f"{HF_CACHE}/transformers"
os.environ['HF_HUB_ENABLE_HF_TRANSFER'] = '1' # Faster downloads
# Matplotlib cache (avoid permission warnings)
os.environ['MPLCONFIGDIR'] = "/tmp/matplotlib"
os.makedirs("/tmp/matplotlib", exist_ok=True)
# ============================================================================
# CORE DEPENDENCIES
# ============================================================================
import torch
import gradio as gr
from dotenv import load_dotenv
# Agent architecture
from agents import (
ToolDecisionAgent,
PromptRoutingAgents,
ThinkingAgents,
ResponseAgent,
)
# Lazy-loading model (optional pre-warm)
from model_manager import get_model
# State management
from state_manager import (
GlobalStateManager,
LogicalExpressions,
)
# Prompt library
from prompt_library import (
CORE_IDENTITY,
VAUGE_INPUT,
USER_UNDERSTANDING,
GENERAL_FORMATTING,
LATEX_FORMATTING,
GUIDING_TEACHING,
STRUCTURE_PRACTICE_QUESTIONS,
PRACTICE_QUESTION_FOLLOWUP,
TOOL_USE_ENHANCEMENT,
)
# LangGraph imports
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
# LangChain Core
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, ToolMessage, BaseMessage
# Tool for graphing
from graph_tool import generate_plot
# ============================================================================
# LIGHTEVAL FOR METRICS
# ============================================================================
try:
from lighteval.logging.evaluation_tracker import EvaluationTracker
from lighteval.models.transformers.transformers_model import TransformersModel
from lighteval.metrics.metrics_sample import BertScore, ROUGE
from lighteval.tasks.requests import Doc
LIGHTEVAL_AVAILABLE = True
except ImportError:
LIGHTEVAL_AVAILABLE = False
logging.warning("LightEval not available - metrics tracking limited")
# ============================================================================
# CONFIGURATION
# ============================================================================
# Suppress warnings
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
# Load environment
load_dotenv(".env")
HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN")
# Debug and runtime settings
DEBUG_STATE = os.getenv("DEBUG_STATE", "false").lower() == "true"
CURRENT_YEAR = datetime.now().year
# ============================================================================
# LOGGING SETUP
# ============================================================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def log_step(step_name: str, start_time: Optional[float] = None) -> float:
"""
Log a pipeline step with timestamp and duration.
Args:
step_name: Name of the step
start_time: Start time from previous call (if completing a step)
Returns:
Current time for next call
"""
now = time.time()
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
if start_time:
duration = now - start_time
logger.info(f"[{timestamp}] COMPLETED: {step_name} ({duration:.2f}s)")
else:
logger.info(f"[{timestamp}] STARTING: {step_name}")
return now
# ============================================================================
# MODEL INFORMATION
# ============================================================================
print("="*60)
print("MIMIR - Using Llama-3.2-3B-Instruct")
print(" Model: meta-llama/Llama-3.2-3B-Instruct")
print(" Memory: ~1GB (4-bit quantized)")
print(" Context: 128K tokens")
print(" Architecture: Single unified model")
print("="*60)
# ============================================================================
# GLOBAL INITIALIZATION
# ============================================================================
logger.info("="*60)
logger.info("INITIALIZING MIMIR APPLICATION")
logger.info("="*60)
init_start = log_step("Global Initialization")
# Initialize state management
global_state_manager = GlobalStateManager()
logical_expressions = LogicalExpressions()
logger.info("State management initialized")
# Initialize agents (lazy loading - models load on first use)
tool_agent = ToolDecisionAgent()
routing_agents = PromptRoutingAgents()
thinking_agents = ThinkingAgents()
response_agent = ResponseAgent()
logger.info("Agents initialized (using shared get_shared_llama)")
# Pre-warm shared Qwen3-Claude (optional - happens on first agent call anyway)
logger.info("Shared Qwen3-Claude agent ready (loads on first use)")
log_step("Global Initialization", init_start)
# ============================================================================
# ANALYTICS & DATABASE FUNCTIONS
# ============================================================================
def get_trackio_database_path(project_name: str) -> Optional[str]:
"""Get path to metrics SQLite database"""
possible_paths = [
f"./{project_name}.db",
f"./trackio_data/{project_name}.db",
f"./.trackio/{project_name}.db",
"./mimir_metrics.db"
]
for path in possible_paths:
if os.path.exists(path):
return path
return None
def get_project_statistics_with_nulls(cursor, project_name: str) -> Dict:
"""Query metrics database for project statistics"""
try:
stats = {}
# Total conversations
try:
cursor.execute("""
SELECT COUNT(DISTINCT run_id) as total_runs
FROM metrics
WHERE project_name = ?
""", (project_name,))
result = cursor.fetchone()
stats["total_conversations"] = result["total_runs"] if result and result["total_runs"] > 0 else None
except sqlite3.Error:
stats["total_conversations"] = None
# Average response time
try:
cursor.execute("""
SELECT AVG(CAST(value AS FLOAT)) as avg_response_time
FROM metrics
WHERE project_name = ? AND metric_name = 'response_time'
""", (project_name,))
result = cursor.fetchone()
if result and result["avg_response_time"] is not None:
stats["avg_session_length"] = round(result["avg_response_time"], 2)
else:
stats["avg_session_length"] = None
except sqlite3.Error:
stats["avg_session_length"] = None
# Success rate
try:
cursor.execute("""
SELECT
COUNT(*) as total_responses,
SUM(CASE WHEN CAST(value AS FLOAT) > 3.5 THEN 1 ELSE 0 END) as successful_responses
FROM metrics
WHERE project_name = ? AND metric_name = 'quality_score'
""", (project_name,))
result = cursor.fetchone()
if result and result["total_responses"] > 0:
success_rate = (result["successful_responses"] / result["total_responses"]) * 100
stats["success_rate"] = round(success_rate, 1)
else:
stats["success_rate"] = None
except sqlite3.Error:
stats["success_rate"] = None
return stats
except sqlite3.Error as e:
logger.error(f"Database error: {e}")
return {"total_conversations": None, "avg_session_length": None, "success_rate": None}
def get_recent_interactions_with_nulls(cursor, project_name: str, limit: int = 10) -> List:
"""Query for recent interactions"""
try:
cursor.execute("""
SELECT
m1.timestamp,
m2.value as response_time,
m3.value as prompt_mode,
m4.value as tools_used,
m5.value as quality_score,
m6.value as adapter_used,
m1.run_id
FROM metrics m1
LEFT JOIN metrics m2 ON m1.run_id = m2.run_id AND m2.metric_name = 'response_time'
LEFT JOIN metrics m3 ON m1.run_id = m3.run_id AND m3.metric_name = 'prompt_mode'
LEFT JOIN metrics m4 ON m1.run_id = m4.run_id AND m4.metric_name = 'tools_used'
LEFT JOIN metrics m5 ON m1.run_id = m5.run_id AND m5.metric_name = 'quality_score'
LEFT JOIN metrics m6 ON m1.run_id = m6.run_id AND m6.metric_name = 'active_adapter'
WHERE m1.project_name = ? AND m1.metric_name = 'conversation_start'
ORDER BY m1.timestamp DESC
LIMIT ?
""", (project_name, limit))
results = cursor.fetchall()
recent_data = []
for row in results:
recent_data.append([
row["timestamp"][:16] if row["timestamp"] else None,
float(row["response_time"]) if row["response_time"] is not None else None,
row["prompt_mode"] if row["prompt_mode"] else None,
bool(int(row["tools_used"])) if row["tools_used"] is not None else None,
float(row["quality_score"]) if row["quality_score"] is not None else None,
row["adapter_used"] if row["adapter_used"] else None
])
return recent_data
except sqlite3.Error as e:
logger.error(f"Database error: {e}")
return []
def create_dashboard_html_with_nulls(project_name: str, project_stats: Dict) -> str:
"""Create dashboard HTML with enhanced agent-based metrics"""
def format_stat(value, suffix="", no_data_text="No data"):
if value is None:
return f'<span style="color: #999; font-style: italic;">{no_data_text}</span>'
return f"{value}{suffix}"
def format_large_stat(value, suffix="", no_data_text="--"):
if value is None:
return f'<span style="color: #ccc;">{no_data_text}</span>'
return f"{value}{suffix}"
# Get evaluation metrics from global state
try:
eval_summary = global_state_manager.get_evaluation_summary()
cache_status = global_state_manager.get_cache_status()
project_stats["ml_educational_quality"] = eval_summary['aggregate_metrics']['avg_educational_quality']
project_stats["user_satisfaction"] = eval_summary['aggregate_metrics']['user_satisfaction_rate']
project_stats["active_sessions"] = cache_status['total_conversation_sessions']
except Exception as e:
logger.warning(f"Could not get global state metrics: {e}")
project_stats["ml_educational_quality"] = None
project_stats["user_satisfaction"] = None
project_stats["active_sessions"] = None
# Status determination
success_rate = project_stats.get("success_rate")
if success_rate is not None:
if success_rate >= 80:
status_color = "#4CAF50"
status_text = "Excellent"
elif success_rate >= 60:
status_color = "#FF9800"
status_text = "Good"
else:
status_color = "#F44336"
status_text = "Needs Improvement"
else:
status_color = "#999"
status_text = "No data"
# Agent-based metrics section
agent_metrics_section = f"""
<div style="margin: 15px 0; padding: 10px; background: #f0f8ff; border-radius: 4px; border-left: 4px solid #007bff;">
<strong>πŸš€ Agent Performance (Qwen3-Claude Single Model):</strong>
Educational Quality: {format_stat(project_stats.get('ml_educational_quality'), '', 'N/A')} |
User Satisfaction: {format_stat(project_stats.get('user_satisfaction'), '%' if project_stats.get('user_satisfaction') else '', 'N/A')} |
Active Sessions: {format_stat(project_stats.get('active_sessions'), '', 'N/A')}
</div>
"""
dashboard_html = f'''
<div style="text-align: center; padding: 20px; border: 1px solid #ddd; border-radius: 8px; background: #f9f9f9;">
<h3>πŸ“Š {project_name} Analytics</h3>
<div style="display: grid; grid-template-columns: 1fr 1fr 1fr; gap: 15px; margin: 20px 0;">
<div style="padding: 15px; background: white; border-radius: 6px; box-shadow: 0 2px 4px rgba(0,0,0,0.1);">
<div style="font-size: 24px; font-weight: bold; color: #2196F3;">{format_large_stat(project_stats.get('total_conversations'))}</div>
<div style="color: #666; font-size: 12px;">Total Sessions</div>
</div>
<div style="padding: 15px; background: white; border-radius: 6px; box-shadow: 0 2px 4px rgba(0,0,0,0.1);">
<div style="font-size: 24px; font-weight: bold; color: #FF9800;">{format_large_stat(project_stats.get('avg_session_length'), 's' if project_stats.get('avg_session_length') else '')}</div>
<div style="color: #666; font-size: 12px;">Avg Response Time</div>
</div>
<div style="padding: 15px; background: white; border-radius: 6px; box-shadow: 0 2px 4px rgba(0,0,0,0.1);">
<div style="font-size: 24px; font-weight: bold; color: {status_color};">{format_large_stat(success_rate, '%' if success_rate else '')}</div>
<div style="color: #666; font-size: 12px;">Success Rate ({status_text})</div>
</div>
</div>
{agent_metrics_section}
<div style="margin: 15px 0; padding: 10px; background: #fff3cd; border-radius: 4px; font-size: 14px;">
<strong>Model:</strong> {format_stat(project_stats.get('model_type'), no_data_text='Unknown')} |
<strong>Last Updated:</strong> {project_stats.get('last_updated', 'Unknown')}
</div>
</div>
'''
return dashboard_html
def calculate_response_quality(response: str) -> float:
"""Calculate response quality score"""
try:
length_score = min(len(response) / 200, 1.0)
educational_keywords = ['learn', 'understand', 'concept', 'example', 'practice']
keyword_score = sum(1 for keyword in educational_keywords if keyword in response.lower()) / len(educational_keywords)
if len(response) < 20:
return 2.0
elif len(response) > 2000:
return 3.5
base_score = 2.5 + (length_score * 1.5) + (keyword_score * 1.0)
return min(max(base_score, 1.0), 5.0)
except:
return 3.0
def evaluate_educational_quality_with_tracking(user_query: str, response: str, thread_id: str = None, session_id: str = None):
"""Educational quality evaluation with state tracking using LightEval"""
start_time = time.time()
try:
# Educational indicators
educational_indicators = {
'has_examples': 'example' in response.lower(),
'structured_explanation': '##' in response or '1.' in response,
'appropriate_length': 100 < len(response) < 1500,
'encourages_learning': any(phrase in response.lower()
for phrase in ['practice', 'try', 'consider', 'think about']),
'uses_latex': '$' in response,
'has_clear_sections': response.count('\n\n') >= 2
}
educational_score = sum(educational_indicators.values()) / len(educational_indicators)
semantic_quality = min(len(response) / 500, 1.0)
response_time = time.time() - start_time
# Use LightEval if available
if LIGHTEVAL_AVAILABLE:
try:
doc = Doc(
task_name=f"turn_{thread_id or session_id}",
query=user_query,
choices=[response],
gold_index=-1,
specific_output=response
)
bert_score = BertScore().compute(doc)
semantic_quality = bert_score if bert_score else semantic_quality
except Exception as lighteval_error:
logger.warning(f"LightEval computation failed: {lighteval_error}")
metrics = {
'semantic_quality': semantic_quality,
'educational_score': educational_score,
'response_time': response_time,
'indicators': educational_indicators
}
# Track in global state
global_state_manager.add_educational_quality_score(
user_query=user_query,
response=response,
metrics=metrics,
session_id=session_id
)
logger.info(f"Educational quality evaluated: {educational_score:.3f}")
return metrics
except Exception as e:
logger.error(f"Educational quality evaluation failed: {e}")
return {'educational_score': 0.5, 'semantic_quality': 0.5, 'response_time': 0.0}
def log_metrics_to_database(project_name: str, run_id: str, metrics: Dict):
"""Log metrics to SQLite database for dashboard"""
try:
db_path = get_trackio_database_path(project_name)
if db_path is None:
db_path = "./mimir_metrics.db"
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create metrics table if not exists
cursor.execute("""
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_name TEXT,
run_id TEXT,
metric_name TEXT,
value TEXT,
timestamp TEXT
)
""")
# Insert metrics
timestamp = datetime.now().isoformat()
for metric_name, metric_value in metrics.items():
cursor.execute("""
INSERT INTO metrics (project_name, run_id, metric_name, value, timestamp)
VALUES (?, ?, ?, ?, ?)
""", (project_name, run_id, metric_name, str(metric_value), timestamp))
conn.commit()
conn.close()
logger.info(f"Logged {len(metrics)} metrics to database")
except Exception as e:
logger.error(f"Failed to log metrics to database: {e}")
def sync_trackio_with_global_state():
"""Sync metrics database with global state manager data"""
try:
eval_summary = global_state_manager.get_evaluation_summary()
# Log to database (agent-based metrics only)
metrics = {
"educational_quality_avg": eval_summary['aggregate_metrics']['avg_educational_quality'],
"user_satisfaction": eval_summary['aggregate_metrics']['user_satisfaction_rate'],
"total_evaluations": sum(eval_summary['total_evaluations'].values())
}
log_metrics_to_database("Mimir", str(uuid.uuid4()), metrics)
logger.info("Synced global state metrics to database")
except Exception as e:
logger.error(f"Failed to sync metrics to database: {e}")
def refresh_analytics_data_persistent():
"""Refresh analytics data with global state persistence"""
project_name = "Mimir"
try:
analytics_state = global_state_manager.get_analytics_state()
last_refresh = analytics_state.get('last_refresh')
# If refreshed within last 30 seconds, return cached
if last_refresh and (datetime.now() - last_refresh).seconds < 30:
logger.info("Using cached analytics data (recent refresh)")
return (
analytics_state['project_stats'],
analytics_state['recent_interactions'],
analytics_state['dashboard_html']
)
db_path = get_trackio_database_path(project_name)
if db_path is None:
logger.warning("No metrics database found")
project_stats = {
"total_conversations": None,
"avg_session_length": None,
"success_rate": None,
"model_type": "Qwen3-4B-Claude GGUF (Q6_K - Single Model)",
"last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
dashboard_html = create_dashboard_html_with_nulls(project_name, project_stats)
recent_interactions = []
global_state_manager.update_analytics_state(
project_stats=project_stats,
recent_interactions=recent_interactions,
dashboard_html=dashboard_html
)
return project_stats, recent_interactions, dashboard_html
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
project_stats = get_project_statistics_with_nulls(cursor, project_name)
project_stats["model_type"] = "Qwen3-4B-Claude GGUF (Q6_K - Single Model)"
project_stats["last_updated"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
recent_data = get_recent_interactions_with_nulls(cursor, project_name, limit=10)
dashboard_html = create_dashboard_html_with_nulls(project_name, project_stats)
conn.close()
global_state_manager.update_analytics_state(
project_stats=project_stats,
recent_interactions=recent_data,
dashboard_html=dashboard_html
)
logger.info("Analytics data refreshed and cached successfully")
return project_stats, recent_data, dashboard_html
except Exception as e:
logger.error(f"Error refreshing analytics: {e}")
error_stats = {
"error": str(e),
"total_conversations": None,
"avg_session_length": None,
"success_rate": None,
"model_type": "Error",
"last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
error_html = f"""
<div style="text-align: center; padding: 40px; border: 2px dashed #f44336; border-radius: 8px; background: #ffebee;">
<h3 style="color: #f44336;">⚠️ Analytics Error</h3>
<p>Could not load analytics data: {str(e)[:100]}</p>
</div>
"""
global_state_manager.update_analytics_state(
project_stats=error_stats,
recent_interactions=[],
dashboard_html=error_html,
error_state=str(e)
)
return error_stats, [], error_html
def export_metrics_json_persistent():
"""Export metrics as JSON file"""
try:
project_stats, recent_data, _ = refresh_analytics_data_persistent()
export_data = {
"project": "Mimir",
"export_timestamp": datetime.now().isoformat(),
"statistics": project_stats,
"recent_interactions": recent_data
}
filename = f"mimir_metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w') as f:
json.dump(export_data, f, indent=2, default=str)
global_state_manager.add_export_record("JSON", filename, success=True)
logger.info(f"Metrics exported to {filename}")
gr.Info(f"Metrics exported successfully to {filename}")
except Exception as e:
global_state_manager.add_export_record("JSON", "failed", success=False)
logger.error(f"Export failed: {e}")
gr.Warning(f"Export failed: {str(e)}")
def export_metrics_csv_persistent():
"""Export metrics as CSV file"""
try:
import csv
_, recent_data, _ = refresh_analytics_data_persistent()
filename = f"mimir_metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
with open(filename, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["Timestamp", "Response Time", "Mode", "Tools Used", "Quality Score", "Adapter"])
for row in recent_data:
writer.writerow(row)
global_state_manager.add_export_record("CSV", filename, success=True)
logger.info(f"Metrics exported to {filename}")
gr.Info(f"Metrics exported successfully to {filename}")
except Exception as e:
global_state_manager.add_export_record("CSV", "failed", success=False)
logger.error(f"Export failed: {e}")
gr.Warning(f"Export failed: {str(e)}")
def load_analytics_state():
"""Load analytics state from global manager"""
analytics_state = global_state_manager.get_analytics_state()
project_stats = analytics_state['project_stats']
recent_interactions = analytics_state['recent_interactions']
dashboard_html = analytics_state['dashboard_html']
if dashboard_html is None:
dashboard_html = """
<div style="text-align: center; padding: 40px; border: 2px dashed #ccc; border-radius: 8px; background: #f8f9fa;">
<h3>πŸ“Š Analytics Dashboard</h3>
<p>Click "Refresh Data" to load analytics.</p>
</div>
"""
return project_stats, recent_interactions, dashboard_html
def get_global_state_debug_info():
"""Get debug information about global state"""
cache_status = global_state_manager.get_cache_status()
debug_info = {
"cache_status": cache_status,
"timestamp": datetime.now().isoformat(),
"sessions": global_state_manager.get_all_sessions()
}
return debug_info
# ============================================================================
# POST-PROCESSING
# ============================================================================
class ResponsePostProcessor:
"""Post-processing pipeline for educational responses"""
def __init__(self, max_length: int = 1800, min_length: int = 10):
self.max_length = max_length
self.min_length = min_length
self.logical_stop_patterns = [
r'\n\n---\n',
r'\n\n## Summary\b',
r'\n\nIn conclusion\b',
r'\n\nTo summarize\b',
]
def process_response(self, raw_response: str, user_query: str = "") -> str:
"""Main post-processing pipeline"""
try:
cleaned = self._enhanced_token_cleanup(raw_response)
cleaned = self._truncate_intelligently(cleaned)
cleaned = self._enhance_readability(cleaned)
if not self._passes_quality_check(cleaned):
return self._generate_fallback_response(user_query)
return cleaned.strip()
except Exception as e:
logger.error(f"Post-processing error: {e}")
return raw_response
def _enhanced_token_cleanup(self, text: str) -> str:
"""Remove model artifacts"""
artifacts = [
r'<\|.*?\|>',
r'###\s*$',
r'User:\s*$',
r'Assistant:\s*$',
r'\n\s*\n\s*\n+',
]
for pattern in artifacts:
text = re.sub(pattern, '', text, flags=re.MULTILINE)
return text
def _truncate_intelligently(self, text: str) -> str:
"""Truncate at logical educational endpoints"""
for pattern in self.logical_stop_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return text[:match.start()].strip()
if len(text) <= self.max_length:
return text
sentences = re.split(r'[.!?]+\s+', text)
truncated = ""
for sentence in sentences:
test_length = len(truncated + sentence + ". ")
if test_length <= self.max_length:
truncated += sentence + ". "
else:
break
return truncated.strip()
def _enhance_readability(self, text: str) -> str:
"""Format for better presentation"""
text = re.sub(r'([.!?])([A-Z])', r'\1 \2', text)
text = re.sub(r'\s{2,}', ' ', text)
text = re.sub(r'\n\s*[-*]\s*', '\n- ', text)
return text
def _passes_quality_check(self, text: str) -> bool:
"""Final quality validation"""
if len(text.strip()) < self.min_length:
return False
sentences = re.split(r'[.!?]+', text)
valid_sentences = [s for s in sentences if len(s.strip()) > 5]
return len(valid_sentences) > 0
def _generate_fallback_response(self, user_query: str) -> str:
"""Generate safe fallback"""
return "I'd be happy to help you understand this better. Could you clarify what specific aspect you'd like me to focus on?"
def process_and_stream_response(self, raw_response: str, user_query: str = ""):
"""Process response then stream word-by-word"""
try:
processed_response = self.process_response(raw_response, user_query)
words = processed_response.split()
current_output = ""
for i, word in enumerate(words):
current_output += word
if i < len(words) - 1:
current_output += " "
yield current_output
time.sleep(0.015)
except Exception as e:
logger.error(f"Stream processing error: {e}")
yield "I encountered an error processing the response."
post_processor = ResponsePostProcessor()
# ============================================================================
# DATA EXTRACTION FOR GRAPHING
# ============================================================================
def extract_graph_data(user_input: str, conversation_history: Optional[List[Dict]] = None) -> Optional[Dict]:
"""
Use LLM to extract graphable data from user input.
Returns:
Dict with keys: data, plot_type, title, x_label, y_label, educational_context
Or None if no data can be extracted
"""
from model_manager import get_model
model = get_model()
# Format conversation context
context = ""
if conversation_history: # This now handles None safely
recent = conversation_history[-2:]
context = "\n".join([f"{msg['role']}: {msg['content'][:200]}" for msg in recent])
extraction_prompt = f"""Extract graphable data from the user's message.
Previous context:
{context}
Current message: {user_input}
If the message contains data that can be graphed (numbers, comparisons, datasets, trends), extract:
1. The data as key-value pairs
2. The best plot type (bar, line, or pie)
3. A descriptive title
4. Axis labels (if applicable)
5. Educational context explaining what the graph shows
Respond in JSON format ONLY:
{{
"has_data": true/false,
"data": {{"label1": value1, "label2": value2, ...}},
"plot_type": "bar/line/pie",
"title": "Graph Title",
"x_label": "X Axis Label",
"y_label": "Y Axis Label",
"educational_context": "Brief explanation of what this graph represents"
}}
If no graphable data exists, respond: {{"has_data": false}}"""
try:
system_prompt = "You are a data extraction expert. Extract graphable data from text and respond in valid JSON only."
response = model.generate(
system_prompt=system_prompt,
user_message=extraction_prompt,
max_tokens=300,
temperature=0.3
)
# Parse JSON response
import json
# Try to extract JSON from response
json_start = response.find('{')
json_end = response.rfind('}') + 1
if json_start == -1 or json_end == 0:
return None
json_str = response[json_start:json_end]
result = json.loads(json_str)
if not result.get('has_data', False):
return None
# Validate required fields
if 'data' not in result or not result['data']:
return None
return result
except Exception as e:
logger.error(f"Data extraction failed: {e}")
return None
# ============================================================================
# TOOL FUNCTIONS
# ============================================================================
@tool(return_direct=False)
def Create_Graph_Tool(
data: dict,
plot_type: str,
title: str = "Generated Plot",
x_label: str = "",
y_label: str = "",
educational_context: str = ""
) -> str:
"""Generate educational graphs"""
tool_start = log_step("Create_Graph_Tool")
try:
content, artifact = generate_plot(
data=data,
plot_type=plot_type,
title=title,
x_label=x_label,
y_label=y_label
)
if "error" in artifact:
log_step("Create_Graph_Tool", tool_start)
return f'<p style="color:red;">Graph generation failed: {artifact["error"]}</p>'
base64_image = artifact["base64_image"]
context_html = ""
if educational_context:
context_html = f'<div style="margin: 10px 0; padding: 10px; background: #f8f9fa; border-left: 4px solid #007bff;">πŸ’‘ {educational_context}</div>'
result = f"""{context_html}
<div style="text-align: center; margin: 20px 0;">
<img src="data:image/png;base64,{base64_image}"
style="max-width: 100%; height: auto; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1);"
alt="{title}" />
</div>"""
log_step("Create_Graph_Tool", tool_start)
return result
except Exception as e:
logger.error(f"Graph tool error: {e}")
log_step("Create_Graph_Tool", tool_start)
return f'<p style="color:red;">Error: {str(e)}</p>'
# ============================================================================
# MAIN ORCHESTRATION WORKFLOW
# ============================================================================
def orchestrate_turn(user_input: str, conversation_history: list = None, session_id: str = "default") -> str:
"""
Main orchestration function implementing the redesign workflow.
Steps:
1. Reset prompt state
2. Process user input (history)
3. Tool decision
4. Regex checks
5. Agent execution (Qwen3-Claude)
6. Thinking agents (Qwen3-Claude)
7. Prompt assembly
8. Response generation (Qwen3-Claude)
9. Post-processing
10. Metrics tracking (background thread)
"""
turn_start = log_step("orchestrate_turn")
run_id = str(uuid.uuid4())
try:
# ====================================================================
# STEP 1: RESET PROMPT STATE
# ====================================================================
step_start = log_step("Step 1: Reset prompt state")
global_state_manager.reset_prompt_state()
prompt_state = global_state_manager.get_prompt_state_manager()
log_step("Step 1: Reset prompt state", step_start)
# ====================================================================
# STEP 2: USER INPUT PROCESSING
# ====================================================================
step_start = log_step("Step 2: Process user input")
# Use conversation_history passed from Gradio state (no global state call)
if conversation_history is None:
conversation_history = []
# Take last 8 messages
conversation_history = conversation_history[-8:] if conversation_history else []
# Format history for agents
conversation_history_formatted = "\n".join([
f"{msg['role']}: {msg['content'][:100]}"
for msg in conversation_history
]) if conversation_history else "No previous conversation"
log_step("Step 2: Process user input", step_start)
# ====================================================================
# STEP 3: TOOL DECISION ENGINE & GRAPH GENERATION
# ====================================================================
step_start = log_step("Step 3: Tool decision & graph generation")
# Check if visualization is needed
tool_decision_result = tool_agent.decide(user_input, conversation_history)
tool_img_output = ""
tool_context = ""
if tool_decision_result:
logger.info("Tool decision: YES - visualization needed")
prompt_state.update("TOOL_USE_ENHANCEMENT", True)
# Extract data and generate graph
logger.info("β†’ Extracting graphable data from input...")
graph_data = extract_graph_data(user_input, conversation_history)
if graph_data:
logger.info(f"βœ“ Data extracted: {len(graph_data['data'])} data points")
logger.info(f" Plot type: {graph_data['plot_type']}")
# Generate the graph
tool_img_output = Create_Graph_Tool(
data=graph_data['data'],
plot_type=graph_data['plot_type'],
title=graph_data.get('title', 'Generated Plot'),
x_label=graph_data.get('x_label', ''),
y_label=graph_data.get('y_label', ''),
educational_context=graph_data.get('educational_context', '')
)
# Store context for thinking agents
tool_context = f"Graph created: {graph_data['plot_type']} chart showing {graph_data.get('title', 'data visualization')}"
logger.info("βœ“ Graph generated successfully")
else:
logger.info("⚠️ No extractable data found - skipping graph generation")
else:
logger.info("Tool decision: NO - no visualization needed")
log_step("Step 3: Tool decision & graph generation", step_start)
# ====================================================================
# STEP 4: REGEX LOGICAL EXPRESSIONS
# ====================================================================
step_start = log_step("Step 4: Regex checks")
logical_expressions.apply_all_checks(user_input, prompt_state)
log_step("Step 4: Regex checks", step_start)
# ====================================================================
# STEP 5: SEQUENTIAL AGENT EXECUTION (Qwen3-Claude)
# ====================================================================
step_start = log_step("Step 5: Routing agents")
# Use unified process() method that handles all 4 routing agents
response_prompts_str, thinking_prompts_str = routing_agents.process(
user_input=user_input,
tool_used=(tool_decision_result and bool(tool_img_output))
)
# Update prompt state with response prompts
if response_prompts_str:
for prompt_name in response_prompts_str.split('\n'):
if prompt_name.strip():
prompt_state.update(prompt_name.strip(), True)
logger.info(f"Response prompt activated: {prompt_name.strip()}")
# Store thinking prompts for Step 6 (will be processed by ThinkingAgents)
thinking_prompts_from_routing = thinking_prompts_str.split('\n') if thinking_prompts_str else []
for prompt_name in thinking_prompts_from_routing:
if prompt_name.strip():
logger.info(f"Thinking prompt queued: {prompt_name.strip()}")
log_step("Step 5: Routing agents", step_start)
# ====================================================================
# STEP 6: THINKING AGENT PROCESSING (Qwen3-Claude)
# ====================================================================
step_start = log_step("Step 6: Thinking agents")
# Use thinking prompts identified by routing agents in Step 5
thinking_prompts_list = []
# Add thinking prompts from routing agents
for prompt_name in thinking_prompts_from_routing:
if prompt_name.strip():
thinking_prompts_list.append(prompt_name.strip())
prompt_state.update(prompt_name.strip(), True)
# Additional heuristic: Add MATH_THINKING if LATEX_FORMATTING is active
# (This ensures math thinking is triggered even if routing agents didn't detect it)
if prompt_state.is_active("LATEX_FORMATTING") and "MATH_THINKING" not in thinking_prompts_list:
thinking_prompts_list.append("MATH_THINKING")
prompt_state.update("MATH_THINKING", True)
# Execute thinking agents if any are active
thinking_context = ""
if thinking_prompts_list:
thinking_prompts_string = '\n'.join(thinking_prompts_list)
logger.info(f"Active thinking agents: {thinking_prompts_list}")
think_start = log_step("Thinking agents execution")
thinking_context = thinking_agents.process(
user_input=user_input,
conversation_history=conversation_history_formatted,
thinking_prompts=thinking_prompts_string,
tool_img_output=tool_img_output,
tool_context=tool_context
)
log_step("Thinking agents execution", think_start)
log_step("Step 6: Thinking agents", step_start)
# ====================================================================
# STEP 7: RESPONSE PROMPT ASSEMBLY
# ====================================================================
step_start = log_step("Step 7: Prompt assembly")
# Get active response prompts
response_prompt_names = prompt_state.get_active_response_prompts()
# Build prompt segments
prompt_segments = [CORE_IDENTITY]
prompt_map = {
"VAUGE_INPUT": VAUGE_INPUT,
"USER_UNDERSTANDING": USER_UNDERSTANDING,
"GENERAL_FORMATTING": GENERAL_FORMATTING,
"LATEX_FORMATTING": LATEX_FORMATTING,
"GUIDING_TEACHING": GUIDING_TEACHING,
"STRUCTURE_PRACTICE_QUESTIONS": STRUCTURE_PRACTICE_QUESTIONS,
"PRACTICE_QUESTION_FOLLOWUP": PRACTICE_QUESTION_FOLLOWUP,
"TOOL_USE_ENHANCEMENT": TOOL_USE_ENHANCEMENT,
}
for prompt_name in response_prompt_names:
if prompt_name in prompt_map:
prompt_segments.append(prompt_map[prompt_name])
prompt_segments_text = "\n\n".join(prompt_segments)
logger.info(f"Active prompts: {response_prompt_names}")
log_step("Step 7: Prompt assembly", step_start)
# ====================================================================
# STEP 8: PREPARE RESPONSE AGENT INPUT
# ====================================================================
step_start = log_step("Step 8: Prepare response input")
# Get active response prompts
response_prompt_names = prompt_state.get_active_response_prompts()
logger.info(f"Active prompts: {response_prompt_names}")
# Combine tool outputs for context
# If we have tool_img_output, mention it in tool_context
combined_tool_context = tool_context
if tool_img_output:
# Note: tool_img_output is HTML that will be embedded separately
# Just note its presence in the context
if combined_tool_context:
combined_tool_context += "\n\nNote: A visualization has been generated for the user."
else:
combined_tool_context = "A visualization has been generated for the user."
# Build input dictionary for ResponseAgent
# CRITICAL: Must be a Dict, NOT a string!
input_data = {
'user_query': user_input,
'conversation_history': conversation_history,
'active_prompts': response_prompt_names,
'thinking_context': thinking_context, # str (from thinking agents)
'tool_context': combined_tool_context, # str (tool usage info)
}
logger.info(f"Response input prepared:")
logger.info(f" - User query: {len(user_input)} chars")
logger.info(f" - History: {len(conversation_history)} messages")
logger.info(f" - Active prompts: {len(response_prompt_names)} prompts")
logger.info(f" - Thinking context: {len(thinking_context)} chars")
logger.info(f" - Tool context: {len(combined_tool_context)} chars")
log_step("Step 8: Prepare response input", step_start)
# ====================================================================
# STEP 9: RESPONSE GENERATION (Llama-3.2-3B)
# ====================================================================
step_start = log_step("Step 9: Response generation")
try:
result = response_agent.invoke(input_data)
# Extract response from result dict
raw_response = result.get('response', '')
metadata = result.get('metadata', {})
if not raw_response:
logger.warning("ResponseAgent returned empty response")
raw_response = "I apologize, but I wasn't able to generate a response. Please try again."
logger.info(f"βœ“ Generated {len(raw_response)} chars")
if metadata:
logger.info(f" Metadata: {metadata}")
except Exception as e:
logger.error(f"Response generation failed: {e}")
import traceback
logger.error(traceback.format_exc())
raw_response = "I apologize, but I encountered an error while generating a response. Please try rephrasing your question or try again."
log_step("Step 9: Response generation", step_start)
# ====================================================================
# STEP 10: POST-PROCESSING
# ====================================================================
step_start = log_step("Step 10: Post-processing")
processed_response = post_processor.process_response(raw_response, user_input)
log_step("Step 10: Post-processing", step_start)
# ====================================================================
# STEP 11: METRICS TRACKING (BACKGROUND THREAD - NON-BLOCKING)
# ====================================================================
step_start = log_step("Step 11: Metrics tracking")
def track_metrics_async():
"""Run metrics tracking in background to avoid blocking"""
try:
logger.info("[Background] Starting metrics tracking...")
# Track educational quality
quality_metrics = evaluate_educational_quality_with_tracking(
user_query=user_input,
response=processed_response,
thread_id=run_id,
session_id=session_id
)
# Log metrics to database
metrics_to_log = {
"conversation_start": datetime.now().isoformat(),
"response_time": time.time() - turn_start,
"quality_score": calculate_response_quality(processed_response),
"educational_score": quality_metrics['educational_score'],
"prompt_mode": ",".join(response_prompt_names),
"tools_used": 1 if prompt_state.is_active("TOOL_USE_ENHANCEMENT") else 0,
"thinking_agents": ",".join(thinking_prompts_list) if thinking_prompts_list else "none",
"active_adapter": response_agent.model_type if response_agent.model_loaded else "not_loaded"
}
log_metrics_to_database("Mimir", run_id, metrics_to_log)
logger.info("[Background] βœ“ Metrics tracking completed")
except Exception as metrics_error:
logger.warning(f"[Background] Metrics tracking failed: {metrics_error}")
# Start background thread (daemon=True so it doesn't block shutdown)
metrics_thread = threading.Thread(
target=track_metrics_async,
daemon=True,
name="MetricsTracking"
)
metrics_thread.start()
log_step("Step 11: Metrics tracking", step_start)
logger.info("βœ“ Metrics tracking started in background - continuing immediately")
log_step("orchestrate_turn", turn_start)
return processed_response
except Exception as e:
logger.error(f"Orchestration error: {e}")
import traceback
logger.error(traceback.format_exc())
log_step("orchestrate_turn", turn_start)
return f"I encountered an error: {str(e)}"
# ============================================================================
# GRADIO CALLBACK FUNCTIONS (FIXED STATE MANAGEMENT)
# ============================================================================
def get_loading_animation_base64():
"""Load animated GIF as base64"""
try:
with open("loading_animation.gif", "rb") as gif_file:
gif_data = gif_file.read()
gif_base64 = base64.b64encode(gif_data).decode('utf-8')
return f"data:image/gif;base64,{gif_base64}"
except FileNotFoundError:
logger.warning("loading_animation.gif not found")
return None
def remove_loading_animations(chat_history):
"""Remove loading animations from chat"""
return [msg for msg in chat_history if not (
msg.get("role") == "assistant" and
"loading-animation" in str(msg.get("content", ""))
)]
def add_user_message(message, chat_history, conversation_state):
"""
Add user message with proper state management.
βœ… FIXED: Creates new lists to avoid reference issues with Gradio state.
"""
callback_start = log_step("add_user_message")
if not message.strip():
log_step("add_user_message", callback_start)
return "", chat_history, conversation_state
# Create new lists with the user message appended
new_conversation_state = conversation_state + [{"role": "user", "content": message}]
new_chat_history = chat_history + [{"role": "user", "content": message}]
# Update global state for persistence
# global_state_manager.update_conversation_state(new_chat_history, new_conversation_state)
log_step("add_user_message", callback_start)
# Return NEW states to Gradio
return "", new_chat_history, new_conversation_state
def add_loading_animation(chat_history, conversation_state):
"""
Add loading animation with proper state management.
βœ… FIXED: Creates new lists to avoid reference issues.
"""
callback_start = log_step("add_loading_animation")
if not conversation_state:
log_step("add_loading_animation", callback_start)
return chat_history, conversation_state
# Remove any existing loading animations
new_chat_history = remove_loading_animations(chat_history)
# Add loading animation to NEW list
gif_data = get_loading_animation_base64()
if gif_data:
loading_html = f'<div class="loading-animation" style="display: flex; align-items: center; justify-content: center; padding: 0.5px;"><img src="{gif_data}" alt="Thinking..." style="height: 64px; width: auto; max-width: 80px;" /></div>'
else:
loading_html = '<div class="loading-animation" style="display: flex; align-items: center; justify-content: center; padding: 0.5px;"><div style="width: 64px; height: 64px;"></div></div>'
new_chat_history = new_chat_history + [{"role": "assistant", "content": loading_html}]
# Update global state for persistence
# global_state_manager.update_conversation_state(new_chat_history, conversation_state)
log_step("add_loading_animation", callback_start)
# Return NEW states to Gradio
return new_chat_history, conversation_state
def generate_response(chat_history, conversation_state):
"""
Generate response using orchestration with proper streaming.
Uses Gradio state instead of overwriting, only pulls from global on error.
"""
callback_start = log_step("generate_response")
# Use the state passed by Gradio
# Only pull from global manager if state is empty (error recovery)
if not conversation_state:
logger.warning("Empty conversation_state in generate_response, attempting recovery from global state")
current_state = global_state_manager.get_conversation_state()
chat_history = current_state['chat_history']
conversation_state = current_state['conversation_state']
if not conversation_state:
log_step("generate_response", callback_start)
return chat_history, conversation_state
# Get last user message
last_user_message = ""
for msg in reversed(conversation_state):
if msg["role"] == "user":
last_user_message = msg["content"]
break
if not last_user_message:
log_step("generate_response", callback_start)
return chat_history, conversation_state
try:
# Call orchestration
orch_start = log_step("orchestrate_turn call")
raw_response = orchestrate_turn(last_user_message, conversation_state)
log_step("orchestrate_turn call", orch_start)
# Stream the processed response
first_chunk = True
for chunk in post_processor.process_and_stream_response(raw_response, last_user_message):
# Remove loading animation on FIRST chunk only
if first_chunk:
chat_history = remove_loading_animations(chat_history)
first_chunk = False
# Update chat display - create new list for Gradio to detect change
if chat_history and chat_history[-1]["role"] == "assistant":
# Update existing assistant message
new_chat_history = chat_history[:-1] + [{"role": "assistant", "content": chunk}]
else:
# Add new assistant message
new_chat_history = chat_history + [{"role": "assistant", "content": chunk}]
chat_history = new_chat_history
# Yield to update UI during streaming
yield chat_history, conversation_state
# Add final response to conversation state (create new list)
final_response = chunk if 'chunk' in locals() else raw_response
new_conversation_state = conversation_state + [{"role": "assistant", "content": final_response}]
threading.Thread(
target=lambda: global_state_manager.update_conversation_state(chat_history, new_conversation_state),
daemon=True
).start()
# Final yield with complete states
yield chat_history, new_conversation_state
except Exception as e:
logger.error(f"Response generation error: {e}")
import traceback
logger.error(traceback.format_exc())
error_msg = f"I encountered an error: {str(e)}"
# Clean up and show error (create new lists)
new_chat_history = remove_loading_animations(chat_history) + [{"role": "assistant", "content": error_msg}]
new_conversation_state = conversation_state + [{"role": "assistant", "content": error_msg}]
global_state_manager.update_conversation_state(new_chat_history, new_conversation_state)
yield new_chat_history, new_conversation_state
log_step("generate_response", callback_start)
def reset_conversation():
"""
Reset conversation with global state persistence.
βœ… Returns empty states to Gradio components.
"""
callback_start = log_step("reset_conversation")
global_state_manager.reset_conversation_state()
log_step("reset_conversation", callback_start)
return [], []
def load_conversation_state():
"""
Load conversation state from global manager.
βœ… Returns current states to Gradio components.
"""
callback_start = log_step("load_conversation_state")
current_state = global_state_manager.get_conversation_state()
log_step("load_conversation_state", callback_start)
# Extract and return both states
return current_state['chat_history'], current_state['conversation_state']
# ============================================================================
# MULTI-PAGE INTERFACE
# ============================================================================
def create_interface():
"""Create multi-page Gradio interface"""
logger.info("Creating Gradio interface...")
# Pages
import gradio_chatbot
import gradio_analytics
import gradio_prompt_testing
with gr.Blocks(title="Mimir - Educational AI Assistant") as demo:
navbar = gr.Navbar(
visible=True,
main_page_name="Mimir Chatbot",
value=[("Case Study", "https://github.com/Jdesiree112/Technical_Portfolio/tree/main/CaseStudy_Mimir")]
)
gradio_chatbot.demo.render()
with demo.route("Analytics"):
navbar = gr.Navbar(
visible=True,
main_page_name="Mimir Chatbot",
value=[("Case Study", "https://github.com/Jdesiree112/Technical_Portfolio/tree/main/CaseStudy_Mimir")]
)
gradio_analytics.demo.render()
with demo.route("Prompt Testing"):
navbar = gr.Navbar(
visible=True,
main_page_name="Mimir Chatbot",
value=[("Case Study", "https://github.com/Jdesiree112/Technical_Portfolio/tree/main/CaseStudy_Mimir")]
)
gradio_prompt_testing.demo.render()
logger.info("Interface created successfully")
return demo
# ============================================================================
# MAIN EXECUTION
# ============================================================================
if __name__ == "__main__":
try:
logger.info("="*60)
logger.info("STARTING MIMIR APPLICATION")
logger.info("="*60)
logger.info(f"LightEval available: {LIGHTEVAL_AVAILABLE}")
logger.info(f"Current year: {CURRENT_YEAR}")
logger.info(f"Lazy loading enabled: Model loads on first request βœ…")
logger.info("="*60)
# Create and launch interface
logger.info("β†’ Creating Gradio interface...")
interface_start = time.time()
interface = create_interface()
interface_duration = time.time() - interface_start
logger.info(f"βœ“ Interface created in {interface_duration:.2f}s")
logger.info("β†’ Launching Gradio server on 0.0.0.0:7860...")
logger.info("β†’ Model will load on first user request (lazy loading)")
interface.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
debug=True,
favicon_path="favicon.ico" if os.path.exists("favicon.ico") else None,
show_error=True,
ssr_mode=False,
quiet=False,
prevent_thread_lock=False,
max_threads=40
)
logger.info("βœ“ Gradio server started successfully")
except KeyboardInterrupt:
logger.info("Shutting down Mimir gracefully...")
except Exception as e:
logger.error("="*60)
logger.error("CRITICAL ERROR IN MAIN EXECUTION")
logger.error("="*60)
logger.error(f"Error: {e}")
import traceback
logger.error(traceback.format_exc())
logger.error("="*60)
raise