# app.py - Enhanced with unlimited multi-function execution import json import openai from typing import Dict, Any, Optional, List from dataclasses import dataclass import logging from openai import OpenAI from dotenv import load_dotenv import os # Import your modules from easy_agents import EASYFARMS_FUNCTION_SCHEMAS, execute_easyfarms_function from alert import WEATHER_TOOLS , execute_function from conversation_manager import ConversationManager # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() @dataclass class Config: """Configuration settings""" api_key: str api_url: str model_name: str max_retries: int = 3 temperature: float = 0.5 max_function_rounds: int = 5 # Maximum rounds of function calling to prevent infinite loops @classmethod def from_env(cls): """Load configuration from environment variables""" return cls( api_key=os.getenv("API_KEY"), api_url=os.getenv("API_URL"), model_name=os.getenv("MODEL_NAME") ) class EasyFarmsAssistant: """Enhanced EasyFarms AI Assistant with unlimited multi-function execution""" def __init__(self, config: Optional[Config] = None, manager: Optional[ConversationManager] = None): """ Initialize the assistant with configuration and a conversation manager. Args: config (Optional[Config]): Configuration object. If None, loads from environment. manager (Optional[ConversationManager]): Manager for handling conversation persistence. """ self.config = config or Config.from_env() # Validate configuration if not all([self.config.api_key, self.config.api_url, self.config.model_name]): raise ValueError("Missing required configuration: API_KEY, API_URL, and MODEL_NAME must be set") self.client = OpenAI( api_key=self.config.api_key, base_url=self.config.api_url ) # All available functions from both modules are combined into the tools list self.tools = self._initialize_tools() # Use the provided conversation manager or create a new one self.manager = manager or ConversationManager() # Enhanced system prompt for multi-function execution self.system_prompt = """You are the AI assistant for EasyFarms Agritech Solutions and your name is Dhara. Your task is to provide users with clear, concise, and actionable responses regarding agriculture, crop management, production, treatment, weather alerts, and related queries. Core Capabilities: - Crop recommendations based on soil and weather conditions - Fertilizer recommendations for specific crops - Plant disease detection and treatment advice - Weather alerts and forecasts for farming decisions - Market data and commodity prices - General agricultural guidance IMPORTANT - Multi-Function Execution Guidelines: 1. **You can and SHOULD use multiple functions in a single response** when it provides better value to the user. 2. **You can call the same function multiple times** with different parameters to compare or analyze different scenarios. 3. **Think comprehensively** - if a user asks about crops AND market prices, use both functions. 4. **Make comparisons** - if asked to compare, call the function multiple times with different parameters. 5. **Provide complete solutions** - gather all necessary data before responding. Example Multi-Function Scenarios: - "Compare wheat and rice prices" → Call get_market_prices twice (once for wheat, once for rice) - "What crop should I grow and what's its market price?" → Call get_crop_recommendation AND get_market_prices - "Check prices in Gujarat and Maharashtra" → Call get_market_prices multiple times for different states - "Recommend crops for sandy and loamy soil" → Call get_crop_recommendation multiple times - "Give fertilizer advice and check market prices for wheat" → Call both functions Response Rules: 1. Use all available function_tools when they can help answer the query comprehensively. 2. If comparing multiple items, execute the function for each item separately. 3. If data is unavailable from functions, supplement with your own agricultural knowledge. 4. Present results clearly - use tables, comparisons, or bullet points as appropriate. 5. Keep responses concise but comprehensive. 6. Provide practical, actionable advice. 7. Use English or Hindi based on user preference. 8. For weather-related queries, prioritize safety and timely alerts. Remember: Don't hesitate to use multiple functions - it's better to provide complete information than partial data.""" self.final_system = """ You are the Final Response Generator for EasyFarms Agritech Solutions - an AI assistant helping Indian farmers make informed agricultural decisions Which name is Dhara AI. CORE ROLE: You receive structured data from multiple backend functions and synthesize it into clear, farmer-friendly responses. NEVER reveal technical implementation details (function names, parameters, API calls, error codes). TONE & VOICE: - Professional yet approachable - Use simple agricultural terminology (avoid jargon) - Empathetic to farmer challenges - Confident in recommendations - Supportive and encouraging RESPONSE SYNTHESIS PROTOCOL: 1. DATA ANALYSIS - Parse all function outputs systematically - Identify primary data vs. supplementary data - Note data quality, recency, and completeness - Flag inconsistencies or suspicious values 2. INTELLIGENT STRUCTURING Choose format based on query type: 📊 PRICE COMPARISONS → Comparison tables Format: | Commodity | Market | Price (₹/quintal) | Trend | 🌾 CROP RECOMMENDATIONS → Prioritized list with reasoning Format: 1. [Crop Name] - [Why it's suitable] (Expected yield: X, ROI: Y%) 📅 SEQUENTIAL ADVICE → Numbered steps Format: Step 1: [Action] - [Timing] - [Why] 🔍 ANALYSIS/INSIGHTS → Structured paragraphs with headers ⚠️ ALERTS/WARNINGS → Highlighted callout boxes 3. COMPARISON CREATION When multiple similar data points exist: - Create side-by-side comparisons - Highlight best options (mark with ⭐ or "Recommended") - Show differences clearly (price gaps, yield variations) - Add "Winner" or "Best for..." labels 4. INSIGHT EXTRACTION - Identify trends (prices rising/falling, seasonal patterns) - Spot opportunities (underpriced markets, high-demand crops) - Flag risks (weather warnings, pest alerts, price volatility) - Provide context (historical comparisons, regional norms) 5. HANDLING INCOMPLETE/ERROR DATA ✅ If 80%+ functions succeed: - Generate response with available data - Briefly note: "Some information unavailable, showing available data" ⚠️ If 50-79% functions succeed: - Provide partial response - State: "Limited data available. Showing what we found + [suggest alternative]" ❌ If <50% functions succeed: - Acknowledge the limitation - Offer alternatives: "Unable to fetch complete data right now. You can try: [alternatives]" NEVER reveal: "Function X failed" or "API error" or "Timeout in service Y" INSTEAD say: "Some information is temporarily unavailable" 6. RESPONSE LENGTH GUIDELINES - Simple queries (price check): 3-5 sentences + table - Moderate queries (crop advice): 8-12 sentences + formatted list - Complex queries (full planning): 15-20 sentences + multiple sections - Always prioritize clarity over brevity 7. ACTIONABLE RECOMMENDATIONS Every response MUST end with: - "Next Steps:" or "Action Items:" or "What You Can Do:" - Clear, numbered actions (max 3-5) - Include timing when relevant ("within 2 weeks", "before monsoon") - Add contact info for complex issues: "For personalized advice, contact our agronomist" 8. DATA PRESENTATION RULES - Use Indian number formats: ₹2,50,000 not $2500 - Use Indian units: quintal, acre, bigha (convert if needed) - Show dates as: "15 March 2025" or "15-Mar-2025" - Round prices sensibly: ₹2,450/quintal not ₹2,449.67 - Include units ALWAYS: "25 quintal/acre" not just "25" 9. CONTEXT & PERSONALIZATION If user context available (location, farm size, previous queries): - Reference it naturally: "For your 5-acre farm in Punjab..." - Tailor recommendations: "Based on your previous wheat cultivation..." - Don't repeat already-known info 10. QUALITY CHECKS BEFORE RESPONDING ❌ Avoid: - Technical jargon (API, JSON, function calls, parameters) - Contradictory advice - Unsupported claims ("best in India" without data) - Overly complex tables (max 6 columns) - Wall of text (break into sections) ✅ Ensure: - All numbers have units - All recommendations have reasoning - Tone is consistent - Response directly answers the query - Next steps are actionable SPECIAL HANDLING: 🌐 MULTILINGUAL SUPPORT: - If query in Hindi/regional language, respond in same language - Use romanized Hindi if needed: "aapke khet ke liye" 💰 FINANCIAL ADVICE: - Always show ROI or profit estimates when discussing crops - Include risk factors: "High reward but weather-dependent" - Mention subsidies/schemes if applicable 🌦️ WEATHER-DEPENDENT INFO: - Always include "as of [date]" for weather/price data - Add disclaimers: "Subject to change based on weather" 📍 LOCATION-SPECIFIC: - Prioritize local mandi prices over distant ones - Mention transportation costs if comparing distant markets - Reference local varieties: "Use PB-1509 variety popular in your region" ERROR MESSAGE TEMPLATES: - "We're currently updating our price database. Please try again in a few minutes." - "Some market data is temporarily unavailable. Here's what we have..." - "Unable to access complete information right now. For urgent queries, call [helpline]." EXAMPLE TRANSFORMATIONS: ❌ BAD: "The get_mandi_price() function returned null for location parameter 'Punjab'" ✅ GOOD: "Mandi prices for Punjab are currently being updated. Check back shortly." ❌ BAD: "Function results: [{crop: wheat, price: 2000}, {crop: rice, price: 2500}]" ✅ GOOD: "Current Mandi Prices: - Wheat: ₹2,000/quintal - Rice: ₹2,500/quintal" ❌ BAD: "Based on the ML model output with 87% confidence..." ✅ GOOD: "Based on current conditions and historical data, we recommend..." REMEMBER: You are the user-facing voice of EasyFarms. Be helpful, trustworthy, and farmer-focused. Your goal is to help farmers make profitable decisions, not to showcase technical capabilities. """ def _initialize_tools(self) -> List[Dict]: """Initialize and convert all function schemas to the new tools format""" tools = [] # Convert EasyFarms schemas to the new format for schema in EASYFARMS_FUNCTION_SCHEMAS: tool = { "type": "function", "function": { "name": schema["name"], "description": schema["description"], "parameters": schema["parameters"] } } tools.append(tool) # Add weather tools (which are already in the correct format) tools.extend(WEATHER_TOOLS) return tools def call_function(self, function_name: str, arguments: Dict) -> Any: """Route function calls to appropriate handlers with error handling""" try: # Map all available function names to their handlers function_map = { # EasyFarms functions "get_crop_recommendation": lambda args: execute_easyfarms_function("get_crop_recommendation", **args), "get_fertilizer_recommendation": lambda args: execute_easyfarms_function("get_fertilizer_recommendation", **args), "detect_plant_disease": lambda args: execute_easyfarms_function("detect_plant_disease", **args), "get_supported_options": lambda args: execute_easyfarms_function("get_supported_options", **args), "get_market_prices": lambda args: execute_easyfarms_function("get_market_prices", **args), "compare_commodity_prices": lambda args: execute_easyfarms_function("compare_commodity_prices", **args), "get_market_locations": lambda args: execute_easyfarms_function("get_market_locations", **args), "get_commodity_list": lambda args: execute_easyfarms_function("get_commodity_list", **args), # Weather alert functions "get_weather_alerts": lambda args: self._execute_weather_function("get_weather_alerts", **args), "get_weather": lambda args: self._execute_weather_function("get_weather", **args), "get_alert_summary": lambda args: self._execute_weather_function("get_alert_summary", **args), "get_available_locations": lambda args: self._execute_weather_function("get_available_locations", **args) } if function_name in function_map: return function_map[function_name](arguments) else: return {"error": f"Unknown function: {function_name}"} except Exception as e: logger.error(f"Error executing function {function_name}: {e}") return {"error": str(e)} def _execute_weather_function(self, function_name: str, **kwargs): """Helper to execute weather functions from the alert.py module""" from alert import execute_function return execute_function(function_name, kwargs) def _execute_tool_calls_round(self, messages: List[Dict], tool_calls) -> tuple[List[Dict], int]: """ Execute a round of tool calls and return updated messages with function count Args: messages: Current message history tool_calls: Tool calls to execute Returns: Tuple of (updated messages, number of functions executed) """ function_count = 0 # Add the assistant's message with tool calls messages.append({ "role": "assistant", "tool_calls": [ { "id": tool_call.id, "type": "function", "function": { "name": tool_call.function.name, "arguments": tool_call.function.arguments } } for tool_call in tool_calls ] }) # Execute all tool calls for tool_call in tool_calls: function_name = tool_call.function.name function_args = json.loads(tool_call.function.arguments) logger.info(f"Calling function: {function_name} with args: {function_args}") # Call the function function_result = self.call_function(function_name, function_args) function_count += 1 # Add function result to messages messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": json.dumps(function_result) }) return messages, function_count def process_query(self, user_message: str, user_id: str, chat_id: Optional[str] = None, image_url: Optional[str] = None) -> Dict[str, Any]: """ Process user query with unlimited multi-function execution capability Args: user_message: The user's message user_id: The user ID for authentication and isolation chat_id: Optional chat ID. If None, generates a new one image_url: Optional image URL Returns: Dictionary containing response, chat_id, and message IDs """ try: # Validate user_id if not user_id: return { "error": "User ID is required for authentication", "chat_id": None, "is_new_chat": True, "user_message_id": None, "assistant_message_id": None, "total_messages": 0, "functions_executed": 0 } # Handle chat ID if not chat_id: chat_id = self.manager.generate_chat_id(user_id) is_new_chat = True logger.info(f"Generated new chat ID: {chat_id} for user: {user_id}") else: is_new_chat = not self.manager.chat_exists(chat_id, user_id) if is_new_chat: logger.info(f"Creating new chat with provided ID: {chat_id} for user: {user_id}") else: logger.info(f"Continuing existing chat: {chat_id} for user: {user_id}") # Get conversation history for this user conversation_history = self.manager.get_history(chat_id, user_id) # Prepare messages for AI messages = [{"role": "system", "content": self.system_prompt}] # Add conversation history for message in conversation_history: if message.get("role") == "user": llm_user_content = message.get("content", "") if message.get("imageUrl"): llm_user_content += f" [image_url: {message.get('imageUrl')}]" messages.append({"role": "user", "content": llm_user_content}) elif message.get("role") == "assistant": messages.append({"role": "assistant", "content": message.get("content", "")}) # Add current user message llm_message_content = user_message if image_url: llm_message_content += f" [image_url: {image_url}]" messages.append({"role": "user", "content": llm_message_content}) # Track total functions executed total_functions_executed = 0 # Iterative function calling - allow multiple rounds for round_num in range(self.config.max_function_rounds): logger.info(f"Function calling round {round_num + 1}/{self.config.max_function_rounds}") # Make API call response = self.client.chat.completions.create( model=self.config.model_name, messages=messages, tools=self.tools, tool_choice="auto", temperature=self.config.temperature ) message = response.choices[0].message # Check if there are tool calls if hasattr(message, 'tool_calls') and message.tool_calls: logger.info(f"Round {round_num + 1}: Executing {len(message.tool_calls)} function(s)") # Execute this round of tool calls messages, functions_executed = self._execute_tool_calls_round(messages, message.tool_calls) total_functions_executed += functions_executed # Continue to next round to see if AI wants to call more functions continue else: # No more tool calls, we have the final response response_content = message.content logger.info(f"Function calling completed after {round_num + 1} round(s). Total functions executed: {total_functions_executed}") break else: # Max rounds reached, add final system prompt and get response logger.warning(f"Max function rounds ({self.config.max_function_rounds}) reached") messages.append({ "role": "system", "content": self.final_system }) # Get final response final_response = self.client.chat.completions.create( model=self.config.model_name, messages=messages, temperature=self.config.temperature ) response_content = final_response.choices[0].message.content # If functions were executed, add final synthesis prompt if total_functions_executed > 0 and not response_content: messages.append({ "role": "system", "content": self.final_system }) final_response = self.client.chat.completions.create( model=self.config.model_name, messages=messages, temperature=self.config.temperature ) response_content = final_response.choices[0].message.content # Add messages to conversation history with unique IDs user_msg = self.manager.add_message(chat_id, user_id, "user", user_message, image_url) assistant_msg = self.manager.add_message(chat_id, user_id, "assistant", response_content) return { "response": response_content, "chat_id": chat_id, "is_new_chat": is_new_chat, "user_message_id": user_msg.get("message_id"), "assistant_message_id": assistant_msg.get("message_id"), "total_messages": len(self.manager.get_history(chat_id, user_id)), "functions_executed": total_functions_executed } except Exception as e: logger.error(f"Error processing query for chat {chat_id}, user {user_id}: {e}") return { "error": f"I apologize, but I encountered an error: {str(e)}. Please try again or rephrase your question.", "chat_id": chat_id or self.manager.generate_chat_id(user_id) if user_id else None, "is_new_chat": True, "user_message_id": None, "assistant_message_id": None, "total_messages": 0, "functions_executed": 0 } def get_chat_info(self, chat_id: str, user_id: str) -> Dict[str, Any]: """ Get information about a specific chat for a specific user Args: chat_id: The chat ID to get information for user_id: The user ID for authentication Returns: Dictionary with chat information """ return self.manager.get_chat_info(chat_id, user_id) def get_all_chats(self, user_id: str) -> List[Dict[str, Any]]: """ Get all chat sessions for a specific user Args: user_id: The user ID to get sessions for Returns: List of chat session information """ return self.manager.get_all_chat_sessions(user_id) def clear_history(self, chat_id: str, user_id: str) -> bool: """ Clear conversation history for a specific chat and user Args: chat_id: The ID of the chat to clear user_id: The user ID for authentication Returns: True if deletion was successful, False otherwise """ logger.info(f"Clearing history for chat: {chat_id}, user: {user_id}") return self.manager.delete_history(chat_id, user_id) def get_messages(self, chat_id: str, user_id: str) -> List[Dict[str, Any]]: """ Get all messages for a specific chat and user Args: chat_id: The chat ID to get messages for user_id: The user ID for authentication Returns: List of messages with their IDs """ return self.manager.get_history(chat_id, user_id) # Utility class for generating example queries (can be used for testing) class QuickQueries: """Pre-defined query templates for common farming questions with multi-function examples""" @staticmethod def crop_recommendation(N: int, P: int, K: int, temp: float, humidity: float, ph: float = 6.5) -> str: """Generate crop recommendation query""" return f"What crop should I grow with N={N}, P={P}, K={K}, temperature {temp}°C, humidity {humidity}%, pH {ph}?" @staticmethod def fertilizer_query(crop: str, soil: str, N: int, P: int, K: int) -> str: """Generate fertilizer recommendation query""" return f"I need fertilizer recommendation for {crop} in {soil} soil with N={N}, P={P}, K={K}" @staticmethod def weather_alert(location: str = "") -> str: """Generate weather alert query""" location_str = f" for {location}" if location else "" return f"What are the current weather alerts and conditions{location_str}? How will this affect farming?" @staticmethod def multi_function_queries() -> List[str]: """Generate example multi-function queries""" return [ # Multiple function calls - same function multiple times "Compare wheat and rice market prices in Gujarat", "What are the market prices for tomato in Gujarat, Maharashtra, and Punjab?", "Recommend crops for sandy soil and loamy soil, which one is better?", # Multiple function calls - different functions "What crop should I grow with N=90, P=42, K=43, temperature 25°C, humidity 80% and what's the market price for that crop?", "Give me crop recommendations and current weather alerts for my region", "I want to grow wheat, tell me fertilizer recommendations and current market prices", # Complex multi-function queries "Compare tomato and potato prices across different states and tell me which crop has better market potential", "What are the best crops for my soil (N=80, P=40, K=50, temp=22°C, humidity=75%) and their current market prices?", "Give me weather alerts, crop recommendations for my conditions, and market prices for the recommended crops", # Sequential same function calls "Get market prices for wheat, rice, maize, and sugarcane in Gujarat", "Compare fertilizer requirements for wheat in black soil, red soil, and sandy soil", ] # Test function to validate configuration def test_configuration(): """Test if all configuration is properly set up""" try: # Check environment variables required_env_vars = ["API_KEY", "API_URL", "MODEL_NAME"] missing_vars = [var for var in required_env_vars if not os.getenv(var)] if missing_vars: print(f"Missing environment variables: {missing_vars}") return False # Test assistant initialization assistant = EasyFarmsAssistant() print("Assistant initialized successfully") # Test function schemas print(f"Loaded {len(assistant.tools)} function tools") return True except Exception as e: print(f"Configuration test failed: {e}") return False # Test the multi-function execution def test_multi_function_execution(): """Test the enhanced multi-function execution""" try: assistant = EasyFarmsAssistant() print("\n=== Testing Multi-Function Execution ===\n") # Test 1: Single function call print("Test 1: Single Function Call") result1 = assistant.process_query( "What crop should I grow with N=90, P=42, K=43, temperature 25°C, humidity 80%?", "test_user_multi" ) print(f"Functions executed: {result1.get('functions_executed', 0)}") print(f"Response preview: {result1['response'][:200]}...\n") # Test 2: Multiple different functions print("Test 2: Multiple Different Functions") result2 = assistant.process_query( "What crop should I grow with N=90, P=42, K=43, temperature 25°C, humidity 80% and what's the current market price for wheat?", "test_user_multi" ) print(f"Functions executed: {result2.get('functions_executed', 0)}") print(f"Response preview: {result2['response'][:200]}...\n") # Test 3: Same function multiple times print("Test 3: Same Function Multiple Times") result3 = assistant.process_query( "Compare wheat and rice market prices in Gujarat", "test_user_multi" ) print(f"Functions executed: {result3.get('functions_executed', 0)}") print(f"Response preview: {result3['response'][:200]}...\n") print("✅ Multi-function execution tests completed!") return True except Exception as e: print(f"❌ Multi-function test failed: {e}") return False if __name__ == "__main__": print("=== EasyFarms Assistant Configuration Test ===") if test_configuration(): print("✅ Basic configuration ready!") print("\n=== Testing Multi-Function Execution ===") if test_multi_function_execution(): print("✅ All systems ready!") else: print("⚠️ Basic functions work, but multi-function execution needs attention") else: print("❌ Please fix configuration issues before running the assistant.")