MultiAgenticAI / agents /agent_setup.py
Chaitanya895's picture
Upload 11 files
e2e2eec verified
raw
history blame
22.1 kB
import sys
import os
# Add the 'models' directory to the Python path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'models')))
# Importing necessary autogen classes and SQLite connector
from autogen import AssistantAgent, GroupChat, GroupChatManager
import sqlite3
import pandas as pd
from farmer_advisor import FarmerAdvisor
from market_Researcher import MarketResearcher
from weather_Analyst import WeatherAnalyst
from sustainability_Expert import SustainabilityExpert
import re # For parsing market prices from the message
# Custom AssistantAgent class to override generate_reply
class CustomAssistantAgent(AssistantAgent):
def __init__(self, name, system_message, llm_config):
super().__init__(name=name, system_message=system_message, llm_config=llm_config)
# Instantiate the agent classes
self.farmer_advisor = FarmerAdvisor()
self.market_researcher = MarketResearcher()
db_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'database', 'sustainable_farming.db'))
self.weather_analyst = WeatherAnalyst(db_path=db_path)
self.sustainability_expert = SustainabilityExpert()
# Simulated farm and market inputs
self.simulated_inputs = {
'soil_ph': 6.5, # Neutral soil pH
'soil_moisture': 30.0, # Percentage
'fertilizer': 50.0, # kg/ha
'pesticide': 2.0, # kg/ha
'crop_yield': 3.0, # ton/ha
'temperature': 25.0, # Celsius (initial placeholder, updated by WeatherAnalyst)
'rainfall': 50.0, # mm (initial placeholder, updated by WeatherAnalyst)
'market_features': {
'Demand_Index': 0.5,
'Supply_Index': 0.5,
'Competitor_Price_per_ton': 1000.0,
'Economic_Indicator': 0.8,
'Weather_Impact_Score': 0.7,
'Seasonal_Factor': 'Medium',
'Consumer_Trend_Index': 0.6
}
}
self.sustainability_metrics = {} # To store overall sustainability scores and new metrics
self.final_result = None # To store the final recommendation and chart data
def generate_reply(self, messages=None, sender=None):
if messages is None and sender is not None:
messages = self.chat_messages.get(sender, [])
# Responses for each agent
if self.name == "FarmerAdvisor":
response = self.farmer_advisor_response(messages)
elif self.name == "MarketResearcher":
response = self.market_researcher_response(messages)
elif self.name == "WeatherAnalyst":
response = self.weather_analyst_response(messages)
elif self.name == "SustainabilityExpert":
response = self.sustainability_expert_response(messages)
elif self.name == "CentralCoordinator":
response = self.central_coordinator_logic(messages, sender)
else:
response = "No response available."
# Debug: Log the response
print(f"{self.name} response: {response}")
# Ensure the response is a non-empty string (or dict for CentralCoordinator)
if response is None or (isinstance(response, str) and not response.strip()):
response = f"{self.name}: No valid response generated."
return response
def farmer_advisor_response(self, messages):
initial_message = next((msg["content"] for msg in messages if msg["name"] == "CentralCoordinator"), "")
if "hectare farm with" in initial_message:
parts = initial_message.split("suggest crops based on a ")[1].split(" farm with ")
land_size = float(parts[0].split("-hectare")[0])
soil_type = parts[1].split(" soil and a preference for ")[0].lower()
crop_preference = parts[1].split(" soil and a preference for ")[1].split(".")[0].lower()
# Map soil type to soil pH (simplified mapping)
soil_ph_mapping = {"sandy": 6.0, "loamy": 6.5, "clay": 7.0}
self.simulated_inputs['soil_ph'] = soil_ph_mapping.get(soil_type, 6.5)
# Use WeatherAnalyst's forecast for temperature and rainfall
weather_forecast = self.weather_analyst.forecast(
self.simulated_inputs['soil_ph'],
self.simulated_inputs['soil_moisture'],
self.simulated_inputs['fertilizer'],
self.simulated_inputs['pesticide']
)
self.simulated_inputs['temperature'] = weather_forecast['temperature'][0]
self.simulated_inputs['rainfall'] = weather_forecast['rainfall'][0]
# Recommend crops
recommended_crop = self.farmer_advisor.recommend(
soil_ph=self.simulated_inputs['soil_ph'],
soil_moisture=self.simulated_inputs['soil_moisture'],
temp=self.simulated_inputs['temperature'],
rainfall=self.simulated_inputs['rainfall'],
fertilizer=self.simulated_inputs['fertilizer'],
pesticide=self.simulated_inputs['pesticide'],
crop_yield=self.simulated_inputs['crop_yield']
)
# Suggest a second crop based on crop preference
crop_preference_crops = {
"grains": ["wheat", "corn", "rice", "soybean"],
"vegetables": ["carrots", "tomatoes"],
"fruits": ["apples", "oranges"]
}
suggested_crops = crop_preference_crops.get(crop_preference, ["wheat", "corn"])
if recommended_crop.lower() not in [crop.lower() for crop in suggested_crops]:
suggested_crops[0] = recommended_crop.lower()
return f"Based on a {land_size}-hectare farm with {soil_type} soil and a preference for {crop_preference}, I suggest planting {suggested_crops[0]} and {suggested_crops[1]}."
return "No farm inputs provided to suggest crops."
def market_researcher_response(self, messages):
farmer_response = next((msg["content"] for msg in messages if msg["name"] == "FarmerAdvisor"), "")
if "suggest planting" in farmer_response:
crops = farmer_response.split("suggest planting ")[1].split(" and ")
crops = [crop.strip(".") for crop in crops]
market_insights = []
for crop in crops:
try:
predicted_price = self.market_researcher.forecast(crop, self.simulated_inputs['market_features'])[0]
market_insights.append(f"{crop} is expected to have a market price of ${predicted_price:.2f} per ton")
except ValueError as e:
market_insights.append(f"No market data available for {crop}")
return ", and ".join(market_insights) + "."
return "No crops suggested to provide market insights."
def weather_analyst_response(self, messages):
temp = self.simulated_inputs['temperature']
rainfall = self.simulated_inputs['rainfall']
return f"For the next 3 months, expect a temperature of {temp:.1f}°C and rainfall of {rainfall:.1f} mm."
def sustainability_expert_response(self, messages):
farmer_response = next((msg["content"] for msg in messages if msg["name"] == "FarmerAdvisor"), "")
if "suggest planting" in farmer_response:
crops = farmer_response.split("suggest planting ")[1].split(" and ")
crops = [crop.strip(".") for crop in crops]
# Compute sustainability scores for each crop
sustainability_notes = []
self.sustainability_metrics = {}
for crop in crops:
try:
scores_tuple = self.sustainability_expert.evaluate(
[crop],
soil_ph=self.simulated_inputs['soil_ph'],
soil_moisture=self.simulated_inputs['soil_moisture'],
rainfall=self.simulated_inputs['rainfall'],
fertilizer=self.simulated_inputs['fertilizer'],
pesticide=self.simulated_inputs['pesticide'],
crop_yield=self.simulated_inputs['crop_yield']
)
scores = scores_tuple[1] # Dictionary with sustainability, carbon, water, erosion scores
except Exception as e:
return f"Error evaluating sustainability: {str(e)}"
self.sustainability_metrics[crop] = {
'sustainability_score': scores['sustainability'],
'carbon_score': scores['carbon'],
'water_score': scores['water'],
'erosion_score': scores['erosion']
}
sustainability_notes.append(
f"{crop} has a predicted sustainability score of {scores['sustainability']:.2f} "
f"(Carbon Footprint: {scores['carbon']:.2f}, Water: {scores['water']:.2f}, Erosion: {scores['erosion']:.2f})."
)
return " ".join(sustainability_notes)
return "No crops suggested to evaluate sustainability."
def central_coordinator_logic(self, messages, sender):
# Collect responses from all agents
agent_responses = {}
for message in messages:
sender_name = message.get("name")
content = message.get("content")
if sender_name and content and sender_name != "CentralCoordinator":
agent_responses[sender_name] = content
# Extract crops from FarmerAdvisor
crops = agent_responses.get("FarmerAdvisor", "").split("suggest planting ")[1].split(" and ")
crops = [crop.strip(".") for crop in crops]
# Extract market, weather, and sustainability info
market_info = agent_responses.get("MarketResearcher", "")
weather_info = agent_responses.get("WeatherAnalyst", "")
sustainability_info = agent_responses.get("SustainabilityExpert", "")
# Parse market prices from MarketResearcher's response
market_predictions = {}
for crop in crops:
pattern = rf"{crop} is expected to have a market price of \$([\d\.]+) per ton"
match = re.search(pattern, market_info)
if match:
market_predictions[crop] = float(match.group(1))
else:
market_predictions[crop] = 0.0 # Default if price not found
# Parse sustainability scores from SustainabilityExpert's response
sustainability_scores = {}
for crop in crops:
pattern = rf"{crop} has a predicted sustainability score of ([\d\.]+) \(Carbon Footprint: ([\d\.]+), Water: ([\d\.]+), Erosion: ([\d\.]+)\)"
match = re.search(pattern, sustainability_info)
if match:
sustainability_score = float(match.group(1))
carbon_score = float(match.group(2))
water_score = float(match.group(3))
erosion_score = float(match.group(4))
sustainability_scores[crop] = {
'sustainability_score': sustainability_score,
'carbon_score': carbon_score,
'water_score': water_score,
'erosion_score': erosion_score
}
else:
sustainability_scores[crop] = {
'sustainability_score': 0.5,
'carbon_score': 0.0,
'water_score': 0.0,
'erosion_score': 0.0
}
# Weighted scoring system
weights = {
"market": 0.25, # 25%
"weather": 0.20, # 20%
"sustainability": 0.20, # 20%
"carbon": 0.15, # 15%
"water": 0.10, # 10%
"erosion": 0.10 # 10%
}
crop_scores = {}
for crop in crops:
# Market Score (Profitability): Based on predicted price
market_score = 0.5 # Default
predicted_price = market_predictions.get(crop, 0.0)
market_score = min(predicted_price / 1000.0, 1.0)
# Weather Score (Suitability): Based on temperature and rainfall
temp = float(weather_info.split("temperature of ")[1].split("°C")[0])
rainfall = float(weather_info.split("rainfall of ")[1].split(" mm")[0])
weather_score = 1 - abs(temp - self.simulated_inputs['temperature']) / 50 - abs(rainfall - self.simulated_inputs['rainfall']) / 100
weather_score = max(0, round(weather_score, 2))
# Sustainability Scores
sustainability_metrics = sustainability_scores.get(crop, {
'sustainability_score': 0.5,
'carbon_score': 0.0,
'water_score': 0.0,
'erosion_score': 0.0
})
sustainability_score = sustainability_metrics['sustainability_score']
carbon_score = sustainability_metrics['carbon_score']
water_score = sustainability_metrics['water_score']
erosion_score = sustainability_metrics['erosion_score']
# Total score
total_score = (
weights["market"] * market_score +
weights["weather"] * weather_score +
weights["sustainability"] * sustainability_score +
weights["carbon"] * carbon_score +
weights["water"] * water_score +
weights["erosion"] * erosion_score
)
crop_scores[crop] = {
'total_score': total_score,
'market_score': market_score,
'weather_score': weather_score,
'sustainability_score': sustainability_score,
'carbon_score': carbon_score,
'water_score': water_score,
'erosion_score': erosion_score,
'predicted_temperature': temp,
'predicted_rainfall': rainfall
}
# Rank crops by total score and remove duplicates
seen_crops = set()
unique_crop_scores = []
for crop, scores in sorted(crop_scores.items(), key=lambda x: x[1]['total_score'], reverse=True):
if crop not in seen_crops:
seen_crops.add(crop)
unique_crop_scores.append((crop, scores))
# Generate recommendation with detailed rationale
recommendations = []
for crop, scores in unique_crop_scores:
market_rationale = f"market score: {scores['market_score']:.2f} (${market_predictions.get(crop, 0.0):.2f}/ton)"
weather_rationale = f"weather suitability: {scores['weather_score']:.2f}"
sustainability_rationale = f"sustainability: {scores['sustainability_score']:.2f}"
carbon_rationale = f"carbon footprint: {scores['carbon_score']:.2f}"
water_rationale = f"water: {scores['water_score']:.2f}"
erosion_rationale = f"erosion: {scores['erosion_score']:.2f}"
rationale = (f"Plant {crop}: {market_rationale}, {weather_rationale}, {sustainability_rationale}, "
f"{carbon_rationale}, {water_rationale}, {erosion_rationale} (Final Score: {scores['total_score']:.2f})")
recommendations.append(rationale)
# Combine into final recommendation
final_recommendation = "Recommendations:\n" + "\n".join(recommendations) + f"\n\nDetails:\nMarket Insights: {market_info}\nWeather Forecast: {weather_info}\nSustainability Notes: {sustainability_info}"
# Generate pie chart data for visualization in Streamlit
chart_data = []
for crop, scores in unique_crop_scores:
chart_data.append({
'crop': crop,
'labels': ['Market Score', 'Weather Suitability Score', 'Sustainability Score',
'Carbon Footprint Score', 'Water Score', 'Erosion Score', 'Final Score'],
'values': [
scores['market_score'],
scores['weather_score'],
scores['sustainability_score'],
scores['carbon_score'],
scores['water_score'],
scores['erosion_score'],
scores['total_score']
]
})
# Store in SQLite with new columns for all scores
db_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'database', 'sustainable_farming.db'))
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS recommendations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
crop TEXT,
score REAL,
rationale TEXT,
market_score REAL,
weather_score REAL,
sustainability_score REAL,
carbon_score REAL,
water_score REAL,
erosion_score REAL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
for crop, scores in unique_crop_scores:
cursor.execute(
"INSERT INTO recommendations (crop, score, rationale, market_score, weather_score, sustainability_score, carbon_score, water_score, erosion_score) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
crop,
scores['total_score'],
f"Plant {crop}: market score: {scores['market_score']:.2f}, weather suitability: {scores['weather_score']:.2f}, sustainability: {scores['sustainability_score']:.2f}",
scores['market_score'],
scores['weather_score'],
scores['sustainability_score'],
scores['carbon_score'],
scores['water_score'],
scores['erosion_score']
)
)
conn.commit()
# Store the full result in the instance variable
self.final_result = {
'recommendation': final_recommendation,
'chart_data': chart_data
}
# Return only the recommendation string as the chat message
return final_recommendation
# Define the agents using the custom class
farmer_advisor = CustomAssistantAgent(
name="FarmerAdvisor",
system_message="I am the Farmer Advisor. I process farmer inputs to suggest suitable crops.",
llm_config=False
)
market_researcher = CustomAssistantAgent(
name="MarketResearcher",
system_message="I am the Market Researcher. I analyze market trends to suggest profitable crops.",
llm_config=False
)
weather_analyst = CustomAssistantAgent(
name="WeatherAnalyst",
system_message="I am the Weather Analyst. I predict weather conditions based on farm inputs.",
llm_config=False
)
sustainability_expert = CustomAssistantAgent(
name="SustainabilityExpert",
system_message="I am the Sustainability Expert. I evaluate crops for sustainability.",
llm_config=False
)
central_coordinator = CustomAssistantAgent(
name="CentralCoordinator",
system_message="I am the Central Coordinator. I integrate agent outputs to provide recommendations.",
llm_config=False
)
# Define a custom speaker selection function
def custom_select_speaker(last_speaker, groupchat):
agents = [farmer_advisor, market_researcher, weather_analyst, sustainability_expert, central_coordinator]
if last_speaker is None:
return agents[0]
last_index = agents.index(last_speaker)
next_index = (last_index + 1) % len(agents)
return agents[next_index]
# Set up the group chat for agent interactions
group_chat = GroupChat(
agents=[farmer_advisor, market_researcher, weather_analyst, sustainability_expert, central_coordinator],
messages=[],
max_round=6 # Already set to 6 to allow CentralCoordinator to respond
)
group_chat.select_speaker = custom_select_speaker
group_chat_manager = GroupChatManager(
groupchat=group_chat,
llm_config=False
)
# Function to initiate the group chat with dynamic farmer inputs and return the recommendation
def run_agent_collaboration(land_size, soil_type, crop_preference):
initial_message = (
f"Let’s generate a farming recommendation. "
f"FarmerAdvisor, please suggest crops based on a {land_size}-hectare farm with {soil_type.lower()} soil "
f"and a preference for {crop_preference.lower()}. "
f"MarketResearcher, provide market insights for those crops. "
f"WeatherAnalyst, predict weather for the next 3 months. "
f"SustainabilityExpert, evaluate the sustainability of the suggested crops."
)
# Initiate the chat
central_coordinator.initiate_chat(
group_chat_manager,
message={"content": initial_message, "role": "user"}
)
# Retrieve the final result from the CentralCoordinator instance
result = central_coordinator.final_result
if result is None:
raise ValueError("No recommendation generated by CentralCoordinator.")
return result
if __name__ == "__main__":
result = run_agent_collaboration(land_size=8, soil_type="Loamy", crop_preference="Grains")
print(result['recommendation'])