import sys import os # Add the 'models' directory to the Python path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'models'))) # Importing necessary autogen classes and SQLite connector from autogen import AssistantAgent, GroupChat, GroupChatManager import sqlite3 import pandas as pd from farmer_advisor import FarmerAdvisor from market_Researcher import MarketResearcher from weather_Analyst import WeatherAnalyst from sustainability_Expert import SustainabilityExpert import re # For parsing market prices from the message # Custom AssistantAgent class to override generate_reply class CustomAssistantAgent(AssistantAgent): def __init__(self, name, system_message, llm_config): super().__init__(name=name, system_message=system_message, llm_config=llm_config) # Instantiate the agent classes self.farmer_advisor = FarmerAdvisor() self.market_researcher = MarketResearcher() db_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'database', 'sustainable_farming.db')) self.weather_analyst = WeatherAnalyst(db_path=db_path) self.sustainability_expert = SustainabilityExpert() # Simulated farm and market inputs self.simulated_inputs = { 'soil_ph': 6.5, # Neutral soil pH 'soil_moisture': 30.0, # Percentage 'fertilizer': 50.0, # kg/ha 'pesticide': 2.0, # kg/ha 'crop_yield': 3.0, # ton/ha 'temperature': 25.0, # Celsius (initial placeholder, updated by WeatherAnalyst) 'rainfall': 50.0, # mm (initial placeholder, updated by WeatherAnalyst) 'market_features': { 'Demand_Index': 0.5, 'Supply_Index': 0.5, 'Competitor_Price_per_ton': 1000.0, 'Economic_Indicator': 0.8, 'Weather_Impact_Score': 0.7, 'Seasonal_Factor': 'Medium', 'Consumer_Trend_Index': 0.6 } } self.sustainability_metrics = {} # To store overall sustainability scores and new metrics self.final_result = None # To store the final recommendation and chart data def generate_reply(self, messages=None, sender=None): if messages is None and sender is not None: messages = self.chat_messages.get(sender, []) # Responses for each agent if self.name == "FarmerAdvisor": response = self.farmer_advisor_response(messages) elif self.name == "MarketResearcher": response = self.market_researcher_response(messages) elif self.name == "WeatherAnalyst": response = self.weather_analyst_response(messages) elif self.name == "SustainabilityExpert": response = self.sustainability_expert_response(messages) elif self.name == "CentralCoordinator": response = self.central_coordinator_logic(messages, sender) else: response = "No response available." # Debug: Log the response print(f"{self.name} response: {response}") # Ensure the response is a non-empty string (or dict for CentralCoordinator) if response is None or (isinstance(response, str) and not response.strip()): response = f"{self.name}: No valid response generated." return response def farmer_advisor_response(self, messages): initial_message = next((msg["content"] for msg in messages if msg["name"] == "CentralCoordinator"), "") if "hectare farm with" in initial_message: parts = initial_message.split("suggest crops based on a ")[1].split(" farm with ") land_size = float(parts[0].split("-hectare")[0]) soil_type = parts[1].split(" soil and a preference for ")[0].lower() crop_preference = parts[1].split(" soil and a preference for ")[1].split(".")[0].lower() # Map soil type to soil pH (simplified mapping) soil_ph_mapping = {"sandy": 6.0, "loamy": 6.5, "clay": 7.0} self.simulated_inputs['soil_ph'] = soil_ph_mapping.get(soil_type, 6.5) # Use WeatherAnalyst's forecast for temperature and rainfall weather_forecast = self.weather_analyst.forecast( self.simulated_inputs['soil_ph'], self.simulated_inputs['soil_moisture'], self.simulated_inputs['fertilizer'], self.simulated_inputs['pesticide'] ) self.simulated_inputs['temperature'] = weather_forecast['temperature'][0] self.simulated_inputs['rainfall'] = weather_forecast['rainfall'][0] # Recommend crops recommended_crop = self.farmer_advisor.recommend( soil_ph=self.simulated_inputs['soil_ph'], soil_moisture=self.simulated_inputs['soil_moisture'], temp=self.simulated_inputs['temperature'], rainfall=self.simulated_inputs['rainfall'], fertilizer=self.simulated_inputs['fertilizer'], pesticide=self.simulated_inputs['pesticide'], crop_yield=self.simulated_inputs['crop_yield'] ) # Suggest a second crop based on crop preference crop_preference_crops = { "grains": ["wheat", "corn", "rice", "soybean"], "vegetables": ["carrots", "tomatoes"], "fruits": ["apples", "oranges"] } suggested_crops = crop_preference_crops.get(crop_preference, ["wheat", "corn"]) if recommended_crop.lower() not in [crop.lower() for crop in suggested_crops]: suggested_crops[0] = recommended_crop.lower() return f"Based on a {land_size}-hectare farm with {soil_type} soil and a preference for {crop_preference}, I suggest planting {suggested_crops[0]} and {suggested_crops[1]}." return "No farm inputs provided to suggest crops." def market_researcher_response(self, messages): farmer_response = next((msg["content"] for msg in messages if msg["name"] == "FarmerAdvisor"), "") if "suggest planting" in farmer_response: crops = farmer_response.split("suggest planting ")[1].split(" and ") crops = [crop.strip(".") for crop in crops] market_insights = [] for crop in crops: try: predicted_price = self.market_researcher.forecast(crop, self.simulated_inputs['market_features'])[0] market_insights.append(f"{crop} is expected to have a market price of ${predicted_price:.2f} per ton") except ValueError as e: market_insights.append(f"No market data available for {crop}") return ", and ".join(market_insights) + "." return "No crops suggested to provide market insights." def weather_analyst_response(self, messages): temp = self.simulated_inputs['temperature'] rainfall = self.simulated_inputs['rainfall'] return f"For the next 3 months, expect a temperature of {temp:.1f}°C and rainfall of {rainfall:.1f} mm." def sustainability_expert_response(self, messages): farmer_response = next((msg["content"] for msg in messages if msg["name"] == "FarmerAdvisor"), "") if "suggest planting" in farmer_response: crops = farmer_response.split("suggest planting ")[1].split(" and ") crops = [crop.strip(".") for crop in crops] # Compute sustainability scores for each crop sustainability_notes = [] self.sustainability_metrics = {} for crop in crops: try: scores_tuple = self.sustainability_expert.evaluate( [crop], soil_ph=self.simulated_inputs['soil_ph'], soil_moisture=self.simulated_inputs['soil_moisture'], rainfall=self.simulated_inputs['rainfall'], fertilizer=self.simulated_inputs['fertilizer'], pesticide=self.simulated_inputs['pesticide'], crop_yield=self.simulated_inputs['crop_yield'] ) scores = scores_tuple[1] # Dictionary with sustainability, carbon, water, erosion scores except Exception as e: return f"Error evaluating sustainability: {str(e)}" self.sustainability_metrics[crop] = { 'sustainability_score': scores['sustainability'], 'carbon_score': scores['carbon'], 'water_score': scores['water'], 'erosion_score': scores['erosion'] } sustainability_notes.append( f"{crop} has a predicted sustainability score of {scores['sustainability']:.2f} " f"(Carbon Footprint: {scores['carbon']:.2f}, Water: {scores['water']:.2f}, Erosion: {scores['erosion']:.2f})." ) return " ".join(sustainability_notes) return "No crops suggested to evaluate sustainability." def central_coordinator_logic(self, messages, sender): # Collect responses from all agents agent_responses = {} for message in messages: sender_name = message.get("name") content = message.get("content") if sender_name and content and sender_name != "CentralCoordinator": agent_responses[sender_name] = content # Extract crops from FarmerAdvisor crops = agent_responses.get("FarmerAdvisor", "").split("suggest planting ")[1].split(" and ") crops = [crop.strip(".") for crop in crops] # Extract market, weather, and sustainability info market_info = agent_responses.get("MarketResearcher", "") weather_info = agent_responses.get("WeatherAnalyst", "") sustainability_info = agent_responses.get("SustainabilityExpert", "") # Parse market prices from MarketResearcher's response market_predictions = {} for crop in crops: pattern = rf"{crop} is expected to have a market price of \$([\d\.]+) per ton" match = re.search(pattern, market_info) if match: market_predictions[crop] = float(match.group(1)) else: market_predictions[crop] = 0.0 # Default if price not found # Parse sustainability scores from SustainabilityExpert's response sustainability_scores = {} for crop in crops: pattern = rf"{crop} has a predicted sustainability score of ([\d\.]+) \(Carbon Footprint: ([\d\.]+), Water: ([\d\.]+), Erosion: ([\d\.]+)\)" match = re.search(pattern, sustainability_info) if match: sustainability_score = float(match.group(1)) carbon_score = float(match.group(2)) water_score = float(match.group(3)) erosion_score = float(match.group(4)) sustainability_scores[crop] = { 'sustainability_score': sustainability_score, 'carbon_score': carbon_score, 'water_score': water_score, 'erosion_score': erosion_score } else: sustainability_scores[crop] = { 'sustainability_score': 0.5, 'carbon_score': 0.0, 'water_score': 0.0, 'erosion_score': 0.0 } # Weighted scoring system weights = { "market": 0.25, # 25% "weather": 0.20, # 20% "sustainability": 0.20, # 20% "carbon": 0.15, # 15% "water": 0.10, # 10% "erosion": 0.10 # 10% } crop_scores = {} for crop in crops: # Market Score (Profitability): Based on predicted price market_score = 0.5 # Default predicted_price = market_predictions.get(crop, 0.0) market_score = min(predicted_price / 1000.0, 1.0) # Weather Score (Suitability): Based on temperature and rainfall temp = float(weather_info.split("temperature of ")[1].split("°C")[0]) rainfall = float(weather_info.split("rainfall of ")[1].split(" mm")[0]) weather_score = 1 - abs(temp - self.simulated_inputs['temperature']) / 50 - abs(rainfall - self.simulated_inputs['rainfall']) / 100 weather_score = max(0, round(weather_score, 2)) # Sustainability Scores sustainability_metrics = sustainability_scores.get(crop, { 'sustainability_score': 0.5, 'carbon_score': 0.0, 'water_score': 0.0, 'erosion_score': 0.0 }) sustainability_score = sustainability_metrics['sustainability_score'] carbon_score = sustainability_metrics['carbon_score'] water_score = sustainability_metrics['water_score'] erosion_score = sustainability_metrics['erosion_score'] # Total score total_score = ( weights["market"] * market_score + weights["weather"] * weather_score + weights["sustainability"] * sustainability_score + weights["carbon"] * carbon_score + weights["water"] * water_score + weights["erosion"] * erosion_score ) crop_scores[crop] = { 'total_score': total_score, 'market_score': market_score, 'weather_score': weather_score, 'sustainability_score': sustainability_score, 'carbon_score': carbon_score, 'water_score': water_score, 'erosion_score': erosion_score, 'predicted_temperature': temp, 'predicted_rainfall': rainfall } # Rank crops by total score and remove duplicates seen_crops = set() unique_crop_scores = [] for crop, scores in sorted(crop_scores.items(), key=lambda x: x[1]['total_score'], reverse=True): if crop not in seen_crops: seen_crops.add(crop) unique_crop_scores.append((crop, scores)) # Generate recommendation with detailed rationale recommendations = [] for crop, scores in unique_crop_scores: market_rationale = f"market score: {scores['market_score']:.2f} (${market_predictions.get(crop, 0.0):.2f}/ton)" weather_rationale = f"weather suitability: {scores['weather_score']:.2f}" sustainability_rationale = f"sustainability: {scores['sustainability_score']:.2f}" carbon_rationale = f"carbon footprint: {scores['carbon_score']:.2f}" water_rationale = f"water: {scores['water_score']:.2f}" erosion_rationale = f"erosion: {scores['erosion_score']:.2f}" rationale = (f"Plant {crop}: {market_rationale}, {weather_rationale}, {sustainability_rationale}, " f"{carbon_rationale}, {water_rationale}, {erosion_rationale} (Final Score: {scores['total_score']:.2f})") recommendations.append(rationale) # Combine into final recommendation final_recommendation = "Recommendations:\n" + "\n".join(recommendations) + f"\n\nDetails:\nMarket Insights: {market_info}\nWeather Forecast: {weather_info}\nSustainability Notes: {sustainability_info}" # Generate pie chart data for visualization in Streamlit chart_data = [] for crop, scores in unique_crop_scores: chart_data.append({ 'crop': crop, 'labels': ['Market Score', 'Weather Suitability Score', 'Sustainability Score', 'Carbon Footprint Score', 'Water Score', 'Erosion Score', 'Final Score'], 'values': [ scores['market_score'], scores['weather_score'], scores['sustainability_score'], scores['carbon_score'], scores['water_score'], scores['erosion_score'], scores['total_score'] ] }) # Store in SQLite with new columns for all scores db_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'database', 'sustainable_farming.db')) with sqlite3.connect(db_path) as conn: cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS recommendations ( id INTEGER PRIMARY KEY AUTOINCREMENT, crop TEXT, score REAL, rationale TEXT, market_score REAL, weather_score REAL, sustainability_score REAL, carbon_score REAL, water_score REAL, erosion_score REAL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) """) for crop, scores in unique_crop_scores: cursor.execute( "INSERT INTO recommendations (crop, score, rationale, market_score, weather_score, sustainability_score, carbon_score, water_score, erosion_score) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", ( crop, scores['total_score'], f"Plant {crop}: market score: {scores['market_score']:.2f}, weather suitability: {scores['weather_score']:.2f}, sustainability: {scores['sustainability_score']:.2f}", scores['market_score'], scores['weather_score'], scores['sustainability_score'], scores['carbon_score'], scores['water_score'], scores['erosion_score'] ) ) conn.commit() # Store the full result in the instance variable self.final_result = { 'recommendation': final_recommendation, 'chart_data': chart_data } # Return only the recommendation string as the chat message return final_recommendation # Define the agents using the custom class farmer_advisor = CustomAssistantAgent( name="FarmerAdvisor", system_message="I am the Farmer Advisor. I process farmer inputs to suggest suitable crops.", llm_config=False ) market_researcher = CustomAssistantAgent( name="MarketResearcher", system_message="I am the Market Researcher. I analyze market trends to suggest profitable crops.", llm_config=False ) weather_analyst = CustomAssistantAgent( name="WeatherAnalyst", system_message="I am the Weather Analyst. I predict weather conditions based on farm inputs.", llm_config=False ) sustainability_expert = CustomAssistantAgent( name="SustainabilityExpert", system_message="I am the Sustainability Expert. I evaluate crops for sustainability.", llm_config=False ) central_coordinator = CustomAssistantAgent( name="CentralCoordinator", system_message="I am the Central Coordinator. I integrate agent outputs to provide recommendations.", llm_config=False ) # Define a custom speaker selection function def custom_select_speaker(last_speaker, groupchat): agents = [farmer_advisor, market_researcher, weather_analyst, sustainability_expert, central_coordinator] if last_speaker is None: return agents[0] last_index = agents.index(last_speaker) next_index = (last_index + 1) % len(agents) return agents[next_index] # Set up the group chat for agent interactions group_chat = GroupChat( agents=[farmer_advisor, market_researcher, weather_analyst, sustainability_expert, central_coordinator], messages=[], max_round=6 # Already set to 6 to allow CentralCoordinator to respond ) group_chat.select_speaker = custom_select_speaker group_chat_manager = GroupChatManager( groupchat=group_chat, llm_config=False ) # Function to initiate the group chat with dynamic farmer inputs and return the recommendation def run_agent_collaboration(land_size, soil_type, crop_preference): initial_message = ( f"Let’s generate a farming recommendation. " f"FarmerAdvisor, please suggest crops based on a {land_size}-hectare farm with {soil_type.lower()} soil " f"and a preference for {crop_preference.lower()}. " f"MarketResearcher, provide market insights for those crops. " f"WeatherAnalyst, predict weather for the next 3 months. " f"SustainabilityExpert, evaluate the sustainability of the suggested crops." ) # Initiate the chat central_coordinator.initiate_chat( group_chat_manager, message={"content": initial_message, "role": "user"} ) # Retrieve the final result from the CentralCoordinator instance result = central_coordinator.final_result if result is None: raise ValueError("No recommendation generated by CentralCoordinator.") return result if __name__ == "__main__": result = run_agent_collaboration(land_size=8, soil_type="Loamy", crop_preference="Grains") print(result['recommendation'])