""" MCP Core - Interaction Processor This module provides the core functionality for processing interactions between users and models through the Model Context Protocol. """ from typing import Dict, Any, Optional, List, Union import asyncio from .context import Context, ContextManager from .model_adapters import ModelAdapter, ModelRegistry from .protocol import MCPRequest, MCPResponse, InteractionType class InteractionProcessor: """ Processes interactions between users and models. This class handles the flow of interactions through the MCP system, managing context, routing to appropriate models, and formatting responses. """ def __init__( self, context_manager: ContextManager, model_registry: ModelRegistry ): """ Initialize a new InteractionProcessor. Args: context_manager: The ContextManager to use for context operations. model_registry: The ModelRegistry to use for model operations. """ self.context_manager = context_manager self.model_registry = model_registry async def process_interaction( self, request: MCPRequest, model_name: str ) -> MCPResponse: """ Process an interaction request. Args: request: The MCPRequest to process. model_name: Name of the model adapter to use. Returns: An MCPResponse containing the model's response. Raises: ValueError: If the context ID is invalid or the model is not found. """ # Get or create context context_id = request.context_id context = None if context_id: # Use existing context context = self.context_manager.get_context(context_id) if not context: raise ValueError(f"Context with ID {context_id} not found") else: # Create new context context = self.context_manager.create_context( metadata={"interaction_type": request.interaction_type} ) context_id = context.context_id # Get model adapter model_adapter = self.model_registry.get_adapter(model_name) if not model_adapter: raise ValueError(f"Model adapter '{model_name}' not found") # Process the request with the model model_input = { "text": request.content.get("text", ""), "interaction_type": request.interaction_type, "format": request.format, **request.metadata } # Include context state in model input model_context = { "state": context.state, "history": context.get_history(5) # Last 5 interactions } # Process with model model_output = await model_adapter.process(model_input, model_context) # Update context with this interaction context.add_interaction(request.dict(), model_output) # Extract educational metadata if available educational_metadata = model_output.get("educational_metadata", {}) if educational_metadata: # Update context state with educational information context.update_state({ "educational": { "last_topics": educational_metadata.get("topics", []), "complexity_level": educational_metadata.get("complexity_level", ""), "suggested_follow_ups": educational_metadata.get("suggested_follow_ups", []) } }) # Create response response = MCPResponse( context_id=context_id, interaction_type=request.interaction_type, content={ "text": model_output.get("text", ""), "educational_metadata": educational_metadata }, format=request.format, metadata=request.metadata, model_info={ "model_id": model_output.get("model_id", model_name), "confidence": model_output.get("confidence", 1.0) } ) return response class BatchProcessor: """ Processes batches of interactions. This class provides functionality for processing multiple interactions in parallel or sequence. """ def __init__(self, interaction_processor: InteractionProcessor): """ Initialize a new BatchProcessor. Args: interaction_processor: The InteractionProcessor to use. """ self.interaction_processor = interaction_processor async def process_batch( self, requests: List[Dict[str, Any]], model_name: str, parallel: bool = True ) -> List[Dict[str, Any]]: """ Process a batch of interaction requests. Args: requests: List of request dictionaries. model_name: Name of the model adapter to use. parallel: Whether to process requests in parallel. Returns: List of response dictionaries. """ if parallel: # Process requests in parallel tasks = [ self.interaction_processor.process_interaction( MCPRequest(**request), model_name ) for request in requests ] responses = await asyncio.gather(*tasks) else: # Process requests sequentially responses = [] for request in requests: response = await self.interaction_processor.process_interaction( MCPRequest(**request), model_name ) responses.append(response) # Convert responses to dictionaries return [response.dict() for response in responses]