Spaces:
Sleeping
Sleeping
| """ | |
| Collaborative Agents - Agents designed for negotiation and collaboration | |
| This module implements agents that are specifically designed to work with | |
| the negotiation system, demonstrating collaborative AI patterns and | |
| multi-agent cooperation. | |
| """ | |
| import asyncio | |
| import uuid | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any, Union | |
| from decimal import Decimal | |
| from ..core.agent_negotiation import ( | |
| NegotiatingAgent, NegotiationMessage, NegotiationProposal, | |
| MessageType, NegotiationStatus, ConflictType | |
| ) | |
| from ..core.trip_orchestration import ( | |
| AgentInterface, AgentResult, AgentStatus, OrchestrationContext | |
| ) | |
| class CollaborativeFlightAgent(NegotiatingAgent, AgentInterface): | |
| """ | |
| Flight agent that collaborates with other agents through negotiation. | |
| This agent demonstrates collaborative AI patterns by: | |
| - Engaging in budget negotiations with other agents | |
| - Sharing constraints and reasoning transparently | |
| - Adapting proposals based on collaborative feedback | |
| - Participating in consensus building | |
| """ | |
| def __init__(self, agent_id: str = "collaborative_flight_agent"): | |
| NegotiatingAgent.__init__(self, agent_id) | |
| AgentInterface.__init__(self, agent_id) | |
| self.negotiation_history: List[NegotiationMessage] = [] | |
| self.collaborative_preferences: Dict[str, Any] = {} | |
| self.constraint_violations: List[str] = [] | |
| async def execute(self, context: OrchestrationContext) -> AgentResult: | |
| """Execute flight search with collaborative negotiation capabilities.""" | |
| try: | |
| # Extract requirements from context | |
| origin = context.get_state("origin", "New York") | |
| destination = context.get_state("destination", "Los Angeles") | |
| departure_date = context.get_state("departure_date", datetime.now() + timedelta(days=7)) | |
| passengers = context.get_state("passengers", 1) | |
| budget = context.get_state("flight_budget", 500) | |
| # Check for collaborative negotiations | |
| negotiation_messages = await self._get_negotiation_messages() | |
| # Adapt budget based on negotiations | |
| adapted_budget = await self._adapt_budget_from_negotiations(budget, negotiation_messages) | |
| # Create collaborative proposal | |
| proposal = await self._create_initial_proposal(context) | |
| # Share proposal with other agents | |
| await self._share_proposal_with_agents(proposal) | |
| # Search for flights with collaborative constraints | |
| flights = await self._search_flights_collaboratively( | |
| origin, destination, departure_date, passengers, adapted_budget, context | |
| ) | |
| if not flights: | |
| return AgentResult( | |
| agent_id=self.agent_id, | |
| status=AgentStatus.FAILED, | |
| error=Exception("No flights found matching collaborative constraints") | |
| ) | |
| # Select best flight considering collaborative feedback | |
| selected_flight = await self._select_flight_collaboratively(flights, context, negotiation_messages) | |
| # Simulate booking | |
| booking_reference = f"CF{str(uuid.uuid4())[:8].upper()}" | |
| # Update context with collaborative results | |
| await self._update_context_collaboratively(context, selected_flight, booking_reference) | |
| # Calculate total cost | |
| total_cost = float(selected_flight.price) * passengers | |
| return AgentResult( | |
| agent_id=self.agent_id, | |
| status=AgentStatus.COMPLETED, | |
| data={ | |
| "flight": selected_flight, | |
| "booking_reference": booking_reference, | |
| "total_cost": total_cost, | |
| "passengers": passengers, | |
| "collaborative_negotiations": len(negotiation_messages), | |
| "constraint_violations": self.constraint_violations, | |
| "collaborative_adaptations": self._get_collaborative_adaptations() | |
| }, | |
| execution_time=1.5 | |
| ) | |
| except Exception as e: | |
| return AgentResult( | |
| agent_id=self.agent_id, | |
| status=AgentStatus.FAILED, | |
| error=e, | |
| execution_time=1.0 | |
| ) | |
| async def _create_initial_proposal(self, context: OrchestrationContext) -> NegotiationProposal: | |
| """Create initial proposal for collaborative negotiation.""" | |
| # Analyze context for proposal creation | |
| budget = context.get_state("flight_budget", 500) | |
| origin = context.get_state("origin", "New York") | |
| destination = context.get_state("destination", "Los Angeles") | |
| passengers = context.get_state("passengers", 1) | |
| # Create collaborative proposal | |
| proposal = NegotiationProposal( | |
| proposer_id=self.agent_id, | |
| proposal_type="flight_budget_allocation", | |
| details={ | |
| "flight_budget": budget, | |
| "origin": origin, | |
| "destination": destination, | |
| "passengers": passengers, | |
| "preferred_options": ["direct_flights", "morning_departures"], | |
| "flexible_options": ["1_stop_flights", "afternoon_departures"] | |
| }, | |
| reasoning="Direct flights preferred for time efficiency, but flexible with 1-stop for cost optimization", | |
| constraints=["max_stops_2", "morning_departure_preferred", "major_airlines_only"], | |
| alternatives=[ | |
| { | |
| "budget": budget * 0.8, | |
| "constraints": ["max_stops_1", "flexible_departure"], | |
| "reasoning": "Accept 1-stop flights to reduce budget impact" | |
| }, | |
| { | |
| "budget": budget * 1.2, | |
| "constraints": ["direct_flights_only", "morning_departure"], | |
| "reasoning": "Premium budget for guaranteed direct flights" | |
| } | |
| ], | |
| trade_offs={ | |
| "time_vs_cost": "Direct flights save 2-3 hours but cost 30% more", | |
| "flexibility_vs_guarantee": "Flexible dates can save 20% but reduce options" | |
| }, | |
| confidence_score=0.8 | |
| ) | |
| return proposal | |
| async def _evaluate_proposal_compatibility(self, proposal: Dict[str, Any], context: OrchestrationContext) -> Dict[str, Any]: | |
| """Evaluate compatibility with another agent's proposal.""" | |
| evaluation = { | |
| "compatibility_score": 0.0, | |
| "conflicts": [], | |
| "synergies": [], | |
| "recommendations": [] | |
| } | |
| # Check budget compatibility | |
| if "budget" in proposal: | |
| flight_budget = context.get_state("flight_budget", 500) | |
| if proposal["budget"] <= flight_budget: | |
| evaluation["compatibility_score"] += 0.4 | |
| evaluation["synergies"].append("Budget allocation is compatible") | |
| else: | |
| evaluation["compatibility_score"] -= 0.2 | |
| evaluation["conflicts"].append("Budget exceeds flight allocation") | |
| # Check constraint compatibility | |
| if "constraints" in proposal: | |
| flight_constraints = ["max_stops_2", "morning_departure_preferred"] | |
| for constraint in proposal["constraints"]: | |
| if constraint in flight_constraints: | |
| evaluation["compatibility_score"] += 0.1 | |
| evaluation["synergies"].append(f"Shared constraint: {constraint}") | |
| # Generate recommendations | |
| if evaluation["compatibility_score"] > 0.5: | |
| evaluation["recommendations"].append("Proposal is highly compatible") | |
| elif evaluation["compatibility_score"] > 0.0: | |
| evaluation["recommendations"].append("Proposal has minor conflicts but is workable") | |
| else: | |
| evaluation["recommendations"].append("Proposal has significant conflicts requiring negotiation") | |
| return evaluation | |
| async def _evaluate_consensus_options(self, options: List[Dict[str, Any]], context: OrchestrationContext) -> Dict[str, Any]: | |
| """Evaluate consensus options and return preferred choice.""" | |
| best_option = None | |
| best_score = -1 | |
| for option in options: | |
| score = await self._score_consensus_option(option, context) | |
| if score > best_score: | |
| best_score = score | |
| best_option = option | |
| return { | |
| "preferred_option": best_option, | |
| "confidence": best_score, | |
| "reasoning": f"Option scored {best_score:.2f} based on flight agent priorities" | |
| } | |
| async def _score_consensus_option(self, option: Dict[str, Any], context: OrchestrationContext) -> float: | |
| """Score a consensus option based on flight agent priorities.""" | |
| score = 0.0 | |
| # Budget scoring | |
| if "flight_budget" in option: | |
| budget = context.get_state("flight_budget", 500) | |
| if option["flight_budget"] >= budget * 0.8: | |
| score += 0.4 | |
| # Time preference scoring | |
| if "departure_time" in option: | |
| if "morning" in str(option["departure_time"]).lower(): | |
| score += 0.3 | |
| # Stop preference scoring | |
| if "max_stops" in option: | |
| if option["max_stops"] <= 1: | |
| score += 0.3 | |
| return min(1.0, score) | |
| async def _get_negotiation_messages(self) -> List[NegotiationMessage]: | |
| """Get pending negotiation messages for this agent.""" | |
| if self.negotiation_broker: | |
| return await self.negotiation_broker.get_messages(self.agent_id) | |
| return [] | |
| async def _adapt_budget_from_negotiations(self, original_budget: float, messages: List[NegotiationMessage]) -> float: | |
| """Adapt budget based on collaborative negotiations.""" | |
| adapted_budget = original_budget | |
| for message in messages: | |
| if message.message_type == MessageType.COUNTER_PROPOSAL: | |
| if "flight_budget" in message.content.get("counter_proposal", {}): | |
| proposed_budget = message.content["counter_proposal"]["flight_budget"] | |
| # Consider the proposal but don't automatically accept | |
| adapted_budget = min(adapted_budget, proposed_budget * 1.1) # 10% buffer | |
| return adapted_budget | |
| async def _search_flights_collaboratively(self, origin: str, destination: str, | |
| departure_date: datetime, passengers: int, | |
| budget: float, context: OrchestrationContext) -> List[Any]: | |
| """Search for flights with collaborative constraints.""" | |
| # Get collaborative constraints from negotiations | |
| messages = await self._get_negotiation_messages() | |
| collaborative_constraints = self._extract_collaborative_constraints(messages) | |
| # Create mock flights with collaborative awareness | |
| flights = [] | |
| # Base flight options | |
| base_price = budget / passengers | |
| # Direct flight option (preferred) | |
| if base_price >= 400: | |
| flights.append(self._create_mock_flight( | |
| origin, destination, departure_date, base_price, direct=True | |
| )) | |
| # 1-stop flight option (collaborative compromise) | |
| if base_price >= 300: | |
| flights.append(self._create_mock_flight( | |
| origin, destination, departure_date, base_price * 0.7, direct=False, stops=1 | |
| )) | |
| # Apply collaborative constraints | |
| filtered_flights = self._apply_collaborative_constraints(flights, collaborative_constraints) | |
| return filtered_flights | |
| def _create_mock_flight(self, origin: str, destination: str, departure_date: datetime, | |
| price: float, direct: bool = True, stops: int = 0) -> Any: | |
| """Create a mock flight object.""" | |
| duration = 5.0 if direct else 7.0 | |
| return type('Flight', (), { | |
| 'origin': origin, | |
| 'destination': destination, | |
| 'price': price, | |
| 'departure_time': departure_date.replace(hour=8 if direct else 10), | |
| 'arrival_time': departure_date.replace(hour=8 + int(duration) if direct else 10 + int(duration)), | |
| 'duration_hours': duration, | |
| 'stops': stops, | |
| 'direct': direct, | |
| 'airline': 'Mock Airlines' | |
| })() | |
| def _extract_collaborative_constraints(self, messages: List[NegotiationMessage]) -> Dict[str, Any]: | |
| """Extract collaborative constraints from negotiation messages.""" | |
| constraints = {} | |
| for message in messages: | |
| if message.message_type == MessageType.CONSTRAINT: | |
| constraints.update(message.content.get("constraints", {})) | |
| return constraints | |
| def _apply_collaborative_constraints(self, flights: List[Any], constraints: Dict[str, Any]) -> List[Any]: | |
| """Apply collaborative constraints to flight options.""" | |
| filtered_flights = flights.copy() | |
| # Apply time constraints | |
| if "max_departure_time" in constraints: | |
| max_time = constraints["max_departure_time"] | |
| filtered_flights = [f for f in filtered_flights if f.departure_time.hour <= max_time] | |
| # Apply budget constraints | |
| if "max_flight_cost" in constraints: | |
| max_cost = constraints["max_flight_cost"] | |
| filtered_flights = [f for f in filtered_flights if f.price <= max_cost] | |
| return filtered_flights | |
| async def _select_flight_collaboratively(self, flights: List[Any], context: OrchestrationContext, | |
| messages: List[NegotiationMessage]) -> Any: | |
| """Select best flight considering collaborative feedback.""" | |
| # Score flights based on collaborative preferences | |
| best_flight = None | |
| best_score = -1 | |
| for flight in flights: | |
| score = await self._score_flight_collaboratively(flight, context, messages) | |
| if score > best_score: | |
| best_score = score | |
| best_flight = flight | |
| return best_flight | |
| async def _score_flight_collaboratively(self, flight: Any, context: OrchestrationContext, | |
| messages: List[NegotiationMessage]) -> float: | |
| """Score flight based on collaborative preferences.""" | |
| score = 0.0 | |
| # Base scoring | |
| if flight.direct: | |
| score += 0.4 | |
| else: | |
| score += 0.2 | |
| # Budget scoring | |
| budget = context.get_state("flight_budget", 500) | |
| if flight.price <= budget: | |
| score += 0.3 | |
| else: | |
| score += 0.1 | |
| self.constraint_violations.append(f"Flight price ${flight.price} exceeds budget ${budget}") | |
| # Collaborative feedback scoring | |
| for message in messages: | |
| if message.message_type == MessageType.COUNTER_PROPOSAL: | |
| if "flight_preferences" in message.content: | |
| preferences = message.content["flight_preferences"] | |
| if preferences.get("prefer_direct") and flight.direct: | |
| score += 0.1 | |
| if preferences.get("prefer_morning") and flight.departure_time.hour < 12: | |
| score += 0.1 | |
| return score | |
| async def _share_proposal_with_agents(self, proposal: NegotiationProposal): | |
| """Share proposal with other agents through negotiation broker.""" | |
| if self.negotiation_broker: | |
| # Create negotiation message | |
| message = NegotiationMessage( | |
| message_type=MessageType.PROPOSAL, | |
| sender_id=self.agent_id, | |
| recipient_id="all", | |
| content={ | |
| "proposal_id": proposal.proposal_id, | |
| "proposal_type": proposal.proposal_type, | |
| "details": proposal.details, | |
| "reasoning": proposal.reasoning, | |
| "constraints": proposal.constraints, | |
| "alternatives": proposal.alternatives, | |
| "collaborative": True | |
| } | |
| ) | |
| await self.negotiation_broker.send_message(message) | |
| async def _update_context_collaboratively(self, context: OrchestrationContext, | |
| selected_flight: Any, booking_reference: str): | |
| """Update context with collaborative results.""" | |
| # Update basic context | |
| context.update_state("flight_booked", True) | |
| context.update_state("flight_booking_reference", booking_reference) | |
| context.update_state("selected_flight", selected_flight) | |
| # Share collaborative constraints with other agents | |
| collaborative_constraints = { | |
| "arrival_time": selected_flight.arrival_time, | |
| "departure_time": selected_flight.departure_time, | |
| "airport_code": selected_flight.destination, | |
| "flight_duration": selected_flight.duration_hours | |
| } | |
| context.update_state("collaborative_constraints", collaborative_constraints) | |
| # Update budget remaining | |
| remaining_budget = float(context.budget_limit) - float(selected_flight.price * context.get_state("passengers", 1)) | |
| context.update_state("remaining_budget_after_flight", remaining_budget) | |
| def _get_collaborative_adaptations(self) -> List[str]: | |
| """Get list of collaborative adaptations made.""" | |
| adaptations = [] | |
| if len(self.negotiation_history) > 0: | |
| adaptations.append("Budget adapted based on negotiations") | |
| if self.constraint_violations: | |
| adaptations.append("Constraints relaxed for collaboration") | |
| return adaptations | |
| async def rollback(self, context: OrchestrationContext) -> bool: | |
| """Rollback collaborative flight booking.""" | |
| try: | |
| if context.get_state("flight_booked", False): | |
| print(f" Cancelling collaborative flight booking") | |
| context.update_state("flight_booked", False) | |
| context.update_state("flight_booking_reference", None) | |
| context.update_state("selected_flight", None) | |
| context.update_state("collaborative_constraints", None) | |
| context.update_state("remaining_budget_after_flight", None) | |
| # Clear negotiation history | |
| self.negotiation_history = [] | |
| self.constraint_violations = [] | |
| return True | |
| return True | |
| except Exception as e: | |
| print(f" Collaborative flight rollback error: {e}") | |
| return False | |
| def get_dependencies(self) -> List[Any]: | |
| """Collaborative flight agent has no dependencies.""" | |
| return [] | |
| class CollaborativeHotelAgent(NegotiatingAgent, AgentInterface): | |
| """ | |
| Hotel agent that collaborates with other agents through negotiation. | |
| This agent demonstrates collaborative AI patterns by: | |
| - Negotiating budget allocations with flight and POI agents | |
| - Adapting hotel selection based on collaborative constraints | |
| - Participating in consensus building for optimal solutions | |
| """ | |
| def __init__(self, agent_id: str = "collaborative_hotel_agent"): | |
| NegotiatingAgent.__init__(self, agent_id) | |
| AgentInterface.__init__(self, agent_id) | |
| self.negotiation_history: List[NegotiationMessage] = [] | |
| self.collaborative_adaptations: List[str] = [] | |
| async def execute(self, context: OrchestrationContext) -> AgentResult: | |
| """Execute hotel search with collaborative negotiation.""" | |
| try: | |
| # Extract requirements | |
| destination = context.get_state("destination", "Los Angeles") | |
| check_in = context.get_state("departure_date", datetime.now() + timedelta(days=7)) | |
| check_out = context.get_state("return_date", datetime.now() + timedelta(days=10)) | |
| guests = context.get_state("guests", 1) | |
| budget = context.get_state("hotel_budget", 200) | |
| # Get collaborative constraints from flight agent | |
| collaborative_constraints = context.get_state("collaborative_constraints", {}) | |
| # Adapt budget based on negotiations | |
| adapted_budget = await self._adapt_budget_collaboratively(budget, context) | |
| # Search for hotels with collaborative awareness | |
| hotels = await self._search_hotels_collaboratively( | |
| destination, check_in, check_out, guests, adapted_budget, | |
| collaborative_constraints, context | |
| ) | |
| if not hotels: | |
| return AgentResult( | |
| agent_id=self.agent_id, | |
| status=AgentStatus.FAILED, | |
| error=Exception("No hotels found matching collaborative requirements") | |
| ) | |
| # Select hotel considering collaborative feedback | |
| selected_hotel = await self._select_hotel_collaboratively(hotels, context) | |
| # Simulate booking | |
| booking_reference = f"CH{str(uuid.uuid4())[:8].upper()}" | |
| # Update context | |
| await self._update_context_collaboratively(context, selected_hotel, booking_reference) | |
| # Calculate total cost | |
| nights = 3 # Simplified | |
| total_cost = float(selected_hotel.price_per_night) * nights * guests | |
| return AgentResult( | |
| agent_id=self.agent_id, | |
| status=AgentStatus.COMPLETED, | |
| data={ | |
| "hotel": selected_hotel, | |
| "booking_reference": booking_reference, | |
| "total_cost": total_cost, | |
| "guests": guests, | |
| "nights": nights, | |
| "collaborative_adaptations": self.collaborative_adaptations | |
| }, | |
| execution_time=1.3 | |
| ) | |
| except Exception as e: | |
| return AgentResult( | |
| agent_id=self.agent_id, | |
| status=AgentStatus.FAILED, | |
| error=e, | |
| execution_time=1.0 | |
| ) | |
| async def _create_initial_proposal(self, context: OrchestrationContext) -> NegotiationProposal: | |
| """Create initial proposal for hotel budget allocation.""" | |
| budget = context.get_state("hotel_budget", 200) | |
| destination = context.get_state("destination", "Los Angeles") | |
| return NegotiationProposal( | |
| proposer_id=self.agent_id, | |
| proposal_type="hotel_budget_allocation", | |
| details={ | |
| "hotel_budget": budget, | |
| "destination": destination, | |
| "preferred_options": ["4_star_hotels", "city_center_location"], | |
| "flexible_options": ["3_star_hotels", "nearby_location"] | |
| }, | |
| reasoning="4-star city center hotels preferred for convenience and quality", | |
| constraints=["min_rating_3.5", "max_distance_5km"], | |
| confidence_score=0.7 | |
| ) | |
| async def _evaluate_proposal_compatibility(self, proposal: Dict[str, Any], context: OrchestrationContext) -> Dict[str, Any]: | |
| """Evaluate compatibility with other agent proposals.""" | |
| evaluation = { | |
| "compatibility_score": 0.0, | |
| "conflicts": [], | |
| "synergies": [], | |
| "recommendations": [] | |
| } | |
| # Check budget compatibility | |
| if "budget" in proposal: | |
| hotel_budget = context.get_state("hotel_budget", 200) | |
| if proposal["budget"] >= hotel_budget * 0.8: | |
| evaluation["compatibility_score"] += 0.5 | |
| evaluation["synergies"].append("Budget allocation supports quality hotels") | |
| return evaluation | |
| async def _evaluate_consensus_options(self, options: List[Dict[str, Any]], context: OrchestrationContext) -> Dict[str, Any]: | |
| """Evaluate consensus options for hotel selection.""" | |
| best_option = max(options, key=lambda x: x.get("hotel_budget", 0)) | |
| return { | |
| "preferred_option": best_option, | |
| "confidence": 0.8, | |
| "reasoning": "Higher budget enables better hotel quality and location" | |
| } | |
| async def _adapt_budget_collaboratively(self, original_budget: float, context: OrchestrationContext) -> float: | |
| """Adapt budget based on collaborative negotiations.""" | |
| adapted_budget = original_budget | |
| # Check for remaining budget after flight | |
| remaining_budget = context.get_state("remaining_budget_after_flight", original_budget * 2) | |
| if remaining_budget > 0: | |
| # Use up to 50% of remaining budget for hotels | |
| max_hotel_budget = remaining_budget * 0.5 | |
| adapted_budget = min(adapted_budget, max_hotel_budget) | |
| self.collaborative_adaptations.append("Budget adapted based on flight costs") | |
| return adapted_budget | |
| async def _search_hotels_collaboratively(self, destination: str, check_in: datetime, | |
| check_out: datetime, guests: int, budget: float, | |
| constraints: Dict[str, Any], context: OrchestrationContext) -> List[Any]: | |
| """Search for hotels with collaborative awareness.""" | |
| hotels = [] | |
| # Create hotels based on budget and constraints | |
| if budget >= 300: | |
| hotels.append(self._create_mock_hotel("Luxury Hotel", 4.5, budget, "City Center")) | |
| elif budget >= 200: | |
| hotels.append(self._create_mock_hotel("Business Hotel", 4.0, budget, "Downtown")) | |
| else: | |
| hotels.append(self._create_mock_hotel("Budget Hotel", 3.5, budget, "Outskirts")) | |
| return hotels | |
| def _create_mock_hotel(self, name: str, rating: float, price_per_night: float, location: str) -> Any: | |
| """Create a mock hotel object.""" | |
| return type('Hotel', (), { | |
| 'name': name, | |
| 'rating': rating, | |
| 'price_per_night': price_per_night, | |
| 'location': location, | |
| 'amenities': ['WiFi', 'Parking', 'Gym'] if rating >= 4.0 else ['WiFi', 'Parking'] | |
| })() | |
| async def _select_hotel_collaboratively(self, hotels: List[Any], context: OrchestrationContext) -> Any: | |
| """Select best hotel considering collaborative feedback.""" | |
| # Score hotels based on collaborative preferences | |
| best_hotel = None | |
| best_score = -1 | |
| for hotel in hotels: | |
| score = await self._score_hotel_collaboratively(hotel, context) | |
| if score > best_score: | |
| best_score = score | |
| best_hotel = hotel | |
| return best_hotel | |
| async def _score_hotel_collaboratively(self, hotel: Any, context: OrchestrationContext) -> float: | |
| """Score hotel based on collaborative preferences.""" | |
| score = 0.0 | |
| # Rating score | |
| score += hotel.rating / 5.0 * 0.4 | |
| # Budget score | |
| budget = context.get_state("hotel_budget", 200) | |
| if hotel.price_per_night <= budget: | |
| score += 0.4 | |
| else: | |
| score += 0.2 | |
| # Location score | |
| if "City Center" in hotel.location or "Downtown" in hotel.location: | |
| score += 0.2 | |
| return score | |
| async def _update_context_collaboratively(self, context: OrchestrationContext, | |
| selected_hotel: Any, booking_reference: str): | |
| """Update context with collaborative results.""" | |
| context.update_state("hotel_booked", True) | |
| context.update_state("hotel_booking_reference", booking_reference) | |
| context.update_state("selected_hotel", selected_hotel) | |
| context.update_state("hotel_location", selected_hotel.location) | |
| # Share location constraints with POI agent | |
| context.update_state("hotel_location_constraint", { | |
| "center_location": selected_hotel.location, | |
| "max_distance_km": 5, | |
| "transit_access": True | |
| }) | |
| async def rollback(self, context: OrchestrationContext) -> bool: | |
| """Rollback collaborative hotel booking.""" | |
| try: | |
| if context.get_state("hotel_booked", False): | |
| print(f" Cancelling collaborative hotel booking") | |
| context.update_state("hotel_booked", False) | |
| context.update_state("hotel_booking_reference", None) | |
| context.update_state("selected_hotel", None) | |
| context.update_state("hotel_location_constraint", None) | |
| self.collaborative_adaptations = [] | |
| return True | |
| return True | |
| except Exception as e: | |
| print(f" Collaborative hotel rollback error: {e}") | |
| return False | |
| def get_dependencies(self) -> List[Any]: | |
| """Collaborative hotel agent depends on flight agent.""" | |
| from ..core.trip_orchestration import AgentDependency, DependencyType | |
| return [ | |
| AgentDependency( | |
| agent_id="collaborative_flight_agent", | |
| dependency_type=DependencyType.SEQUENTIAL, | |
| required_status=AgentStatus.COMPLETED | |
| ) | |
| ] | |
| class CollaborativePOIAgent(NegotiatingAgent, AgentInterface): | |
| """ | |
| POI agent that collaborates with other agents through negotiation. | |
| This agent demonstrates collaborative AI patterns by: | |
| - Adapting activity selection based on hotel location constraints | |
| - Negotiating budget allocations for optimal experiences | |
| - Participating in consensus building for trip optimization | |
| """ | |
| def __init__(self, agent_id: str = "collaborative_poi_agent"): | |
| NegotiatingAgent.__init__(self, agent_id) | |
| AgentInterface.__init__(self, agent_id) | |
| self.negotiation_history: List[NegotiationMessage] = [] | |
| self.collaborative_recommendations: List[Any] = [] | |
| async def execute(self, context: OrchestrationContext) -> AgentResult: | |
| """Execute POI recommendations with collaborative negotiation.""" | |
| try: | |
| # Extract requirements | |
| destination = context.get_state("destination", "Los Angeles") | |
| interests = context.get_state("interests", ["cultural", "food"]) | |
| budget = context.get_state("poi_budget", 100) | |
| # Get collaborative constraints from hotel agent | |
| hotel_constraint = context.get_state("hotel_location_constraint", {}) | |
| # Adapt budget based on negotiations | |
| adapted_budget = await self._adapt_budget_collaboratively(budget, context) | |
| # Generate collaborative recommendations | |
| recommendations = await self._generate_collaborative_recommendations( | |
| destination, interests, adapted_budget, hotel_constraint, context | |
| ) | |
| if not recommendations: | |
| return AgentResult( | |
| agent_id=self.agent_id, | |
| status=AgentStatus.FAILED, | |
| error=Exception("No POI recommendations generated collaboratively") | |
| ) | |
| # Select best recommendations | |
| selected_pois = await self._select_pois_collaboratively(recommendations, context) | |
| self.collaborative_recommendations = selected_pois | |
| # Update context | |
| context.update_state("activities_planned", True) | |
| context.update_state("recommended_pois", selected_pois) | |
| # Calculate total cost | |
| total_cost = sum(float(poi.adult_price or 0) for poi in selected_pois if poi.adult_price) | |
| return AgentResult( | |
| agent_id=self.agent_id, | |
| status=AgentStatus.COMPLETED, | |
| data={ | |
| "pois": selected_pois, | |
| "booking_reference": f"CPOI{str(uuid.uuid4())[:8].upper()}", | |
| "total_cost": total_cost, | |
| "count": len(selected_pois), | |
| "collaborative_constraints_applied": len(hotel_constraint) > 0 | |
| }, | |
| execution_time=1.7 | |
| ) | |
| except Exception as e: | |
| return AgentResult( | |
| agent_id=self.agent_id, | |
| status=AgentStatus.FAILED, | |
| error=e, | |
| execution_time=1.2 | |
| ) | |
| async def _create_initial_proposal(self, context: OrchestrationContext) -> NegotiationProposal: | |
| """Create initial proposal for POI budget allocation.""" | |
| budget = context.get_state("poi_budget", 100) | |
| interests = context.get_state("interests", ["cultural", "food"]) | |
| return NegotiationProposal( | |
| proposer_id=self.agent_id, | |
| proposal_type="poi_budget_allocation", | |
| details={ | |
| "poi_budget": budget, | |
| "interests": interests, | |
| "preferred_options": ["premium_experiences", "guided_tours"], | |
| "flexible_options": ["free_activities", "self_guided"] | |
| }, | |
| reasoning="Mix of premium and free activities for optimal value", | |
| constraints=["max_distance_10km", "interest_alignment"], | |
| confidence_score=0.6 | |
| ) | |
| async def _evaluate_proposal_compatibility(self, proposal: Dict[str, Any], context: OrchestrationContext) -> Dict[str, Any]: | |
| """Evaluate compatibility with other agent proposals.""" | |
| evaluation = { | |
| "compatibility_score": 0.0, | |
| "conflicts": [], | |
| "synergies": [], | |
| "recommendations": [] | |
| } | |
| # Check budget compatibility | |
| if "budget" in proposal: | |
| poi_budget = context.get_state("poi_budget", 100) | |
| if proposal["budget"] >= poi_budget * 0.5: | |
| evaluation["compatibility_score"] += 0.6 | |
| evaluation["synergies"].append("Budget allocation enables diverse activities") | |
| return evaluation | |
| async def _evaluate_consensus_options(self, options: List[Dict[str, Any]], context: OrchestrationContext) -> Dict[str, Any]: | |
| """Evaluate consensus options for POI selection.""" | |
| best_option = max(options, key=lambda x: x.get("poi_budget", 0)) | |
| return { | |
| "preferred_option": best_option, | |
| "confidence": 0.7, | |
| "reasoning": "Higher budget enables more diverse and premium activities" | |
| } | |
| async def _adapt_budget_collaboratively(self, original_budget: float, context: OrchestrationContext) -> float: | |
| """Adapt budget based on collaborative negotiations.""" | |
| adapted_budget = original_budget | |
| # Check for remaining budget after flight and hotel | |
| remaining_budget = context.get_state("remaining_budget_after_hotel", original_budget * 2) | |
| if remaining_budget > 0: | |
| # Use remaining budget for activities | |
| adapted_budget = min(adapted_budget, remaining_budget) | |
| return adapted_budget | |
| async def _generate_collaborative_recommendations(self, destination: str, interests: List[str], | |
| budget: float, hotel_constraint: Dict[str, Any], | |
| context: OrchestrationContext) -> List[Any]: | |
| """Generate POI recommendations with collaborative awareness.""" | |
| recommendations = [] | |
| # Apply hotel location constraint | |
| max_distance = hotel_constraint.get("max_distance_km", 10) | |
| # Generate recommendations based on interests and constraints | |
| if "cultural" in interests: | |
| recommendations.append(self._create_mock_poi( | |
| "Cultural Museum", "cultural", 25.0, f"Within {max_distance}km of hotel" | |
| )) | |
| if "food" in interests: | |
| recommendations.append(self._create_mock_poi( | |
| "Food Tour", "food", 45.0, f"Within {max_distance}km of hotel" | |
| )) | |
| if "outdoor" in interests: | |
| recommendations.append(self._create_mock_poi( | |
| "City Park", "outdoor", 0.0, f"Within {max_distance}km of hotel" | |
| )) | |
| # Filter by budget | |
| affordable_recommendations = [ | |
| poi for poi in recommendations | |
| if poi.adult_price <= budget | |
| ] | |
| return affordable_recommendations if affordable_recommendations else recommendations[:2] | |
| def _create_mock_poi(self, name: str, category: str, price: float, location: str) -> Any: | |
| """Create a mock POI object.""" | |
| return type('POI', (), { | |
| 'name': name, | |
| 'category': category, | |
| 'adult_price': price, | |
| 'location': location, | |
| 'duration_hours': 2.0 if price > 0 else 1.0 | |
| })() | |
| async def _select_pois_collaboratively(self, recommendations: List[Any], context: OrchestrationContext) -> List[Any]: | |
| """Select best POIs considering collaborative feedback.""" | |
| # Score recommendations based on collaborative preferences | |
| scored_recommendations = [] | |
| for poi in recommendations: | |
| score = await self._score_poi_collaboratively(poi, context) | |
| scored_recommendations.append((poi, score)) | |
| # Sort by score and select top recommendations | |
| scored_recommendations.sort(key=lambda x: x[1], reverse=True) | |
| # Select top 3 recommendations | |
| selected = [poi for poi, score in scored_recommendations[:3]] | |
| return selected | |
| async def _score_poi_collaboratively(self, poi: Any, context: OrchestrationContext) -> float: | |
| """Score POI based on collaborative preferences.""" | |
| score = 0.0 | |
| # Interest alignment score | |
| interests = context.get_state("interests", []) | |
| if poi.category in interests: | |
| score += 0.4 | |
| # Budget alignment score | |
| budget = context.get_state("poi_budget", 100) | |
| if poi.adult_price <= budget: | |
| score += 0.3 | |
| # Location constraint score | |
| hotel_constraint = context.get_state("hotel_location_constraint", {}) | |
| if hotel_constraint and "Within" in poi.location: | |
| score += 0.2 | |
| # Collaborative value score | |
| if poi.adult_price == 0: # Free activities | |
| score += 0.1 | |
| return score | |
| async def rollback(self, context: OrchestrationContext) -> bool: | |
| """Rollback collaborative POI planning.""" | |
| try: | |
| if self.collaborative_recommendations: | |
| print(f" Cancelling collaborative POI planning: {len(self.collaborative_recommendations)} activities") | |
| context.update_state("activities_planned", False) | |
| context.update_state("recommended_pois", []) | |
| self.collaborative_recommendations = [] | |
| return True | |
| return True | |
| except Exception as e: | |
| print(f" Collaborative POI rollback error: {e}") | |
| return False | |
| def get_dependencies(self) -> List[Any]: | |
| """Collaborative POI agent depends on hotel agent.""" | |
| from ..core.trip_orchestration import AgentDependency, DependencyType | |
| return [ | |
| AgentDependency( | |
| agent_id="collaborative_hotel_agent", | |
| dependency_type=DependencyType.SEQUENTIAL, | |
| required_status=AgentStatus.COMPLETED | |
| ) | |
| ] | |