Spaces:
Sleeping
Sleeping
| """ | |
| Chain 04: Analysis Chain | |
| Type: LLMChain + Pydantic output parser | |
| Trigger: pattern_query | data_analysis | structured_request | |
| Model: Cerebras Llama 3.3 70B | |
| Output: Structured JSON stored to Supabase + summary | |
| """ | |
| from typing import Any, Dict, Optional | |
| import time | |
| import json | |
| from pydantic import BaseModel, Field | |
| from langchain.output_parsers import PydanticOutputParser | |
| from langchain.prompts import ChatPromptTemplate | |
| from langchain_cerebras import ChatCerebras | |
| from langchain_groq import ChatGroq | |
| from app.config import settings | |
| from app.chains.base import BaseChain, ChainType | |
| from app.utils.logging import get_logger | |
| from app.utils.database import db | |
| logger = get_logger("analysis_chain") | |
| class AnalysisOutput(BaseModel): | |
| """Structured output for analysis""" | |
| summary: str = Field(description="Executive summary of the analysis") | |
| key_findings: list = Field(description="List of key findings") | |
| patterns_identified: list = Field(description="Patterns or trends found") | |
| recommendations: list = Field(description="Recommendations based on analysis") | |
| confidence_score: float = Field(description="Confidence in analysis (0-1)") | |
| class AnalysisChain(BaseChain): | |
| """ | |
| Analysis chain for structured data analysis and pattern recognition | |
| Produces JSON output stored to Supabase | |
| """ | |
| chain_type = ChainType.ANALYSIS | |
| description = "Structured analysis with JSON output" | |
| def __init__(self): | |
| super().__init__() | |
| self.llm = None | |
| self.parser = PydanticOutputParser(pydantic_object=AnalysisOutput) | |
| self._initialize() | |
| def _initialize(self): | |
| """Initialize LLM""" | |
| if settings.cerebras_api_key: | |
| try: | |
| self.llm = ChatCerebras( | |
| api_key=settings.cerebras_api_key, | |
| model_name="llama-3.3-70b", | |
| temperature=0.3 # Lower for structured output | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Cerebras: {e}") | |
| # Fallback to Groq | |
| if not self.llm and settings.groq_api_key: | |
| self.llm = ChatGroq( | |
| api_key=settings.groq_api_key, | |
| model_name="llama-3.1-70b-versatile", | |
| temperature=0.3 | |
| ) | |
| async def execute( | |
| self, | |
| input_data: Dict[str, Any], | |
| context: Optional[Dict[str, Any]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Execute analysis chain | |
| Args: | |
| input_data: Contains 'text' (analysis query) | |
| context: Full context with data | |
| Returns: | |
| Structured analysis results | |
| """ | |
| start_time = time.time() | |
| if not self.llm: | |
| return { | |
| "response": { | |
| "type": "text", | |
| "content": "Analysis service unavailable. I'll provide a qualitative assessment.", | |
| "structured_data": {} | |
| }, | |
| "personality_mode": "neutral", | |
| "chain": self.chain_type.value, | |
| "model": "fallback", | |
| "latency_ms": int((time.time() - start_time) * 1000), | |
| "tokens_used": 0, | |
| "sources": [] | |
| } | |
| query = input_data.get("text", "") | |
| user_id = context.get("identity", {}).get("user_profile", {}).get("user_id", "unknown") | |
| try: | |
| # Get relevant data from context | |
| memory_data = context.get("semantic_memory", {}) | |
| tasks_data = context.get("situational", {}) | |
| # Build prompt with format instructions | |
| format_instructions = self.parser.get_format_instructions() | |
| prompt = ChatPromptTemplate.from_messages([ | |
| ("system", f"""You are an analytical AI. Analyze the user's query and provide structured output. | |
| {format_instructions} | |
| Use the provided context data to inform your analysis. Be objective and data-driven."""), | |
| ("human", f"""Analyze the following: | |
| Query: {query} | |
| Context Data: | |
| - Knowledge items: {len(memory_data.get('knowledge_items', []))} relevant items | |
| - Tasks: {tasks_data.get('active_goals_count', 0)} active goals | |
| - Pending events: {tasks_data.get('event_count', 0)} | |
| Provide a thorough analysis following the format above.""") | |
| ]) | |
| # Generate analysis | |
| messages = prompt.format_messages(query=query) | |
| response = await self.llm.ainvoke(messages) | |
| # Parse output | |
| try: | |
| parsed = self.parser.parse(response.content) | |
| structured_output = parsed.dict() | |
| except Exception as e: | |
| logger.error(f"Parse error: {e}") | |
| # Fallback: extract manually | |
| structured_output = { | |
| "summary": response.content[:500], | |
| "key_findings": [], | |
| "patterns_identified": [], | |
| "recommendations": [], | |
| "confidence_score": 0.7 | |
| } | |
| # Store analysis to knowledge base | |
| analysis_record = { | |
| "user_id": user_id, | |
| "content": json.dumps(structured_output, indent=2), | |
| "content_type": "analysis", | |
| "category": "data_analysis", | |
| "source_type": "analysis_chain", | |
| "confidence_score": structured_output.get("confidence_score", 0.7), | |
| "quality_score": 0.85 | |
| } | |
| try: | |
| await db.insert("knowledge_base", analysis_record) | |
| except Exception as e: | |
| logger.error(f"Failed to store analysis: {e}") | |
| elapsed_ms = int((time.time() - start_time) * 1000) | |
| return { | |
| "response": { | |
| "type": "analysis", | |
| "content": structured_output.get("summary", "Analysis complete."), | |
| "structured_data": structured_output | |
| }, | |
| "personality_mode": "direct", # Analysis is factual | |
| "chain": self.chain_type.value, | |
| "model": "cerebras-70b" if settings.cerebras_api_key else "groq-70b", | |
| "latency_ms": elapsed_ms, | |
| "tokens_used": 0, | |
| "sources": [] | |
| } | |
| except Exception as e: | |
| logger.error(f"Analysis chain error: {e}") | |
| return { | |
| "response": { | |
| "type": "text", | |
| "content": f"I encountered an error during analysis: {str(e)}", | |
| "structured_data": {} | |
| }, | |
| "personality_mode": "neutral", | |
| "chain": self.chain_type.value, | |
| "model": "error", | |
| "latency_ms": int((time.time() - start_time) * 1000), | |
| "tokens_used": 0, | |
| "sources": [] | |
| } | |