diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,927 +1,232 @@ -# app.py -""" -Mimir Educational AI Assistant - Main Application -Architecture: -- Multi-page Gradio interface (Chatbot + Analytics with link to Mimir case study) -- Agent-based orchestration (Tool, Routing, Thinking, Response) -- Global state management with SQLite + HF dataset backup -- Prompt state tracking per turn -- LightEval for metrics tracking -- Logger for timing functions -- OPTIMIZED: Single Llama-3.2-3B model with lazy loading (loads on first use, ~1GB) -""" import os import re -import sys -import time import json -import base64 -import logging -import sqlite3 -import subprocess +import time +import torch +import gradio as gr import threading +import logging +import platform import warnings -import uuid + from datetime import datetime -from pathlib import Path -from typing import Dict, List, Optional, Tuple, Any - -# ============================================================================ -# HUGGINGFACE CACHE SETUP - Avoid Permission Errors -# ============================================================================ -# Use /tmp for all HuggingFace operations (writable at runtime) -HF_CACHE = "/tmp/huggingface" -os.makedirs(f"{HF_CACHE}/hub", exist_ok=True) -os.makedirs(f"{HF_CACHE}/modules", exist_ok=True) -os.makedirs(f"{HF_CACHE}/transformers", exist_ok=True) - -# Configure HuggingFace cache locations -os.environ['HF_HOME'] = HF_CACHE -os.environ['HF_HUB_CACHE'] = f"{HF_CACHE}/hub" -os.environ['HF_MODULES_CACHE'] = f"{HF_CACHE}/modules" -os.environ['HF_CACHE'] = f"{HF_CACHE}/transformers" -os.environ['HF_HUB_ENABLE_HF_TRANSFER'] = '1' # Faster downloads - -# Matplotlib cache (avoid permission warnings) -os.environ['MPLCONFIGDIR'] = "/tmp/matplotlib" -os.makedirs("/tmp/matplotlib", exist_ok=True) - -# ============================================================================ -# CORE DEPENDENCIES -# ============================================================================ -import torch -import gradio as gr from dotenv import load_dotenv +from typing import Annotated, Sequence, TypedDict, List, Optional, Any, Type -# Agent architecture -from agents import ( - ToolDecisionAgent, - PromptRoutingAgents, - ThinkingAgents, - ResponseAgent, -) - -# Lazy-loading model (optional pre-warm) -from model_manager import get_model - -# State management -from state_manager import ( - GlobalStateManager, - LogicalExpressions, -) +from pydantic import BaseModel, Field -# Prompt library -from prompt_library import ( - CORE_IDENTITY, - VAUGE_INPUT, - USER_UNDERSTANDING, - GENERAL_FORMATTING, - LATEX_FORMATTING, - GUIDING_TEACHING, - STRUCTURE_PRACTICE_QUESTIONS, - PRACTICE_QUESTION_FOLLOWUP, - TOOL_USE_ENHANCEMENT, -) +# Gradio Spaces decorator (for @spaces.GPU) +import spaces # LangGraph imports from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langgraph.checkpoint.memory import MemorySaver +from langgraph.prebuilt import ToolNode -# LangChain Core +# LangChain Core imports from langchain_core.tools import tool from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, ToolMessage, BaseMessage +from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder +from langchain_core.runnables import Runnable +from langchain_core.runnables.utils import Input, Output + +# Transformers imports +from transformers import ( + AutoTokenizer, + AutoModelForCausalLM, + TextIteratorStreamer, + StoppingCriteria, + StoppingCriteriaList, + BitsAndBytesConfig, +) -# Tool for graphing from graph_tool import generate_plot +from loading_animations import create_thinking_indicator, get_thinking_dots + + +# Updated environment variables +os.environ['HF_HOME'] = '/tmp/huggingface' +os.environ['HF_DATASETS_CACHE'] = '/tmp/huggingface' -# ============================================================================ -# LIGHTEVAL FOR METRICS -# ============================================================================ -try: - from lighteval.logging.evaluation_tracker import EvaluationTracker - from lighteval.models.transformers.transformers_model import TransformersModel - from lighteval.metrics.metrics_sample import BertScore, ROUGE - from lighteval.tasks.requests import Doc - LIGHTEVAL_AVAILABLE = True -except ImportError: - LIGHTEVAL_AVAILABLE = False - logging.warning("LightEval not available - metrics tracking limited") - -# ============================================================================ -# CONFIGURATION -# ============================================================================ # Suppress warnings -warnings.filterwarnings("ignore", category=UserWarning) -warnings.filterwarnings("ignore", category=FutureWarning) +warnings.filterwarnings("ignore", message="Special tokens have been added") +warnings.filterwarnings("ignore", category=UserWarning, module="transformers") +warnings.filterwarnings("ignore", category=FutureWarning, module="huggingface_hub") +warnings.filterwarnings("ignore", message=".*TracerWarning.*") +warnings.filterwarnings("ignore", message=".*flash-attention.*") -# Load environment load_dotenv(".env") HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN") +print("Environment variables loaded.") -# Debug and runtime settings DEBUG_STATE = os.getenv("DEBUG_STATE", "false").lower() == "true" -CURRENT_YEAR = datetime.now().year - - -# ============================================================================ -# LOGGING SETUP -# ============================================================================ -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) -logger = logging.getLogger(__name__) - - -def log_step(step_name: str, start_time: Optional[float] = None) -> float: - """ - Log a pipeline step with timestamp and duration. - - Args: - step_name: Name of the step - start_time: Start time from previous call (if completing a step) - - Returns: - Current time for next call - """ - now = time.time() - timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] +def debug_state(conversation_state, event_name="", force_debug=False): + """Debug function to inspect current conversation state""" + if not (DEBUG_STATE or force_debug): + return conversation_state - if start_time: - duration = now - start_time - logger.info(f"[{timestamp}] COMPLETED: {step_name} ({duration:.2f}s)") - else: - logger.info(f"[{timestamp}] STARTING: {step_name}") + timestamp = datetime.now().strftime("%H:%M:%S") + logger.info(f"[{timestamp}] DEBUG STATE - {event_name}") + logger.info(f"Total messages: {len(conversation_state)}") - return now - -# ============================================================================ -# MODEL INFORMATION -# ============================================================================ -print("="*60) -print("MIMIR - Using Llama-3.2-3B-Instruct") -print(" Model: meta-llama/Llama-3.2-3B-Instruct") -print(" Memory: ~1GB (4-bit quantized)") -print(" Context: 128K tokens") -print(" Architecture: Single unified model") -print("="*60) - -# ============================================================================ -# GLOBAL INITIALIZATION -# ============================================================================ - -logger.info("="*60) -logger.info("INITIALIZING MIMIR APPLICATION") -logger.info("="*60) - -init_start = log_step("Global Initialization") - -# Initialize state management -global_state_manager = GlobalStateManager() -logical_expressions = LogicalExpressions() -logger.info("State management initialized") - -# Initialize agents (lazy loading - models load on first use) -tool_agent = ToolDecisionAgent() -routing_agents = PromptRoutingAgents() -thinking_agents = ThinkingAgents() -response_agent = ResponseAgent() -logger.info("Agents initialized (using shared get_shared_llama)") - -# Pre-warm shared Qwen3-Claude (optional - happens on first agent call anyway) -logger.info("Shared Qwen3-Claude agent ready (loads on first use)") - -log_step("Global Initialization", init_start) - - -# ============================================================================ -# ANALYTICS & DATABASE FUNCTIONS -# ============================================================================ - -def get_trackio_database_path(project_name: str) -> Optional[str]: - """Get path to metrics SQLite database""" - possible_paths = [ - f"./{project_name}.db", - f"./trackio_data/{project_name}.db", - f"./.trackio/{project_name}.db", - "./mimir_metrics.db" - ] + for i, msg in enumerate(conversation_state): + role = msg["role"] + content_preview = msg["content"][:100] + "..." if len(msg["content"]) > 100 else msg["content"] + logger.info(f" {i+1}. {role}: {content_preview}") - for path in possible_paths: - if os.path.exists(path): - return path + # Log to file for later analysis + if DEBUG_STATE: + debug_log_file = "debug_state.log" + with open(debug_log_file, "a", encoding="utf-8") as f: + f.write(f"\n=== {timestamp} - {event_name} ===\n") + f.write(f"Total messages: {len(conversation_state)}\n") + for i, msg in enumerate(conversation_state): + f.write(f"{i+1}. {msg['role']}: {msg['content'][:200]}...\n") + f.write("=" * 40 + "\n") - return None - - -def get_project_statistics_with_nulls(cursor, project_name: str) -> Dict: - """Query metrics database for project statistics""" - try: - stats = {} - - # Total conversations - try: - cursor.execute(""" - SELECT COUNT(DISTINCT run_id) as total_runs - FROM metrics - WHERE project_name = ? - """, (project_name,)) - result = cursor.fetchone() - stats["total_conversations"] = result["total_runs"] if result and result["total_runs"] > 0 else None - except sqlite3.Error: - stats["total_conversations"] = None - - # Average response time - try: - cursor.execute(""" - SELECT AVG(CAST(value AS FLOAT)) as avg_response_time - FROM metrics - WHERE project_name = ? AND metric_name = 'response_time' - """, (project_name,)) - result = cursor.fetchone() - if result and result["avg_response_time"] is not None: - stats["avg_session_length"] = round(result["avg_response_time"], 2) - else: - stats["avg_session_length"] = None - except sqlite3.Error: - stats["avg_session_length"] = None - - # Success rate - try: - cursor.execute(""" - SELECT - COUNT(*) as total_responses, - SUM(CASE WHEN CAST(value AS FLOAT) > 3.5 THEN 1 ELSE 0 END) as successful_responses - FROM metrics - WHERE project_name = ? AND metric_name = 'quality_score' - """, (project_name,)) - result = cursor.fetchone() - if result and result["total_responses"] > 0: - success_rate = (result["successful_responses"] / result["total_responses"]) * 100 - stats["success_rate"] = round(success_rate, 1) - else: - stats["success_rate"] = None - except sqlite3.Error: - stats["success_rate"] = None - - return stats - - except sqlite3.Error as e: - logger.error(f"Database error: {e}") - return {"total_conversations": None, "avg_session_length": None, "success_rate": None} + return conversation_state +# Setup main logger first +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) -def get_recent_interactions_with_nulls(cursor, project_name: str, limit: int = 10) -> List: - """Query for recent interactions""" - try: - cursor.execute(""" - SELECT - m1.timestamp, - m2.value as response_time, - m3.value as prompt_mode, - m4.value as tools_used, - m5.value as quality_score, - m6.value as adapter_used, - m1.run_id - FROM metrics m1 - LEFT JOIN metrics m2 ON m1.run_id = m2.run_id AND m2.metric_name = 'response_time' - LEFT JOIN metrics m3 ON m1.run_id = m3.run_id AND m3.metric_name = 'prompt_mode' - LEFT JOIN metrics m4 ON m1.run_id = m4.run_id AND m4.metric_name = 'tools_used' - LEFT JOIN metrics m5 ON m1.run_id = m5.run_id AND m5.metric_name = 'quality_score' - LEFT JOIN metrics m6 ON m1.run_id = m6.run_id AND m6.metric_name = 'active_adapter' - WHERE m1.project_name = ? AND m1.metric_name = 'conversation_start' - ORDER BY m1.timestamp DESC - LIMIT ? - """, (project_name, limit)) - - results = cursor.fetchall() - recent_data = [] - - for row in results: - recent_data.append([ - row["timestamp"][:16] if row["timestamp"] else None, - float(row["response_time"]) if row["response_time"] is not None else None, - row["prompt_mode"] if row["prompt_mode"] else None, - bool(int(row["tools_used"])) if row["tools_used"] is not None else None, - float(row["quality_score"]) if row["quality_score"] is not None else None, - row["adapter_used"] if row["adapter_used"] else None - ]) - - return recent_data - - except sqlite3.Error as e: - logger.error(f"Database error: {e}") - return [] +# MISSING HTML CONTENT DEFINITIONS - FIX FOR UNDEFINED VARIABLES +html_head_content = """ + + +Mimir - Educational AI Assistant +""" +force_light_mode = """ + +""" -def create_dashboard_html_with_nulls(project_name: str, project_stats: Dict) -> str: - """Create dashboard HTML with enhanced agent-based metrics""" - def format_stat(value, suffix="", no_data_text="No data"): - if value is None: - return f'{no_data_text}' - return f"{value}{suffix}" +# Environment and Logging Setup +def setup_metrics_logger(): + """Setup a simple file logger for human-readable metrics""" + metrics_logger = logging.getLogger('metrics') + metrics_logger.setLevel(logging.INFO) - def format_large_stat(value, suffix="", no_data_text="--"): - if value is None: - return f'{no_data_text}' - return f"{value}{suffix}" + # Avoid duplicate handlers + if metrics_logger.handlers: + return metrics_logger - # Get evaluation metrics from global state - try: - eval_summary = global_state_manager.get_evaluation_summary() - cache_status = global_state_manager.get_cache_status() - - project_stats["ml_educational_quality"] = eval_summary['aggregate_metrics']['avg_educational_quality'] - project_stats["user_satisfaction"] = eval_summary['aggregate_metrics']['user_satisfaction_rate'] - project_stats["active_sessions"] = cache_status['total_conversation_sessions'] - - except Exception as e: - logger.warning(f"Could not get global state metrics: {e}") - project_stats["ml_educational_quality"] = None - project_stats["user_satisfaction"] = None - project_stats["active_sessions"] = None + # Create file handler + log_file = 'performance_metrics.log' + handler = logging.FileHandler(log_file) - # Status determination - success_rate = project_stats.get("success_rate") - if success_rate is not None: - if success_rate >= 80: - status_color = "#4CAF50" - status_text = "Excellent" - elif success_rate >= 60: - status_color = "#FF9800" - status_text = "Good" - else: - status_color = "#F44336" - status_text = "Needs Improvement" - else: - status_color = "#999" - status_text = "No data" + # Create formatter for clean output + formatter = logging.Formatter('%(message)s') + handler.setFormatter(formatter) - # Agent-based metrics section - agent_metrics_section = f""" -
- πŸš€ Agent Performance (Qwen3-Claude Single Model): - Educational Quality: {format_stat(project_stats.get('ml_educational_quality'), '', 'N/A')} | - User Satisfaction: {format_stat(project_stats.get('user_satisfaction'), '%' if project_stats.get('user_satisfaction') else '', 'N/A')} | - Active Sessions: {format_stat(project_stats.get('active_sessions'), '', 'N/A')} -
- """ - - dashboard_html = f''' -
-

πŸ“Š {project_name} Analytics

- -
-
-
{format_large_stat(project_stats.get('total_conversations'))}
-
Total Sessions
-
-
-
{format_large_stat(project_stats.get('avg_session_length'), 's' if project_stats.get('avg_session_length') else '')}
-
Avg Response Time
-
-
-
{format_large_stat(success_rate, '%' if success_rate else '')}
-
Success Rate ({status_text})
-
-
- - {agent_metrics_section} - -
- Model: {format_stat(project_stats.get('model_type'), no_data_text='Unknown')} | - Last Updated: {project_stats.get('last_updated', 'Unknown')} -
-
- ''' - - return dashboard_html - - -def calculate_response_quality(response: str) -> float: - """Calculate response quality score""" - try: - length_score = min(len(response) / 200, 1.0) - educational_keywords = ['learn', 'understand', 'concept', 'example', 'practice'] - keyword_score = sum(1 for keyword in educational_keywords if keyword in response.lower()) / len(educational_keywords) - - if len(response) < 20: - return 2.0 - elif len(response) > 2000: - return 3.5 - - base_score = 2.5 + (length_score * 1.5) + (keyword_score * 1.0) - return min(max(base_score, 1.0), 5.0) - except: - return 3.0 - - -def evaluate_educational_quality_with_tracking(user_query: str, response: str, thread_id: str = None, session_id: str = None): - """Educational quality evaluation with state tracking using LightEval""" - start_time = time.time() + metrics_logger.addHandler(handler) + return metrics_logger + +# Initialize the logger +metrics_logger = setup_metrics_logger() + +def log_metric(message): + """Log a human-readable metric message with automatic timestamp""" + current_time = datetime.now() + timestamped_message = f"{message} | Logged: {current_time:%Y-%m-%d %H:%M:%S}" + metrics_logger.info(timestamped_message) + logger.info(timestamped_message) + +# Support both token names for flexibility +hf_token = HF_TOKEN +if not hf_token: + logger.warning("Neither HF_TOKEN nor HUGGINGFACEHUB_API_TOKEN is set, the application may not work.") + +# Tool Decision Engine (Updated for LangGraph) +class Tool_Decision_Engine: + """Uses LLM to intelligently decide when visualization tools would be beneficial""" - try: - # Educational indicators - educational_indicators = { - 'has_examples': 'example' in response.lower(), - 'structured_explanation': '##' in response or '1.' in response, - 'appropriate_length': 100 < len(response) < 1500, - 'encourages_learning': any(phrase in response.lower() - for phrase in ['practice', 'try', 'consider', 'think about']), - 'uses_latex': '$' in response, - 'has_clear_sections': response.count('\n\n') >= 2 - } - - educational_score = sum(educational_indicators.values()) / len(educational_indicators) - semantic_quality = min(len(response) / 500, 1.0) - response_time = time.time() - start_time - - # Use LightEval if available - if LIGHTEVAL_AVAILABLE: - try: - doc = Doc( - task_name=f"turn_{thread_id or session_id}", - query=user_query, - choices=[response], - gold_index=-1, - specific_output=response - ) - - bert_score = BertScore().compute(doc) - semantic_quality = bert_score if bert_score else semantic_quality - - except Exception as lighteval_error: - logger.warning(f"LightEval computation failed: {lighteval_error}") - - metrics = { - 'semantic_quality': semantic_quality, - 'educational_score': educational_score, - 'response_time': response_time, - 'indicators': educational_indicators - } - - # Track in global state - global_state_manager.add_educational_quality_score( - user_query=user_query, - response=response, - metrics=metrics, - session_id=session_id - ) - - logger.info(f"Educational quality evaluated: {educational_score:.3f}") - return metrics - - except Exception as e: - logger.error(f"Educational quality evaluation failed: {e}") - return {'educational_score': 0.5, 'semantic_quality': 0.5, 'response_time': 0.0} - -def log_metrics_to_database(project_name: str, run_id: str, metrics: Dict): - """Log metrics to SQLite database for dashboard""" - try: - db_path = get_trackio_database_path(project_name) + def __init__(self, llm): + self.decision_llm = llm + self.decision_prompt = """Analyze this educational query and determine if creating a graph, chart, or visual representation would significantly enhance learning and understanding. +Query: "{query}" + +EXCLUDE if query is: +- Greetings or casual conversation (hello, hi, hey) +- Simple definitions without data +- Test/warmup messages +- General explanations that don't involve data + +INCLUDE if query involves: +- Mathematical functions or relationships +- Data analysis or statistics +- Comparisons that benefit from charts +- Trends or patterns over time +- Creating practice questions with data + +Answer with exactly: YES or NO +Decision:""" + + def should_use_visualization(self, query: str) -> bool: + """Enhanced decision logic with explicit exclusions""" + start_graph_decision_time = time.perf_counter() + current_time = datetime.now() - if db_path is None: - db_path = "./mimir_metrics.db" - - conn = sqlite3.connect(db_path) - cursor = conn.cursor() - - # Create metrics table if not exists - cursor.execute(""" - CREATE TABLE IF NOT EXISTS metrics ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - project_name TEXT, - run_id TEXT, - metric_name TEXT, - value TEXT, - timestamp TEXT - ) - """) - - # Insert metrics - timestamp = datetime.now().isoformat() - for metric_name, metric_value in metrics.items(): - cursor.execute(""" - INSERT INTO metrics (project_name, run_id, metric_name, value, timestamp) - VALUES (?, ?, ?, ?, ?) - """, (project_name, run_id, metric_name, str(metric_value), timestamp)) - - conn.commit() - conn.close() - - logger.info(f"Logged {len(metrics)} metrics to database") - - except Exception as e: - logger.error(f"Failed to log metrics to database: {e}") - - -def sync_trackio_with_global_state(): - """Sync metrics database with global state manager data""" - try: - eval_summary = global_state_manager.get_evaluation_summary() - - # Log to database (agent-based metrics only) - metrics = { - "educational_quality_avg": eval_summary['aggregate_metrics']['avg_educational_quality'], - "user_satisfaction": eval_summary['aggregate_metrics']['user_satisfaction_rate'], - "total_evaluations": sum(eval_summary['total_evaluations'].values()) - } - - log_metrics_to_database("Mimir", str(uuid.uuid4()), metrics) - - logger.info("Synced global state metrics to database") - - except Exception as e: - logger.error(f"Failed to sync metrics to database: {e}") - - -def refresh_analytics_data_persistent(): - """Refresh analytics data with global state persistence""" - project_name = "Mimir" - - try: - analytics_state = global_state_manager.get_analytics_state() - last_refresh = analytics_state.get('last_refresh') - - # If refreshed within last 30 seconds, return cached - if last_refresh and (datetime.now() - last_refresh).seconds < 30: - logger.info("Using cached analytics data (recent refresh)") - return ( - analytics_state['project_stats'], - analytics_state['recent_interactions'], - analytics_state['dashboard_html'] - ) - - db_path = get_trackio_database_path(project_name) - - if db_path is None: - logger.warning("No metrics database found") - project_stats = { - "total_conversations": None, - "avg_session_length": None, - "success_rate": None, - "model_type": "Qwen3-4B-Claude GGUF (Q6_K - Single Model)", - "last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S") - } - - dashboard_html = create_dashboard_html_with_nulls(project_name, project_stats) - recent_interactions = [] - - global_state_manager.update_analytics_state( - project_stats=project_stats, - recent_interactions=recent_interactions, - dashboard_html=dashboard_html - ) + try: + # Explicit exclusions for common non-visual queries + exclusion_patterns = [ + r'^(hello|hi|hey)\b', + r'warmup.*test', + r'(what is|define|explain)\s+\w+\s*(of|the)?', + r'capital\s+of', + r'^(greet|greeting)' + ] - return project_stats, recent_interactions, dashboard_html - - conn = sqlite3.connect(db_path) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - - project_stats = get_project_statistics_with_nulls(cursor, project_name) - project_stats["model_type"] = "Qwen3-4B-Claude GGUF (Q6_K - Single Model)" - project_stats["last_updated"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - recent_data = get_recent_interactions_with_nulls(cursor, project_name, limit=10) - dashboard_html = create_dashboard_html_with_nulls(project_name, project_stats) - - conn.close() - - global_state_manager.update_analytics_state( - project_stats=project_stats, - recent_interactions=recent_data, - dashboard_html=dashboard_html - ) - - logger.info("Analytics data refreshed and cached successfully") - return project_stats, recent_data, dashboard_html - - except Exception as e: - logger.error(f"Error refreshing analytics: {e}") - - error_stats = { - "error": str(e), - "total_conversations": None, - "avg_session_length": None, - "success_rate": None, - "model_type": "Error", - "last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S") - } - - error_html = f""" -
-

⚠️ Analytics Error

-

Could not load analytics data: {str(e)[:100]}

-
- """ - - global_state_manager.update_analytics_state( - project_stats=error_stats, - recent_interactions=[], - dashboard_html=error_html, - error_state=str(e) - ) - - return error_stats, [], error_html - - -def export_metrics_json_persistent(): - """Export metrics as JSON file""" - try: - project_stats, recent_data, _ = refresh_analytics_data_persistent() - - export_data = { - "project": "Mimir", - "export_timestamp": datetime.now().isoformat(), - "statistics": project_stats, - "recent_interactions": recent_data - } - - filename = f"mimir_metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" - - with open(filename, 'w') as f: - json.dump(export_data, f, indent=2, default=str) - - global_state_manager.add_export_record("JSON", filename, success=True) - - logger.info(f"Metrics exported to {filename}") - gr.Info(f"Metrics exported successfully to {filename}") - - except Exception as e: - global_state_manager.add_export_record("JSON", "failed", success=False) - logger.error(f"Export failed: {e}") - gr.Warning(f"Export failed: {str(e)}") - - -def export_metrics_csv_persistent(): - """Export metrics as CSV file""" - try: - import csv - - _, recent_data, _ = refresh_analytics_data_persistent() - - filename = f"mimir_metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" - - with open(filename, 'w', newline='') as f: - writer = csv.writer(f) - writer.writerow(["Timestamp", "Response Time", "Mode", "Tools Used", "Quality Score", "Adapter"]) + query_lower = query.lower().strip() - for row in recent_data: - writer.writerow(row) - - global_state_manager.add_export_record("CSV", filename, success=True) - - logger.info(f"Metrics exported to {filename}") - gr.Info(f"Metrics exported successfully to {filename}") - - except Exception as e: - global_state_manager.add_export_record("CSV", "failed", success=False) - logger.error(f"Export failed: {e}") - gr.Warning(f"Export failed: {str(e)}") - - -def load_analytics_state(): - """Load analytics state from global manager""" - analytics_state = global_state_manager.get_analytics_state() - - project_stats = analytics_state['project_stats'] - recent_interactions = analytics_state['recent_interactions'] - dashboard_html = analytics_state['dashboard_html'] - - if dashboard_html is None: - dashboard_html = """ -
-

πŸ“Š Analytics Dashboard

-

Click "Refresh Data" to load analytics.

-
- """ - - return project_stats, recent_interactions, dashboard_html - - -def get_global_state_debug_info(): - """Get debug information about global state""" - cache_status = global_state_manager.get_cache_status() - - debug_info = { - "cache_status": cache_status, - "timestamp": datetime.now().isoformat(), - "sessions": global_state_manager.get_all_sessions() - } - - return debug_info - - -# ============================================================================ -# POST-PROCESSING -# ============================================================================ - -class ResponsePostProcessor: - """Post-processing pipeline for educational responses""" - - def __init__(self, max_length: int = 1800, min_length: int = 10): - self.max_length = max_length - self.min_length = min_length - - self.logical_stop_patterns = [ - r'\n\n---\n', - r'\n\n## Summary\b', - r'\n\nIn conclusion\b', - r'\n\nTo summarize\b', - ] - - def process_response(self, raw_response: str, user_query: str = "") -> str: - """Main post-processing pipeline""" - try: - cleaned = self._enhanced_token_cleanup(raw_response) - cleaned = self._truncate_intelligently(cleaned) - cleaned = self._enhance_readability(cleaned) + # Check exclusions first + for pattern in exclusion_patterns: + if re.search(pattern, query_lower): + end_graph_decision_time = time.perf_counter() + graph_decision_time = end_graph_decision_time - start_graph_decision_time + log_metric(f"Tool decision time (excluded): {graph_decision_time:0.4f} seconds. Timestamp: {current_time:%Y-%m-%d %H:%M:%S}") + return False - if not self._passes_quality_check(cleaned): - return self._generate_fallback_response(user_query) + # Create decision prompt + decision_query = self.decision_prompt.format(query=query) - return cleaned.strip() + # Get LLM decision + decision_response = self.decision_llm.invoke(decision_query) - except Exception as e: - logger.error(f"Post-processing error: {e}") - return raw_response - - def _enhanced_token_cleanup(self, text: str) -> str: - """Remove model artifacts""" - artifacts = [ - r'<\|.*?\|>', - r'###\s*$', - r'User:\s*$', - r'Assistant:\s*$', - r'\n\s*\n\s*\n+', - ] - - for pattern in artifacts: - text = re.sub(pattern, '', text, flags=re.MULTILINE) - - return text - - def _truncate_intelligently(self, text: str) -> str: - """Truncate at logical educational endpoints""" - for pattern in self.logical_stop_patterns: - match = re.search(pattern, text, re.IGNORECASE) - if match: - return text[:match.start()].strip() - - if len(text) <= self.max_length: - return text - - sentences = re.split(r'[.!?]+\s+', text) - truncated = "" - - for sentence in sentences: - test_length = len(truncated + sentence + ". ") - if test_length <= self.max_length: - truncated += sentence + ". " - else: - break - - return truncated.strip() - - def _enhance_readability(self, text: str) -> str: - """Format for better presentation""" - text = re.sub(r'([.!?])([A-Z])', r'\1 \2', text) - text = re.sub(r'\s{2,}', ' ', text) - text = re.sub(r'\n\s*[-*]\s*', '\n- ', text) - - return text - - def _passes_quality_check(self, text: str) -> bool: - """Final quality validation""" - if len(text.strip()) < self.min_length: - return False - - sentences = re.split(r'[.!?]+', text) - valid_sentences = [s for s in sentences if len(s.strip()) > 5] - - return len(valid_sentences) > 0 - - def _generate_fallback_response(self, user_query: str) -> str: - """Generate safe fallback""" - return "I'd be happy to help you understand this better. Could you clarify what specific aspect you'd like me to focus on?" - - def process_and_stream_response(self, raw_response: str, user_query: str = ""): - """Process response then stream word-by-word""" - try: - processed_response = self.process_response(raw_response, user_query) + # Parse response - look for YES/NO + decision_text = decision_response.strip().upper() - words = processed_response.split() - current_output = "" + # Log the decision for debugging + logger.info(f"Tool decision for '{query[:50]}...': {decision_text}") - for i, word in enumerate(words): - current_output += word - if i < len(words) - 1: - current_output += " " - - yield current_output - time.sleep(0.015) - - except Exception as e: - logger.error(f"Stream processing error: {e}") - yield "I encountered an error processing the response." - - -post_processor = ResponsePostProcessor() - -# ============================================================================ -# DATA EXTRACTION FOR GRAPHING -# ============================================================================ - -def extract_graph_data(user_input: str, conversation_history: Optional[List[Dict]] = None) -> Optional[Dict]: - """ - Use LLM to extract graphable data from user input. - - Returns: - Dict with keys: data, plot_type, title, x_label, y_label, educational_context - Or None if no data can be extracted - """ - from model_manager import get_model - - model = get_model() - - # Format conversation context - context = "" - if conversation_history: # This now handles None safely - recent = conversation_history[-2:] - context = "\n".join([f"{msg['role']}: {msg['content'][:200]}" for msg in recent]) - - - extraction_prompt = f"""Extract graphable data from the user's message. - -Previous context: -{context} - -Current message: {user_input} - -If the message contains data that can be graphed (numbers, comparisons, datasets, trends), extract: -1. The data as key-value pairs -2. The best plot type (bar, line, or pie) -3. A descriptive title -4. Axis labels (if applicable) -5. Educational context explaining what the graph shows - -Respond in JSON format ONLY: -{{ - "has_data": true/false, - "data": {{"label1": value1, "label2": value2, ...}}, - "plot_type": "bar/line/pie", - "title": "Graph Title", - "x_label": "X Axis Label", - "y_label": "Y Axis Label", - "educational_context": "Brief explanation of what this graph represents" -}} - -If no graphable data exists, respond: {{"has_data": false}}""" - - try: - system_prompt = "You are a data extraction expert. Extract graphable data from text and respond in valid JSON only." - - response = model.generate( - system_prompt=system_prompt, - user_message=extraction_prompt, - max_tokens=300, - temperature=0.3 - ) - - # Parse JSON response - import json - # Try to extract JSON from response - json_start = response.find('{') - json_end = response.rfind('}') + 1 - - if json_start == -1 or json_end == 0: - return None + # More strict parsing + result = "YES" in decision_text and "NO" not in decision_text - json_str = response[json_start:json_end] - result = json.loads(json_str) - - if not result.get('has_data', False): - return None + end_graph_decision_time = time.perf_counter() + graph_decision_time = end_graph_decision_time - start_graph_decision_time + log_metric(f"Tool decision time: {graph_decision_time:0.4f} seconds. Decision: {result}. Timestamp: {current_time:%Y-%m-%d %H:%M:%S}") - # Validate required fields - if 'data' not in result or not result['data']: - return None + return result - return result - - except Exception as e: - logger.error(f"Data extraction failed: {e}") - return None - - + except Exception as e: + logger.error(f"Error in tool decision making: {e}") + end_graph_decision_time = time.perf_counter() + graph_decision_time = end_graph_decision_time - start_graph_decision_time + log_metric(f"Tool decision time (error): {graph_decision_time:0.4f} seconds. Timestamp: {current_time:%Y-%m-%d %H:%M:%S}") + return False - -# ============================================================================ -# TOOL FUNCTIONS -# ============================================================================ +# LangGraph State Definition +class EducationalAgentState(TypedDict): + messages: Annotated[Sequence[BaseMessage], add_messages] + needs_tools: bool + educational_context: Optional[str] @tool(return_direct=False) def Create_Graph_Tool( @@ -932,10 +237,30 @@ def Create_Graph_Tool( y_label: str = "", educational_context: str = "" ) -> str: - """Generate educational graphs""" - tool_start = log_step("Create_Graph_Tool") + """ + Creates educational graphs and charts to help explain concepts to students. + + Use this tool ONLY when teaching concepts that would benefit from visual representation, such as: + - Mathematical functions and relationships (quadratic equations, exponential growth) + - Statistical distributions and data analysis (normal curves, survey results) + - Scientific trends and comparisons (temperature changes, population growth) + - Economic models and business metrics (profit over time, market shares) + - Grade distributions or performance analysis (test score ranges) + - Any quantitative concept that's clearer with visualization + + Args: + data: Dictionary with string keys and numeric values {"Category A": 25, "Category B": 40} + plot_type: "bar", "line", or "pie" + title: Title for the chart + x_label: X-axis label + y_label: Y-axis label + educational_context: Explanation of why this visualization helps learning + """ + start_create_graph_tool_time = time.perf_counter() + current_time = datetime.now() try: + # Call the generate_plot function directly content, artifact = generate_plot( data=data, plot_type=plot_type, @@ -944,16 +269,19 @@ def Create_Graph_Tool( y_label=y_label ) + # Check if there was an error if "error" in artifact: - log_step("Create_Graph_Tool", tool_start) return f'

Graph generation failed: {artifact["error"]}

' + # Convert the base64 image to HTML base64_image = artifact["base64_image"] + # Add educational context if provided context_html = "" if educational_context: - context_html = f'
πŸ’‘ {educational_context}
' + context_html = f'
πŸ’‘ {educational_context}
' + # Create the complete HTML with image result = f"""{context_html}
{title}
""" - log_step("Create_Graph_Tool", tool_start) + end_create_graph_tool_time = time.perf_counter() + graph_create_graph_tool_time = end_create_graph_tool_time - start_create_graph_tool_time + log_metric(f"Graph tool creation time: {graph_create_graph_tool_time:0.4f} seconds. Timestamp: {current_time:%Y-%m-%d %H:%M:%S}") + return result except Exception as e: - logger.error(f"Graph tool error: {e}") - log_step("Create_Graph_Tool", tool_start) - return f'

Error: {str(e)}

' + logger.error(f"Error in graph generation: {e}") + return f'

Error creating graph: {str(e)}

' + +# System Prompt with ReAct Framework for Phi-3-mini +SYSTEM_PROMPT = """You are Mimir, an expert multi-concept tutor designed to facilitate genuine learning and understanding. Your primary mission is to guide students through the learning process. You do so concisely, without excessive filler language or flowery content. + +## Core Educational Principles +- Provide comprehensive, educational responses that help students truly understand concepts +- Prioritize teaching methodology over answer delivery +- Foster critical thinking and independent problem-solving skills + +## Formatting +- You have access to LaTeX and markdown rendering. +- Use ## and ## headings when needed. If only one heading level is needed, use ##. +- For inline math, use $ ... $, e.g. $\sum_{i=0}^n i^2$ +- For centered display math, use $$ ... $$ on its own line. +- To show a literal dollar sign, use `\$` (e.g., \$5.00). +- To show literal parentheses in LaTeX, use `\(` and `\)` (e.g., \(a+b\)). +- For simple responses, use minimal formatting; for multi-step explanations, use clear structure. +- Separate sections and paragraphs with a full black line. +- Emojis are disabled. + +## Tone and Communication Style +- Write at a reading level that is accessible yet intellectually stimulating +- Be supportive and encouraging without being condescending +- Never use crude language or content inappropriate for an educational setting +- Avoid preachy, judgmental, or accusatory language +- Skip flattery and respond directly to questions +- Do not use emojis or actions in asterisks unless specifically requested +- Present critiques and corrections kindly as educational opportunities +- Keep responses between **1 and 4 sentences** unless step-by-step reasoning is required. +- Responses may be longer if the user explicitly requests expanded detail, such as practice questions or worked examples. + +## Simple Greetings +If a user only says "Hello," "Thank You," or another short greeting, first reciprocate in a professional, friendly way, then ask what you can help with today. + +### Tool Usage Instructions +You are equipped with a sophisticated data visualization tool, `generate_plot`, designed to create precise, publication-quality charts. Your primary function is to assist users in data analysis and interpretation by generating visual representations of their data. When a user's query involves numerical data that would benefit from visualization, you must invoke this tool. + +**Tool Signature:** +`generate_plot(data: Dict[str, float], plot_type: Literal["bar", "line", "pie"], title: str, labels: List[str], x_label: str, y_label: str)` + +**Parameter Guide:** +* `data` **(Required)**: A dictionary where keys are string labels and values are the corresponding numeric data points. + * *Example:* `{"Experiment A": 88.5, "Experiment B": 92.1}` +* `plot_type` **(Required)**: The specific type of chart to generate. This **must** be one of `"bar"`, `"line"`, or `"pie"`. +* `title` (Optional): A formal title for the plot. +* `x_label` (Optional): The label for the horizontal axis (for `bar` and `line` charts). +* `y_label` (Optional): The label for the vertical axis (for `bar` and `line` charts). +* `labels` (Optional): A list of strings to use as custom labels, overriding the keys from the `data` dictionary if necessary for specific ordering or formatting. + +**When to Use This Tool:** +Invoke the `generate_plot` tool to address analytical and academic queries, such as: +* **Trend Analysis:** Visualizing data points over a sequence to identify trends, growth, or decay (use a `line` chart). +* **Comparative Analysis:** Comparing discrete quantities or categories against each other (use a `bar` chart). +* **Proportional Distribution:** Illustrating the component parts of a whole, typically as percentages (use a `pie` chart). + +**Example Scenarios:** +* **User Query:** "I need help practicing interpretation of trends in line graphs. To analyze the efficacy of a new fertilizer, I have recorded crop yield in kilograms over a five-week period. Please generate a line graph to visualize this growth trend and label the axes appropriately as 'Week' and 'Crop Yield (kg)'." +* **Your Tool Call:** + * `data`: `{"Week 1": 120, "Week 2": 155, "Week 3": 190, "Week 4": 210, "Week 5": 245}` + * `plot_type`: `"line"` + * `title`: `"Efficacy of New Fertilizer on Crop Yield"` + * `x_label`: `"Week"` + * `y_label`: `"Crop Yield (kg)"` + +* **User Query:** "I am studying for my ACT, and I am at a loss on interpreting the charts. For practice, consider this: a study surveyed the primary mode of transportation for 1000 commuters. The results were: 450 drive, 300 use public transit, 150 cycle, and 100 walk. Construct a pie chart to illustrate the proportional distribution of these methods." +* **Your Tool Call:** + * `data`: `{"Driving": 450, "Public Transit": 300, "Cycling": 150, "Walking": 100}` + * `plot_type`: `"pie"` + * `title`: `"Proportional Distribution of Commuter Transportation Methods"` + +NOTE: If specific data to use is not supplied, create reasonable data to create your charts. + +## Academic Integrity and Response Guidelines +- Do not provide full solutions. Instead: + - **Guide through processes**: Break down problems into conceptual components + - **Ask clarifying questions**: Understand what the student knows + - **Provide similar examples**: Work through analogous problems + - **Encourage original thinking**: Help students develop reasoning skills + - **Suggest study strategies**: Recommend effective learning approaches +- **Math problems**: Explain concepts and guide through steps without computing final answers +- **Multiple-choice questions**: Discuss concepts being tested rather than identifying correct choices +- **Essays**: Discuss research strategies and organizational techniques +- **Factual questions**: Provide educational context and encourage synthesis + +## Practice Question Templates + +**Multiple Choice** + +1. 1 to 4 sentence question +OPTIONAL, IF NEEDED. only INCLUDE A GRAPH, LINKED AS IMAGE, OR TABLE, NEVER BOTH. +![Chart, Graph](my_image.png "Scenic View") + +| Example C1 | Example C2 |... +| :---------------: | :----------------: |... +| Content...... | Content....... |... + +A. Option +B. Option +C. Option +D. Option + +--- + +**All That Apply** + +1. 1 to 4 sentence question +OPTIONAL, IF NEEDED. only INCLUDE A GRAPH, LINKED AS IMAGE, OR TABLE, NEVER BOTH. +![Chart, Graph](my_image.png "Scenic View") + +| Example C1 | Example C2 |... +| :---------------: | :----------------: |... +| Content...... | Content....... |... + +- [ ] A. Option +- [ ] B. Option +- [ ] C. Option +- [ ] D. Option + +--- + +**Written Response** + +1. 1 to 4 sentence question +OPTIONAL, IF NEEDED. only INCLUDE A GRAPH, LINKED AS IMAGE, OR TABLE, NEVER BOTH. +![Chart, Graph](my_image.png "Scenic View") + +| Example C1 | Example C2 |... +| :---------------: | :----------------: |... +| Content...... | Content....... |... + +Prompt the user, in one sentence, to write their response +""" +# --- Stop Criteria --- -# ============================================================================ -# MAIN ORCHESTRATION WORKFLOW -# ============================================================================ +class StopOnSequence(StoppingCriteria): + def __init__(self, tokenizer, stop_sequence): + self.tokenizer = tokenizer + self.stop_sequence = tokenizer.encode(stop_sequence, add_special_tokens=False) -def orchestrate_turn(user_input: str, conversation_history: list = None, session_id: str = "default") -> str: - """ - Main orchestration function implementing the redesign workflow. - + def __call__(self, input_ids, scores, **kwargs): + if input_ids[0, -len(self.stop_sequence):].tolist() == self.stop_sequence: + return True + return False + +# --- LLM Class with Phi-3 Mini --- +class Phi3MiniEducationalLLM(Runnable): + """LLM class optimized for Microsoft Phi-3-mini-4k-instruct with 4-bit quantization""" - Steps: - 1. Reset prompt state - 2. Process user input (history) - 3. Tool decision - 4. Regex checks - 5. Agent execution (Qwen3-Claude) - 6. Thinking agents (Qwen3-Claude) - 7. Prompt assembly - 8. Response generation (Qwen3-Claude) - 9. Post-processing - 10. Metrics tracking (background thread) - """ - turn_start = log_step("orchestrate_turn") - run_id = str(uuid.uuid4()) + def __init__(self, model_path: str = "microsoft/Phi-3-mini-4k-instruct"): + super().__init__() + logger.info(f"Loading Phi-3-mini model with 4-bit quantization: {model_path}") + start_Loading_Model_time = time.perf_counter() + current_time = datetime.now() + + self.model_name = model_path - try: - # ==================================================================== - # STEP 1: RESET PROMPT STATE - # ==================================================================== - step_start = log_step("Step 1: Reset prompt state") - global_state_manager.reset_prompt_state() - prompt_state = global_state_manager.get_prompt_state_manager() - log_step("Step 1: Reset prompt state", step_start) - - # ==================================================================== - # STEP 2: USER INPUT PROCESSING - # ==================================================================== - step_start = log_step("Step 2: Process user input") - - # Use conversation_history passed from Gradio state (no global state call) - if conversation_history is None: - conversation_history = [] - - # Take last 8 messages - conversation_history = conversation_history[-8:] if conversation_history else [] - - # Format history for agents - conversation_history_formatted = "\n".join([ - f"{msg['role']}: {msg['content'][:100]}" - for msg in conversation_history - ]) if conversation_history else "No previous conversation" - - log_step("Step 2: Process user input", step_start) - - # ==================================================================== - # STEP 3: TOOL DECISION ENGINE & GRAPH GENERATION - # ==================================================================== - step_start = log_step("Step 3: Tool decision & graph generation") - - # Check if visualization is needed - tool_decision_result = tool_agent.decide(user_input, conversation_history) - - tool_img_output = "" - tool_context = "" - - if tool_decision_result: - logger.info("Tool decision: YES - visualization needed") - prompt_state.update("TOOL_USE_ENHANCEMENT", True) - - # Extract data and generate graph - logger.info("β†’ Extracting graphable data from input...") - graph_data = extract_graph_data(user_input, conversation_history) + try: + # Load tokenizer (can be done on CPU) + self.tokenizer = AutoTokenizer.from_pretrained( + model_path, + trust_remote_code=True, + token=hf_token, + use_fast=False + ) + + # Configure 4-bit quantization + self.quantization_config = BitsAndBytesConfig( + load_in_4bit=True, + bnb_4bit_compute_dtype=torch.bfloat16, + bnb_4bit_quant_type="nf4", # NormalFloat 4-bit + bnb_4bit_use_double_quant=True, # Nested quantization for extra savings + ) + + # Store model path - model will be loaded inside GPU context + self.model_path = model_path + self.model = None - if graph_data: - logger.info(f"βœ“ Data extracted: {len(graph_data['data'])} data points") - logger.info(f" Plot type: {graph_data['plot_type']}") - - # Generate the graph - tool_img_output = Create_Graph_Tool( - data=graph_data['data'], - plot_type=graph_data['plot_type'], - title=graph_data.get('title', 'Generated Plot'), - x_label=graph_data.get('x_label', ''), - y_label=graph_data.get('y_label', ''), - educational_context=graph_data.get('educational_context', '') + except Exception as e: + logger.error(f"Failed to initialize Phi-3-mini model {model_path}: {e}") + raise + + # Ensure pad token exists + if self.tokenizer.pad_token is None: + self.tokenizer.pad_token = self.tokenizer.eos_token + + self.streamer = None + + def _load_model_if_needed(self): + """Load model with 4-bit quantization only when needed inside GPU context""" + if self.model is None: + logger.info("Loading model with 4-bit quantization...") + try: + self.model = AutoModelForCausalLM.from_pretrained( + self.model_path, + quantization_config=self.quantization_config, + torch_dtype=torch.bfloat16, + trust_remote_code=True, + low_cpu_mem_usage=True, + token=hf_token, + attn_implementation="eager", + device_map="auto" ) - - # Store context for thinking agents - tool_context = f"Graph created: {graph_data['plot_type']} chart showing {graph_data.get('title', 'data visualization')}" - - logger.info("βœ“ Graph generated successfully") + logger.info(f"Model loaded successfully. Memory footprint reduced to ~2.2GB with 4-bit quantization") + except Exception as e: + logger.error(f"Failed to load quantized model: {e}") + raise + return self.model + + def _format_chat_template(self, prompt: str) -> str: + """Format prompt using Phi-3's chat template""" + try: + messages = [ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": prompt} + ] + # Use Phi-3's chat template + formatted_text = self.tokenizer.apply_chat_template( + messages, + tokenize=False, + add_generation_prompt=True + ) + return formatted_text + except Exception as e: + logger.warning(f"Chat template failed, using fallback format: {e}") + # Fallback to manual Phi-3 format + return f"<|system|>\n{SYSTEM_PROMPT}<|end|>\n<|user|>\n{prompt}<|end|>\n<|assistant|>\n" + + @spaces.GPU(duration=180) + def invoke(self, input: Input, config=None) -> Output: + """Main invoke method optimized for 4-bit quantized Phi‑3‑mini""" + start_invoke_time = time.perf_counter() + current_time = datetime.now() + + # Handle different input types + if isinstance(input, dict): + if 'input' in input: + prompt = input['input'] + elif 'messages' in input: + prompt = str(input['messages']) else: - logger.info("⚠️ No extractable data found - skipping graph generation") + prompt = str(input) else: - logger.info("Tool decision: NO - no visualization needed") - - log_step("Step 3: Tool decision & graph generation", step_start) - # ==================================================================== - # STEP 4: REGEX LOGICAL EXPRESSIONS - # ==================================================================== - step_start = log_step("Step 4: Regex checks") - logical_expressions.apply_all_checks(user_input, prompt_state) - log_step("Step 4: Regex checks", step_start) - - # ==================================================================== - # STEP 5: SEQUENTIAL AGENT EXECUTION (Qwen3-Claude) - # ==================================================================== - step_start = log_step("Step 5: Routing agents") - - # Use unified process() method that handles all 4 routing agents - response_prompts_str, thinking_prompts_str = routing_agents.process( - user_input=user_input, - tool_used=(tool_decision_result and bool(tool_img_output)) - ) - - # Update prompt state with response prompts - if response_prompts_str: - for prompt_name in response_prompts_str.split('\n'): - if prompt_name.strip(): - prompt_state.update(prompt_name.strip(), True) - logger.info(f"Response prompt activated: {prompt_name.strip()}") - - # Store thinking prompts for Step 6 (will be processed by ThinkingAgents) - thinking_prompts_from_routing = thinking_prompts_str.split('\n') if thinking_prompts_str else [] - for prompt_name in thinking_prompts_from_routing: - if prompt_name.strip(): - logger.info(f"Thinking prompt queued: {prompt_name.strip()}") - - log_step("Step 5: Routing agents", step_start) - - # ==================================================================== - # STEP 6: THINKING AGENT PROCESSING (Qwen3-Claude) - # ==================================================================== - step_start = log_step("Step 6: Thinking agents") - - # Use thinking prompts identified by routing agents in Step 5 - thinking_prompts_list = [] - - # Add thinking prompts from routing agents - for prompt_name in thinking_prompts_from_routing: - if prompt_name.strip(): - thinking_prompts_list.append(prompt_name.strip()) - prompt_state.update(prompt_name.strip(), True) - - # Additional heuristic: Add MATH_THINKING if LATEX_FORMATTING is active - # (This ensures math thinking is triggered even if routing agents didn't detect it) - if prompt_state.is_active("LATEX_FORMATTING") and "MATH_THINKING" not in thinking_prompts_list: - thinking_prompts_list.append("MATH_THINKING") - prompt_state.update("MATH_THINKING", True) - - # Execute thinking agents if any are active - thinking_context = "" - if thinking_prompts_list: - thinking_prompts_string = '\n'.join(thinking_prompts_list) - logger.info(f"Active thinking agents: {thinking_prompts_list}") - - think_start = log_step("Thinking agents execution") - thinking_context = thinking_agents.process( - user_input=user_input, - conversation_history=conversation_history_formatted, - thinking_prompts=thinking_prompts_string, - tool_img_output=tool_img_output, - tool_context=tool_context + prompt = str(input) + + try: + model = self._load_model_if_needed() + text = self._format_chat_template(prompt) + + try: + max_input_length = 2000 - 400 + inputs = self.tokenizer( + text, + return_tensors="pt", + padding=True, + truncation=True, + max_length=max_input_length + ) + if 'input_ids' not in inputs: + logger.error("Tokenizer did not return input_ids") + return "I encountered an error processing your request. Please try again." + except Exception as tokenizer_error: + logger.error(f"Tokenization error: {tokenizer_error}") + return "I encountered an error processing your request. Please try again." + + try: + inputs = {k: v.to(model.device) for k, v in inputs.items()} + except Exception as device_error: + logger.error(f"Device transfer error: {device_error}") + return "I encountered an error processing your request. Please try again." + + # Define stopping criteria after tokenizer initialization + stop_criteria = StoppingCriteriaList([StopOnSequence(self.tokenizer, "User:")]) + + with torch.no_grad(): + try: + outputs = model.generate( + input_ids=inputs['input_ids'], + attention_mask=inputs.get('attention_mask', None), + max_new_tokens=250, + do_sample=True, + temperature=0.4, + top_p=0.9, + top_k=50, + repetition_penalty=1.1, + pad_token_id=self.tokenizer.eos_token_id, + use_cache=False, + past_key_values=None, + stopping_criteria=stop_criteria + ) + except Exception as generation_error: + logger.error(f"Generation error: {generation_error}") + return "I encountered an error generating the response. Please try again." + + try: + new_tokens = outputs[0][len(inputs['input_ids'][0]):] + result = self.tokenizer.decode(new_tokens, skip_special_tokens=True).strip() + + # Apply soft-stop cleanup + for stop_word in ["User:", "\n\n", "###"]: + if stop_word in result: + result = result.split(stop_word)[0].strip() + break + except Exception as decode_error: + logger.error(f"Decoding error: {decode_error}") + return "I encountered an error processing the response. Please try again." + + end_invoke_time = time.perf_counter() + invoke_time = end_invoke_time - start_invoke_time + log_metric( + f"LLM Invoke time (4‑bit): {invoke_time:0.4f} seconds. " + f"Input length: {len(prompt)} chars. " + f"Model: {self.model_name}. " + f"Timestamp: {current_time:%Y‑%m‑%d %H:%M:%S}" ) - log_step("Thinking agents execution", think_start) - - log_step("Step 6: Thinking agents", step_start) - - # ==================================================================== - # STEP 7: RESPONSE PROMPT ASSEMBLY - # ==================================================================== - step_start = log_step("Step 7: Prompt assembly") - - # Get active response prompts - response_prompt_names = prompt_state.get_active_response_prompts() - - # Build prompt segments - prompt_segments = [CORE_IDENTITY] - - prompt_map = { - "VAUGE_INPUT": VAUGE_INPUT, - "USER_UNDERSTANDING": USER_UNDERSTANDING, - "GENERAL_FORMATTING": GENERAL_FORMATTING, - "LATEX_FORMATTING": LATEX_FORMATTING, - "GUIDING_TEACHING": GUIDING_TEACHING, - "STRUCTURE_PRACTICE_QUESTIONS": STRUCTURE_PRACTICE_QUESTIONS, - "PRACTICE_QUESTION_FOLLOWUP": PRACTICE_QUESTION_FOLLOWUP, - "TOOL_USE_ENHANCEMENT": TOOL_USE_ENHANCEMENT, - } - - for prompt_name in response_prompt_names: - if prompt_name in prompt_map: - prompt_segments.append(prompt_map[prompt_name]) - - prompt_segments_text = "\n\n".join(prompt_segments) - - logger.info(f"Active prompts: {response_prompt_names}") - log_step("Step 7: Prompt assembly", step_start) - - # ==================================================================== - # STEP 8: PREPARE RESPONSE AGENT INPUT - # ==================================================================== - step_start = log_step("Step 8: Prepare response input") - - # Get active response prompts - response_prompt_names = prompt_state.get_active_response_prompts() - logger.info(f"Active prompts: {response_prompt_names}") - - # Combine tool outputs for context - # If we have tool_img_output, mention it in tool_context - combined_tool_context = tool_context - if tool_img_output: - # Note: tool_img_output is HTML that will be embedded separately - # Just note its presence in the context - if combined_tool_context: - combined_tool_context += "\n\nNote: A visualization has been generated for the user." - else: - combined_tool_context = "A visualization has been generated for the user." - - # Build input dictionary for ResponseAgent - # CRITICAL: Must be a Dict, NOT a string! - input_data = { - 'user_query': user_input, - 'conversation_history': conversation_history, - 'active_prompts': response_prompt_names, - 'thinking_context': thinking_context, # str (from thinking agents) - 'tool_context': combined_tool_context, # str (tool usage info) - } - - logger.info(f"Response input prepared:") - logger.info(f" - User query: {len(user_input)} chars") - logger.info(f" - History: {len(conversation_history)} messages") - logger.info(f" - Active prompts: {len(response_prompt_names)} prompts") - logger.info(f" - Thinking context: {len(thinking_context)} chars") - logger.info(f" - Tool context: {len(combined_tool_context)} chars") - - log_step("Step 8: Prepare response input", step_start) - - # ==================================================================== - # STEP 9: RESPONSE GENERATION (Llama-3.2-3B) - # ==================================================================== - step_start = log_step("Step 9: Response generation") - + + return result if result else "I'm still learning how to respond to that properly." + + except Exception as e: + logger.error(f"Generation error with 4‑bit model: {e}") + end_invoke_time = time.perf_counter() + invoke_time = end_invoke_time - start_invoke_time + log_metric( + f"LLM Invoke time (error): {invoke_time:0.4f} seconds. " + f"Model: {self.model_name}. " + f"Timestamp: {current_time:%Y‑%m‑%d %H:%M:%S}" + ) + return f"I encountered an error: {str(e)}" + + @spaces.GPU(duration=240) + def stream_generate(self, input: Input, config=None): + """Streaming generation with 4‑bit quantized model and expanded context""" + start_stream_time = time.perf_counter() + current_time = datetime.now() + logger.info("Starting stream_generate with 4‑bit quantized model...") + + # Handle input properly + if isinstance(input, dict): + prompt = input.get('input', str(input)) + else: + prompt = str(input) + try: - result = response_agent.invoke(input_data) + model = self._load_model_if_needed() + if torch.cuda.is_available(): + torch.cuda.empty_cache() + text = self._format_chat_template(prompt) + + try: + inputs = self.tokenizer( + text, + return_tensors="pt", + padding=True, + truncation=True, + max_length=2000 + ) + if 'input_ids' not in inputs: + yield "I encountered an error processing your request. Please try again." + return + except Exception as tokenizer_error: + logger.error(f"Streaming tokenization error: {tokenizer_error}") + yield "I encountered an error processing your request. Please try again." + return + + try: + inputs = {k: v.to(model.device) for k, v in inputs.items()} + except Exception as device_error: + logger.error(f"Streaming device transfer error: {device_error}") + yield "I encountered an error processing your request. Please try again." + return + + streamer = TextIteratorStreamer( + self.tokenizer, + skip_prompt=True, + skip_special_tokens=True + ) + + generation_kwargs = { + "input_ids": inputs['input_ids'], + "attention_mask": inputs.get('attention_mask', None), + "max_new_tokens": 250, + "do_sample": True, + "temperature": 0.7, + "top_p": 0.9, + "top_k": 50, + "repetition_penalty": 1.2, + "pad_token_id": self.tokenizer.eos_token_id, + "streamer": streamer, + "use_cache": False, + "past_key_values": None + } + + generation_thread = threading.Thread( + target=model.generate, + kwargs=generation_kwargs + ) + generation_thread.start() + + generated_text = "" + consecutive_repeats = 0 + last_chunk = "" + + try: + for new_token_text in streamer: + if not new_token_text: + continue + generated_text += new_token_text + if new_token_text == last_chunk: + consecutive_repeats += 1 + if consecutive_repeats >= 5: + logger.warning("Repetitive generation detected, stopping early") + break + else: + consecutive_repeats = 0 + last_chunk = new_token_text + yield generated_text + except Exception as e: + logger.error(f"Error in streaming iteration: {e}") + if not generated_text.strip(): + generated_text = "I apologize, but I'm having trouble generating a response. Please try rephrasing your question." + yield generated_text + + generation_thread.join() + if not generated_text.strip(): + generated_text = "I apologize, but I'm having trouble generating a response. Please try rephrasing your question." + yield generated_text + + end_stream_time = time.perf_counter() + stream_time = end_stream_time - start_stream_time + log_metric( + f"LLM Stream time (4‑bit): {stream_time:0.4f} seconds. " + f"Generated length: {len(generated_text)} chars. " + f"Model: {self.model_name}. " + f"Timestamp: {current_time:%Y‑%m‑%d %H:%M:%S}" + ) + except Exception as e: + logger.error(f"4‑bit streaming generation error: {e}") + end_stream_time = time.perf_counter() + stream_time = end_stream_time - start_stream_time + log_metric( + f"LLM Stream time (error): {stream_time:0.4f} seconds. " + f"Model: {self.model_name}. " + f"Timestamp: {current_time:%Y‑%m‑%d %H:%M:%S}" + ) + yield "I encountered an error generating the response. Please try again." + + @property + def InputType(self) -> Type[Input]: + return str + + @property + def OutputType(self) -> Type[Output]: + return str + +# LangGraph Agent Implementation with Tool Calling +class Educational_Agent: + """Modern LangGraph-based educational agent with Phi-3-mini and improved tool calling""" + + def __init__(self): + start_init_and_langgraph_time = time.perf_counter() + current_time = datetime.now() + + self.llm = Phi3MiniEducationalLLM(model_path="microsoft/Phi-3-mini-4k-instruct") + self.tool_decision_engine = Tool_Decision_Engine(self.llm) + + # Create LangGraph workflow + self.app = self._create_langgraph_workflow() + + end_init_and_langgraph_time = time.perf_counter() + init_and_langgraph_time = end_init_and_langgraph_time - start_init_and_langgraph_time + log_metric(f"Init and LangGraph workflow setup time: {init_and_langgraph_time:0.4f} seconds. Timestamp: {current_time:%Y-%m-%d %H:%M:%S}") + + def _create_langgraph_workflow(self): + """Create the complete LangGraph workflow with improved tool calling""" + # Use the updated Create_Graph_Tool + tools = [Create_Graph_Tool] + tool_node = ToolNode(tools) + + def call_model(state: EducationalAgentState) -> dict: + """Call the LLM to generate a response""" + start_call_model_time = time.perf_counter() + current_time = datetime.now() - # Extract response from result dict - raw_response = result.get('response', '') - metadata = result.get('metadata', {}) + messages = state["messages"] - if not raw_response: - logger.warning("ResponseAgent returned empty response") - raw_response = "I apologize, but I wasn't able to generate a response. Please try again." + # Get the latest human message + user_query = "" + for msg in reversed(messages): + if isinstance(msg, HumanMessage): + user_query = msg.content + break - logger.info(f"βœ“ Generated {len(raw_response)} chars") - if metadata: - logger.info(f" Metadata: {metadata}") + if not user_query: + return {"messages": [AIMessage(content="I didn't receive a question. Please ask me something!")]} - except Exception as e: - logger.error(f"Response generation failed: {e}") - import traceback - logger.error(traceback.format_exc()) - raw_response = "I apologize, but I encountered an error while generating a response. Please try rephrasing your question or try again." - - log_step("Step 9: Response generation", step_start) - - # ==================================================================== - # STEP 10: POST-PROCESSING - # ==================================================================== - step_start = log_step("Step 10: Post-processing") - processed_response = post_processor.process_response(raw_response, user_input) - log_step("Step 10: Post-processing", step_start) - - # ==================================================================== - # STEP 11: METRICS TRACKING (BACKGROUND THREAD - NON-BLOCKING) - # ==================================================================== - step_start = log_step("Step 11: Metrics tracking") - - def track_metrics_async(): - """Run metrics tracking in background to avoid blocking""" try: - logger.info("[Background] Starting metrics tracking...") + # Check if tools are needed based on state + needs_tools = state.get("needs_tools", False) - # Track educational quality - quality_metrics = evaluate_educational_quality_with_tracking( - user_query=user_input, - response=processed_response, - thread_id=run_id, - session_id=session_id - ) + if needs_tools: + # Create tool prompt that guides the model to use structured parameters + tool_prompt = f""" + You are an educational AI assistant. The user has asked: "{user_query}" + + This query would benefit from a visualization. Please call the Create_Graph_Tool with appropriate structured parameters. + + For the data parameter, create a meaningful dictionary with string keys and numeric values that illustrate the concept being discussed. + + Choose the appropriate plot_type: + - "bar" for comparing categories or discrete data + - "line" for showing trends over time or continuous relationships + - "pie" for showing parts of a whole or proportions + + Create a descriptive title and appropriate axis labels. Include an educational_context explaining why this visualization helps learning. + + Call the tool with these structured parameters, don't format as JSON. + """ + prompt = tool_prompt + else: + prompt = user_query - # Log metrics to database - metrics_to_log = { - "conversation_start": datetime.now().isoformat(), - "response_time": time.time() - turn_start, - "quality_score": calculate_response_quality(processed_response), - "educational_score": quality_metrics['educational_score'], - "prompt_mode": ",".join(response_prompt_names), - "tools_used": 1 if prompt_state.is_active("TOOL_USE_ENHANCEMENT") else 0, - "thinking_agents": ",".join(thinking_prompts_list) if thinking_prompts_list else "none", - "active_adapter": response_agent.model_type if response_agent.model_loaded else "not_loaded" - } + # Bind tools to LLM if needed + if needs_tools: + model_with_tools = self.llm + # For Phi-3, we need to manually bind tools if supported + try: + if hasattr(self.llm, 'bind_tools'): + model_with_tools = self.llm.bind_tools(tools) + response = model_with_tools.invoke(prompt) + except: + # Fallback if tool binding not supported + response = self.llm.invoke(prompt) + else: + response = self.llm.invoke(prompt) - log_metrics_to_database("Mimir", run_id, metrics_to_log) - logger.info("[Background] βœ“ Metrics tracking completed") + # Create AI message + ai_message = AIMessage(content=response) - except Exception as metrics_error: - logger.warning(f"[Background] Metrics tracking failed: {metrics_error}") - - # Start background thread (daemon=True so it doesn't block shutdown) - metrics_thread = threading.Thread( - target=track_metrics_async, - daemon=True, - name="MetricsTracking" + end_call_model_time = time.perf_counter() + call_model_time = end_call_model_time - start_call_model_time + log_metric(f"Call model time: {call_model_time:0.4f} seconds. Timestamp: {current_time:%Y-%m-%d %H:%M:%S}") + + return {"messages": [ai_message]} + + except Exception as e: + logger.error(f"Error in call_model: {e}") + end_call_model_time = time.perf_counter() + call_model_time = end_call_model_time - start_call_model_time + log_metric(f"Call model time (error): {call_model_time:0.4f} seconds. Timestamp: {current_time:%Y-%m-%d %H:%M:%S}") + + error_message = AIMessage(content=f"I encountered an error generating a response: {str(e)}") + return {"messages": [error_message]} + + def should_continue(state: EducationalAgentState) -> str: + """Route to tools or end based on the last message""" + last_message = state["messages"][-1] + + # Check if the last message has tool calls + if hasattr(last_message, "tool_calls") and last_message.tool_calls: + return "tools" + else: + return END + + def make_tool_decision(state: EducationalAgentState) -> dict: + """Decide whether tools are needed and update state""" + start_tool_decision_time = time.perf_counter() + current_time = datetime.now() + + messages = state["messages"] + + # Get the latest human message + user_query = "" + for msg in reversed(messages): + if isinstance(msg, HumanMessage): + user_query = msg.content + break + + if not user_query: + return {"needs_tools": False} + + # Use the tool decision engine + needs_visualization = self.tool_decision_engine.should_use_visualization(user_query) + + end_tool_decision_time = time.perf_counter() + tool_decision_time = end_tool_decision_time - start_tool_decision_time + log_metric(f"Tool decision workflow time: {tool_decision_time:0.4f} seconds. Decision: {needs_visualization}. Timestamp: {current_time:%Y-%m-%d %H:%M:%S}") + + return {"needs_tools": needs_visualization} + + # Create the workflow graph + workflow = StateGraph(EducationalAgentState) + + # Add nodes + workflow.add_node("decide_tools", make_tool_decision) + workflow.add_node("call_model", call_model) + workflow.add_node("tools", tool_node) + + # Add edges + workflow.add_edge(START, "decide_tools") + workflow.add_edge("decide_tools", "call_model") + + # Add conditional edge from call_model + workflow.add_conditional_edges( + "call_model", + should_continue, + {"tools": "tools", END: END} ) - metrics_thread.start() - log_step("Step 11: Metrics tracking", step_start) - logger.info("βœ“ Metrics tracking started in background - continuing immediately") + # After tools, go back to call_model for final response + workflow.add_edge("tools", "call_model") - log_step("orchestrate_turn", turn_start) - return processed_response + # Compile the workflow + return workflow.compile(checkpointer=MemorySaver()) + + def process_query(self, user_input: str, thread_id: str = "default") -> str: + """Process a user query through the LangGraph workflow""" + start_process_query_time = time.perf_counter() + current_time = datetime.now() - except Exception as e: - logger.error(f"Orchestration error: {e}") - import traceback - logger.error(traceback.format_exc()) - log_step("orchestrate_turn", turn_start) - return f"I encountered an error: {str(e)}" + try: + # Create initial state + initial_state = { + "messages": [HumanMessage(content=user_input)], + "needs_tools": False, + "educational_context": None + } + + # Run the workflow + config = {"configurable": {"thread_id": thread_id}} + result = self.app.invoke(initial_state, config) + + # Extract the final response + messages = result["messages"] + + # Combine AI message and tool results + response_parts = [] + + for msg in messages: + if isinstance(msg, AIMessage): + # Clean up the response - remove JSON blocks if tools were used + content = msg.content + if "```json" in content and result.get("needs_tools", False): + # Remove JSON blocks from display since tools handle visualization + content = re.sub(r'```json.*?```', '', content, flags=re.DOTALL) + content = content.strip() + response_parts.append(content) + elif isinstance(msg, ToolMessage): + response_parts.append(msg.content) + + final_response = "\n\n".join(response_parts).strip() + + end_process_query_time = time.perf_counter() + process_query_time = end_process_query_time - start_process_query_time + log_metric(f"Total query processing time: {process_query_time:0.4f} seconds. Input: '{user_input[:50]}...'. Timestamp: {current_time:%Y-%m-%d %H:%M:%S}") + + return final_response if final_response else "I'm having trouble generating a response. Please try rephrasing your question." + + except Exception as e: + logger.error(f"Error in process_query: {e}") + end_process_query_time = time.perf_counter() + process_query_time = end_process_query_time - start_process_query_time + log_metric(f"Total query processing time (error): {process_query_time:0.4f} seconds. Timestamp: {current_time:%Y-%m-%d %H:%M:%S}") + return f"I encountered an error processing your request: {str(e)}" + def stream_query(self, user_input: str, thread_id: str = "default"): + """Stream a response for a user query""" + start_stream_query_time = time.perf_counter() + current_time = datetime.now() + + try: + # For streaming, we'll use the LLM directly with tool decision + needs_tools = self.tool_decision_engine.should_use_visualization(user_input) + + if needs_tools: + # Create tool prompt + tool_prompt = f""" +You are an educational AI assistant. The user has asked: "{user_input}" -# ============================================================================ -# GRADIO CALLBACK FUNCTIONS (FIXED STATE MANAGEMENT) -# ============================================================================ +This query would benefit from a visualization. Please provide a helpful educational response AND include a JSON configuration for creating a graph or chart. -def get_loading_animation_base64(): - """Load animated GIF as base64""" - try: - with open("loading_animation.gif", "rb") as gif_file: - gif_data = gif_file.read() - gif_base64 = base64.b64encode(gif_data).decode('utf-8') - return f"data:image/gif;base64,{gif_base64}" - except FileNotFoundError: - logger.warning("loading_animation.gif not found") - return None +Format your response with explanatory text followed by a JSON block like this: +```json +{{ +"data": {{"Category 1": value1, "Category 2": value2}}, +"plot_type": "bar|line|pie", +"title": "Descriptive Title", +"x_label": "X Axis Label", +"y_label": "Y Axis Label", +"educational_context": "Explanation of why this visualization helps learning" +}} +``` -def remove_loading_animations(chat_history): - """Remove loading animations from chat""" - return [msg for msg in chat_history if not ( - msg.get("role") == "assistant" and - "loading-animation" in str(msg.get("content", "")) - )] +Provide your educational response followed by the JSON configuration. +""" + prompt = tool_prompt + else: + prompt = user_input + + # Stream the response + full_response = "" + for chunk in self.llm.stream_generate(prompt): + full_response = chunk + yield chunk + + # Process tools if needed after streaming completes + if needs_tools and "```json" in full_response: + json_pattern = r'```json\s*\n?(.*?)\n?```' + json_matches = re.findall(json_pattern, full_response, re.DOTALL) + + if json_matches: + json_config = json_matches[0].strip() + try: + # Validate and process the JSON + config_dict = json.loads(json_config) + required_keys = ['data', 'plot_type', 'title'] + + if all(key in config_dict for key in required_keys): + # Generate the visualization + tool_result = Create_Graph_Tool.invoke({"graph_config": json_config}) + + # Clean response and add visualization + cleaned_response = re.sub(r'```json.*?```', '', full_response, flags=re.DOTALL).strip() + final_response = f"{cleaned_response}\n\n{tool_result}" + yield final_response + except (json.JSONDecodeError, Exception) as e: + logger.error(f"Error processing streamed JSON: {e}") + + end_stream_query_time = time.perf_counter() + stream_query_time = end_stream_query_time - start_stream_query_time + log_metric(f"Stream query total time: {stream_query_time:0.4f} seconds. Input: '{user_input[:50]}...'. Timestamp: {current_time:%Y-%m-%d %H:%M:%S}") + + except Exception as e: + logger.error(f"Error in stream_query: {e}") + end_stream_query_time = time.perf_counter() + stream_query_time = end_stream_query_time - start_stream_query_time + log_metric(f"Stream query total time (error): {stream_query_time:0.4f} seconds. Timestamp: {current_time:%Y-%m-%d %H:%M:%S}") + yield f"I encountered an error: {str(e)}" + +# Gradio Interface Functions +def warmup_agent(): + """Warm up the agent with a simple test query""" + try: + logger.info("Warming up Phi-3-mini educational agent...") + start_warmup_time = time.perf_counter() + + # Simple warmup query + warmup_response = agent.process_query("Hello", thread_id="warmup") + + end_warmup_time = time.perf_counter() + warmup_time = end_warmup_time - start_warmup_time + log_metric(f"Agent warmup completed in {warmup_time:.2f} seconds") + logger.info(f"Warmup response: {warmup_response[:100]}...") + + except Exception as e: + logger.error(f"Warmup failed: {e}") def add_user_message(message, chat_history, conversation_state): - """ - Add user message with proper state management. - βœ… FIXED: Creates new lists to avoid reference issues with Gradio state. - """ - callback_start = log_step("add_user_message") - + """Add user message to state and display immediately""" if not message.strip(): - log_step("add_user_message", callback_start) return "", chat_history, conversation_state - # Create new lists with the user message appended - new_conversation_state = conversation_state + [{"role": "user", "content": message}] - new_chat_history = chat_history + [{"role": "user", "content": message}] + # Add to conversation state + conversation_state.append({"role": "user", "content": message}) - # Update global state for persistence - # global_state_manager.update_conversation_state(new_chat_history, new_conversation_state) + # Update chat display + chat_history.append({"role": "user", "content": message}) - log_step("add_user_message", callback_start) - - # Return NEW states to Gradio - return "", new_chat_history, new_conversation_state - + return "", chat_history, conversation_state -def add_loading_animation(chat_history, conversation_state): - """ - Add loading animation with proper state management. - βœ… FIXED: Creates new lists to avoid reference issues. - """ - callback_start = log_step("add_loading_animation") - +def add_thinking_indicator(chat_history, conversation_state): + """Add thinking indicator to chat display""" if not conversation_state: - log_step("add_loading_animation", callback_start) return chat_history, conversation_state - # Remove any existing loading animations - new_chat_history = remove_loading_animations(chat_history) - - # Add loading animation to NEW list - gif_data = get_loading_animation_base64() - if gif_data: - loading_html = f'
Thinking...
' - else: - loading_html = '
' - - new_chat_history = new_chat_history + [{"role": "assistant", "content": loading_html}] + # Add simple animated dots to chat display (not permanent state) + thinking_html = get_thinking_dots() + chat_history.append({"role": "assistant", "content": thinking_html}) - # Update global state for persistence - # global_state_manager.update_conversation_state(new_chat_history, conversation_state) - - log_step("add_loading_animation", callback_start) - - # Return NEW states to Gradio - return new_chat_history, conversation_state - + return chat_history, conversation_state def generate_response(chat_history, conversation_state): - """ - Generate response using orchestration with proper streaming. - Uses Gradio state instead of overwriting, only pulls from global on error. - """ - callback_start = log_step("generate_response") - - # Use the state passed by Gradio - # Only pull from global manager if state is empty (error recovery) + """Generate streaming response from the agent""" if not conversation_state: - logger.warning("Empty conversation_state in generate_response, attempting recovery from global state") - current_state = global_state_manager.get_conversation_state() - chat_history = current_state['chat_history'] - conversation_state = current_state['conversation_state'] - - if not conversation_state: - log_step("generate_response", callback_start) return chat_history, conversation_state - # Get last user message + # Get the last user message last_user_message = "" for msg in reversed(conversation_state): if msg["role"] == "user": @@ -1406,177 +1092,203 @@ def generate_response(chat_history, conversation_state): break if not last_user_message: - log_step("generate_response", callback_start) return chat_history, conversation_state try: - # Call orchestration - orch_start = log_step("orchestrate_turn call") - raw_response = orchestrate_turn(last_user_message, conversation_state) - log_step("orchestrate_turn call", orch_start) + # Stream the response + full_response = "" - # Stream the processed response - first_chunk = True - for chunk in post_processor.process_and_stream_response(raw_response, last_user_message): - # Remove loading animation on FIRST chunk only - if first_chunk: - chat_history = remove_loading_animations(chat_history) - first_chunk = False + for chunk in agent.stream_query(last_user_message): + full_response = chunk - # Update chat display - create new list for Gradio to detect change + # Update the last message in chat display (replace thinking indicator) if chat_history and chat_history[-1]["role"] == "assistant": - # Update existing assistant message - new_chat_history = chat_history[:-1] + [{"role": "assistant", "content": chunk}] + chat_history[-1]["content"] = full_response else: - # Add new assistant message - new_chat_history = chat_history + [{"role": "assistant", "content": chunk}] - - chat_history = new_chat_history + chat_history.append({"role": "assistant", "content": full_response}) - # Yield to update UI during streaming yield chat_history, conversation_state - # Add final response to conversation state (create new list) - final_response = chunk if 'chunk' in locals() else raw_response - new_conversation_state = conversation_state + [{"role": "assistant", "content": final_response}] - - threading.Thread( - target=lambda: global_state_manager.update_conversation_state(chat_history, new_conversation_state), - daemon=True - ).start() - - # Final yield with complete states - yield chat_history, new_conversation_state + # Add final response to permanent conversation state + conversation_state.append({"role": "assistant", "content": full_response}) + yield chat_history, conversation_state except Exception as e: - logger.error(f"Response generation error: {e}") - import traceback - logger.error(traceback.format_exc()) - + logger.error(f"Error in generate_response: {e}") error_msg = f"I encountered an error: {str(e)}" - # Clean up and show error (create new lists) - new_chat_history = remove_loading_animations(chat_history) + [{"role": "assistant", "content": error_msg}] - new_conversation_state = conversation_state + [{"role": "assistant", "content": error_msg}] + # Update display + if chat_history and chat_history[-1]["role"] == "assistant": + chat_history[-1]["content"] = error_msg + else: + chat_history.append({"role": "assistant", "content": error_msg}) - global_state_manager.update_conversation_state(new_chat_history, new_conversation_state) - yield new_chat_history, new_conversation_state - - log_step("generate_response", callback_start) - + # Add to permanent state + conversation_state.append({"role": "assistant", "content": error_msg}) + yield chat_history, conversation_state def reset_conversation(): - """ - Reset conversation with global state persistence. - βœ… Returns empty states to Gradio components. - """ - callback_start = log_step("reset_conversation") - global_state_manager.reset_conversation_state() - log_step("reset_conversation", callback_start) + """Reset both chat display and conversation state""" return [], [] - - -def load_conversation_state(): - """ - Load conversation state from global manager. - βœ… Returns current states to Gradio components. - """ - callback_start = log_step("load_conversation_state") - current_state = global_state_manager.get_conversation_state() - log_step("load_conversation_state", callback_start) - # Extract and return both states - return current_state['chat_history'], current_state['conversation_state'] - - - - -# ============================================================================ -# MULTI-PAGE INTERFACE -# ============================================================================ +# --- UI: Interface Creation --- def create_interface(): - """Create multi-page Gradio interface""" - logger.info("Creating Gradio interface...") - - # Pages - import gradio_chatbot - import gradio_analytics - import gradio_prompt_testing + """Creates and configures the complete Gradio interface with proper state management.""" + start_create_interface_time = time.perf_counter() + current_time = datetime.now() - with gr.Blocks(title="Mimir - Educational AI Assistant") as demo: - navbar = gr.Navbar( - visible=True, - main_page_name="Mimir Chatbot", - value=[("Case Study", "https://github.com/Jdesiree112/Technical_Portfolio/tree/main/CaseStudy_Mimir")] - ) - gradio_chatbot.demo.render() + # Read CSS file + custom_css = "" + try: + with open("styles.css", "r", encoding="utf-8") as css_file: + custom_css = css_file.read() + except FileNotFoundError: + logger.warning("styles.css file not found, using default styling") + except Exception as e: + logger.warning(f"Error reading styles.css: {e}") + + try: + with open("styles.css", "r", encoding="utf-8") as css_file: + custom_css = css_file.read() + logger.info(f"CSS loaded successfully, length: {len(custom_css)} characters") + # Check if dots CSS is actually in the file + if ".thinking-indicator" in custom_css: + logger.info("Dots CSS found in file") + else: + logger.warning("Dots CSS NOT found in file") + except FileNotFoundError: + logger.warning("styles.css file not found, using default styling") - with demo.route("Analytics"): - navbar = gr.Navbar( - visible=True, - main_page_name="Mimir Chatbot", - value=[("Case Study", "https://github.com/Jdesiree112/Technical_Portfolio/tree/main/CaseStudy_Mimir")] - ) - gradio_analytics.demo.render() + with gr.Blocks( + title="Mimir", + fill_width=True, + fill_height=True, + theme=gr.themes.Origin() + ) as demo: + # Add head content + gr.HTML(html_head_content) + gr.HTML(force_light_mode) + + # State management - this is the key addition + conversation_state = gr.State([]) # Persistent conversation memory + + with gr.Column(elem_classes=["main-container"]): + # Title Section + gr.HTML('

πŸŽ“ Mimir

') + + # Chat Section + with gr.Row(): + chatbot = gr.Chatbot( + type="messages", + show_copy_button=True, + show_share_button=False, + layout="bubble", + autoscroll=True, + avatar_images=None, + elem_id="main-chatbot", + scale=1, + height="70vh", + value=[], # Initialize with empty list + latex_delimiters=[ + {"left": "$$", "right": "$$", "display": True}, + {"left": "$", "right": "$", "display": False}, + ] + ) + + # Input Section + with gr.Row(elem_classes=["input-controls"]): + msg = gr.Textbox( + placeholder="Ask me about math, research, study strategies, or any educational topic...", + show_label=False, + lines=6, + max_lines=8, + elem_classes=["input-textbox"], + container=False, + scale=4 + ) + with gr.Column(elem_classes=["button-column"], scale=1): + send = gr.Button("Send", elem_classes=["send-button"], size="sm") + clear = gr.Button("Clear", elem_classes=["clear-button"], size="sm") + + # event chaining with state management + submit_event = msg.submit( + add_user_message, + inputs=[msg, chatbot, conversation_state], + outputs=[msg, chatbot, conversation_state], + show_progress="hidden" + ).then( + add_thinking_indicator, + inputs=[chatbot, conversation_state], + outputs=[chatbot, conversation_state], + show_progress="hidden" + ).then( + generate_response, + inputs=[chatbot, conversation_state], + outputs=[chatbot, conversation_state], + show_progress="hidden" + ) + + send_event = send.click( + add_user_message, + inputs=[msg, chatbot, conversation_state], + outputs=[msg, chatbot, conversation_state], + show_progress="hidden" + ).then( + add_thinking_indicator, + inputs=[chatbot, conversation_state], + outputs=[chatbot, conversation_state], + show_progress="hidden" + ).then( + generate_response, + inputs=[chatbot, conversation_state], + outputs=[chatbot, conversation_state], + show_progress="hidden" + ) + + # Clear button + clear.click( + reset_conversation, + inputs=None, + outputs=[chatbot, conversation_state], + show_progress="hidden" + ) + + # Apply CSS + gr.HTML(f'') - with demo.route("Prompt Testing"): - navbar = gr.Navbar( - visible=True, - main_page_name="Mimir Chatbot", - value=[("Case Study", "https://github.com/Jdesiree112/Technical_Portfolio/tree/main/CaseStudy_Mimir")] - ) - gradio_prompt_testing.demo.render() + end_create_interface_time = time.perf_counter() + create_interface_time = end_create_interface_time - start_create_interface_time + log_metric(f"Create interface time: {create_interface_time:0.4f} seconds. Timestamp: {current_time:%Y-%m-%d %H:%M:%S}") - logger.info("Interface created successfully") return demo - -# ============================================================================ -# MAIN EXECUTION -# ============================================================================ +# --- Main Execution --- if __name__ == "__main__": try: - logger.info("="*60) - logger.info("STARTING MIMIR APPLICATION") - logger.info("="*60) - logger.info(f"LightEval available: {LIGHTEVAL_AVAILABLE}") - logger.info(f"Current year: {CURRENT_YEAR}") - logger.info(f"Lazy loading enabled: Model loads on first request βœ…") - logger.info("="*60) + logger.info("=" * 50) + logger.info("Starting Mimir Application with Microsoft Phi-3-mini-4k-instruct") + logger.info("=" * 50) - # Create and launch interface - logger.info("β†’ Creating Gradio interface...") - interface_start = time.time() - interface = create_interface() - interface_duration = time.time() - interface_start - logger.info(f"βœ“ Interface created in {interface_duration:.2f}s") + # Step 1: Preload the model and agent + logger.info("Loading Phi-3-mini model and LangGraph workflow...") + start_time = time.time() + agent = Educational_Agent() + load_time = time.time() - start_time + logger.info(f"Phi-3-mini LangGraph agent loaded successfully in {load_time:.2f} seconds") - logger.info("β†’ Launching Gradio server on 0.0.0.0:7860...") - logger.info("β†’ Model will load on first user request (lazy loading)") + # Step 2: Warm up the model + logger.info("Warming up Phi-3-mini model...") + warmup_agent() + interface = create_interface() interface.launch( server_name="0.0.0.0", - server_port=7860, share=False, debug=True, - favicon_path="favicon.ico" if os.path.exists("favicon.ico") else None, - show_error=True, - ssr_mode=False, - quiet=False, - prevent_thread_lock=False, - max_threads=40 + favicon_path="favicon.ico", + ssr_mode=False ) - logger.info("βœ“ Gradio server started successfully") - - except KeyboardInterrupt: - logger.info("Shutting down Mimir gracefully...") except Exception as e: - logger.error("="*60) - logger.error("CRITICAL ERROR IN MAIN EXECUTION") - logger.error("="*60) - logger.error(f"Error: {e}") - import traceback - logger.error(traceback.format_exc()) - logger.error("="*60) + logger.error(f"❌ Failed to launch Mimir with Phi-3-mini: {e}") raise \ No newline at end of file