tillu-daemon / app /chains /context_assembler.py
tillu-AI's picture
upload app/chains/context_assembler.py
ad9e941 verified
"""
Context Assembler (Runs Before Every Chain)
The universal pre-step. No chain executes without this running first.
ASSEMBLY TARGET: < 120ms total
"""
from typing import Any, Dict, List, Optional
from datetime import datetime, time
import asyncio
from app.utils.logging import get_logger
from app.utils.database import db
from app.utils.cache import cache
logger = get_logger("context_assembler")
class ContextAssembler:
"""
Assembles 7 tiers of context for every chain execution.
Total target time: < 120ms
"""
@classmethod
async def assemble(
cls,
user_id: str,
input_text: str,
session_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Assemble all context tiers.
TIER 1: Identity (< 5ms, Redis cache)
TIER 2: Temporal (< 2ms)
TIER 3: Emotional State (< 10ms, Redis cache)
TIER 4: Immediate Memory (< 20ms, Supabase)
TIER 5: Semantic Memory (< 50ms, pgvector)
TIER 6: Situational (< 15ms, Supabase + Redis)
TIER 7: World State (< 10ms, Redis cache)
"""
start_time = datetime.now()
# Execute all tiers concurrently where possible
tasks = [
cls._tier_1_identity(user_id),
cls._tier_2_temporal(user_id),
cls._tier_3_emotional(user_id),
cls._tier_4_immediate_memory(user_id, session_id),
cls._tier_5_semantic_memory(user_id, input_text),
cls._tier_6_situational(user_id),
cls._tier_7_world_state(user_id),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
context = {
"identity": results[0] if not isinstance(results[0], Exception) else {},
"temporal": results[1] if not isinstance(results[1], Exception) else {},
"emotional": results[2] if not isinstance(results[2], Exception) else {},
"immediate_memory": results[3] if not isinstance(results[3], Exception) else {},
"semantic_memory": results[4] if not isinstance(results[4], Exception) else {},
"situational": results[5] if not isinstance(results[5], Exception) else {},
"world_state": results[6] if not isinstance(results[6], Exception) else {},
}
elapsed_ms = (datetime.now() - start_time).total_seconds() * 1000
logger.info(f"Context assembled in {elapsed_ms:.1f}ms")
return context
@classmethod
async def _tier_1_identity(cls, user_id: str) -> Dict[str, Any]:
"""
TIER 1 β€” Identity (< 5ms, Redis cache)
β†’ user_profile loaded (cached 1hr)
β†’ personality_params computed
β†’ active_hours classification
"""
cache_key = f"user_profile:{user_id}"
# Try cache first
cached = await cache.get(cache_key)
if cached:
return {
"user_profile": cached,
"source": "cache",
"personality_params": cached.get("personality_params", {}),
"is_active_hours": cls._check_active_hours(cached)
}
# Fetch from database
profile = await db.fetch_one("user_profile", {"user_id": user_id})
if profile:
# Cache for 1 hour
await cache.set(cache_key, profile, ttl=3600)
return {
"user_profile": profile,
"source": "database",
"personality_params": profile.get("personality_params", {}),
"is_active_hours": cls._check_active_hours(profile)
}
return {"error": "Profile not found"}
@classmethod
def _check_active_hours(cls, profile: Dict) -> bool:
"""Check if current time is within user's active hours"""
now = datetime.now()
current_time = now.time()
start = profile.get("active_hours_start", "08:00:00")
end = profile.get("active_hours_end", "22:00:00")
# Convert string times to time objects
if isinstance(start, str):
start = datetime.strptime(start, "%H:%M:%S").time()
if isinstance(end, str):
end = datetime.strptime(end, "%H:%M:%S").time()
return start <= current_time <= end
@classmethod
async def _tier_2_temporal(cls, user_id: str) -> Dict[str, Any]:
"""
TIER 2 β€” Temporal (< 2ms)
β†’ current datetime + timezone
β†’ behavioral pattern for this hour/day
β†’ DND window check
"""
now = datetime.now()
hour = now.hour
day_of_week = now.weekday()
# Classify time period
if 5 <= hour < 12:
period = "morning"
elif 12 <= hour < 17:
period = "afternoon"
elif 17 <= hour < 22:
period = "evening"
else:
period = "night"
# Weekend check
is_weekend = day_of_week >= 5
return {
"current_time": now.isoformat(),
"hour": hour,
"day_of_week": day_of_week,
"period": period,
"is_weekend": is_weekend,
"is_dnd": False # Would check against profile
}
@classmethod
async def _tier_3_emotional(cls, user_id: str) -> Dict[str, Any]:
"""
TIER 3 β€” Emotional State (< 10ms, Redis cache)
β†’ 7-day emotion average (cached 1hr)
β†’ today's trajectory
β†’ current stress estimate
"""
cache_key = f"emotional_state:{user_id}"
# Try cache
cached = await cache.get(cache_key)
if cached:
return cached
# Query recent emotion logs
logs = await db.fetch_many(
"emotion_log",
filters={"user_id": user_id},
order_by="recorded_at",
ascending=False,
limit=20
)
if not logs:
return {
"dominant_emotion": "neutral",
"stress_level": "low",
"7_day_average": {},
"today_trajectory": "stable"
}
# Calculate averages
emotions = ["joy", "sadness", "anger", "fear", "surprise", "disgust", "neutral"]
averages = {}
for emotion in emotions:
values = [log.get(emotion, 0) for log in logs if emotion in log]
averages[emotion] = sum(values) / len(values) if values else 0
# Find dominant
dominant = max(averages, key=averages.get)
# Stress classification
stress = logs[0].get("stress_level", "low") if logs else "low"
result = {
"dominant_emotion": dominant,
"stress_level": stress,
"7_day_average": averages,
"today_trajectory": "stable", # Simplified
"latest_emotion": logs[0] if logs else None
}
# Cache for 1 hour
await cache.set(cache_key, result, ttl=3600)
return result
@classmethod
async def _tier_4_immediate_memory(cls, user_id: str, session_id: Optional[str]) -> Dict[str, Any]:
"""
TIER 4 β€” Immediate Memory (< 20ms, Supabase)
β†’ ConversationBufferWindowMemory: last 20 turns
β†’ today's interaction summary
"""
# Get last 20 interactions for this session
filters = {"user_id": user_id}
if session_id:
filters["session_id"] = session_id
interactions = await db.fetch_many(
"interactions",
filters=filters,
order_by="created_at",
ascending=False,
limit=20
)
# Format as conversation turns
turns = []
for interaction in reversed(interactions): # Oldest first
turns.append({
"role": "user",
"content": interaction.get("input_text", ""),
"timestamp": interaction.get("created_at")
})
if interaction.get("response_text"):
turns.append({
"role": "assistant",
"content": interaction.get("response_text", ""),
"timestamp": interaction.get("created_at"),
"personality_mode": interaction.get("personality_mode")
})
return {
"recent_turns": turns,
"turn_count": len(turns),
"session_id": session_id
}
@classmethod
async def _tier_5_semantic_memory(cls, user_id: str, input_text: str) -> Dict[str, Any]:
"""
TIER 5 β€” Semantic Memory (< 50ms, pgvector)
β†’ embed input query (HF Inference API)
β†’ cosine similarity search: top 8 conversations
β†’ cosine similarity search: top 5 knowledge_base
β†’ cosine similarity search: top 3 research_sessions
"""
from app.memory.semantic_search import semantic_search
# Search all semantic memory sources
results = await semantic_search.search_all(
user_id=user_id,
query=input_text,
max_results_per_source=8
)
return {
"knowledge_items": results.get("knowledge", [])[:5],
"news_articles": results.get("news", [])[:3],
"research_sessions": results.get("research", [])[:3],
"source": "semantic_similarity",
"total_results": results.get("total_results", 0),
"embedding_used": True
}
@classmethod
async def _tier_6_situational(cls, user_id: str) -> Dict[str, Any]:
"""
TIER 6 β€” Situational (< 15ms, Supabase + Redis)
β†’ today's tasks + approaching deadlines
β†’ active goals + probability scores
β†’ pending proactive events queued
"""
# Get active tasks
tasks = await db.fetch_many(
"tasks_goals",
filters={"user_id": user_id, "status": "active"},
order_by="due_date",
ascending=True,
limit=10
)
# Get pending events
events = await db.fetch_many(
"event_queue",
filters={"user_id": user_id, "status": "pending"},
limit=10
)
# Categorize tasks
today_tasks = []
approaching_deadlines = []
now = datetime.now()
for task in tasks:
due = task.get("due_date")
if due:
due_date = datetime.fromisoformat(due.replace("Z", "+00:00")) if isinstance(due, str) else due
days_until = (due_date - now).days
if days_until <= 0:
today_tasks.append(task)
elif days_until <= 3:
approaching_deadlines.append(task)
return {
"today_tasks": today_tasks,
"approaching_deadlines": approaching_deadlines,
"active_goals_count": len(tasks),
"pending_events": events,
"event_count": len(events)
}
@classmethod
async def _tier_7_world_state(cls, user_id: str) -> Dict[str, Any]:
"""
TIER 7 β€” World State (< 10ms, Redis cache)
β†’ breaking news last 2 hours (cached)
β†’ financial movements >2% (cached)
β†’ pre-computed morning context if applicable
"""
cache_key = f"world_state:{user_id}"
# Try cache
cached = await cache.get(cache_key)
if cached:
return cached
# Placeholder - would fetch from cached world data
world_state = {
"breaking_news": [],
"financial_alerts": [],
"morning_context": None,
"last_updated": datetime.now().isoformat(),
"note": "World state from daemon cache"
}
# Cache for 10 minutes
await cache.set(cache_key, world_state, ttl=600)
return world_state