Spaces:
Sleeping
Sleeping
| # app.py - Enhanced with unlimited multi-function execution | |
| import json | |
| import openai | |
| from typing import Dict, Any, Optional, List | |
| from dataclasses import dataclass | |
| import logging | |
| from openai import OpenAI | |
| from dotenv import load_dotenv | |
| import os | |
| # Import your modules | |
| from easy_agents import EASYFARMS_FUNCTION_SCHEMAS, execute_easyfarms_function | |
| from alert import WEATHER_TOOLS , execute_function | |
| from conversation_manager import ConversationManager | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables | |
| load_dotenv() | |
| class Config: | |
| """Configuration settings""" | |
| api_key: str | |
| api_url: str | |
| model_name: str | |
| max_retries: int = 3 | |
| temperature: float = 0.5 | |
| max_function_rounds: int = 5 # Maximum rounds of function calling to prevent infinite loops | |
| def from_env(cls): | |
| """Load configuration from environment variables""" | |
| return cls( | |
| api_key=os.getenv("API_KEY"), | |
| api_url=os.getenv("API_URL"), | |
| model_name=os.getenv("MODEL_NAME") | |
| ) | |
| class EasyFarmsAssistant: | |
| """Enhanced EasyFarms AI Assistant with unlimited multi-function execution""" | |
| def __init__(self, config: Optional[Config] = None, manager: Optional[ConversationManager] = None): | |
| """ | |
| Initialize the assistant with configuration and a conversation manager. | |
| Args: | |
| config (Optional[Config]): Configuration object. If None, loads from environment. | |
| manager (Optional[ConversationManager]): Manager for handling conversation persistence. | |
| """ | |
| self.config = config or Config.from_env() | |
| # Validate configuration | |
| if not all([self.config.api_key, self.config.api_url, self.config.model_name]): | |
| raise ValueError("Missing required configuration: API_KEY, API_URL, and MODEL_NAME must be set") | |
| self.client = OpenAI( | |
| api_key=self.config.api_key, | |
| base_url=self.config.api_url | |
| ) | |
| # All available functions from both modules are combined into the tools list | |
| self.tools = self._initialize_tools() | |
| # Use the provided conversation manager or create a new one | |
| self.manager = manager or ConversationManager() | |
| # Enhanced system prompt for multi-function execution | |
| self.system_prompt = """You are the AI assistant for EasyForms Agritech Solutions. Your task is to provide users with clear, concise, and actionable responses regarding agriculture, crop management, production, treatment, weather alerts, and related queries. | |
| Core Capabilities: | |
| - Crop recommendations based on soil and weather conditions | |
| - Fertilizer recommendations for specific crops | |
| - Plant disease detection and treatment advice | |
| - Weather alerts and forecasts for farming decisions | |
| - Market data and commodity prices | |
| - General agricultural guidance | |
| IMPORTANT - Multi-Function Execution Guidelines: | |
| 1. **You can and SHOULD use multiple functions in a single response** when it provides better value to the user. | |
| 2. **You can call the same function multiple times** with different parameters to compare or analyze different scenarios. | |
| 3. **Think comprehensively** - if a user asks about crops AND market prices, use both functions. | |
| 4. **Make comparisons** - if asked to compare, call the function multiple times with different parameters. | |
| 5. **Provide complete solutions** - gather all necessary data before responding. | |
| Example Multi-Function Scenarios: | |
| - "Compare wheat and rice prices" → Call get_market_prices twice (once for wheat, once for rice) | |
| - "What crop should I grow and what's its market price?" → Call get_crop_recommendation AND get_market_prices | |
| - "Check prices in Gujarat and Maharashtra" → Call get_market_prices multiple times for different states | |
| - "Recommend crops for sandy and loamy soil" → Call get_crop_recommendation multiple times | |
| - "Give fertilizer advice and check market prices for wheat" → Call both functions | |
| Response Rules: | |
| 1. Use all available function_tools when they can help answer the query comprehensively. | |
| 2. If comparing multiple items, execute the function for each item separately. | |
| 3. If data is unavailable from functions, supplement with your own agricultural knowledge. | |
| 4. Present results clearly - use tables, comparisons, or bullet points as appropriate. | |
| 5. Keep responses concise but comprehensive. | |
| 6. Provide practical, actionable advice. | |
| 7. Use English or Hindi based on user preference. | |
| 8. For weather-related queries, prioritize safety and timely alerts. | |
| Remember: Don't hesitate to use multiple functions - it's better to provide complete information than partial data.""" | |
| self.final_system = """You are the final response assistant for EasyForms Agritech Solutions. You have received outputs from multiple function calls. Your job is to synthesize all this information into a **clear, comprehensive, and actionable response** for the user. | |
| Response Synthesis Guidelines: | |
| 1. **Analyze all function outputs** - Look at every function result provided. | |
| 2. **Create comparisons** - If multiple similar functions were called, create comparison tables or clear comparisons. | |
| 3. **Highlight insights** - Point out interesting patterns, best options, or important differences. | |
| 4. **Structure intelligently**: | |
| - Use tables for price comparisons | |
| - Use bullet points for multiple recommendations | |
| - Use numbered steps for sequential advice | |
| - Use paragraphs for explanations | |
| 5. **Be comprehensive but concise** - Include all important information without being verbose. | |
| 6. **Provide actionable recommendations** - End with clear next steps or advice. | |
| 7. **Handle missing data gracefully** - If some functions returned errors, work with available data. | |
| 8. **Use appropriate formatting** - Make the response easy to scan and understand. | |
| Example Response Structures: | |
| - For price comparisons: Create a comparison table showing commodity, location, and prices | |
| - For crop recommendations: List recommended crops with reasoning | |
| - For multi-aspect queries: Use sections with clear headings | |
| Remember: The user asked one question but you have data from multiple sources - combine them intelligently.""" | |
| def _initialize_tools(self) -> List[Dict]: | |
| """Initialize and convert all function schemas to the new tools format""" | |
| tools = [] | |
| # Convert EasyFarms schemas to the new format | |
| for schema in EASYFARMS_FUNCTION_SCHEMAS: | |
| tool = { | |
| "type": "function", | |
| "function": { | |
| "name": schema["name"], | |
| "description": schema["description"], | |
| "parameters": schema["parameters"] | |
| } | |
| } | |
| tools.append(tool) | |
| # Add weather tools (which are already in the correct format) | |
| tools.extend(WEATHER_TOOLS) | |
| return tools | |
| def call_function(self, function_name: str, arguments: Dict) -> Any: | |
| """Route function calls to appropriate handlers with error handling""" | |
| try: | |
| # Map all available function names to their handlers | |
| function_map = { | |
| # EasyFarms functions | |
| "get_crop_recommendation": lambda args: execute_easyfarms_function("get_crop_recommendation", **args), | |
| "get_fertilizer_recommendation": lambda args: execute_easyfarms_function("get_fertilizer_recommendation", **args), | |
| "detect_plant_disease": lambda args: execute_easyfarms_function("detect_plant_disease", **args), | |
| "get_supported_options": lambda args: execute_easyfarms_function("get_supported_options", **args), | |
| "get_market_prices": lambda args: execute_easyfarms_function("get_market_prices", **args), | |
| "compare_commodity_prices": lambda args: execute_easyfarms_function("compare_commodity_prices", **args), | |
| "get_market_locations": lambda args: execute_easyfarms_function("get_market_locations", **args), | |
| "get_commodity_list": lambda args: execute_easyfarms_function("get_commodity_list", **args), | |
| # Weather alert functions | |
| "get_weather_alerts": lambda args: self._execute_weather_function("get_weather_alerts", **args), | |
| "get_weather": lambda args: self._execute_weather_function("get_weather", **args), | |
| "get_alert_summary": lambda args: self._execute_weather_function("get_alert_summary", **args), | |
| "get_available_locations": lambda args: self._execute_weather_function("get_available_locations", **args) | |
| } | |
| if function_name in function_map: | |
| return function_map[function_name](arguments) | |
| else: | |
| return {"error": f"Unknown function: {function_name}"} | |
| except Exception as e: | |
| logger.error(f"Error executing function {function_name}: {e}") | |
| return {"error": str(e)} | |
| def _execute_weather_function(self, function_name: str, **kwargs): | |
| """Helper to execute weather functions from the alert.py module""" | |
| from alert import execute_function | |
| return execute_function(function_name, kwargs) | |
| def _execute_tool_calls_round(self, messages: List[Dict], tool_calls) -> tuple[List[Dict], int]: | |
| """ | |
| Execute a round of tool calls and return updated messages with function count | |
| Args: | |
| messages: Current message history | |
| tool_calls: Tool calls to execute | |
| Returns: | |
| Tuple of (updated messages, number of functions executed) | |
| """ | |
| function_count = 0 | |
| # Add the assistant's message with tool calls | |
| messages.append({ | |
| "role": "assistant", | |
| "tool_calls": [ | |
| { | |
| "id": tool_call.id, | |
| "type": "function", | |
| "function": { | |
| "name": tool_call.function.name, | |
| "arguments": tool_call.function.arguments | |
| } | |
| } for tool_call in tool_calls | |
| ] | |
| }) | |
| # Execute all tool calls | |
| for tool_call in tool_calls: | |
| function_name = tool_call.function.name | |
| function_args = json.loads(tool_call.function.arguments) | |
| logger.info(f"Calling function: {function_name} with args: {function_args}") | |
| # Call the function | |
| function_result = self.call_function(function_name, function_args) | |
| function_count += 1 | |
| # Add function result to messages | |
| messages.append({ | |
| "role": "tool", | |
| "tool_call_id": tool_call.id, | |
| "content": json.dumps(function_result) | |
| }) | |
| return messages, function_count | |
| def process_query(self, user_message: str, user_id: str, chat_id: Optional[str] = None, image_url: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Process user query with unlimited multi-function execution capability | |
| Args: | |
| user_message: The user's message | |
| user_id: The user ID for authentication and isolation | |
| chat_id: Optional chat ID. If None, generates a new one | |
| image_url: Optional image URL | |
| Returns: | |
| Dictionary containing response, chat_id, and message IDs | |
| """ | |
| try: | |
| # Validate user_id | |
| if not user_id: | |
| return { | |
| "error": "User ID is required for authentication", | |
| "chat_id": None, | |
| "is_new_chat": True, | |
| "user_message_id": None, | |
| "assistant_message_id": None, | |
| "total_messages": 0, | |
| "functions_executed": 0 | |
| } | |
| # Handle chat ID | |
| if not chat_id: | |
| chat_id = self.manager.generate_chat_id(user_id) | |
| is_new_chat = True | |
| logger.info(f"Generated new chat ID: {chat_id} for user: {user_id}") | |
| else: | |
| is_new_chat = not self.manager.chat_exists(chat_id, user_id) | |
| if is_new_chat: | |
| logger.info(f"Creating new chat with provided ID: {chat_id} for user: {user_id}") | |
| else: | |
| logger.info(f"Continuing existing chat: {chat_id} for user: {user_id}") | |
| # Get conversation history for this user | |
| conversation_history = self.manager.get_history(chat_id, user_id) | |
| # Prepare messages for AI | |
| messages = [{"role": "system", "content": self.system_prompt}] | |
| # Add conversation history | |
| for message in conversation_history: | |
| if message.get("role") == "user": | |
| llm_user_content = message.get("content", "") | |
| if message.get("imageUrl"): | |
| llm_user_content += f" [image_url: {message.get('imageUrl')}]" | |
| messages.append({"role": "user", "content": llm_user_content}) | |
| elif message.get("role") == "assistant": | |
| messages.append({"role": "assistant", "content": message.get("content", "")}) | |
| # Add current user message | |
| llm_message_content = user_message | |
| if image_url: | |
| llm_message_content += f" [image_url: {image_url}]" | |
| messages.append({"role": "user", "content": llm_message_content}) | |
| # Track total functions executed | |
| total_functions_executed = 0 | |
| # Iterative function calling - allow multiple rounds | |
| for round_num in range(self.config.max_function_rounds): | |
| logger.info(f"Function calling round {round_num + 1}/{self.config.max_function_rounds}") | |
| # Make API call | |
| response = self.client.chat.completions.create( | |
| model=self.config.model_name, | |
| messages=messages, | |
| tools=self.tools, | |
| tool_choice="auto", | |
| temperature=self.config.temperature | |
| ) | |
| message = response.choices[0].message | |
| # Check if there are tool calls | |
| if hasattr(message, 'tool_calls') and message.tool_calls: | |
| logger.info(f"Round {round_num + 1}: Executing {len(message.tool_calls)} function(s)") | |
| # Execute this round of tool calls | |
| messages, functions_executed = self._execute_tool_calls_round(messages, message.tool_calls) | |
| total_functions_executed += functions_executed | |
| # Continue to next round to see if AI wants to call more functions | |
| continue | |
| else: | |
| # No more tool calls, we have the final response | |
| response_content = message.content | |
| logger.info(f"Function calling completed after {round_num + 1} round(s). Total functions executed: {total_functions_executed}") | |
| break | |
| else: | |
| # Max rounds reached, add final system prompt and get response | |
| logger.warning(f"Max function rounds ({self.config.max_function_rounds}) reached") | |
| messages.append({ | |
| "role": "system", | |
| "content": self.final_system | |
| }) | |
| # Get final response | |
| final_response = self.client.chat.completions.create( | |
| model=self.config.model_name, | |
| messages=messages, | |
| temperature=self.config.temperature | |
| ) | |
| response_content = final_response.choices[0].message.content | |
| # If functions were executed, add final synthesis prompt | |
| if total_functions_executed > 0 and not response_content: | |
| messages.append({ | |
| "role": "system", | |
| "content": self.final_system | |
| }) | |
| final_response = self.client.chat.completions.create( | |
| model=self.config.model_name, | |
| messages=messages, | |
| temperature=self.config.temperature | |
| ) | |
| response_content = final_response.choices[0].message.content | |
| # Add messages to conversation history with unique IDs | |
| user_msg = self.manager.add_message(chat_id, user_id, "user", user_message, image_url) | |
| assistant_msg = self.manager.add_message(chat_id, user_id, "assistant", response_content) | |
| return { | |
| "response": response_content, | |
| "chat_id": chat_id, | |
| "is_new_chat": is_new_chat, | |
| "user_message_id": user_msg.get("message_id"), | |
| "assistant_message_id": assistant_msg.get("message_id"), | |
| "total_messages": len(self.manager.get_history(chat_id, user_id)), | |
| "functions_executed": total_functions_executed | |
| } | |
| except Exception as e: | |
| logger.error(f"Error processing query for chat {chat_id}, user {user_id}: {e}") | |
| return { | |
| "error": f"I apologize, but I encountered an error: {str(e)}. Please try again or rephrase your question.", | |
| "chat_id": chat_id or self.manager.generate_chat_id(user_id) if user_id else None, | |
| "is_new_chat": True, | |
| "user_message_id": None, | |
| "assistant_message_id": None, | |
| "total_messages": 0, | |
| "functions_executed": 0 | |
| } | |
| def get_chat_info(self, chat_id: str, user_id: str) -> Dict[str, Any]: | |
| """ | |
| Get information about a specific chat for a specific user | |
| Args: | |
| chat_id: The chat ID to get information for | |
| user_id: The user ID for authentication | |
| Returns: | |
| Dictionary with chat information | |
| """ | |
| return self.manager.get_chat_info(chat_id, user_id) | |
| def get_all_chats(self, user_id: str) -> List[Dict[str, Any]]: | |
| """ | |
| Get all chat sessions for a specific user | |
| Args: | |
| user_id: The user ID to get sessions for | |
| Returns: | |
| List of chat session information | |
| """ | |
| return self.manager.get_all_chat_sessions(user_id) | |
| def clear_history(self, chat_id: str, user_id: str) -> bool: | |
| """ | |
| Clear conversation history for a specific chat and user | |
| Args: | |
| chat_id: The ID of the chat to clear | |
| user_id: The user ID for authentication | |
| Returns: | |
| True if deletion was successful, False otherwise | |
| """ | |
| logger.info(f"Clearing history for chat: {chat_id}, user: {user_id}") | |
| return self.manager.delete_history(chat_id, user_id) | |
| def get_messages(self, chat_id: str, user_id: str) -> List[Dict[str, Any]]: | |
| """ | |
| Get all messages for a specific chat and user | |
| Args: | |
| chat_id: The chat ID to get messages for | |
| user_id: The user ID for authentication | |
| Returns: | |
| List of messages with their IDs | |
| """ | |
| return self.manager.get_history(chat_id, user_id) | |
| # Utility class for generating example queries (can be used for testing) | |
| class QuickQueries: | |
| """Pre-defined query templates for common farming questions with multi-function examples""" | |
| def crop_recommendation(N: int, P: int, K: int, temp: float, humidity: float, ph: float = 6.5) -> str: | |
| """Generate crop recommendation query""" | |
| return f"What crop should I grow with N={N}, P={P}, K={K}, temperature {temp}°C, humidity {humidity}%, pH {ph}?" | |
| def fertilizer_query(crop: str, soil: str, N: int, P: int, K: int) -> str: | |
| """Generate fertilizer recommendation query""" | |
| return f"I need fertilizer recommendation for {crop} in {soil} soil with N={N}, P={P}, K={K}" | |
| def weather_alert(location: str = "") -> str: | |
| """Generate weather alert query""" | |
| location_str = f" for {location}" if location else "" | |
| return f"What are the current weather alerts and conditions{location_str}? How will this affect farming?" | |
| def multi_function_queries() -> List[str]: | |
| """Generate example multi-function queries""" | |
| return [ | |
| # Multiple function calls - same function multiple times | |
| "Compare wheat and rice market prices in Gujarat", | |
| "What are the market prices for tomato in Gujarat, Maharashtra, and Punjab?", | |
| "Recommend crops for sandy soil and loamy soil, which one is better?", | |
| # Multiple function calls - different functions | |
| "What crop should I grow with N=90, P=42, K=43, temperature 25°C, humidity 80% and what's the market price for that crop?", | |
| "Give me crop recommendations and current weather alerts for my region", | |
| "I want to grow wheat, tell me fertilizer recommendations and current market prices", | |
| # Complex multi-function queries | |
| "Compare tomato and potato prices across different states and tell me which crop has better market potential", | |
| "What are the best crops for my soil (N=80, P=40, K=50, temp=22°C, humidity=75%) and their current market prices?", | |
| "Give me weather alerts, crop recommendations for my conditions, and market prices for the recommended crops", | |
| # Sequential same function calls | |
| "Get market prices for wheat, rice, maize, and sugarcane in Gujarat", | |
| "Compare fertilizer requirements for wheat in black soil, red soil, and sandy soil", | |
| ] | |
| # Test function to validate configuration | |
| def test_configuration(): | |
| """Test if all configuration is properly set up""" | |
| try: | |
| # Check environment variables | |
| required_env_vars = ["API_KEY", "API_URL", "MODEL_NAME"] | |
| missing_vars = [var for var in required_env_vars if not os.getenv(var)] | |
| if missing_vars: | |
| print(f"Missing environment variables: {missing_vars}") | |
| return False | |
| # Test assistant initialization | |
| assistant = EasyFarmsAssistant() | |
| print("Assistant initialized successfully") | |
| # Test function schemas | |
| print(f"Loaded {len(assistant.tools)} function tools") | |
| return True | |
| except Exception as e: | |
| print(f"Configuration test failed: {e}") | |
| return False | |
| # Test the multi-function execution | |
| def test_multi_function_execution(): | |
| """Test the enhanced multi-function execution""" | |
| try: | |
| assistant = EasyFarmsAssistant() | |
| print("\n=== Testing Multi-Function Execution ===\n") | |
| # Test 1: Single function call | |
| print("Test 1: Single Function Call") | |
| result1 = assistant.process_query( | |
| "What crop should I grow with N=90, P=42, K=43, temperature 25°C, humidity 80%?", | |
| "test_user_multi" | |
| ) | |
| print(f"Functions executed: {result1.get('functions_executed', 0)}") | |
| print(f"Response preview: {result1['response'][:200]}...\n") | |
| # Test 2: Multiple different functions | |
| print("Test 2: Multiple Different Functions") | |
| result2 = assistant.process_query( | |
| "What crop should I grow with N=90, P=42, K=43, temperature 25°C, humidity 80% and what's the current market price for wheat?", | |
| "test_user_multi" | |
| ) | |
| print(f"Functions executed: {result2.get('functions_executed', 0)}") | |
| print(f"Response preview: {result2['response'][:200]}...\n") | |
| # Test 3: Same function multiple times | |
| print("Test 3: Same Function Multiple Times") | |
| result3 = assistant.process_query( | |
| "Compare wheat and rice market prices in Gujarat", | |
| "test_user_multi" | |
| ) | |
| print(f"Functions executed: {result3.get('functions_executed', 0)}") | |
| print(f"Response preview: {result3['response'][:200]}...\n") | |
| print("✅ Multi-function execution tests completed!") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Multi-function test failed: {e}") | |
| return False | |
| if __name__ == "__main__": | |
| print("=== EasyFarms Assistant Configuration Test ===") | |
| if test_configuration(): | |
| print("✅ Basic configuration ready!") | |
| print("\n=== Testing Multi-Function Execution ===") | |
| if test_multi_function_execution(): | |
| print("✅ All systems ready!") | |
| else: | |
| print("⚠️ Basic functions work, but multi-function execution needs attention") | |
| else: | |
| print("❌ Please fix configuration issues before running the assistant.") |