masterllm / services /schemas.py
ganesh-vilje's picture
Fix: Add missing generate_message_id function to schemas.py
b1206db
# services/schemas.py
"""
MongoDB Schemas for MasterLLM V3 Architecture
Defines Pydantic models for all MongoDB collections:
- sessions: Session metadata
- messages: Message metadata with S3 references
- pipelines: Pipeline execution metadata
- workflows: Saved workflows
- files: File metadata with S3 references
"""
from typing import Dict, Any, List, Optional
from datetime import datetime
from pydantic import BaseModel, Field
from enum import Enum
import uuid
# ========================
# ENUMS
# ========================
class SessionState(str, Enum):
"""Session states"""
INITIAL = "initial"
PIPELINE_PROPOSED = "pipeline_proposed"
EXECUTING = "executing"
IN_PROGRESS = "in_progress" # Pipeline is running
COMPLETED = "completed" # Pipeline completed successfully
FAILED = "failed" # Pipeline failed
class PipelineStatus(str, Enum):
"""Pipeline execution states"""
PROPOSED = "proposed"
EXECUTING = "executing"
COMPLETED = "completed"
FAILED = "failed"
class ComponentStatus(str, Enum):
"""Component execution states"""
PENDING = "pending"
EXECUTING = "executing"
RUNNING = "running" # Alias for executing
COMPLETED = "completed"
SUCCESS = "success" # Alias for completed
FAILED = "failed"
class MessageRole(str, Enum):
"""Message roles"""
USER = "user"
ASSISTANT = "assistant"
SYSTEM = "system"
# ========================
# COMPONENT MODELS
# ========================
class ComponentMetadata(BaseModel):
"""Component execution metadata within a pipeline"""
component_id: str # Unique identifier for component
component_name: str
order: int
status: ComponentStatus = ComponentStatus.PENDING
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
component_output: Optional[Any] = None # Output from component execution
hasError: bool = False # Error flag
error: Optional[Dict[str, Any]] = None # Error details {error_code, message, details}
success_message: Optional[str] = None # Component-wise success message
metadata: Optional[Dict[str, Any]] = None # Duration, tokens, etc.
class SessionStats(BaseModel):
"""Session statistics"""
message_count: int = 0
pipeline_run_count: int = 0
total_tokens_used: int = 0
# ========================
# COLLECTION SCHEMAS
# ========================
class SessionSchema(BaseModel):
"""MongoDB schema for sessions collection"""
session_id: str
user_id: Optional[str] = None
created_at: datetime
last_activity: datetime
state: SessionState = SessionState.INITIAL
chat_name: Optional[str] = None
chat_name_generated_at: Optional[datetime] = None
chat_name_model: Optional[str] = None
current_file_id: Optional[str] = None # Reference to files collection
proposed_pipeline_id: Optional[str] = None # Reference to pipelines collection
stats: SessionStats = Field(default_factory=SessionStats)
metadata: Dict[str, Any] = Field(default_factory=dict)
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() + "Z" if v else None
}
class MessageSchema(BaseModel):
"""MongoDB schema for messages collection - METADATA ONLY"""
message_id: str
session_id: str
role: MessageRole
timestamp: datetime
s3_key: str # Path to full message content in S3
s3_bucket: str
content_preview: str # First 200 chars for quick display
has_file: bool = False
file_id: Optional[str] = None # Reference to files collection
metadata: Dict[str, Any] = Field(default_factory=dict)
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() + "Z" if v else None
}
class PipelineSchema(BaseModel):
"""MongoDB schema for pipelines collection - METADATA ONLY"""
execution_id: str
session_id: str
pipeline_name: str
created_at: datetime
created_from: str # "request" or "edit"
created_by_message: str # User's message that triggered pipeline
status: PipelineStatus
pipeline_definition_s3_key: str # Full pipeline definition in S3
components: List[ComponentMetadata] # Component metadata only
final_output_s3_key: Optional[str] = None # Contains ALL component outputs
output_id: Optional[str] = None # ID for downloading output
final_output_presigned_url: Optional[str] = None
final_output_presigned_expires_at: Optional[datetime] = None
executed_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
executor: Optional[str] = None # "bedrock", "crewai", "langchain"
result: Optional[str] = None # Renamed from result_preview - output if completed
hasError: bool = False # Error flag
error: Optional[Dict[str, Any]] = None # Error details
metadata: Dict[str, Any] = Field(default_factory=dict)
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() + "Z" if v else None
}
class WorkflowSchema(BaseModel):
"""MongoDB schema for workflows collection - METADATA ONLY"""
workflow_id: str
session_id: str
saved_at: datetime
saved_by_user_message: str # Message when user confirmed save
pipeline_definition_s3_key: str # Full workflow/pipeline definition in S3
pipeline_name: str
pipeline_preview: str # "Extract text → Summarize → Translate"
user_confirmed: bool = True # User explicitly confirmed save
tags: List[str] = Field(default_factory=list)
source_pipeline_id: Optional[str] = None # Pipeline ID this workflow came from
pipeline_status: Optional[str] = None # Status when saved: "proposed", "completed"
metadata: Dict[str, Any] = Field(default_factory=dict)
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() + "Z" if v else None
}
class FileSchema(BaseModel):
"""MongoDB schema for files collection - METADATA ONLY"""
file_id: str
session_id: str
uploaded_at: datetime
file_name: str
file_size: int
mime_type: str
s3_bucket: str
s3_key: str
presigned_url: str # Cached, 7-day expiry
presigned_expires_at: datetime
metadata: Dict[str, Any] = Field(default_factory=dict)
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() + "Z" if v else None
}
# ========================
# S3 CONTENT SCHEMAS
# ========================
class MessageS3Content(BaseModel):
"""Schema for message content stored in S3"""
message_id: str
role: str
content: str
timestamp: str
file_data: Dict[str, Any] = Field(default_factory=dict)
class ComponentOutputS3(BaseModel):
"""Schema for individual component output in final_output.json"""
component_id: str
name: str
order: int
status: str
component_output: Optional[Any] = None
hasError: bool = False
error: Optional[Dict[str, Any]] = None
success_message: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
result: Optional[Dict[str, Any]] = None
class FinalOutputS3(BaseModel):
"""Schema for final_output.json in S3"""
workflow_name: str
workflow_status: str # "completed" or "failed"
last_node_output: Optional[str] = None # Output from last successful node ONLY
components: List[ComponentOutputS3]
execution_time: float
timestamp: str
class PipelineDefinitionS3(BaseModel):
"""Schema for pipeline definition stored in S3"""
pipeline_name: str
pipeline_steps: List[Dict[str, Any]] # Component configurations
target_lang: Optional[str] = None
reason: str
metadata: Dict[str, Any] = Field(default_factory=dict)
_generator: str # "bedrock" or "gemini"
_model: str
created_at: str
# ========================
# UTILITY FUNCTIONS
# ========================
def generate_component_id(pipeline_id: str, index: int) -> str:
"""Generate component ID in format: pipeline_id_index"""
return f"{pipeline_id}_{index}"
def generate_message_id() -> str:
"""Generate unique message ID using MongoDB ObjectId format"""
from bson import ObjectId
return str(ObjectId())
def generate_output_id() -> str:
"""Generate unique output ID for downloads"""
return f"output_{uuid.uuid4().hex[:12]}"
def datetime_to_iso(dt: Optional[datetime]) -> Optional[str]:
"""Convert datetime to ISO string"""
return dt.isoformat() + "Z" if dt else None
def iso_to_datetime(iso_str: Optional[str]) -> Optional[datetime]:
"""Convert ISO string to datetime"""
if not iso_str:
return None
# Remove 'Z' if present
iso_str = iso_str.rstrip('Z')
return datetime.fromisoformat(iso_str)
def schema_to_dict(schema: BaseModel) -> Dict[str, Any]:
"""Convert Pydantic schema to dict with datetime serialization"""
return schema.model_dump(mode='json')
# ========================
# INDEX DEFINITIONS
# ========================
# Define indexes for MongoDB collections
COLLECTION_INDEXES = {
"sessions": [
{"keys": [("session_id", 1)], "unique": True},
{"keys": [("last_activity", -1)]},
{"keys": [("user_id", 1)]},
],
"messages": [
{"keys": [("message_id", 1)], "unique": True},
{"keys": [("session_id", 1), ("timestamp", 1)]}, # Chronological retrieval
],
"pipelines": [
{"keys": [("execution_id", 1)], "unique": True},
{"keys": [("session_id", 1), ("created_at", -1)]},
{"keys": [("status", 1)]},
],
"workflows": [
{"keys": [("workflow_id", 1)], "unique": True},
{"keys": [("saved_at", -1)]},
{"keys": [("session_id", 1)]},
],
"files": [
{"keys": [("file_id", 1)], "unique": True},
{"keys": [("session_id", 1)]},
],
}
def create_indexes(db):
"""
Create all required indexes on MongoDB database
Args:
db: pymongo database instance
"""
for collection_name, indexes in COLLECTION_INDEXES.items():
collection = db[collection_name]
for index_def in indexes:
collection.create_index(
index_def["keys"],
unique=index_def.get("unique", False)
)
print("✅ All MongoDB indexes created successfully")