Spaces:
Sleeping
Sleeping
| """ | |
| Model orchestrator for the AI Learning Path Generator. | |
| Handles interactions with language models and embeddings. | |
| """ | |
| from langchain.prompts import PromptTemplate, ChatPromptTemplate | |
| from src.utils.observability import get_observability_manager, estimate_cost | |
| from src.utils.cache import cache, cached | |
| from src.utils.helpers import optimize_prompt, count_tokens, estimate_api_cost | |
| from src.utils.config import ( | |
| OPENAI_API_KEY, | |
| DEEPSEEK_API_KEY, # Kept for legacy compatibility | |
| OPENROUTER_API_KEY, # OpenRouter support | |
| DEFAULT_PROVIDER, | |
| DEFAULT_MODEL, | |
| OPENROUTER_FREE_MODEL, # Free model from OpenRouter | |
| MAX_TOKENS, | |
| TEMPERATURE | |
| ) | |
| from langchain.chains import LLMChain | |
| from typing import List, Dict, Any, Optional, Union, TypeVar, Type | |
| import json | |
| import os | |
| # Using Pydantic v1 | |
| import pydantic | |
| from pydantic import BaseModel as PydanticBaseModel | |
| # Import from langchain (older version compatible with Pydantic v1) | |
| from langchain.llms import OpenAI | |
| from langchain.chat_models import ChatOpenAI | |
| # For type hints | |
| T = TypeVar('T', bound='BaseModel') | |
| class BaseModel(PydanticBaseModel): | |
| """Base model using Pydantic v1.""" | |
| class Config: | |
| arbitrary_types_allowed = True | |
| # We'll use only OpenAI for now to make the application work | |
| # Both providers will default to using OpenAI | |
| # Import token optimization utilities for cost savings | |
| # Import caching utilities to avoid repeated API calls | |
| # Import observability utilities for LLM monitoring | |
| class ModelOrchestrator: | |
| """ | |
| Manages AI model interactions with RAG capabilities. | |
| """ | |
| def __init__(self, api_key: Optional[str] = None, provider: Optional[str] = None): | |
| print("--- ModelOrchestrator.__init__ started ---") | |
| """ | |
| Initialize the model orchestrator with RAG capabilities. | |
| Args: | |
| api_key: Optional API key (if not provided, will use from environment) | |
| provider: Optional provider name ('openai', 'openrouter', or 'deepseek') | |
| """ | |
| self.provider = provider.lower() if provider else DEFAULT_PROVIDER | |
| self.context = [] | |
| self.goal = None | |
| self.planning_enabled = True | |
| self.memory = [] | |
| # Set up API key based on selected provider | |
| if self.provider == 'openai': | |
| self.api_key = api_key or OPENAI_API_KEY | |
| if not self.api_key: | |
| raise ValueError( | |
| "OpenAI API key is required. Please provide it or set the OPENAI_API_KEY environment variable.") | |
| print( | |
| "--- ModelOrchestrator.__init__: Preparing to initialize ChatOpenAI ---") | |
| print( | |
| f"--- ModelOrchestrator.__init__: API Key: {str(self.api_key)[:15]}..., Model: {DEFAULT_MODEL}, Temp: {TEMPERATURE}, Max Tokens: {MAX_TOKENS} ---") | |
| # self.llm = ChatOpenAI( | |
| # api_key=self.api_key, | |
| # model_name=DEFAULT_MODEL, | |
| # temperature=TEMPERATURE, | |
| # max_tokens=MAX_TOKENS | |
| # ) | |
| print("--- ModelOrchestrator.__init__: ChatOpenAI initialization SKIPPED ---") | |
| print( | |
| "--- ModelOrchestrator.__init__: Preparing to initialize OpenAI (base_llm) ---") | |
| # self.base_llm = OpenAI( | |
| # api_key=self.api_key, | |
| # model_name=DEFAULT_MODEL, | |
| # temperature=TEMPERATURE, | |
| # max_tokens=MAX_TOKENS | |
| # ) | |
| print( | |
| "--- ModelOrchestrator.__init__: OpenAI (base_llm) initialization SKIPPED ---") | |
| elif self.provider == 'deepseek': | |
| self.api_key = api_key or DEEPSEEK_API_KEY | |
| if not self.api_key: | |
| raise ValueError( | |
| "DeepSeek API key is required. Please provide it or set the DEEPSEEK_API_KEY environment variable.") | |
| print("--- ModelOrchestrator.__init__: DeepSeek provider selected, client initialization SKIPPED for now ---") | |
| elif self.provider == 'openrouter': | |
| self.api_key = api_key or OPENROUTER_API_KEY | |
| if not self.api_key: | |
| raise ValueError( | |
| "OpenRouter API key is required. Please provide it or set the OPENROUTER_API_KEY environment variable.") | |
| print( | |
| "--- ModelOrchestrator.__init__: OpenRouter provider selected (free models available) ---") | |
| # Only OpenAI, OpenRouter and DeepSeek providers are supported now | |
| # (OpenAI is the primary and recommended provider) | |
| else: | |
| raise ValueError( | |
| f"Unsupported provider: {self.provider}. Use 'openai', 'openrouter', or 'deepseek'.") | |
| # Track current model name | |
| self.model_name = DEFAULT_MODEL | |
| # Initialize observability manager | |
| self.obs_manager = get_observability_manager() | |
| # Override default model if DeepSeek provider is selected | |
| if self.provider == 'deepseek': | |
| # Allow environment variable override but default to the official DeepSeek chat model | |
| self.model_name = os.getenv("DEEPSEEK_MODEL", "deepseek-chat") | |
| print( | |
| f"--- ModelOrchestrator.__init__: DeepSeek provider detected, using model: {self.model_name} ---") | |
| # Initialize the language model based on provider | |
| print("--- ModelOrchestrator.__init__: Calling init_language_model ---") | |
| self.init_language_model() | |
| print("--- ModelOrchestrator.__init__ finished (LLM initialized) ---") | |
| def init_language_model(self, model_name: Optional[str] = None, temperature: Optional[float] = None): | |
| print( | |
| f"--- ModelOrchestrator.init_language_model started (provider: {self.provider}, model: {model_name or self.model_name}) ---") | |
| """ | |
| Initialize or switch the language model. | |
| Args: | |
| model_name: Name of the model to use | |
| temperature: Temperature setting for the model | |
| """ | |
| # Update model name if provided | |
| if model_name: | |
| self.model_name = model_name | |
| temp = temperature if temperature is not None else TEMPERATURE | |
| # Initialize based on provider | |
| try: | |
| if self.provider == 'openai': | |
| print( | |
| f"--- ModelOrchestrator.init_language_model: Initializing ChatOpenAI for {self.provider} ---") | |
| self.llm = ChatOpenAI( | |
| openai_api_key=self.api_key, | |
| model=self.model_name, | |
| temperature=temp, | |
| max_tokens=MAX_TOKENS, | |
| ) | |
| print( | |
| f"--- ModelOrchestrator.init_language_model: ChatOpenAI for {self.provider} initialized ---") | |
| elif self.provider == 'openrouter': | |
| print( | |
| f"--- ModelOrchestrator.init_language_model: Initializing ChatOpenAI for OpenRouter ---") | |
| # Use OpenRouter free model for this provider | |
| model_to_use = OPENROUTER_FREE_MODEL | |
| self.model_name = model_to_use # Update model name | |
| # OpenRouter uses OpenAI-compatible API with different endpoint | |
| self.llm = ChatOpenAI( | |
| openai_api_key=self.api_key, | |
| openai_api_base="https://openrouter.ai/api/v1", | |
| model=model_to_use, | |
| temperature=temp, | |
| max_tokens=MAX_TOKENS, | |
| ) | |
| print( | |
| f"--- ModelOrchestrator.init_language_model: ChatOpenAI for OpenRouter initialized with model: {model_to_use} ---") | |
| elif self.provider == 'deepseek': | |
| print( | |
| f"--- ModelOrchestrator.init_language_model: Initializing ChatOpenAI for {self.provider} ---") | |
| # DeepSeek uses OpenAI-compatible API | |
| self.llm = ChatOpenAI( | |
| openai_api_key=self.api_key, | |
| openai_api_base="https://api.deepseek.com/v1", | |
| model=self.model_name, | |
| temperature=temp, | |
| max_tokens=MAX_TOKENS, | |
| ) | |
| print( | |
| f"--- ModelOrchestrator.init_language_model: ChatOpenAI for DeepSeek initialized ---") | |
| except Exception as e: | |
| print(f"Error initializing language model: {str(e)}") | |
| raise | |
| def switch_provider(self, provider: str, api_key: Optional[str] = None, model_name: Optional[str] = None): | |
| """ | |
| Switch between AI providers. | |
| Args: | |
| provider: The provider to switch to ('openai' or 'deepseek') | |
| api_key: Optional API key for the provider | |
| model_name: Optional model name to use | |
| Returns: | |
| str: Status message indicating the provider and model in use | |
| """ | |
| try: | |
| self.provider = provider.lower() | |
| # Update API key if provided | |
| if api_key: | |
| self.api_key = api_key | |
| elif self.provider == 'openai': | |
| self.api_key = OPENAI_API_KEY | |
| elif self.provider == 'deepseek': | |
| self.api_key = DEEPSEEK_API_KEY | |
| # OpenAI is the primary provider now | |
| else: | |
| raise ValueError( | |
| f"Unsupported provider: {provider}. Use 'openai' or 'deepseek'.") | |
| # Update model name if provided | |
| if model_name: | |
| self.model_name = model_name | |
| # Re-initialize the language model | |
| self.init_language_model() | |
| return f"Switched to {self.provider} provider with model {self.model_name}" | |
| except Exception as e: | |
| error_msg = f"Error switching to provider {provider}: {str(e)}" | |
| print(error_msg) | |
| # Try to fallback to a working provider | |
| if self.provider != 'openai': | |
| print("Falling back to OpenAI provider") | |
| return self.switch_provider('openai', OPENAI_API_KEY, model_name or DEFAULT_MODEL) | |
| raise ValueError(error_msg) from e | |
| def generate_response( | |
| self, | |
| prompt: str, | |
| relevant_documents: Optional[List[str]] = None, | |
| temperature: Optional[float] = None, | |
| use_cache: bool = True # NEW: Enable caching by default | |
| ) -> str: | |
| """ | |
| Generate a text response from the language model. | |
| Args: | |
| prompt: The prompt for the model | |
| relevant_documents: Optional list of relevant documents to add context | |
| temperature: Optional override for model temperature | |
| use_cache: Whether to use cached responses (default: True) | |
| Returns: | |
| The generated response as a string | |
| """ | |
| # Check cache first to save money! 💰 | |
| if use_cache: | |
| cache_key = cache.cache_key( | |
| "response", | |
| prompt[:200], # First 200 chars of prompt | |
| str(relevant_documents)[:100] if relevant_documents else "", | |
| self.model_name, | |
| temperature or TEMPERATURE | |
| ) | |
| cached_response = cache.get(cache_key) | |
| if cached_response: | |
| print("💰 Using cached response - $0.00 cost!") | |
| return cached_response | |
| # Optimize prompt to reduce token usage and save money! 💰 | |
| full_prompt = optimize_prompt( | |
| prompt, relevant_documents, max_tokens=4000) | |
| # Log token count and estimated cost for monitoring | |
| input_token_count = count_tokens(full_prompt, self.model_name) | |
| estimated_input_cost = estimate_api_cost( | |
| input_token_count, self.model_name) | |
| print( | |
| f"💰 Token count: {input_token_count} (~${estimated_input_cost:.4f} input cost)") | |
| try: | |
| # Set up the temperature | |
| temp = temperature if temperature is not None else TEMPERATURE | |
| print("DEBUG: About to make OpenAI API call using direct implementation...") | |
| import time | |
| from src.direct_openai import generate_completion | |
| try: | |
| start_time = time.time() | |
| print(f"DEBUG: Using model: {self.model_name}") | |
| print(f"DEBUG: Prompt length: {len(full_prompt)} chars") | |
| # Use our direct implementation that bypasses the client library | |
| response_text = generate_completion( | |
| prompt=full_prompt, | |
| system_message="You are an expert educational AI assistant that specializes in creating personalized learning paths.", | |
| model=self.model_name, | |
| temperature=temp, | |
| max_tokens=MAX_TOKENS, | |
| timeout=120 | |
| ) | |
| latency_ms = (time.time() - start_time) * 1000 | |
| print(f"DEBUG: API call completed in {latency_ms:.2f}ms") | |
| # Estimate output tokens and total cost | |
| output_token_count = count_tokens( | |
| response_text, self.model_name) if response_text else 0 | |
| total_cost = estimate_cost( | |
| self.model_name, input_token_count, output_token_count) | |
| # Log to observability platform (LangSmith + W&B) | |
| self.obs_manager.log_llm_call( | |
| prompt=full_prompt, | |
| response=response_text, | |
| model=self.model_name, | |
| metadata={ | |
| "temperature": temp, | |
| "max_tokens": MAX_TOKENS, | |
| "provider": self.provider, | |
| "cached": False | |
| }, | |
| latency_ms=latency_ms, | |
| token_count=input_token_count + output_token_count, | |
| cost=total_cost | |
| ) | |
| # Cache the response for future use (save money!) | |
| if use_cache and response_text: | |
| # Cache for 24 hours | |
| cache.set(cache_key, response_text, ttl=86400) | |
| return response_text | |
| except Exception as e: | |
| print(f"DEBUG: API call failed: {str(e)}") | |
| raise | |
| except Exception as e: | |
| error_msg = f"Error generating response: {str(e)}" | |
| print(error_msg) | |
| # Try to extract more detailed error information | |
| try: | |
| import traceback | |
| error_traceback = traceback.format_exc() | |
| print(f"Error traceback:\n{error_traceback}") | |
| # Check if it's an OpenAI API error | |
| if hasattr(e, 'response') and hasattr(e.response, 'json'): | |
| error_data = e.response.json() | |
| print(f"OpenAI API Error: {error_data}") | |
| error_msg += f"\nAPI Error: {error_data.get('error', {}).get('message', str(e))}" | |
| except Exception as inner_e: | |
| print(f"Error while processing error: {str(inner_e)}") | |
| raise ValueError(error_msg) from e | |
| def generate_response_stream( | |
| self, | |
| prompt: str, | |
| relevant_documents: Optional[List[str]] = None, | |
| temperature: Optional[float] = None, | |
| ): | |
| """ | |
| Generate streaming response for real-time output. | |
| Why streaming: | |
| - Users see progress immediately | |
| - Perceived performance is better | |
| - Same cost as regular response! | |
| - Better UX = happier users | |
| Args: | |
| prompt: The prompt for the model | |
| relevant_documents: Optional list of relevant documents to add context | |
| temperature: Optional override for model temperature | |
| Yields: | |
| Chunks of response text as they arrive | |
| """ | |
| # Optimize prompt to reduce costs | |
| full_prompt = optimize_prompt( | |
| prompt, relevant_documents, max_tokens=4000) | |
| # Log token count | |
| token_count = count_tokens(full_prompt, self.model_name) | |
| estimated_cost = estimate_api_cost(token_count, self.model_name) | |
| print( | |
| f"💰 Streaming - Token count: {token_count} (~${estimated_cost:.4f} input cost)") | |
| temp = temperature if temperature is not None else TEMPERATURE | |
| try: | |
| from openai import OpenAI | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| stream = client.chat.completions.create( | |
| model=self.model_name, | |
| messages=[ | |
| {"role": "system", "content": "You are an expert educational AI assistant that specializes in creating personalized learning paths."}, | |
| {"role": "user", "content": full_prompt} | |
| ], | |
| temperature=temp, | |
| max_tokens=MAX_TOKENS, | |
| stream=True # Enable streaming! | |
| ) | |
| for chunk in stream: | |
| if chunk.choices[0].delta.content is not None: | |
| yield chunk.choices[0].delta.content | |
| except Exception as e: | |
| print(f"Streaming error: {str(e)}") | |
| yield f"Error: {str(e)}" | |
| def generate_structured_response( | |
| self, | |
| prompt: str, | |
| output_schema: str, | |
| relevant_documents: Optional[List[str]] = None, | |
| temperature: Optional[float] = None, | |
| use_cache: bool = True # NEW: Enable caching by default | |
| ) -> str: | |
| """ | |
| Generate a structured response that follows a specific schema. | |
| Args: | |
| prompt: The prompt for the model | |
| output_schema: The schema instructions for the output | |
| relevant_documents: Optional list of relevant documents to add context | |
| temperature: Optional override for model temperature | |
| use_cache: Whether to use cached responses (default: True) | |
| Returns: | |
| The generated response as a JSON string | |
| """ | |
| # Check cache first to save money! 💰 | |
| if use_cache: | |
| cache_key = cache.cache_key( | |
| "structured", | |
| prompt[:200], # First 200 chars of prompt | |
| output_schema[:100], # First 100 chars of schema | |
| str(relevant_documents)[:100] if relevant_documents else "", | |
| self.model_name, | |
| temperature or 0.2 | |
| ) | |
| cached_response = cache.get(cache_key) | |
| if cached_response: | |
| print("💰 Using cached structured response - $0.00 cost!") | |
| return cached_response | |
| # Determine if this is a learning path generation | |
| is_learning_path = 'LearningPath' in output_schema | |
| # Prepare the prompt with schema instructions and emphasize required fields | |
| required_fields_reminder = "" | |
| if is_learning_path: | |
| required_fields_reminder = """ | |
| IMPORTANT: Your response MUST include ALL of these required fields: | |
| - title: String title of the learning path | |
| - description: Detailed description of the learning path | |
| - topic: Main topic of study | |
| - expertise_level: Starting expertise level | |
| - learning_style: Preferred learning style | |
| - time_commitment: Weekly time commitment | |
| - duration_weeks: Total duration in weeks (integer) | |
| - goals: List of learning goals and objectives | |
| - milestones: List of learning milestones | |
| - prerequisites: List of prerequisites for this path | |
| - total_hours: Total estimated hours (integer) | |
| For each milestone, you MUST include: | |
| - title: Short title for the milestone | |
| - description: Detailed description | |
| - estimated_hours: Estimated hours to complete (integer) | |
| - resources: List of recommended learning resources | |
| - skills_gained: List of skills gained after completion | |
| """ | |
| schema_prompt = f""" | |
| {prompt} | |
| Your response should follow this schema format: | |
| {output_schema} | |
| {required_fields_reminder} | |
| Please provide a valid JSON response that strictly follows this schema. | |
| Do not include any explanatory text outside the JSON structure. | |
| """ | |
| # Optimize prompt with context to reduce token usage 💰 | |
| full_prompt = optimize_prompt( | |
| schema_prompt, relevant_documents, max_tokens=6000) | |
| # Log token count and estimated cost | |
| token_count = count_tokens(full_prompt, self.model_name) | |
| estimated_cost = estimate_api_cost(token_count, self.model_name) | |
| print( | |
| f"💰 Structured response - Token count: {token_count} (~${estimated_cost:.4f} input cost)") | |
| # Set up the temperature - lower for structured outputs | |
| temp = temperature if temperature is not None else 0.2 | |
| # Use our direct implementation that bypasses the client library | |
| import time | |
| import requests | |
| import traceback | |
| response_text = None | |
| try: | |
| start_time = time.time() | |
| print( | |
| f"DEBUG: Generating structured response using provider: {self.provider}, model: {self.model_name}") | |
| print(f"DEBUG: Prompt length: {len(full_prompt)} chars") | |
| # Print the first 200 chars of the prompt for debugging | |
| print(f"DEBUG: Prompt preview: {full_prompt[:200]}...") | |
| # Print API key details for debugging (safely) | |
| if self.provider == 'openai': | |
| api_key = OPENAI_API_KEY | |
| if api_key: | |
| print( | |
| f"DEBUG: Using OpenAI API key starting with: {api_key[:5]}{'*' * 10}") | |
| else: | |
| print("DEBUG: WARNING - No OpenAI API key found!") | |
| elif self.provider == 'deepseek': | |
| api_key = DEEPSEEK_API_KEY | |
| if api_key: | |
| print( | |
| f"DEBUG: Using DeepSeek API key starting with: {api_key[:5]}{'*' * 10}") | |
| else: | |
| print("DEBUG: WARNING - No DeepSeek API key found!") | |
| # OpenAI is the primary provider now | |
| if self.provider == 'openai': | |
| from src.direct_openai import generate_completion | |
| print("Attempting to generate OpenAI completion...") | |
| response_text = generate_completion( | |
| prompt=full_prompt, | |
| system_message="You are an expert AI assistant that specializes in generating structured responses following specified schemas. Always include all required fields in your JSON response.", | |
| model=self.model_name, | |
| temperature=temp, | |
| max_tokens=MAX_TOKENS, | |
| timeout=300 # Increase timeout for reliability | |
| ) | |
| print( | |
| f"Successfully generated completion with {len(response_text) if response_text else 0} characters") | |
| elif self.provider == 'openrouter': | |
| # OpenRouter uses OpenAI-compatible API via direct_openai with custom endpoint | |
| from openai import OpenAI as OpenAIClient | |
| print("Attempting to generate OpenRouter completion...") | |
| client = OpenAIClient( | |
| api_key=self.api_key, | |
| base_url="https://openrouter.ai/api/v1" | |
| ) | |
| # Use free model if not specified | |
| model_to_use = self.model_name if self.model_name else OPENROUTER_FREE_MODEL | |
| try: | |
| completion = client.chat.completions.create( | |
| model=model_to_use, | |
| messages=[ | |
| {"role": "system", "content": "You are an expert AI assistant that specializes in generating structured responses following specified schemas. Always include all required fields in your JSON response."}, | |
| {"role": "user", "content": full_prompt} | |
| ], | |
| temperature=temp, | |
| max_tokens=MAX_TOKENS, | |
| timeout=300 | |
| ) | |
| response_text = completion.choices[0].message.content | |
| print( | |
| f"Successfully generated OpenRouter completion with {len(response_text) if response_text else 0} characters") | |
| except Exception as e: | |
| print(f"Error calling OpenRouter API: {e}") | |
| response_text = None | |
| elif self.provider == 'deepseek': | |
| response_text = self._deepseek_completion( | |
| full_prompt, | |
| temp, | |
| system_message="You are an expert AI assistant that specializes in generating structured responses following specified schemas. Always include all required fields in your JSON response." | |
| ) | |
| # OpenAI is the primary provider now | |
| else: | |
| raise ValueError(f"Unknown provider: {self.provider}") | |
| print( | |
| f"DEBUG: API call completed in {time.time() - start_time:.2f} seconds") | |
| if response_text: | |
| print( | |
| f"DEBUG: Received response with length: {len(response_text)} chars") | |
| print(f"DEBUG: Response preview: {response_text[:100]}...") | |
| else: | |
| print("DEBUG: WARNING - Received empty response from API") | |
| if is_learning_path: | |
| # Return a fallback learning path | |
| return self._create_fallback_learning_path() | |
| else: | |
| # Return a fallback generic response | |
| return json.dumps({ | |
| "summary": "Sorry, I encountered an error retrieving information.", | |
| "key_concepts": ["Error occurred while processing your request"], | |
| "learning_path": ["Please try again with a different query"], | |
| "resources": [], | |
| "code_examples": [], | |
| "advanced_topics": [] | |
| }) | |
| except Exception as e: | |
| print(f"DEBUG: Structured response generation failed: {str(e)}") | |
| print(traceback.format_exc()) | |
| if is_learning_path: | |
| # Return a fallback learning path | |
| return self._create_fallback_learning_path() | |
| else: | |
| # Return a fallback generic response | |
| return json.dumps({ | |
| "summary": f"Sorry, I encountered an error: {str(e)}", | |
| "key_concepts": ["Unable to extract structured information"], | |
| "learning_path": ["Please try asking in a different way"], | |
| "resources": [], | |
| "code_examples": [], | |
| "advanced_topics": [], | |
| "career_applications": [] | |
| }) | |
| # Extract JSON from the response | |
| try: | |
| # Try to find JSON in the response (may be enclosed in ```json blocks) | |
| if "```json" in response_text: | |
| json_start = response_text.find("```json") + 7 | |
| json_end = response_text.find("```", json_start) | |
| json_str = response_text[json_start:json_end].strip() | |
| elif "```" in response_text: | |
| json_start = response_text.find("```") + 3 | |
| json_end = response_text.find("```", json_start) | |
| json_str = response_text[json_start:json_end].strip() | |
| else: | |
| json_str = response_text.strip() | |
| # Validate JSON | |
| data = json.loads(json_str) | |
| # If expecting a learning path but received a list or wrong type, fallback | |
| if is_learning_path and not isinstance(data, dict): | |
| print( | |
| "DEBUG: Expected learning path dict but received different type, returning fallback path.") | |
| return self._create_fallback_learning_path() | |
| # For learning paths, validate that all required fields are present | |
| if is_learning_path: | |
| required_fields = [ | |
| 'title', 'description', 'topic', 'expertise_level', | |
| 'learning_style', 'time_commitment', 'duration_weeks', | |
| 'goals', 'milestones', 'prerequisites', 'total_hours' | |
| ] | |
| missing_fields = [ | |
| field for field in required_fields if field not in data] | |
| if missing_fields: | |
| print( | |
| f"DEBUG: Missing required fields in learning path: {missing_fields}") | |
| # If any fields are missing, add them with default values | |
| for field in missing_fields: | |
| if field == 'title': | |
| data['title'] = data.get( | |
| 'topic', 'Learning Path') + ' Learning Path' | |
| elif field == 'description': | |
| data[ | |
| 'description'] = f"A comprehensive learning path for {data.get('topic', 'the requested topic')}." | |
| elif field == 'topic': | |
| data['topic'] = data.get( | |
| 'title', 'General Learning').replace(' Learning Path', '') | |
| elif field == 'expertise_level': | |
| data['expertise_level'] = 'beginner' | |
| elif field == 'learning_style': | |
| data['learning_style'] = 'visual' | |
| elif field == 'time_commitment': | |
| data['time_commitment'] = 'moderate' | |
| elif field == 'duration_weeks': | |
| data['duration_weeks'] = 8 | |
| elif field == 'goals': | |
| data['goals'] = [ | |
| f"Master {data.get('topic', 'the subject')}"] | |
| elif field == 'milestones': | |
| data['milestones'] = [{ | |
| 'title': 'Getting Started', | |
| 'description': f"Introduction to {data.get('topic', 'the subject')}", | |
| 'estimated_hours': 10, | |
| 'resources': [{'name': 'Online Documentation', 'url': '', 'type': 'documentation'}], | |
| 'skills_gained': [f"Basic {data.get('topic', 'subject')} knowledge"] | |
| }] | |
| elif field == 'prerequisites': | |
| data['prerequisites'] = ['None'] | |
| elif field == 'total_hours': | |
| data['total_hours'] = 40 | |
| # Also check that each milestone has the required fields | |
| if 'milestones' in data and isinstance(data['milestones'], list): | |
| milestone_required_fields = [ | |
| 'title', 'description', 'estimated_hours', 'resources', 'skills_gained'] | |
| for i, milestone in enumerate(data['milestones']): | |
| milestone_missing_fields = [ | |
| field for field in milestone_required_fields if field not in milestone] | |
| if milestone_missing_fields: | |
| print( | |
| f"DEBUG: Missing required fields in milestone {i+1}: {milestone_missing_fields}") | |
| # Add missing fields with default values | |
| for field in milestone_missing_fields: | |
| if field == 'title': | |
| milestone['title'] = f"Milestone {i+1}" | |
| elif field == 'description': | |
| milestone['description'] = f"A key learning milestone in this path." | |
| elif field == 'estimated_hours': | |
| milestone['estimated_hours'] = 10 | |
| elif field == 'resources': | |
| milestone['resources'] = [ | |
| {'name': 'Online Resource', 'url': '', 'type': 'article'}] | |
| elif field == 'skills_gained': | |
| milestone['skills_gained'] = [ | |
| f"Skills related to {data.get('topic', 'the subject')}"] | |
| # Cache the successful response for future use (save money!) | |
| json_result = json.dumps(data) | |
| if use_cache: | |
| # Cache for 24 hours | |
| cache.set(cache_key, json_result, ttl=86400) | |
| return json_result | |
| except Exception as e: | |
| print(f"DEBUG: Error parsing initial JSON: {str(e)}") | |
| # First cleanup attempt - remove markdown code block wrappers | |
| cleaned_response = response_text.strip() | |
| # Remove ```json...``` or ```...``` markdown wrappers | |
| import re | |
| markdown_match = re.search( | |
| r'```(?:json)?\s*(.*?)\s*```', response_text, re.DOTALL) | |
| if markdown_match: | |
| cleaned_response = markdown_match.group(1).strip() | |
| print(f"DEBUG: Extracted content from markdown code block") | |
| # Remove common text prefixes | |
| for prefix in ["+", "-", "*", "#", "Response:", "JSON:", "Here's", "```", "```json"]: | |
| if cleaned_response.startswith(prefix): | |
| cleaned_response = cleaned_response[len(prefix):].strip() | |
| try: | |
| # Try to parse the cleaned response | |
| data = json.loads(cleaned_response) | |
| print(f"DEBUG: Successfully parsed cleaned JSON") | |
| return json.dumps(data) | |
| except Exception as e2: | |
| print(f"DEBUG: Error parsing cleaned JSON: {str(e2)}") | |
| # Second attempt - find the main JSON object (start with first { and match closing }) | |
| try: | |
| first_brace = cleaned_response.find('{') | |
| if first_brace != -1: | |
| # Count braces to find the matching closing brace | |
| brace_count = 0 | |
| end_pos = first_brace | |
| for i in range(first_brace, len(cleaned_response)): | |
| if cleaned_response[i] == '{': | |
| brace_count += 1 | |
| elif cleaned_response[i] == '}': | |
| brace_count -= 1 | |
| if brace_count == 0: | |
| end_pos = i + 1 | |
| break | |
| potential_json = cleaned_response[first_brace:end_pos] | |
| print( | |
| f"DEBUG: Extracted JSON from position {first_brace} to {end_pos} ({len(potential_json)} chars)") | |
| data = json.loads(potential_json) | |
| print(f"DEBUG: Successfully parsed extracted JSON") | |
| return json.dumps(data) | |
| except Exception as e3: | |
| print(f"DEBUG: Error in brace matching: {str(e3)}") | |
| # Return a fallback JSON as last resort instead of raising an exception | |
| print("DEBUG: Returning fallback JSON structure due to parsing failure") | |
| return json.dumps({ | |
| "summary": "Failed to parse the AI's response. The content might not be in the expected JSON format.", | |
| "key_concepts": ["JSON parsing error"], | |
| "learning_path": ["Please try a different query or check the AI provider's output directly if possible."], | |
| "resources": [], | |
| "code_examples": [], | |
| "advanced_topics": [], | |
| "error_details": "The AI's response could not be successfully parsed as JSON after multiple attempts." | |
| }) | |
| return json.dumps({ | |
| "summary": f"I processed your request but encountered a formatting issue. Your question was about: {response_text[:100]}...", | |
| "key_concepts": ["Unable to extract structured information"], | |
| "learning_path": ["Please try asking in a different way"], | |
| "resources": [], | |
| "code_examples": [], | |
| "advanced_topics": [], | |
| "career_applications": [] | |
| }) | |
| def _deepseek_completion(self, prompt: str, temperature: float, system_message: str = None): | |
| """Call DeepSeek API for chat completion. | |
| The helper explicitly adds a **system** message reminding the model to comply with the | |
| schema and strictly return JSON. We have observed that without this guard-rail the | |
| DeepSeek model occasionally omits required fields which later causes Pydantic | |
| validation failures. Passing a clear system prompt greatly increases response | |
| reliability. | |
| """ | |
| import requests | |
| import traceback | |
| import json | |
| import time | |
| api_key = DEEPSEEK_API_KEY | |
| url = "https://api.deepseek.com/v1/chat/completions" | |
| system_msg = ( | |
| system_message | |
| or "You are an expert AI assistant that MUST output ONLY valid JSON strictly " | |
| "following the user's schema instructions. Do not add any commentary, markdown " | |
| "code fences or explanations." | |
| ) | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json", | |
| } | |
| payload_base = { | |
| "model": self.model_name if hasattr(self, "model_name") else "deepseek-chat", | |
| "temperature": temperature or 0.2, | |
| "max_tokens": MAX_TOKENS, | |
| } | |
| def _post(messages): | |
| start = time.time() | |
| pl = {**payload_base, "messages": messages} | |
| print( | |
| f"DEBUG: DeepSeek request with {len(json.dumps(pl))} chars payload, " | |
| f"messages={len(messages)}" | |
| ) | |
| resp = requests.post(url, headers=headers, json=pl, timeout=150) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| content = data["choices"][0]["message"]["content"] | |
| print( | |
| f"DEBUG: DeepSeek response in {time.time()-start:.2f}s with " | |
| f"{len(content)} chars" | |
| ) | |
| return content | |
| try: | |
| # 1st attempt – full prompt | |
| messages = [ | |
| {"role": "system", "content": system_msg}, | |
| {"role": "user", "content": prompt}, | |
| ] | |
| response_text = _post(messages) | |
| # Quick JSON sanity check; if it fails we'll retry with a reduced prompt. | |
| try: | |
| json.loads(response_text.strip("`")) | |
| return response_text | |
| except Exception: | |
| print( | |
| "DEBUG: DeepSeek response not valid JSON, retrying with simplified instructions...") | |
| # 2nd attempt – simplified prompt focusing on schema only | |
| simple_prompt = ( | |
| "Provide ONLY the JSON that matches the schema. Do not wrap it in anything." | |
| ) | |
| messages_retry = [ | |
| {"role": "system", "content": system_msg}, | |
| {"role": "user", "content": prompt + "\n\n" + simple_prompt}, | |
| ] | |
| return _post(messages_retry) | |
| except Exception as e: | |
| print(f"DEBUG: DeepSeek API call failed: {str(e)}") | |
| print(traceback.format_exc()) | |
| raise | |
| def _create_fallback_learning_path(self): | |
| """ | |
| Create a fallback learning path with default values when generation fails. | |
| """ | |
| import datetime | |
| import uuid | |
| fallback_path = { | |
| "id": str(uuid.uuid4()), | |
| "title": "General Learning Path", | |
| "description": "A default learning path created when specific generation failed.", | |
| "topic": "General Topic", | |
| "expertise_level": "beginner", | |
| "learning_style": "visual", | |
| "time_commitment": "moderate", | |
| "duration_weeks": 8, | |
| "goals": ["Build foundational knowledge", "Develop practical skills"], | |
| "milestones": [ | |
| { | |
| "title": "Getting Started", | |
| "description": "Introduction to the fundamentals.", | |
| "estimated_hours": 10, | |
| "resources": [ | |
| {"name": "Online Documentation", | |
| "url": "", "type": "documentation"} | |
| ], | |
| "skills_gained": ["Basic knowledge"] | |
| }, | |
| { | |
| "title": "Core Concepts", | |
| "description": "Understanding core principles and practices.", | |
| "estimated_hours": 15, | |
| "resources": [ | |
| {"name": "Online Tutorial", "url": "", "type": "tutorial"} | |
| ], | |
| "skills_gained": ["Fundamental concepts"] | |
| } | |
| ], | |
| "prerequisites": ["None"], | |
| "total_hours": 25, | |
| "created_at": datetime.datetime.now().isoformat() | |
| } | |
| return json.dumps(fallback_path) | |
| def analyze_difficulty(self, content: str) -> float: | |
| """ | |
| Analyze the difficulty level of educational content. | |
| Args: | |
| content: The content to analyze | |
| Returns: | |
| Difficulty score between 0 (easiest) and 1 (hardest) | |
| """ | |
| prompt = f""" | |
| Analyze the following educational content and rate its difficulty level on a scale from 0 to 1, | |
| where 0 is very basic (elementary level) and 1 is extremely advanced (expert/PhD level). | |
| Content: | |
| {content[:1000]}... | |
| Consider factors like: | |
| - Technical vocabulary and jargon | |
| - Complexity of concepts | |
| - Prerequisites required to understand | |
| - Density of information | |
| Return only a numeric score between 0 and 1 with up to 2 decimal places. | |
| """ | |
| response = self.generate_response(prompt, temperature=0.1) | |
| # Extract the numeric score | |
| try: | |
| # Look for patterns like "0.75" or "Difficulty: 0.75" | |
| import re | |
| matches = re.findall(r"([0-9]\.[0-9]{1,2})", response) | |
| if matches: | |
| score = float(matches[0]) | |
| return max(0.0, min(1.0, score)) # Ensure between 0 and 1 | |
| # If no decimal found, look for whole numbers | |
| matches = re.findall(r"^([0-9])$", response) | |
| if matches: | |
| score = float(matches[0]) | |
| return max(0.0, min(1.0, score)) # Ensure between 0 and 1 | |
| return 0.5 # Default to middle difficulty | |
| except Exception: | |
| return 0.5 # Default to middle difficulty | |
| def generate_resource_recommendations( | |
| self, | |
| topic: str, | |
| learning_style: str, | |
| expertise_level: str, | |
| count: int = 5 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Generate tailored resource recommendations for a topic. | |
| Args: | |
| topic: The topic to find resources for | |
| learning_style: Preferred learning style | |
| expertise_level: User's expertise level | |
| count: Number of resources to recommend | |
| Returns: | |
| List of resource dictionaries | |
| """ | |
| prompt = f""" | |
| Generate {count} learning resources for someone studying {topic}. | |
| Their learning style is {learning_style} and their expertise level is {expertise_level}. | |
| IMPORTANT: All resources MUST be in English only. Do not include resources in Portuguese, Spanish, or any other language. | |
| For each resource, include: | |
| 1. Title (in English) | |
| 2. Type (video, article, book, interactive, course, documentation, podcast, project) | |
| 3. Description (1-2 sentences in English) | |
| 4. Difficulty level (beginner, intermediate, advanced, expert) | |
| 5. Estimated time to complete (in minutes or hours) | |
| 6. URL (create a realistic but fictional URL if needed) | |
| Provide the response as a JSON array of resource objects. All text fields must be in English. | |
| """ | |
| response = self.generate_structured_response( | |
| prompt=prompt, | |
| output_schema=""" | |
| [ | |
| { | |
| "title": "string", | |
| "type": "string", | |
| "description": "string", | |
| "difficulty": "string", | |
| "time_estimate": "string", | |
| "url": "string" | |
| } | |
| ] | |
| """, | |
| temperature=0.7 | |
| ) | |
| try: | |
| resources = json.loads(response) | |
| return resources | |
| except Exception: | |
| # Fallback to empty list on parsing error | |
| return [] | |
| def generate_path(self, topic: str, expertise_level: str, learning_style: str, context: List[str] = None) -> str: | |
| """ | |
| Generate a learning path based on user preferences and context using RAG. | |
| Args: | |
| topic: The learning topic | |
| expertise_level: User's expertise level | |
| learning_style: User's preferred learning style | |
| context: Optional context to consider | |
| Returns: | |
| Generated learning path | |
| """ | |
| # Combine provided context with stored context | |
| full_context = self.context + (context or []) | |
| # Plan if planning is enabled | |
| if self.planning_enabled and hasattr(self, '_plan_path_generation'): | |
| self._plan_path_generation( | |
| topic, expertise_level, learning_style, full_context) | |
| # Generate path with context | |
| prompt = f"""Generate a learning path for the following topic: | |
| Topic: {topic} | |
| Expertise Level: {expertise_level} | |
| Learning Style: {learning_style} | |
| Context: | |
| {' '.join(full_context)} | |
| Previous answers: | |
| {' '.join(self.memory)} | |
| Generate a structured learning path with milestones and resources. | |
| """ | |
| path = self._generate_text(prompt) | |
| # Store path in memory | |
| self.memory.append( | |
| f"Generated path for {topic} with {expertise_level} level and {learning_style} style") | |
| return path | |
| def generate_answer(self, question: str, context: Optional[List[str]] = None, temperature: Optional[float] = None) -> str: | |
| """ | |
| Generate an answer to a question using RAG and agentic behavior. | |
| Args: | |
| question: The question to answer | |
| context: Optional context to consider | |
| temperature: Optional temperature for response generation | |
| Returns: | |
| Generated answer | |
| """ | |
| # Combine provided context with stored context | |
| full_context = self.context + (context or []) | |
| # Plan if planning is enabled | |
| if self.planning_enabled and hasattr(self, '_plan_answer_generation'): | |
| self._plan_answer_generation(question, full_context) | |
| # Generate answer with context | |
| prompt = f"""Answer the following question based on the provided context: | |
| Context: | |
| {' '.join(full_context)} | |
| Question: {question}""" | |
| # Store question in memory | |
| self.memory.append(f"Question: {question}") | |
| # Generate and return the answer | |
| return self.generate_response(prompt, relevant_documents=full_context, temperature=temperature) | |
| def _plan_answer_generation(self, question: str, context: List[str]) -> None: | |
| """ | |
| Plan the answer generation process. | |
| Args: | |
| question: The question to answer | |
| context: Context information | |
| """ | |
| # Analyze the question to determine the best approach | |
| question_lower = question.lower() | |
| # Determine if we need more context | |
| if len(context) < 2 and not any(keyword in question_lower for keyword in ["what", "how", "why", "when", "where", "who"]): | |
| self.context.append("Need more context for this question") | |
| # Determine the type of question | |
| if "how" in question_lower: | |
| self.context.append("This is a procedural question") | |
| elif "why" in question_lower: | |
| self.context.append("This is an explanatory question") | |
| elif "what" in question_lower: | |
| self.context.append("This is a definitional question") | |
| elif "compare" in question_lower or "difference" in question_lower: | |
| self.context.append("This is a comparative question") | |
| def _plan_path_generation(self, topic: str, expertise_level: str, learning_style: str, context: List[str]) -> None: | |
| """ | |
| Plan the learning path generation process. | |
| Args: | |
| topic: The learning topic | |
| expertise_level: User's expertise level | |
| learning_style: User's preferred learning style | |
| context: Context information | |
| """ | |
| # Determine the appropriate depth and breadth based on expertise level | |
| if expertise_level == "beginner": | |
| self.context.append("Focus on fundamentals and basic concepts") | |
| elif expertise_level == "intermediate": | |
| self.context.append( | |
| "Include practical applications and case studies") | |
| elif expertise_level == "advanced": | |
| self.context.append( | |
| "Include advanced techniques and research papers") | |
| # Adjust for learning style | |
| if learning_style == "visual": | |
| self.context.append("Prioritize video resources and diagrams") | |
| elif learning_style == "auditory": | |
| self.context.append("Prioritize podcasts and audio lectures") | |
| elif learning_style == "reading": | |
| self.context.append("Prioritize books and articles") | |
| elif learning_style == "kinesthetic": | |
| self.context.append("Prioritize hands-on projects and exercises") | |