wanderlust.ai / src /wanderlust_ai /agents /collaborative_agents.py
BlakeL's picture
Upload 115 files
3f9f85b verified
"""
Collaborative Agents - Agents designed for negotiation and collaboration
This module implements agents that are specifically designed to work with
the negotiation system, demonstrating collaborative AI patterns and
multi-agent cooperation.
"""
import asyncio
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Union
from decimal import Decimal
from ..core.agent_negotiation import (
NegotiatingAgent, NegotiationMessage, NegotiationProposal,
MessageType, NegotiationStatus, ConflictType
)
from ..core.trip_orchestration import (
AgentInterface, AgentResult, AgentStatus, OrchestrationContext
)
class CollaborativeFlightAgent(NegotiatingAgent, AgentInterface):
"""
Flight agent that collaborates with other agents through negotiation.
This agent demonstrates collaborative AI patterns by:
- Engaging in budget negotiations with other agents
- Sharing constraints and reasoning transparently
- Adapting proposals based on collaborative feedback
- Participating in consensus building
"""
def __init__(self, agent_id: str = "collaborative_flight_agent"):
NegotiatingAgent.__init__(self, agent_id)
AgentInterface.__init__(self, agent_id)
self.negotiation_history: List[NegotiationMessage] = []
self.collaborative_preferences: Dict[str, Any] = {}
self.constraint_violations: List[str] = []
async def execute(self, context: OrchestrationContext) -> AgentResult:
"""Execute flight search with collaborative negotiation capabilities."""
try:
# Extract requirements from context
origin = context.get_state("origin", "New York")
destination = context.get_state("destination", "Los Angeles")
departure_date = context.get_state("departure_date", datetime.now() + timedelta(days=7))
passengers = context.get_state("passengers", 1)
budget = context.get_state("flight_budget", 500)
# Check for collaborative negotiations
negotiation_messages = await self._get_negotiation_messages()
# Adapt budget based on negotiations
adapted_budget = await self._adapt_budget_from_negotiations(budget, negotiation_messages)
# Create collaborative proposal
proposal = await self._create_initial_proposal(context)
# Share proposal with other agents
await self._share_proposal_with_agents(proposal)
# Search for flights with collaborative constraints
flights = await self._search_flights_collaboratively(
origin, destination, departure_date, passengers, adapted_budget, context
)
if not flights:
return AgentResult(
agent_id=self.agent_id,
status=AgentStatus.FAILED,
error=Exception("No flights found matching collaborative constraints")
)
# Select best flight considering collaborative feedback
selected_flight = await self._select_flight_collaboratively(flights, context, negotiation_messages)
# Simulate booking
booking_reference = f"CF{str(uuid.uuid4())[:8].upper()}"
# Update context with collaborative results
await self._update_context_collaboratively(context, selected_flight, booking_reference)
# Calculate total cost
total_cost = float(selected_flight.price) * passengers
return AgentResult(
agent_id=self.agent_id,
status=AgentStatus.COMPLETED,
data={
"flight": selected_flight,
"booking_reference": booking_reference,
"total_cost": total_cost,
"passengers": passengers,
"collaborative_negotiations": len(negotiation_messages),
"constraint_violations": self.constraint_violations,
"collaborative_adaptations": self._get_collaborative_adaptations()
},
execution_time=1.5
)
except Exception as e:
return AgentResult(
agent_id=self.agent_id,
status=AgentStatus.FAILED,
error=e,
execution_time=1.0
)
async def _create_initial_proposal(self, context: OrchestrationContext) -> NegotiationProposal:
"""Create initial proposal for collaborative negotiation."""
# Analyze context for proposal creation
budget = context.get_state("flight_budget", 500)
origin = context.get_state("origin", "New York")
destination = context.get_state("destination", "Los Angeles")
passengers = context.get_state("passengers", 1)
# Create collaborative proposal
proposal = NegotiationProposal(
proposer_id=self.agent_id,
proposal_type="flight_budget_allocation",
details={
"flight_budget": budget,
"origin": origin,
"destination": destination,
"passengers": passengers,
"preferred_options": ["direct_flights", "morning_departures"],
"flexible_options": ["1_stop_flights", "afternoon_departures"]
},
reasoning="Direct flights preferred for time efficiency, but flexible with 1-stop for cost optimization",
constraints=["max_stops_2", "morning_departure_preferred", "major_airlines_only"],
alternatives=[
{
"budget": budget * 0.8,
"constraints": ["max_stops_1", "flexible_departure"],
"reasoning": "Accept 1-stop flights to reduce budget impact"
},
{
"budget": budget * 1.2,
"constraints": ["direct_flights_only", "morning_departure"],
"reasoning": "Premium budget for guaranteed direct flights"
}
],
trade_offs={
"time_vs_cost": "Direct flights save 2-3 hours but cost 30% more",
"flexibility_vs_guarantee": "Flexible dates can save 20% but reduce options"
},
confidence_score=0.8
)
return proposal
async def _evaluate_proposal_compatibility(self, proposal: Dict[str, Any], context: OrchestrationContext) -> Dict[str, Any]:
"""Evaluate compatibility with another agent's proposal."""
evaluation = {
"compatibility_score": 0.0,
"conflicts": [],
"synergies": [],
"recommendations": []
}
# Check budget compatibility
if "budget" in proposal:
flight_budget = context.get_state("flight_budget", 500)
if proposal["budget"] <= flight_budget:
evaluation["compatibility_score"] += 0.4
evaluation["synergies"].append("Budget allocation is compatible")
else:
evaluation["compatibility_score"] -= 0.2
evaluation["conflicts"].append("Budget exceeds flight allocation")
# Check constraint compatibility
if "constraints" in proposal:
flight_constraints = ["max_stops_2", "morning_departure_preferred"]
for constraint in proposal["constraints"]:
if constraint in flight_constraints:
evaluation["compatibility_score"] += 0.1
evaluation["synergies"].append(f"Shared constraint: {constraint}")
# Generate recommendations
if evaluation["compatibility_score"] > 0.5:
evaluation["recommendations"].append("Proposal is highly compatible")
elif evaluation["compatibility_score"] > 0.0:
evaluation["recommendations"].append("Proposal has minor conflicts but is workable")
else:
evaluation["recommendations"].append("Proposal has significant conflicts requiring negotiation")
return evaluation
async def _evaluate_consensus_options(self, options: List[Dict[str, Any]], context: OrchestrationContext) -> Dict[str, Any]:
"""Evaluate consensus options and return preferred choice."""
best_option = None
best_score = -1
for option in options:
score = await self._score_consensus_option(option, context)
if score > best_score:
best_score = score
best_option = option
return {
"preferred_option": best_option,
"confidence": best_score,
"reasoning": f"Option scored {best_score:.2f} based on flight agent priorities"
}
async def _score_consensus_option(self, option: Dict[str, Any], context: OrchestrationContext) -> float:
"""Score a consensus option based on flight agent priorities."""
score = 0.0
# Budget scoring
if "flight_budget" in option:
budget = context.get_state("flight_budget", 500)
if option["flight_budget"] >= budget * 0.8:
score += 0.4
# Time preference scoring
if "departure_time" in option:
if "morning" in str(option["departure_time"]).lower():
score += 0.3
# Stop preference scoring
if "max_stops" in option:
if option["max_stops"] <= 1:
score += 0.3
return min(1.0, score)
async def _get_negotiation_messages(self) -> List[NegotiationMessage]:
"""Get pending negotiation messages for this agent."""
if self.negotiation_broker:
return await self.negotiation_broker.get_messages(self.agent_id)
return []
async def _adapt_budget_from_negotiations(self, original_budget: float, messages: List[NegotiationMessage]) -> float:
"""Adapt budget based on collaborative negotiations."""
adapted_budget = original_budget
for message in messages:
if message.message_type == MessageType.COUNTER_PROPOSAL:
if "flight_budget" in message.content.get("counter_proposal", {}):
proposed_budget = message.content["counter_proposal"]["flight_budget"]
# Consider the proposal but don't automatically accept
adapted_budget = min(adapted_budget, proposed_budget * 1.1) # 10% buffer
return adapted_budget
async def _search_flights_collaboratively(self, origin: str, destination: str,
departure_date: datetime, passengers: int,
budget: float, context: OrchestrationContext) -> List[Any]:
"""Search for flights with collaborative constraints."""
# Get collaborative constraints from negotiations
messages = await self._get_negotiation_messages()
collaborative_constraints = self._extract_collaborative_constraints(messages)
# Create mock flights with collaborative awareness
flights = []
# Base flight options
base_price = budget / passengers
# Direct flight option (preferred)
if base_price >= 400:
flights.append(self._create_mock_flight(
origin, destination, departure_date, base_price, direct=True
))
# 1-stop flight option (collaborative compromise)
if base_price >= 300:
flights.append(self._create_mock_flight(
origin, destination, departure_date, base_price * 0.7, direct=False, stops=1
))
# Apply collaborative constraints
filtered_flights = self._apply_collaborative_constraints(flights, collaborative_constraints)
return filtered_flights
def _create_mock_flight(self, origin: str, destination: str, departure_date: datetime,
price: float, direct: bool = True, stops: int = 0) -> Any:
"""Create a mock flight object."""
duration = 5.0 if direct else 7.0
return type('Flight', (), {
'origin': origin,
'destination': destination,
'price': price,
'departure_time': departure_date.replace(hour=8 if direct else 10),
'arrival_time': departure_date.replace(hour=8 + int(duration) if direct else 10 + int(duration)),
'duration_hours': duration,
'stops': stops,
'direct': direct,
'airline': 'Mock Airlines'
})()
def _extract_collaborative_constraints(self, messages: List[NegotiationMessage]) -> Dict[str, Any]:
"""Extract collaborative constraints from negotiation messages."""
constraints = {}
for message in messages:
if message.message_type == MessageType.CONSTRAINT:
constraints.update(message.content.get("constraints", {}))
return constraints
def _apply_collaborative_constraints(self, flights: List[Any], constraints: Dict[str, Any]) -> List[Any]:
"""Apply collaborative constraints to flight options."""
filtered_flights = flights.copy()
# Apply time constraints
if "max_departure_time" in constraints:
max_time = constraints["max_departure_time"]
filtered_flights = [f for f in filtered_flights if f.departure_time.hour <= max_time]
# Apply budget constraints
if "max_flight_cost" in constraints:
max_cost = constraints["max_flight_cost"]
filtered_flights = [f for f in filtered_flights if f.price <= max_cost]
return filtered_flights
async def _select_flight_collaboratively(self, flights: List[Any], context: OrchestrationContext,
messages: List[NegotiationMessage]) -> Any:
"""Select best flight considering collaborative feedback."""
# Score flights based on collaborative preferences
best_flight = None
best_score = -1
for flight in flights:
score = await self._score_flight_collaboratively(flight, context, messages)
if score > best_score:
best_score = score
best_flight = flight
return best_flight
async def _score_flight_collaboratively(self, flight: Any, context: OrchestrationContext,
messages: List[NegotiationMessage]) -> float:
"""Score flight based on collaborative preferences."""
score = 0.0
# Base scoring
if flight.direct:
score += 0.4
else:
score += 0.2
# Budget scoring
budget = context.get_state("flight_budget", 500)
if flight.price <= budget:
score += 0.3
else:
score += 0.1
self.constraint_violations.append(f"Flight price ${flight.price} exceeds budget ${budget}")
# Collaborative feedback scoring
for message in messages:
if message.message_type == MessageType.COUNTER_PROPOSAL:
if "flight_preferences" in message.content:
preferences = message.content["flight_preferences"]
if preferences.get("prefer_direct") and flight.direct:
score += 0.1
if preferences.get("prefer_morning") and flight.departure_time.hour < 12:
score += 0.1
return score
async def _share_proposal_with_agents(self, proposal: NegotiationProposal):
"""Share proposal with other agents through negotiation broker."""
if self.negotiation_broker:
# Create negotiation message
message = NegotiationMessage(
message_type=MessageType.PROPOSAL,
sender_id=self.agent_id,
recipient_id="all",
content={
"proposal_id": proposal.proposal_id,
"proposal_type": proposal.proposal_type,
"details": proposal.details,
"reasoning": proposal.reasoning,
"constraints": proposal.constraints,
"alternatives": proposal.alternatives,
"collaborative": True
}
)
await self.negotiation_broker.send_message(message)
async def _update_context_collaboratively(self, context: OrchestrationContext,
selected_flight: Any, booking_reference: str):
"""Update context with collaborative results."""
# Update basic context
context.update_state("flight_booked", True)
context.update_state("flight_booking_reference", booking_reference)
context.update_state("selected_flight", selected_flight)
# Share collaborative constraints with other agents
collaborative_constraints = {
"arrival_time": selected_flight.arrival_time,
"departure_time": selected_flight.departure_time,
"airport_code": selected_flight.destination,
"flight_duration": selected_flight.duration_hours
}
context.update_state("collaborative_constraints", collaborative_constraints)
# Update budget remaining
remaining_budget = float(context.budget_limit) - float(selected_flight.price * context.get_state("passengers", 1))
context.update_state("remaining_budget_after_flight", remaining_budget)
def _get_collaborative_adaptations(self) -> List[str]:
"""Get list of collaborative adaptations made."""
adaptations = []
if len(self.negotiation_history) > 0:
adaptations.append("Budget adapted based on negotiations")
if self.constraint_violations:
adaptations.append("Constraints relaxed for collaboration")
return adaptations
async def rollback(self, context: OrchestrationContext) -> bool:
"""Rollback collaborative flight booking."""
try:
if context.get_state("flight_booked", False):
print(f" Cancelling collaborative flight booking")
context.update_state("flight_booked", False)
context.update_state("flight_booking_reference", None)
context.update_state("selected_flight", None)
context.update_state("collaborative_constraints", None)
context.update_state("remaining_budget_after_flight", None)
# Clear negotiation history
self.negotiation_history = []
self.constraint_violations = []
return True
return True
except Exception as e:
print(f" Collaborative flight rollback error: {e}")
return False
def get_dependencies(self) -> List[Any]:
"""Collaborative flight agent has no dependencies."""
return []
class CollaborativeHotelAgent(NegotiatingAgent, AgentInterface):
"""
Hotel agent that collaborates with other agents through negotiation.
This agent demonstrates collaborative AI patterns by:
- Negotiating budget allocations with flight and POI agents
- Adapting hotel selection based on collaborative constraints
- Participating in consensus building for optimal solutions
"""
def __init__(self, agent_id: str = "collaborative_hotel_agent"):
NegotiatingAgent.__init__(self, agent_id)
AgentInterface.__init__(self, agent_id)
self.negotiation_history: List[NegotiationMessage] = []
self.collaborative_adaptations: List[str] = []
async def execute(self, context: OrchestrationContext) -> AgentResult:
"""Execute hotel search with collaborative negotiation."""
try:
# Extract requirements
destination = context.get_state("destination", "Los Angeles")
check_in = context.get_state("departure_date", datetime.now() + timedelta(days=7))
check_out = context.get_state("return_date", datetime.now() + timedelta(days=10))
guests = context.get_state("guests", 1)
budget = context.get_state("hotel_budget", 200)
# Get collaborative constraints from flight agent
collaborative_constraints = context.get_state("collaborative_constraints", {})
# Adapt budget based on negotiations
adapted_budget = await self._adapt_budget_collaboratively(budget, context)
# Search for hotels with collaborative awareness
hotels = await self._search_hotels_collaboratively(
destination, check_in, check_out, guests, adapted_budget,
collaborative_constraints, context
)
if not hotels:
return AgentResult(
agent_id=self.agent_id,
status=AgentStatus.FAILED,
error=Exception("No hotels found matching collaborative requirements")
)
# Select hotel considering collaborative feedback
selected_hotel = await self._select_hotel_collaboratively(hotels, context)
# Simulate booking
booking_reference = f"CH{str(uuid.uuid4())[:8].upper()}"
# Update context
await self._update_context_collaboratively(context, selected_hotel, booking_reference)
# Calculate total cost
nights = 3 # Simplified
total_cost = float(selected_hotel.price_per_night) * nights * guests
return AgentResult(
agent_id=self.agent_id,
status=AgentStatus.COMPLETED,
data={
"hotel": selected_hotel,
"booking_reference": booking_reference,
"total_cost": total_cost,
"guests": guests,
"nights": nights,
"collaborative_adaptations": self.collaborative_adaptations
},
execution_time=1.3
)
except Exception as e:
return AgentResult(
agent_id=self.agent_id,
status=AgentStatus.FAILED,
error=e,
execution_time=1.0
)
async def _create_initial_proposal(self, context: OrchestrationContext) -> NegotiationProposal:
"""Create initial proposal for hotel budget allocation."""
budget = context.get_state("hotel_budget", 200)
destination = context.get_state("destination", "Los Angeles")
return NegotiationProposal(
proposer_id=self.agent_id,
proposal_type="hotel_budget_allocation",
details={
"hotel_budget": budget,
"destination": destination,
"preferred_options": ["4_star_hotels", "city_center_location"],
"flexible_options": ["3_star_hotels", "nearby_location"]
},
reasoning="4-star city center hotels preferred for convenience and quality",
constraints=["min_rating_3.5", "max_distance_5km"],
confidence_score=0.7
)
async def _evaluate_proposal_compatibility(self, proposal: Dict[str, Any], context: OrchestrationContext) -> Dict[str, Any]:
"""Evaluate compatibility with other agent proposals."""
evaluation = {
"compatibility_score": 0.0,
"conflicts": [],
"synergies": [],
"recommendations": []
}
# Check budget compatibility
if "budget" in proposal:
hotel_budget = context.get_state("hotel_budget", 200)
if proposal["budget"] >= hotel_budget * 0.8:
evaluation["compatibility_score"] += 0.5
evaluation["synergies"].append("Budget allocation supports quality hotels")
return evaluation
async def _evaluate_consensus_options(self, options: List[Dict[str, Any]], context: OrchestrationContext) -> Dict[str, Any]:
"""Evaluate consensus options for hotel selection."""
best_option = max(options, key=lambda x: x.get("hotel_budget", 0))
return {
"preferred_option": best_option,
"confidence": 0.8,
"reasoning": "Higher budget enables better hotel quality and location"
}
async def _adapt_budget_collaboratively(self, original_budget: float, context: OrchestrationContext) -> float:
"""Adapt budget based on collaborative negotiations."""
adapted_budget = original_budget
# Check for remaining budget after flight
remaining_budget = context.get_state("remaining_budget_after_flight", original_budget * 2)
if remaining_budget > 0:
# Use up to 50% of remaining budget for hotels
max_hotel_budget = remaining_budget * 0.5
adapted_budget = min(adapted_budget, max_hotel_budget)
self.collaborative_adaptations.append("Budget adapted based on flight costs")
return adapted_budget
async def _search_hotels_collaboratively(self, destination: str, check_in: datetime,
check_out: datetime, guests: int, budget: float,
constraints: Dict[str, Any], context: OrchestrationContext) -> List[Any]:
"""Search for hotels with collaborative awareness."""
hotels = []
# Create hotels based on budget and constraints
if budget >= 300:
hotels.append(self._create_mock_hotel("Luxury Hotel", 4.5, budget, "City Center"))
elif budget >= 200:
hotels.append(self._create_mock_hotel("Business Hotel", 4.0, budget, "Downtown"))
else:
hotels.append(self._create_mock_hotel("Budget Hotel", 3.5, budget, "Outskirts"))
return hotels
def _create_mock_hotel(self, name: str, rating: float, price_per_night: float, location: str) -> Any:
"""Create a mock hotel object."""
return type('Hotel', (), {
'name': name,
'rating': rating,
'price_per_night': price_per_night,
'location': location,
'amenities': ['WiFi', 'Parking', 'Gym'] if rating >= 4.0 else ['WiFi', 'Parking']
})()
async def _select_hotel_collaboratively(self, hotels: List[Any], context: OrchestrationContext) -> Any:
"""Select best hotel considering collaborative feedback."""
# Score hotels based on collaborative preferences
best_hotel = None
best_score = -1
for hotel in hotels:
score = await self._score_hotel_collaboratively(hotel, context)
if score > best_score:
best_score = score
best_hotel = hotel
return best_hotel
async def _score_hotel_collaboratively(self, hotel: Any, context: OrchestrationContext) -> float:
"""Score hotel based on collaborative preferences."""
score = 0.0
# Rating score
score += hotel.rating / 5.0 * 0.4
# Budget score
budget = context.get_state("hotel_budget", 200)
if hotel.price_per_night <= budget:
score += 0.4
else:
score += 0.2
# Location score
if "City Center" in hotel.location or "Downtown" in hotel.location:
score += 0.2
return score
async def _update_context_collaboratively(self, context: OrchestrationContext,
selected_hotel: Any, booking_reference: str):
"""Update context with collaborative results."""
context.update_state("hotel_booked", True)
context.update_state("hotel_booking_reference", booking_reference)
context.update_state("selected_hotel", selected_hotel)
context.update_state("hotel_location", selected_hotel.location)
# Share location constraints with POI agent
context.update_state("hotel_location_constraint", {
"center_location": selected_hotel.location,
"max_distance_km": 5,
"transit_access": True
})
async def rollback(self, context: OrchestrationContext) -> bool:
"""Rollback collaborative hotel booking."""
try:
if context.get_state("hotel_booked", False):
print(f" Cancelling collaborative hotel booking")
context.update_state("hotel_booked", False)
context.update_state("hotel_booking_reference", None)
context.update_state("selected_hotel", None)
context.update_state("hotel_location_constraint", None)
self.collaborative_adaptations = []
return True
return True
except Exception as e:
print(f" Collaborative hotel rollback error: {e}")
return False
def get_dependencies(self) -> List[Any]:
"""Collaborative hotel agent depends on flight agent."""
from ..core.trip_orchestration import AgentDependency, DependencyType
return [
AgentDependency(
agent_id="collaborative_flight_agent",
dependency_type=DependencyType.SEQUENTIAL,
required_status=AgentStatus.COMPLETED
)
]
class CollaborativePOIAgent(NegotiatingAgent, AgentInterface):
"""
POI agent that collaborates with other agents through negotiation.
This agent demonstrates collaborative AI patterns by:
- Adapting activity selection based on hotel location constraints
- Negotiating budget allocations for optimal experiences
- Participating in consensus building for trip optimization
"""
def __init__(self, agent_id: str = "collaborative_poi_agent"):
NegotiatingAgent.__init__(self, agent_id)
AgentInterface.__init__(self, agent_id)
self.negotiation_history: List[NegotiationMessage] = []
self.collaborative_recommendations: List[Any] = []
async def execute(self, context: OrchestrationContext) -> AgentResult:
"""Execute POI recommendations with collaborative negotiation."""
try:
# Extract requirements
destination = context.get_state("destination", "Los Angeles")
interests = context.get_state("interests", ["cultural", "food"])
budget = context.get_state("poi_budget", 100)
# Get collaborative constraints from hotel agent
hotel_constraint = context.get_state("hotel_location_constraint", {})
# Adapt budget based on negotiations
adapted_budget = await self._adapt_budget_collaboratively(budget, context)
# Generate collaborative recommendations
recommendations = await self._generate_collaborative_recommendations(
destination, interests, adapted_budget, hotel_constraint, context
)
if not recommendations:
return AgentResult(
agent_id=self.agent_id,
status=AgentStatus.FAILED,
error=Exception("No POI recommendations generated collaboratively")
)
# Select best recommendations
selected_pois = await self._select_pois_collaboratively(recommendations, context)
self.collaborative_recommendations = selected_pois
# Update context
context.update_state("activities_planned", True)
context.update_state("recommended_pois", selected_pois)
# Calculate total cost
total_cost = sum(float(poi.adult_price or 0) for poi in selected_pois if poi.adult_price)
return AgentResult(
agent_id=self.agent_id,
status=AgentStatus.COMPLETED,
data={
"pois": selected_pois,
"booking_reference": f"CPOI{str(uuid.uuid4())[:8].upper()}",
"total_cost": total_cost,
"count": len(selected_pois),
"collaborative_constraints_applied": len(hotel_constraint) > 0
},
execution_time=1.7
)
except Exception as e:
return AgentResult(
agent_id=self.agent_id,
status=AgentStatus.FAILED,
error=e,
execution_time=1.2
)
async def _create_initial_proposal(self, context: OrchestrationContext) -> NegotiationProposal:
"""Create initial proposal for POI budget allocation."""
budget = context.get_state("poi_budget", 100)
interests = context.get_state("interests", ["cultural", "food"])
return NegotiationProposal(
proposer_id=self.agent_id,
proposal_type="poi_budget_allocation",
details={
"poi_budget": budget,
"interests": interests,
"preferred_options": ["premium_experiences", "guided_tours"],
"flexible_options": ["free_activities", "self_guided"]
},
reasoning="Mix of premium and free activities for optimal value",
constraints=["max_distance_10km", "interest_alignment"],
confidence_score=0.6
)
async def _evaluate_proposal_compatibility(self, proposal: Dict[str, Any], context: OrchestrationContext) -> Dict[str, Any]:
"""Evaluate compatibility with other agent proposals."""
evaluation = {
"compatibility_score": 0.0,
"conflicts": [],
"synergies": [],
"recommendations": []
}
# Check budget compatibility
if "budget" in proposal:
poi_budget = context.get_state("poi_budget", 100)
if proposal["budget"] >= poi_budget * 0.5:
evaluation["compatibility_score"] += 0.6
evaluation["synergies"].append("Budget allocation enables diverse activities")
return evaluation
async def _evaluate_consensus_options(self, options: List[Dict[str, Any]], context: OrchestrationContext) -> Dict[str, Any]:
"""Evaluate consensus options for POI selection."""
best_option = max(options, key=lambda x: x.get("poi_budget", 0))
return {
"preferred_option": best_option,
"confidence": 0.7,
"reasoning": "Higher budget enables more diverse and premium activities"
}
async def _adapt_budget_collaboratively(self, original_budget: float, context: OrchestrationContext) -> float:
"""Adapt budget based on collaborative negotiations."""
adapted_budget = original_budget
# Check for remaining budget after flight and hotel
remaining_budget = context.get_state("remaining_budget_after_hotel", original_budget * 2)
if remaining_budget > 0:
# Use remaining budget for activities
adapted_budget = min(adapted_budget, remaining_budget)
return adapted_budget
async def _generate_collaborative_recommendations(self, destination: str, interests: List[str],
budget: float, hotel_constraint: Dict[str, Any],
context: OrchestrationContext) -> List[Any]:
"""Generate POI recommendations with collaborative awareness."""
recommendations = []
# Apply hotel location constraint
max_distance = hotel_constraint.get("max_distance_km", 10)
# Generate recommendations based on interests and constraints
if "cultural" in interests:
recommendations.append(self._create_mock_poi(
"Cultural Museum", "cultural", 25.0, f"Within {max_distance}km of hotel"
))
if "food" in interests:
recommendations.append(self._create_mock_poi(
"Food Tour", "food", 45.0, f"Within {max_distance}km of hotel"
))
if "outdoor" in interests:
recommendations.append(self._create_mock_poi(
"City Park", "outdoor", 0.0, f"Within {max_distance}km of hotel"
))
# Filter by budget
affordable_recommendations = [
poi for poi in recommendations
if poi.adult_price <= budget
]
return affordable_recommendations if affordable_recommendations else recommendations[:2]
def _create_mock_poi(self, name: str, category: str, price: float, location: str) -> Any:
"""Create a mock POI object."""
return type('POI', (), {
'name': name,
'category': category,
'adult_price': price,
'location': location,
'duration_hours': 2.0 if price > 0 else 1.0
})()
async def _select_pois_collaboratively(self, recommendations: List[Any], context: OrchestrationContext) -> List[Any]:
"""Select best POIs considering collaborative feedback."""
# Score recommendations based on collaborative preferences
scored_recommendations = []
for poi in recommendations:
score = await self._score_poi_collaboratively(poi, context)
scored_recommendations.append((poi, score))
# Sort by score and select top recommendations
scored_recommendations.sort(key=lambda x: x[1], reverse=True)
# Select top 3 recommendations
selected = [poi for poi, score in scored_recommendations[:3]]
return selected
async def _score_poi_collaboratively(self, poi: Any, context: OrchestrationContext) -> float:
"""Score POI based on collaborative preferences."""
score = 0.0
# Interest alignment score
interests = context.get_state("interests", [])
if poi.category in interests:
score += 0.4
# Budget alignment score
budget = context.get_state("poi_budget", 100)
if poi.adult_price <= budget:
score += 0.3
# Location constraint score
hotel_constraint = context.get_state("hotel_location_constraint", {})
if hotel_constraint and "Within" in poi.location:
score += 0.2
# Collaborative value score
if poi.adult_price == 0: # Free activities
score += 0.1
return score
async def rollback(self, context: OrchestrationContext) -> bool:
"""Rollback collaborative POI planning."""
try:
if self.collaborative_recommendations:
print(f" Cancelling collaborative POI planning: {len(self.collaborative_recommendations)} activities")
context.update_state("activities_planned", False)
context.update_state("recommended_pois", [])
self.collaborative_recommendations = []
return True
return True
except Exception as e:
print(f" Collaborative POI rollback error: {e}")
return False
def get_dependencies(self) -> List[Any]:
"""Collaborative POI agent depends on hotel agent."""
from ..core.trip_orchestration import AgentDependency, DependencyType
return [
AgentDependency(
agent_id="collaborative_hotel_agent",
dependency_type=DependencyType.SEQUENTIAL,
required_status=AgentStatus.COMPLETED
)
]