Spaces:
Sleeping
Sleeping
| # agents.py | |
| """ | |
| Unified agent architecture for Mimir Educational AI Assistant. | |
| LAZY-LOADING LLAMA-3.2-3B-INSTRUCT | |
| Components: | |
| - LazyLlamaModel: Singleton lazy-loading model (loads on first use, cached thereafter) | |
| - ToolDecisionAgent: Uses lazy-loaded Llama for visualization decisions | |
| - PromptRoutingAgents: Uses lazy-loaded Llama for all 4 routing agents | |
| - ThinkingAgents: Uses lazy-loaded Llama for all reasoning (including math) | |
| - ResponseAgent: Uses lazy-loaded Llama for final responses | |
| Key optimization: Model loads on first generate() call and is cached for all | |
| subsequent requests. Single model architecture with ~1GB memory footprint. | |
| No compile or warmup scripts needed - fully automatic. | |
| """ | |
| import os | |
| import re | |
| import torch | |
| import logging | |
| import time | |
| import subprocess | |
| import threading | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Tuple, Type | |
| import warnings | |
| # Setup main logger first | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ============================================================================ | |
| # MEMORY PROFILING UTILITIES | |
| # ============================================================================ | |
| def log_memory(tag=""): | |
| """Log current GPU memory usage""" | |
| try: | |
| if torch.cuda.is_available(): | |
| allocated = torch.cuda.memory_allocated() / 1024**2 | |
| reserved = torch.cuda.memory_reserved() / 1024**2 | |
| max_allocated = torch.cuda.max_memory_allocated() / 1024**2 | |
| logger.info(f"[{tag}] GPU Memory - Allocated: {allocated:.2f} MB, Reserved: {reserved:.2f} MB, Peak: {max_allocated:.2f} MB") | |
| else: | |
| logger.info(f"[{tag}] No CUDA available") | |
| except Exception as e: | |
| logger.warning(f"[{tag}] Error logging GPU memory: {e}") | |
| def log_nvidia_smi(tag=""): | |
| """Log full nvidia-smi output for system-wide GPU view""" | |
| try: | |
| output = subprocess.check_output(['nvidia-smi', '--query-gpu=memory.used,memory.total', '--format=csv,noheader,nounits'], encoding='utf-8') | |
| logger.info(f"[{tag}] NVIDIA-SMI: {output.strip()}") | |
| except Exception as e: | |
| logger.warning(f"[{tag}] Error running nvidia-smi: {e}") | |
| def log_step(step_name, start_time=None): | |
| """Log a pipeline step with timestamp and duration""" | |
| now = time.time() | |
| timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] | |
| if start_time: | |
| duration = now - start_time | |
| logger.info(f"[{timestamp}] ✓ {step_name} completed in {duration:.2f}s") | |
| else: | |
| logger.info(f"[{timestamp}] → {step_name} starting...") | |
| return now | |
| def profile_generation(model, tokenizer, inputs, **gen_kwargs): | |
| """Profile memory and time for model.generate() call""" | |
| torch.cuda.empty_cache() | |
| torch.cuda.reset_peak_memory_stats() | |
| log_memory("Before generate()") | |
| start_time = time.time() | |
| with torch.no_grad(): | |
| outputs = model.generate(**inputs, **gen_kwargs) | |
| end_time = time.time() | |
| duration = end_time - start_time | |
| peak_memory = torch.cuda.max_memory_allocated() / 1024**2 | |
| log_memory("After generate()") | |
| logger.info(f"Generation completed in {duration:.2f}s. Peak GPU: {peak_memory:.2f} MB") | |
| return outputs, duration | |
| # ============================================================================ | |
| # IMPORTS | |
| # ============================================================================ | |
| # Transformers for standard models | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForCausalLM, | |
| BitsAndBytesConfig, | |
| ) | |
| # ZeroGPU support | |
| try: | |
| import spaces | |
| HF_SPACES_AVAILABLE = True | |
| except ImportError: | |
| HF_SPACES_AVAILABLE = False | |
| class DummySpaces: | |
| def GPU(duration=90): | |
| def decorator(func): | |
| return func | |
| return decorator | |
| spaces = DummySpaces() | |
| # Accelerate | |
| from accelerate import Accelerator | |
| from accelerate.utils import set_seed | |
| # LangChain Core for proper message handling | |
| from langchain_core.runnables import Runnable | |
| from langchain_core.runnables.utils import Input, Output | |
| from langchain_core.messages import SystemMessage, HumanMessage | |
| # Import ALL prompts from prompt library | |
| from prompt_library import ( | |
| # System prompts | |
| CORE_IDENTITY, | |
| TOOL_DECISION, | |
| agent_1_system, | |
| agent_2_system, | |
| agent_3_system, | |
| agent_4_system, | |
| # Thinking agent system prompts | |
| MATH_THINKING, | |
| QUESTION_ANSWER_DESIGN, | |
| REASONING_THINKING, | |
| # Response agent prompts (dynamically applied) | |
| VAUGE_INPUT, | |
| USER_UNDERSTANDING, | |
| GENERAL_FORMATTING, | |
| LATEX_FORMATTING, | |
| GUIDING_TEACHING, | |
| STRUCTURE_PRACTICE_QUESTIONS, | |
| PRACTICE_QUESTION_FOLLOWUP, | |
| TOOL_USE_ENHANCEMENT, | |
| ) | |
| # ============================================================================ | |
| # MODEL MANAGER - LAZY LOADING | |
| # ============================================================================ | |
| # Import the lazy-loading Llama-3.2-3B model manager | |
| from model_manager import get_model as get_shared_llama, LazyLlamaModel as LlamaSharedAgent | |
| # Backwards compatibility aliases | |
| get_shared_mistral = get_shared_llama | |
| MistralSharedAgent = LlamaSharedAgent | |
| # ============================================================================ | |
| # CONFIGURATION | |
| # ============================================================================ | |
| CACHE_DIR = "/tmp/compiled_models" | |
| HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN") | |
| # Suppress warnings | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| warnings.filterwarnings("ignore", category=FutureWarning) | |
| # Model info (for logging/diagnostics) | |
| LLAMA_MODEL_ID = "meta-llama/Llama-3.2-3B-Instruct" | |
| def check_model_cache() -> Dict[str, bool]: | |
| """Check model status (legacy function for compatibility)""" | |
| cache_status = { | |
| "llama": True, # Lazy-loaded on first use | |
| "all_compiled": True, | |
| } | |
| logger.info("✓ Llama-3.2-3B uses lazy loading (loads on first generate() call)") | |
| return cache_status | |
| # Call at module load | |
| _cache_status = check_model_cache() | |
| log_memory("Module load complete") | |
| # ============================================================================ | |
| # TOOL DECISION AGENT | |
| # ============================================================================ | |
| class ToolDecisionAgent: | |
| """ | |
| Analyzes if visualization/graphing tools should be used. | |
| Uses lazy-loaded Llama-3.2-3B for decision-making. | |
| Model loads automatically on first use. | |
| Returns: Boolean (True = use tools, False = skip tools) | |
| """ | |
| def __init__(self): | |
| """Initialize with lazy-loaded Llama model""" | |
| self.model = get_shared_llama() | |
| logger.info("ToolDecisionAgent initialized (using lazy-loaded Llama)") | |
| def decide(self, user_query: str, conversation_history: List[Dict]) -> bool: | |
| """ | |
| Decide if graphing tools should be used. | |
| Args: | |
| user_query: Current user message | |
| conversation_history: Full conversation context | |
| Returns: | |
| bool: True if tools should be used | |
| """ | |
| logger.info("→ ToolDecisionAgent: Analyzing query for tool usage") | |
| # Format conversation context | |
| context = "\n".join([ | |
| f"{msg['role']}: {msg['content']}" | |
| for msg in conversation_history[-3:] # Last 3 turns | |
| ]) | |
| # Decision prompt | |
| analysis_prompt = f"""Previous conversation: | |
| {context} | |
| Current query: {user_query} | |
| Should visualization tools (graphs, charts) be used?""" | |
| try: | |
| decision_start = time.time() | |
| # Use shared Llama for decision | |
| response = self.model.generate( | |
| system_prompt=TOOL_DECISION, | |
| user_message=analysis_prompt, | |
| max_tokens=10, | |
| temperature=0.1 | |
| ) | |
| decision_time = time.time() - decision_start | |
| # Parse decision | |
| decision = "YES" in response.upper() | |
| logger.info(f"✓ ToolDecision: {'USE TOOLS' if decision else 'NO TOOLS'} ({decision_time:.2f}s)") | |
| return decision | |
| except Exception as e: | |
| logger.error(f"ToolDecisionAgent error: {e}") | |
| return False # Default: no tools | |
| # ============================================================================ | |
| # PROMPT ROUTING AGENTS (4 Specialized Agents) | |
| # ============================================================================ | |
| class PromptRoutingAgents: | |
| """ | |
| Four specialized agents for prompt segment selection. | |
| All share the same Llama-3.2-3B instance for efficiency. | |
| Agents: | |
| 1. Practice Question Detector | |
| 2. Discovery Mode Classifier | |
| 3. Follow-up Assessment | |
| 4. Teaching Mode Assessor | |
| """ | |
| def __init__(self): | |
| """Initialize with shared Llama model""" | |
| self.model = get_shared_llama() | |
| logger.info("PromptRoutingAgents initialized (4 agents, shared Llama)") | |
| def agent_1_practice_question( | |
| self, | |
| user_query: str, | |
| conversation_history: List[Dict] | |
| ) -> bool: | |
| """Agent 1: Detect if practice questions should be generated""" | |
| logger.info("→ Agent 1: Analyzing for practice question opportunity") | |
| context = "\n".join([ | |
| f"{msg['role']}: {msg['content']}" | |
| for msg in conversation_history[-4:] | |
| ]) | |
| analysis_prompt = f"""Conversation: | |
| {context} | |
| New query: {user_query} | |
| Should I create practice questions?""" | |
| try: | |
| response = self.model.generate( | |
| system_prompt=agent_1_system, | |
| user_message=analysis_prompt, | |
| max_tokens=10, | |
| temperature=0.1 | |
| ) | |
| decision = "YES" in response.upper() | |
| logger.info(f"✓ Agent 1: {'PRACTICE QUESTIONS' if decision else 'NO PRACTICE'}") | |
| return decision | |
| except Exception as e: | |
| logger.error(f"Agent 1 error: {e}") | |
| return False | |
| def agent_2_discovery_mode( | |
| self, | |
| user_query: str, | |
| conversation_history: List[Dict] | |
| ) -> Tuple[bool, bool]: | |
| """Agent 2: Classify vague input and understanding level""" | |
| logger.info("→ Agent 2: Classifying discovery mode") | |
| context = "\n".join([ | |
| f"{msg['role']}: {msg['content']}" | |
| for msg in conversation_history[-3:] | |
| ]) | |
| analysis_prompt = f"""Conversation: | |
| {context} | |
| Query: {user_query} | |
| Classification: | |
| 1. Is input vague? (VAGUE/CLEAR) | |
| 2. Understanding level? (LOW/MEDIUM/HIGH)""" | |
| try: | |
| response = self.model.generate( | |
| system_prompt=agent_2_system, | |
| user_message=analysis_prompt, | |
| max_tokens=20, | |
| temperature=0.1 | |
| ) | |
| vague = "VAGUE" in response.upper() | |
| low_understanding = "LOW" in response.upper() | |
| logger.info(f"✓ Agent 2: Vague={vague}, LowUnderstanding={low_understanding}") | |
| return vague, low_understanding | |
| except Exception as e: | |
| logger.error(f"Agent 2 error: {e}") | |
| return False, False | |
| def agent_3_followup_assessment( | |
| self, | |
| user_query: str, | |
| conversation_history: List[Dict] | |
| ) -> bool: | |
| """Agent 3: Detect if user is responding to practice questions""" | |
| logger.info("→ Agent 3: Checking for practice question follow-up") | |
| # Check last bot message for practice question indicators | |
| if len(conversation_history) < 2: | |
| return False | |
| last_bot_msg = None | |
| for msg in reversed(conversation_history): | |
| if msg['role'] == 'assistant': | |
| last_bot_msg = msg['content'] | |
| break | |
| if not last_bot_msg: | |
| return False | |
| # Look for practice question markers | |
| has_practice = any(marker in last_bot_msg.lower() for marker in [ | |
| "practice", "try this", "solve", "calculate", "what is", "question" | |
| ]) | |
| if not has_practice: | |
| return False | |
| # Analyze if current query is an answer attempt | |
| analysis_prompt = f"""Previous message (from me): | |
| {last_bot_msg[:500]} | |
| User response: | |
| {user_query} | |
| Is user answering a practice question?""" | |
| try: | |
| response = self.model.generate( | |
| system_prompt=agent_3_system, | |
| user_message=analysis_prompt, | |
| max_tokens=10, | |
| temperature=0.1 | |
| ) | |
| is_followup = "YES" in response.upper() | |
| logger.info(f"✓ Agent 3: {'GRADING MODE' if is_followup else 'NOT FOLLOWUP'}") | |
| return is_followup | |
| except Exception as e: | |
| logger.error(f"Agent 3 error: {e}") | |
| return False | |
| def agent_4_teaching_mode( | |
| self, | |
| user_query: str, | |
| conversation_history: List[Dict] | |
| ) -> Tuple[bool, bool]: | |
| """Agent 4: Assess teaching vs practice mode""" | |
| logger.info("→ Agent 4: Assessing teaching mode") | |
| context = "\n".join([ | |
| f"{msg['role']}: {msg['content']}" | |
| for msg in conversation_history[-3:] | |
| ]) | |
| analysis_prompt = f"""Conversation: | |
| {context} | |
| Query: {user_query} | |
| Assessment: | |
| 1. Need direct teaching? (TEACH/PRACTICE) | |
| 2. Create practice questions? (YES/NO)""" | |
| try: | |
| response = self.model.generate( | |
| system_prompt=agent_4_system, | |
| user_message=analysis_prompt, | |
| max_tokens=15, | |
| temperature=0.1 | |
| ) | |
| teaching = "TEACH" in response.upper() | |
| practice = "YES" in response.upper() or "PRACTICE" in response.upper() | |
| logger.info(f"✓ Agent 4: Teaching={teaching}, Practice={practice}") | |
| return teaching, practice | |
| except Exception as e: | |
| logger.error(f"Agent 4 error: {e}") | |
| return False, False | |
| def process( | |
| self, | |
| user_input: str, | |
| tool_used: bool = False, | |
| conversation_history: Optional[List[Dict]] = None | |
| ) -> Tuple[str, str]: | |
| """ | |
| Unified process method - runs all 4 routing agents sequentially. | |
| Returns: | |
| Tuple[str, str]: (response_prompts, thinking_prompts) | |
| """ | |
| if conversation_history is None: | |
| conversation_history = [] | |
| response_prompts = [] | |
| thinking_prompts = [] | |
| # Agent 1: Practice Questions | |
| if self.agent_1_practice_question(user_input, conversation_history): | |
| response_prompts.append("STRUCTURE_PRACTICE_QUESTIONS") | |
| # Agent 2: Discovery Mode | |
| is_vague, low_understanding = self.agent_2_discovery_mode(user_input, conversation_history) | |
| if is_vague: | |
| response_prompts.append("VAUGE_INPUT") | |
| if low_understanding: | |
| response_prompts.append("USER_UNDERSTANDING") | |
| # Agent 3: Follow-up Assessment | |
| if self.agent_3_followup_assessment(user_input, conversation_history): | |
| response_prompts.append("PRACTICE_QUESTION_FOLLOWUP") | |
| # Agent 4: Teaching Mode | |
| needs_teaching, needs_practice = self.agent_4_teaching_mode(user_input, conversation_history) | |
| if needs_teaching: | |
| response_prompts.append("GUIDING_TEACHING") | |
| # Always add base formatting | |
| response_prompts.extend(["GENERAL_FORMATTING", "LATEX_FORMATTING"]) | |
| # Tool enhancement if used | |
| if tool_used: | |
| response_prompts.append("TOOL_USE_ENHANCEMENT") | |
| # Return as newline-separated strings | |
| response_prompts_str = "\n".join(response_prompts) | |
| thinking_prompts_str = "" # Thinking prompts decided elsewhere | |
| return response_prompts_str, thinking_prompts_str | |
| # ============================================================================ | |
| # THINKING AGENTS (Preprocessing Layer) | |
| # ============================================================================ | |
| class ThinkingAgents: | |
| """ | |
| Generates reasoning context before final response. | |
| Uses shared Llama-3.2-3B for all thinking (including math). | |
| Agents: | |
| 1. Math Thinking (Tree-of-Thought) | |
| 2. Q&A Design (Chain-of-Thought) | |
| 3. General Reasoning (Chain-of-Thought) | |
| """ | |
| def __init__(self): | |
| """Initialize with shared Llama model""" | |
| self.model = get_shared_llama() | |
| logger.info("ThinkingAgents initialized (using shared Llama for all thinking)") | |
| def math_thinking( | |
| self, | |
| user_query: str, | |
| conversation_history: List[Dict], | |
| tool_context: str = "" | |
| ) -> str: | |
| """ | |
| Generate mathematical reasoning using Tree-of-Thought. | |
| Now uses Llama-3.2-3B instead of GGUF. | |
| """ | |
| logger.info("→ Math Thinking Agent: Generating reasoning") | |
| context = "\n".join([ | |
| f"{msg['role']}: {msg['content']}" | |
| for msg in conversation_history[-3:] | |
| ]) | |
| thinking_prompt = f"""Conversation context: | |
| {context} | |
| Current query: {user_query} | |
| {f"Tool output: {tool_context}" if tool_context else ""} | |
| Generate mathematical reasoning:""" | |
| try: | |
| thinking_start = time.time() | |
| reasoning = self.model.generate( | |
| system_prompt=MATH_THINKING, | |
| user_message=thinking_prompt, | |
| max_tokens=300, | |
| temperature=0.7 | |
| ) | |
| thinking_time = time.time() - thinking_start | |
| logger.info(f"✓ Math Thinking: Generated {len(reasoning)} chars ({thinking_time:.2f}s)") | |
| return reasoning | |
| except Exception as e: | |
| logger.error(f"Math Thinking error: {e}") | |
| return "" | |
| def qa_design_thinking( | |
| self, | |
| user_query: str, | |
| conversation_history: List[Dict], | |
| tool_context: str = "" | |
| ) -> str: | |
| """Generate practice question design reasoning""" | |
| logger.info("→ Q&A Design Agent: Generating question strategy") | |
| context = "\n".join([ | |
| f"{msg['role']}: {msg['content']}" | |
| for msg in conversation_history[-3:] | |
| ]) | |
| thinking_prompt = f"""Context: | |
| {context} | |
| Query: {user_query} | |
| {f"Tool data: {tool_context}" if tool_context else ""} | |
| Design practice questions:""" | |
| try: | |
| reasoning = self.model.generate( | |
| system_prompt=QUESTION_ANSWER_DESIGN, | |
| user_message=thinking_prompt, | |
| max_tokens=250, | |
| temperature=0.7 | |
| ) | |
| logger.info(f"✓ Q&A Design: Generated {len(reasoning)} chars") | |
| return reasoning | |
| except Exception as e: | |
| logger.error(f"Q&A Design error: {e}") | |
| return "" | |
| def process( | |
| self, | |
| user_input: str, | |
| conversation_history: str = "", | |
| thinking_prompts: str = "", | |
| tool_img_output: str = "", | |
| tool_context: str = "" | |
| ) -> str: | |
| """ | |
| Unified process method - runs thinking agents based on active prompts. | |
| Args: | |
| user_input: User's query | |
| conversation_history: Formatted conversation history string | |
| thinking_prompts: Newline-separated list of thinking prompts to activate | |
| tool_img_output: HTML output from visualization tool | |
| tool_context: Context from tool usage | |
| Returns: | |
| str: Combined thinking context from all activated agents | |
| """ | |
| thinking_outputs = [] | |
| # Convert history string to list format for agent methods | |
| history_list = [] | |
| if conversation_history and conversation_history != "No previous conversation": | |
| for line in conversation_history.split('\n'): | |
| if ':' in line: | |
| role, content = line.split(':', 1) | |
| history_list.append({'role': role.strip(), 'content': content.strip()}) | |
| # Determine which thinking agents to run based on prompts | |
| prompt_list = [p.strip() for p in thinking_prompts.split('\n') if p.strip()] | |
| # Math Thinking | |
| if any('MATH' in p.upper() for p in prompt_list): | |
| math_output = self.math_thinking( | |
| user_query=user_input, | |
| conversation_history=history_list, | |
| tool_context=tool_context | |
| ) | |
| if math_output: | |
| thinking_outputs.append(f"[Mathematical Reasoning]\n{math_output}") | |
| # Q&A Design Thinking | |
| if any('PRACTICE' in p.upper() or 'QUESTION' in p.upper() for p in prompt_list): | |
| qa_output = self.qa_design_thinking( | |
| user_query=user_input, | |
| conversation_history=history_list, | |
| tool_context=tool_context | |
| ) | |
| if qa_output: | |
| thinking_outputs.append(f"[Practice Question Design]\n{qa_output}") | |
| # General Reasoning (fallback or when no specific thinking needed) | |
| if not thinking_outputs or any('REASONING' in p.upper() for p in prompt_list): | |
| general_output = self.general_reasoning( | |
| user_query=user_input, | |
| conversation_history=history_list, | |
| tool_context=tool_context | |
| ) | |
| if general_output: | |
| thinking_outputs.append(f"[General Reasoning]\n{general_output}") | |
| # Combine all thinking outputs | |
| combined_thinking = "\n\n".join(thinking_outputs) if thinking_outputs else "" | |
| if combined_thinking: | |
| logger.info(f"✓ Thinking complete: {len(combined_thinking)} chars from {len(thinking_outputs)} agents") | |
| return combined_thinking | |
| def general_reasoning( | |
| self, | |
| user_query: str, | |
| conversation_history: List[Dict], | |
| tool_context: str = "" | |
| ) -> str: | |
| """Generate general reasoning context""" | |
| logger.info("→ General Reasoning Agent: Generating context") | |
| context = "\n".join([ | |
| f"{msg['role']}: {msg['content']}" | |
| for msg in conversation_history[-4:] | |
| ]) | |
| thinking_prompt = f"""Conversation: | |
| {context} | |
| Query: {user_query} | |
| {f"Context: {tool_context}" if tool_context else ""} | |
| Analyze and provide reasoning:""" | |
| try: | |
| reasoning = self.model.generate( | |
| system_prompt=REASONING_THINKING, | |
| user_message=thinking_prompt, | |
| max_tokens=200, | |
| temperature=0.7 | |
| ) | |
| logger.info(f"✓ General Reasoning: Generated {len(reasoning)} chars") | |
| return reasoning | |
| except Exception as e: | |
| logger.error(f"General Reasoning error: {e}") | |
| return "" | |
| # ============================================================================ | |
| # RESPONSE AGENT (Final Response Generation) | |
| # ============================================================================ | |
| class ResponseAgent(Runnable): | |
| """ | |
| Generates final educational responses using lazy-loaded Llama-3.2-3B. | |
| Model loads automatically on first use. | |
| Features: | |
| - Dynamic prompt assembly based on agent decisions | |
| - Streaming word-by-word output | |
| - Educational tone enforcement | |
| - LaTeX support for math | |
| - Context integration (thinking outputs, tool outputs) | |
| """ | |
| def __init__(self): | |
| """Initialize with lazy-loaded Llama model""" | |
| super().__init__() | |
| self.model = get_shared_llama() | |
| logger.info("ResponseAgent initialized (using lazy-loaded Llama)") | |
| def invoke(self, input_data: Dict) -> Dict: | |
| """ | |
| Generate final response with streaming. | |
| Args: | |
| input_data: { | |
| 'user_query': str, | |
| 'conversation_history': List[Dict], | |
| 'active_prompts': List[str], | |
| 'thinking_context': str, | |
| 'tool_context': str, | |
| } | |
| Returns: | |
| {'response': str, 'metadata': Dict} | |
| """ | |
| logger.info("→ ResponseAgent: Generating final response") | |
| # Extract inputs | |
| user_query = input_data.get('user_query', '') | |
| conversation_history = input_data.get('conversation_history', []) | |
| active_prompts = input_data.get('active_prompts', []) | |
| thinking_context = input_data.get('thinking_context', '') | |
| tool_context = input_data.get('tool_context', '') | |
| # Build system prompt from active segments | |
| system_prompt = self._build_system_prompt(active_prompts) | |
| # Build user message with context | |
| user_message = self._build_user_message( | |
| user_query, | |
| conversation_history, | |
| thinking_context, | |
| tool_context | |
| ) | |
| try: | |
| response_start = time.time() | |
| # Generate response (streaming handled at app.py level) | |
| response = self.model.generate( | |
| system_prompt=system_prompt, | |
| user_message=user_message, | |
| max_tokens=600, | |
| temperature=0.7 | |
| ) | |
| response_time = time.time() - response_start | |
| # Clean up response | |
| response = self._clean_response(response) | |
| logger.info(f"✓ ResponseAgent: Generated {len(response)} chars ({response_time:.2f}s)") | |
| return { | |
| 'response': response, | |
| 'metadata': { | |
| 'generation_time': response_time, | |
| 'model': LLAMA_MODEL_ID, | |
| 'active_prompts': active_prompts | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"ResponseAgent error: {e}") | |
| return { | |
| 'response': "I apologize, but I encountered an error generating a response. Please try again.", | |
| 'metadata': {'error': str(e)} | |
| } | |
| def _build_system_prompt(self, active_prompts: List[str]) -> str: | |
| """Assemble system prompt from active segments""" | |
| prompt_map = { | |
| 'CORE_IDENTITY': CORE_IDENTITY, | |
| 'GENERAL_FORMATTING': GENERAL_FORMATTING, | |
| 'LATEX_FORMATTING': LATEX_FORMATTING, | |
| 'VAUGE_INPUT': VAUGE_INPUT, | |
| 'USER_UNDERSTANDING': USER_UNDERSTANDING, | |
| 'GUIDING_TEACHING': GUIDING_TEACHING, | |
| 'STRUCTURE_PRACTICE_QUESTIONS': STRUCTURE_PRACTICE_QUESTIONS, | |
| 'PRACTICE_QUESTION_FOLLOWUP': PRACTICE_QUESTION_FOLLOWUP, | |
| 'TOOL_USE_ENHANCEMENT': TOOL_USE_ENHANCEMENT, | |
| } | |
| # Always include core identity | |
| segments = [CORE_IDENTITY, GENERAL_FORMATTING] | |
| # Add active prompts | |
| for prompt_name in active_prompts: | |
| if prompt_name in prompt_map and prompt_map[prompt_name] not in segments: | |
| segments.append(prompt_map[prompt_name]) | |
| return "\n\n".join(segments) | |
| def _build_user_message( | |
| self, | |
| user_query: str, | |
| conversation_history: List[Dict], | |
| thinking_context: str, | |
| tool_context: str | |
| ) -> str: | |
| """Build user message with all context""" | |
| parts = [] | |
| # Conversation history (last 3 turns) | |
| if conversation_history: | |
| history_text = "\n".join([ | |
| f"{msg['role']}: {msg['content'][:200]}" | |
| for msg in conversation_history[-3:] | |
| ]) | |
| parts.append(f"Recent conversation:\n{history_text}") | |
| # Thinking context (invisible to user, guides response) | |
| if thinking_context: | |
| parts.append(f"[Internal reasoning context]: {thinking_context}") | |
| # Tool context | |
| if tool_context: | |
| parts.append(f"[Tool output]: {tool_context}") | |
| # Current query | |
| parts.append(f"Student query: {user_query}") | |
| return "\n\n".join(parts) | |
| def _clean_response(self, response: str) -> str: | |
| """Clean up response artifacts""" | |
| # Remove common artifacts | |
| artifacts = ['<|im_end|>', '<|endoftext|>', '###', '<|end|>'] | |
| for artifact in artifacts: | |
| response = response.replace(artifact, '') | |
| # Remove trailing incomplete sentences | |
| if response and response[-1] not in '.!?': | |
| # Find last complete sentence | |
| for delimiter in ['. ', '! ', '? ']: | |
| if delimiter in response: | |
| response = response.rsplit(delimiter, 1)[0] + delimiter[0] | |
| break | |
| return response.strip() | |
| def stream(self, input_data: Dict): | |
| """ | |
| Stream response word-by-word. | |
| Yields: | |
| str: Response chunks | |
| """ | |
| logger.info("→ ResponseAgent: Streaming response") | |
| # Build prompts | |
| system_prompt = self._build_system_prompt(input_data.get('active_prompts', [])) | |
| user_message = self._build_user_message( | |
| input_data.get('user_query', ''), | |
| input_data.get('conversation_history', []), | |
| input_data.get('thinking_context', ''), | |
| input_data.get('tool_context', '') | |
| ) | |
| try: | |
| # Use streaming generation from shared model | |
| for chunk in self.model.generate_streaming( | |
| system_prompt=system_prompt, | |
| user_message=user_message, | |
| max_tokens=600, | |
| temperature=0.7 | |
| ): | |
| yield chunk | |
| except Exception as e: | |
| logger.error(f"Streaming error: {e}") | |
| yield "I apologize, but I encountered an error. Please try again." | |
| # ============================================================================ | |
| # MODULE INITIALIZATION | |
| # ============================================================================ | |
| logger.info("="*60) | |
| logger.info("MIMIR AGENTS MODULE INITIALIZED") | |
| logger.info("="*60) | |
| logger.info(f" Model: Llama-3.2-3B-Instruct (lazy-loaded)") | |
| logger.info(f" Agents: Tool, Routing (4x), Thinking (3x), Response") | |
| logger.info(f" Memory: ~1GB (loads on first use)") | |
| logger.info(f" Architecture: Single unified model with caching") | |
| logger.info("="*60) |