Spaces:
Running
Running
| """ | |
| Context Assembler (Runs Before Every Chain) | |
| The universal pre-step. No chain executes without this running first. | |
| ASSEMBLY TARGET: < 120ms total | |
| """ | |
| from typing import Any, Dict, List, Optional | |
| from datetime import datetime, time | |
| import asyncio | |
| from app.utils.logging import get_logger | |
| from app.utils.database import db | |
| from app.utils.cache import cache | |
| logger = get_logger("context_assembler") | |
| class ContextAssembler: | |
| """ | |
| Assembles 7 tiers of context for every chain execution. | |
| Total target time: < 120ms | |
| """ | |
| async def assemble( | |
| cls, | |
| user_id: str, | |
| input_text: str, | |
| session_id: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Assemble all context tiers. | |
| TIER 1: Identity (< 5ms, Redis cache) | |
| TIER 2: Temporal (< 2ms) | |
| TIER 3: Emotional State (< 10ms, Redis cache) | |
| TIER 4: Immediate Memory (< 20ms, Supabase) | |
| TIER 5: Semantic Memory (< 50ms, pgvector) | |
| TIER 6: Situational (< 15ms, Supabase + Redis) | |
| TIER 7: World State (< 10ms, Redis cache) | |
| """ | |
| start_time = datetime.now() | |
| # Execute all tiers concurrently where possible | |
| tasks = [ | |
| cls._tier_1_identity(user_id), | |
| cls._tier_2_temporal(user_id), | |
| cls._tier_3_emotional(user_id), | |
| cls._tier_4_immediate_memory(user_id, session_id), | |
| cls._tier_5_semantic_memory(user_id, input_text), | |
| cls._tier_6_situational(user_id), | |
| cls._tier_7_world_state(user_id), | |
| ] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| context = { | |
| "identity": results[0] if not isinstance(results[0], Exception) else {}, | |
| "temporal": results[1] if not isinstance(results[1], Exception) else {}, | |
| "emotional": results[2] if not isinstance(results[2], Exception) else {}, | |
| "immediate_memory": results[3] if not isinstance(results[3], Exception) else {}, | |
| "semantic_memory": results[4] if not isinstance(results[4], Exception) else {}, | |
| "situational": results[5] if not isinstance(results[5], Exception) else {}, | |
| "world_state": results[6] if not isinstance(results[6], Exception) else {}, | |
| } | |
| elapsed_ms = (datetime.now() - start_time).total_seconds() * 1000 | |
| logger.info(f"Context assembled in {elapsed_ms:.1f}ms") | |
| return context | |
| async def _tier_1_identity(cls, user_id: str) -> Dict[str, Any]: | |
| """ | |
| TIER 1 β Identity (< 5ms, Redis cache) | |
| β user_profile loaded (cached 1hr) | |
| β personality_params computed | |
| β active_hours classification | |
| """ | |
| cache_key = f"user_profile:{user_id}" | |
| # Try cache first | |
| cached = await cache.get(cache_key) | |
| if cached: | |
| return { | |
| "user_profile": cached, | |
| "source": "cache", | |
| "personality_params": cached.get("personality_params", {}), | |
| "is_active_hours": cls._check_active_hours(cached) | |
| } | |
| # Fetch from database | |
| profile = await db.fetch_one("user_profile", {"user_id": user_id}) | |
| if profile: | |
| # Cache for 1 hour | |
| await cache.set(cache_key, profile, ttl=3600) | |
| return { | |
| "user_profile": profile, | |
| "source": "database", | |
| "personality_params": profile.get("personality_params", {}), | |
| "is_active_hours": cls._check_active_hours(profile) | |
| } | |
| return {"error": "Profile not found"} | |
| def _check_active_hours(cls, profile: Dict) -> bool: | |
| """Check if current time is within user's active hours""" | |
| now = datetime.now() | |
| current_time = now.time() | |
| start = profile.get("active_hours_start", "08:00:00") | |
| end = profile.get("active_hours_end", "22:00:00") | |
| # Convert string times to time objects | |
| if isinstance(start, str): | |
| start = datetime.strptime(start, "%H:%M:%S").time() | |
| if isinstance(end, str): | |
| end = datetime.strptime(end, "%H:%M:%S").time() | |
| return start <= current_time <= end | |
| async def _tier_2_temporal(cls, user_id: str) -> Dict[str, Any]: | |
| """ | |
| TIER 2 β Temporal (< 2ms) | |
| β current datetime + timezone | |
| β behavioral pattern for this hour/day | |
| β DND window check | |
| """ | |
| now = datetime.now() | |
| hour = now.hour | |
| day_of_week = now.weekday() | |
| # Classify time period | |
| if 5 <= hour < 12: | |
| period = "morning" | |
| elif 12 <= hour < 17: | |
| period = "afternoon" | |
| elif 17 <= hour < 22: | |
| period = "evening" | |
| else: | |
| period = "night" | |
| # Weekend check | |
| is_weekend = day_of_week >= 5 | |
| return { | |
| "current_time": now.isoformat(), | |
| "hour": hour, | |
| "day_of_week": day_of_week, | |
| "period": period, | |
| "is_weekend": is_weekend, | |
| "is_dnd": False # Would check against profile | |
| } | |
| async def _tier_3_emotional(cls, user_id: str) -> Dict[str, Any]: | |
| """ | |
| TIER 3 β Emotional State (< 10ms, Redis cache) | |
| β 7-day emotion average (cached 1hr) | |
| β today's trajectory | |
| β current stress estimate | |
| """ | |
| cache_key = f"emotional_state:{user_id}" | |
| # Try cache | |
| cached = await cache.get(cache_key) | |
| if cached: | |
| return cached | |
| # Query recent emotion logs | |
| logs = await db.fetch_many( | |
| "emotion_log", | |
| filters={"user_id": user_id}, | |
| order_by="recorded_at", | |
| ascending=False, | |
| limit=20 | |
| ) | |
| if not logs: | |
| return { | |
| "dominant_emotion": "neutral", | |
| "stress_level": "low", | |
| "7_day_average": {}, | |
| "today_trajectory": "stable" | |
| } | |
| # Calculate averages | |
| emotions = ["joy", "sadness", "anger", "fear", "surprise", "disgust", "neutral"] | |
| averages = {} | |
| for emotion in emotions: | |
| values = [log.get(emotion, 0) for log in logs if emotion in log] | |
| averages[emotion] = sum(values) / len(values) if values else 0 | |
| # Find dominant | |
| dominant = max(averages, key=averages.get) | |
| # Stress classification | |
| stress = logs[0].get("stress_level", "low") if logs else "low" | |
| result = { | |
| "dominant_emotion": dominant, | |
| "stress_level": stress, | |
| "7_day_average": averages, | |
| "today_trajectory": "stable", # Simplified | |
| "latest_emotion": logs[0] if logs else None | |
| } | |
| # Cache for 1 hour | |
| await cache.set(cache_key, result, ttl=3600) | |
| return result | |
| async def _tier_4_immediate_memory(cls, user_id: str, session_id: Optional[str]) -> Dict[str, Any]: | |
| """ | |
| TIER 4 β Immediate Memory (< 20ms, Supabase) | |
| β ConversationBufferWindowMemory: last 20 turns | |
| β today's interaction summary | |
| """ | |
| # Get last 20 interactions for this session | |
| filters = {"user_id": user_id} | |
| if session_id: | |
| filters["session_id"] = session_id | |
| interactions = await db.fetch_many( | |
| "interactions", | |
| filters=filters, | |
| order_by="created_at", | |
| ascending=False, | |
| limit=20 | |
| ) | |
| # Format as conversation turns | |
| turns = [] | |
| for interaction in reversed(interactions): # Oldest first | |
| turns.append({ | |
| "role": "user", | |
| "content": interaction.get("input_text", ""), | |
| "timestamp": interaction.get("created_at") | |
| }) | |
| if interaction.get("response_text"): | |
| turns.append({ | |
| "role": "assistant", | |
| "content": interaction.get("response_text", ""), | |
| "timestamp": interaction.get("created_at"), | |
| "personality_mode": interaction.get("personality_mode") | |
| }) | |
| return { | |
| "recent_turns": turns, | |
| "turn_count": len(turns), | |
| "session_id": session_id | |
| } | |
| async def _tier_5_semantic_memory(cls, user_id: str, input_text: str) -> Dict[str, Any]: | |
| """ | |
| TIER 5 β Semantic Memory (< 50ms, pgvector) | |
| β embed input query (HF Inference API) | |
| β cosine similarity search: top 8 conversations | |
| β cosine similarity search: top 5 knowledge_base | |
| β cosine similarity search: top 3 research_sessions | |
| """ | |
| from app.memory.semantic_search import semantic_search | |
| # Search all semantic memory sources | |
| results = await semantic_search.search_all( | |
| user_id=user_id, | |
| query=input_text, | |
| max_results_per_source=8 | |
| ) | |
| return { | |
| "knowledge_items": results.get("knowledge", [])[:5], | |
| "news_articles": results.get("news", [])[:3], | |
| "research_sessions": results.get("research", [])[:3], | |
| "source": "semantic_similarity", | |
| "total_results": results.get("total_results", 0), | |
| "embedding_used": True | |
| } | |
| async def _tier_6_situational(cls, user_id: str) -> Dict[str, Any]: | |
| """ | |
| TIER 6 β Situational (< 15ms, Supabase + Redis) | |
| β today's tasks + approaching deadlines | |
| β active goals + probability scores | |
| β pending proactive events queued | |
| """ | |
| # Get active tasks | |
| tasks = await db.fetch_many( | |
| "tasks_goals", | |
| filters={"user_id": user_id, "status": "active"}, | |
| order_by="due_date", | |
| ascending=True, | |
| limit=10 | |
| ) | |
| # Get pending events | |
| events = await db.fetch_many( | |
| "event_queue", | |
| filters={"user_id": user_id, "status": "pending"}, | |
| limit=10 | |
| ) | |
| # Categorize tasks | |
| today_tasks = [] | |
| approaching_deadlines = [] | |
| now = datetime.now() | |
| for task in tasks: | |
| due = task.get("due_date") | |
| if due: | |
| due_date = datetime.fromisoformat(due.replace("Z", "+00:00")) if isinstance(due, str) else due | |
| days_until = (due_date - now).days | |
| if days_until <= 0: | |
| today_tasks.append(task) | |
| elif days_until <= 3: | |
| approaching_deadlines.append(task) | |
| return { | |
| "today_tasks": today_tasks, | |
| "approaching_deadlines": approaching_deadlines, | |
| "active_goals_count": len(tasks), | |
| "pending_events": events, | |
| "event_count": len(events) | |
| } | |
| async def _tier_7_world_state(cls, user_id: str) -> Dict[str, Any]: | |
| """ | |
| TIER 7 β World State (< 10ms, Redis cache) | |
| β breaking news last 2 hours (cached) | |
| β financial movements >2% (cached) | |
| β pre-computed morning context if applicable | |
| """ | |
| cache_key = f"world_state:{user_id}" | |
| # Try cache | |
| cached = await cache.get(cache_key) | |
| if cached: | |
| return cached | |
| # Placeholder - would fetch from cached world data | |
| world_state = { | |
| "breaking_news": [], | |
| "financial_alerts": [], | |
| "morning_context": None, | |
| "last_updated": datetime.now().isoformat(), | |
| "note": "World state from daemon cache" | |
| } | |
| # Cache for 10 minutes | |
| await cache.set(cache_key, world_state, ttl=600) | |
| return world_state | |