Spaces:
Sleeping
Sleeping
File size: 6,324 Bytes
942216e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
"""
MCP Core - Interaction Processor
This module provides the core functionality for processing interactions
between users and models through the Model Context Protocol.
"""
from typing import Dict, Any, Optional, List, Union
import asyncio
from .context import Context, ContextManager
from .model_adapters import ModelAdapter, ModelRegistry
from .protocol import MCPRequest, MCPResponse, InteractionType
class InteractionProcessor:
"""
Processes interactions between users and models.
This class handles the flow of interactions through the MCP system,
managing context, routing to appropriate models, and formatting responses.
"""
def __init__(
self,
context_manager: ContextManager,
model_registry: ModelRegistry
):
"""
Initialize a new InteractionProcessor.
Args:
context_manager: The ContextManager to use for context operations.
model_registry: The ModelRegistry to use for model operations.
"""
self.context_manager = context_manager
self.model_registry = model_registry
async def process_interaction(
self,
request: MCPRequest,
model_name: str
) -> MCPResponse:
"""
Process an interaction request.
Args:
request: The MCPRequest to process.
model_name: Name of the model adapter to use.
Returns:
An MCPResponse containing the model's response.
Raises:
ValueError: If the context ID is invalid or the model is not found.
"""
# Get or create context
context_id = request.context_id
context = None
if context_id:
# Use existing context
context = self.context_manager.get_context(context_id)
if not context:
raise ValueError(f"Context with ID {context_id} not found")
else:
# Create new context
context = self.context_manager.create_context(
metadata={"interaction_type": request.interaction_type}
)
context_id = context.context_id
# Get model adapter
model_adapter = self.model_registry.get_adapter(model_name)
if not model_adapter:
raise ValueError(f"Model adapter '{model_name}' not found")
# Process the request with the model
model_input = {
"text": request.content.get("text", ""),
"interaction_type": request.interaction_type,
"format": request.format,
**request.metadata
}
# Include context state in model input
model_context = {
"state": context.state,
"history": context.get_history(5) # Last 5 interactions
}
# Process with model
model_output = await model_adapter.process(model_input, model_context)
# Update context with this interaction
context.add_interaction(request.dict(), model_output)
# Extract educational metadata if available
educational_metadata = model_output.get("educational_metadata", {})
if educational_metadata:
# Update context state with educational information
context.update_state({
"educational": {
"last_topics": educational_metadata.get("topics", []),
"complexity_level": educational_metadata.get("complexity_level", ""),
"suggested_follow_ups": educational_metadata.get("suggested_follow_ups", [])
}
})
# Create response
response = MCPResponse(
context_id=context_id,
interaction_type=request.interaction_type,
content={
"text": model_output.get("text", ""),
"educational_metadata": educational_metadata
},
format=request.format,
metadata=request.metadata,
model_info={
"model_id": model_output.get("model_id", model_name),
"confidence": model_output.get("confidence", 1.0)
}
)
return response
class BatchProcessor:
"""
Processes batches of interactions.
This class provides functionality for processing multiple interactions
in parallel or sequence.
"""
def __init__(self, interaction_processor: InteractionProcessor):
"""
Initialize a new BatchProcessor.
Args:
interaction_processor: The InteractionProcessor to use.
"""
self.interaction_processor = interaction_processor
async def process_batch(
self,
requests: List[Dict[str, Any]],
model_name: str,
parallel: bool = True
) -> List[Dict[str, Any]]:
"""
Process a batch of interaction requests.
Args:
requests: List of request dictionaries.
model_name: Name of the model adapter to use.
parallel: Whether to process requests in parallel.
Returns:
List of response dictionaries.
"""
if parallel:
# Process requests in parallel
tasks = [
self.interaction_processor.process_interaction(
MCPRequest(**request),
model_name
)
for request in requests
]
responses = await asyncio.gather(*tasks)
else:
# Process requests sequentially
responses = []
for request in requests:
response = await self.interaction_processor.process_interaction(
MCPRequest(**request),
model_name
)
responses.append(response)
# Convert responses to dictionaries
return [response.dict() for response in responses]
|