Spaces:
Sleeping
Sleeping
| """ | |
| MCP Core - Interaction Processor | |
| This module provides the core functionality for processing interactions | |
| between users and models through the Model Context Protocol. | |
| """ | |
| from typing import Dict, Any, Optional, List, Union | |
| import asyncio | |
| from .context import Context, ContextManager | |
| from .model_adapters import ModelAdapter, ModelRegistry | |
| from .protocol import MCPRequest, MCPResponse, InteractionType | |
| class InteractionProcessor: | |
| """ | |
| Processes interactions between users and models. | |
| This class handles the flow of interactions through the MCP system, | |
| managing context, routing to appropriate models, and formatting responses. | |
| """ | |
| def __init__( | |
| self, | |
| context_manager: ContextManager, | |
| model_registry: ModelRegistry | |
| ): | |
| """ | |
| Initialize a new InteractionProcessor. | |
| Args: | |
| context_manager: The ContextManager to use for context operations. | |
| model_registry: The ModelRegistry to use for model operations. | |
| """ | |
| self.context_manager = context_manager | |
| self.model_registry = model_registry | |
| async def process_interaction( | |
| self, | |
| request: MCPRequest, | |
| model_name: str | |
| ) -> MCPResponse: | |
| """ | |
| Process an interaction request. | |
| Args: | |
| request: The MCPRequest to process. | |
| model_name: Name of the model adapter to use. | |
| Returns: | |
| An MCPResponse containing the model's response. | |
| Raises: | |
| ValueError: If the context ID is invalid or the model is not found. | |
| """ | |
| # Get or create context | |
| context_id = request.context_id | |
| context = None | |
| if context_id: | |
| # Use existing context | |
| context = self.context_manager.get_context(context_id) | |
| if not context: | |
| raise ValueError(f"Context with ID {context_id} not found") | |
| else: | |
| # Create new context | |
| context = self.context_manager.create_context( | |
| metadata={"interaction_type": request.interaction_type} | |
| ) | |
| context_id = context.context_id | |
| # Get model adapter | |
| model_adapter = self.model_registry.get_adapter(model_name) | |
| if not model_adapter: | |
| raise ValueError(f"Model adapter '{model_name}' not found") | |
| # Process the request with the model | |
| model_input = { | |
| "text": request.content.get("text", ""), | |
| "interaction_type": request.interaction_type, | |
| "format": request.format, | |
| **request.metadata | |
| } | |
| # Include context state in model input | |
| model_context = { | |
| "state": context.state, | |
| "history": context.get_history(5) # Last 5 interactions | |
| } | |
| # Process with model | |
| model_output = await model_adapter.process(model_input, model_context) | |
| # Update context with this interaction | |
| context.add_interaction(request.dict(), model_output) | |
| # Extract educational metadata if available | |
| educational_metadata = model_output.get("educational_metadata", {}) | |
| if educational_metadata: | |
| # Update context state with educational information | |
| context.update_state({ | |
| "educational": { | |
| "last_topics": educational_metadata.get("topics", []), | |
| "complexity_level": educational_metadata.get("complexity_level", ""), | |
| "suggested_follow_ups": educational_metadata.get("suggested_follow_ups", []) | |
| } | |
| }) | |
| # Create response | |
| response = MCPResponse( | |
| context_id=context_id, | |
| interaction_type=request.interaction_type, | |
| content={ | |
| "text": model_output.get("text", ""), | |
| "educational_metadata": educational_metadata | |
| }, | |
| format=request.format, | |
| metadata=request.metadata, | |
| model_info={ | |
| "model_id": model_output.get("model_id", model_name), | |
| "confidence": model_output.get("confidence", 1.0) | |
| } | |
| ) | |
| return response | |
| class BatchProcessor: | |
| """ | |
| Processes batches of interactions. | |
| This class provides functionality for processing multiple interactions | |
| in parallel or sequence. | |
| """ | |
| def __init__(self, interaction_processor: InteractionProcessor): | |
| """ | |
| Initialize a new BatchProcessor. | |
| Args: | |
| interaction_processor: The InteractionProcessor to use. | |
| """ | |
| self.interaction_processor = interaction_processor | |
| async def process_batch( | |
| self, | |
| requests: List[Dict[str, Any]], | |
| model_name: str, | |
| parallel: bool = True | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Process a batch of interaction requests. | |
| Args: | |
| requests: List of request dictionaries. | |
| model_name: Name of the model adapter to use. | |
| parallel: Whether to process requests in parallel. | |
| Returns: | |
| List of response dictionaries. | |
| """ | |
| if parallel: | |
| # Process requests in parallel | |
| tasks = [ | |
| self.interaction_processor.process_interaction( | |
| MCPRequest(**request), | |
| model_name | |
| ) | |
| for request in requests | |
| ] | |
| responses = await asyncio.gather(*tasks) | |
| else: | |
| # Process requests sequentially | |
| responses = [] | |
| for request in requests: | |
| response = await self.interaction_processor.process_interaction( | |
| MCPRequest(**request), | |
| model_name | |
| ) | |
| responses.append(response) | |
| # Convert responses to dictionaries | |
| return [response.dict() for response in responses] | |