# app.py import json import openai from typing import Dict, Any, Optional, List from dataclasses import dataclass import logging from openai import OpenAI from dotenv import load_dotenv import os # Import your modules from easy_agents import EASYFARMS_FUNCTION_SCHEMAS, execute_easyfarms_function from alert import WEATHER_TOOLS , execute_function from conversation_manager import ConversationManager # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() @dataclass class Config: """Configuration settings""" api_key: str api_url: str model_name: str max_retries: int = 3 temperature: float = 0.5 @classmethod def from_env(cls): """Load configuration from environment variables""" return cls( api_key=os.getenv("API_KEY"), api_url=os.getenv("API_URL"), model_name=os.getenv("MODEL_NAME") ) class EasyFarmsAssistant: """Enhanced EasyFarms AI Assistant with weather integration and persistent sessions""" def __init__(self, config: Optional[Config] = None, manager: Optional[ConversationManager] = None): """ Initialize the assistant with configuration and a conversation manager. Args: config (Optional[Config]): Configuration object. If None, loads from environment. manager (Optional[ConversationManager]): Manager for handling conversation persistence. """ self.config = config or Config.from_env() # Validate configuration if not all([self.config.api_key, self.config.api_url, self.config.model_name]): raise ValueError("Missing required configuration: API_KEY, API_URL, and MODEL_NAME must be set") self.client = OpenAI( api_key=self.config.api_key, base_url=self.config.api_url ) # All available functions from both modules are combined into the tools list self.tools = self._initialize_tools() # Use the provided conversation manager or create a new one self.manager = manager or ConversationManager() # System prompts self.system_prompt = """You are the AI assistant for EasyForms Agritech Solutions. Your task is to provide users with clear, concise, and actionable responses regarding agriculture, crop management, production, treatment, weather alerts, and related queries. Core Capabilities: - Crop recommendations based on soil and weather conditions - Fertilizer recommendations for specific crops - Plant disease detection and treatment advice - Weather alerts and forecasts for farming decisions - Market data and commodity prices - General agricultural guidance Rules: 1. Check if any relevant function_tools or datasets are available for this query. 2. If available, use the functions to fetch information and generate the final user-facing response. 3. If the functions or data are unavailable, do **not stop**; instead, generate a general, well-reasoned response based on your own knowledge. 4. Keep the response **simple, smooth, well-pointed, and concise**. 5. Structure the response with bullet points or numbered steps where helpful. 6. Provide practical, actionable advice a user can implement immediately. 7. Use English or Hindi based on user preference. 8. If any information is uncertain, mention it clearly and suggest alternatives. 9. For weather-related queries, prioritize safety and timely alerts.""" self.final_system = """You are the final response assistant for EasyForms Agritech Solutions. Use the outputs from previous function calls to generate a **clear, concise, actionable response** for the user. Rules: 1. Combine the function outputs and your own reasoning to answer the query. 2. Keep responses simple, smooth, well-pointed, and concise. 3. Structure response with headings or bullet points if helpful. 4. Provide practical advice that a farmer or user can implement immediately. 5. If some data is missing, clearly state it and offer alternatives. 6. Use English or Hindi based on the user preference. 7. For weather alerts, emphasize urgency and protective measures.""" def _initialize_tools(self) -> List[Dict]: """Initialize and convert all function schemas to the new tools format""" tools = [] # Convert EasyFarms schemas to the new format for schema in EASYFARMS_FUNCTION_SCHEMAS: tool = { "type": "function", "function": { "name": schema["name"], "description": schema["description"], "parameters": schema["parameters"] } } tools.append(tool) # Add weather tools (which are already in the correct format) tools.extend(WEATHER_TOOLS) return tools def call_function(self, function_name: str, arguments: Dict) -> Any: """Route function calls to appropriate handlers with error handling""" try: # Map all available function names to their handlers function_map = { # EasyFarms functions "get_crop_recommendation": lambda args: execute_easyfarms_function("get_crop_recommendation", **args), "get_fertilizer_recommendation": lambda args: execute_easyfarms_function("get_fertilizer_recommendation", **args), "detect_plant_disease": lambda args: execute_easyfarms_function("detect_plant_disease", **args), "get_supported_options": lambda args: execute_easyfarms_function("get_supported_options", **args), "get_market_prices": lambda args: execute_easyfarms_function("get_market_prices", **args), "compare_commodity_prices": lambda args: execute_easyfarms_function("compare_commodity_prices", **args), "get_market_locations": lambda args: execute_easyfarms_function("get_market_locations", **args), "get_commodity_list": lambda args: execute_easyfarms_function("get_commodity_list", **args), # Weather alert functions "get_weather_alerts": lambda args: self._execute_weather_function("get_weather_alerts", **args), "get_weather": lambda args: self._execute_weather_function("get_weather", **args), "get_alert_summary": lambda args: self._execute_weather_function("get_alert_summary", **args), "get_available_locations": lambda args: self._execute_weather_function("get_available_locations", **args) } if function_name in function_map: return function_map[function_name](arguments) else: return {"error": f"Unknown function: {function_name}"} except Exception as e: logger.error(f"Error executing function {function_name}: {e}") return {"error": str(e)} def _execute_weather_function(self, function_name: str, **kwargs): """Helper to execute weather functions from the alert.py module""" from alert import execute_function return execute_function(function_name, kwargs) def process_query(self, user_message: str, session_id: str, image_url: Optional[str] = None) -> str: """ Process user query, correctly reformatting history for the LLM API call. """ try: # MEMORY STEP 1: Fetch the complete past conversation using the session_id. conversation_history = self.manager.get_history(session_id) # Prepare the list that will be sent to the AI messages = [{"role": "system", "content": self.system_prompt}] # MEMORY STEP 2: Loop through the history and add every past message. # This builds the AI's memory of what was said before. for message in conversation_history: if message.get("role") == "user": llm_user_content = message.get("content", "") if message.get("imageUrl"): llm_user_content += f" [image_url: {message.get('imageUrl')}]" messages.append({"role": "user", "content": llm_user_content}) elif message.get("role") == "assistant": messages.append({"role": "assistant", "content": message.get("content", "")}) # MEMORY STEP 3: Add the user's CURRENT message to the end of the history. llm_message_content = user_message if image_url: llm_message_content += f" [image_url: {image_url}]" messages.append({"role": "user", "content": llm_message_content}) # MEMORY STEP 4: Send the entire 'messages' list to the AI. response = self.client.chat.completions.create( model=self.config.model_name, messages=messages, tools=self.tools, tool_choice="auto", temperature=self.config.temperature ) message = response.choices[0].message if hasattr(message, 'tool_calls') and message.tool_calls: # Add the assistant's message with tool calls messages.append({ "role": "assistant", "tool_calls": [ { "id": tool_call.id, "type": "function", "function": { "name": tool_call.function.name, "arguments": tool_call.function.arguments } } for tool_call in message.tool_calls ] }) # Execute all tool calls for tool_call in message.tool_calls: function_name = tool_call.function.name function_args = json.loads(tool_call.function.arguments) logger.info(f"Calling function: {function_name} with args: {function_args}") # Call the function function_result = self.call_function(function_name, function_args) # Add function result to messages messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": json.dumps(function_result) }) # Add final system prompt for generating response messages.append({ "role": "system", "content": self.final_system }) # Get final response final_response = self.client.chat.completions.create( model=self.config.model_name, messages=messages, temperature=self.config.temperature ) response_content = final_response.choices[0].message.content else: response_content = message.content # After getting the response, save the new turns back to the database for the next message. user_turn_for_storage = {"role": "user", "content": user_message} if image_url: user_turn_for_storage["imageUrl"] = image_url updated_history = conversation_history + [ user_turn_for_storage, {"role": "assistant", "content": response_content} ] self.manager.save_history(session_id, updated_history) return response_content except Exception as e: logger.error(f"Error processing query for session {session_id}: {e}") return f"I apologize, but I encountered an error: {str(e)}. Please try again or rephrase your question." def clear_history(self, session_id: str) -> bool: """ Clear conversation history for a specific session from the database. Args: session_id: The ID of the session to clear. Returns: True if deletion was successful, False otherwise. """ logger.info(f"Clearing history for session: {session_id}") return self.manager.delete_history(session_id) # Utility class for generating example queries (can be used for testing) class QuickQueries: """Pre-defined query templates for common farming questions""" @staticmethod def crop_recommendation(N: int, P: int, K: int, temp: float, humidity: float, ph: float = 6.5) -> str: """Generate crop recommendation query""" return f"What crop should I grow with N={N}, P={P}, K={K}, temperature {temp}°C, humidity {humidity}%, pH {ph}?" @staticmethod def fertilizer_query(crop: str, soil: str, N: int, P: int, K: int) -> str: """Generate fertilizer recommendation query""" return f"I need fertilizer recommendation for {crop} in {soil} soil with N={N}, P={P}, K={K}" @staticmethod def weather_alert(location: str = "") -> str: """Generate weather alert query""" location_str = f" for {location}" if location else "" return f"What are the current weather alerts and conditions{location_str}? How will this affect farming?" # Test function to validate configuration def test_configuration(): """Test if all configuration is properly set up""" try: # Check environment variables required_env_vars = ["API_KEY", "API_URL", "MODEL_NAME"] missing_vars = [var for var in required_env_vars if not os.getenv(var)] if missing_vars: print(f"❌ Missing environment variables: {missing_vars}") return False # Test assistant initialization assistant = EasyFarmsAssistant() print("✅ Assistant initialized successfully") # Test function schemas print(f"✅ Loaded {len(assistant.tools)} function tools") return True except Exception as e: print(f"❌ Configuration test failed: {e}") return False if __name__ == "__main__": print("=== EasyFarms Assistant Configuration Test ===") if test_configuration(): print("✅ All systems ready!") else: print("❌ Please fix configuration issues before running the assistant.")