Spaces:
Sleeping
Sleeping
| # app.py | |
| import json | |
| import openai | |
| from typing import Dict, Any, Optional, List | |
| from dataclasses import dataclass | |
| import logging | |
| from openai import OpenAI | |
| from dotenv import load_dotenv | |
| import os | |
| # Import your modules | |
| from easy_agents import EASYFARMS_FUNCTION_SCHEMAS, execute_easyfarms_function | |
| from alert import WEATHER_TOOLS , execute_function | |
| from conversation_manager import ConversationManager | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables | |
| load_dotenv() | |
| class Config: | |
| """Configuration settings""" | |
| api_key: str | |
| api_url: str | |
| model_name: str | |
| max_retries: int = 3 | |
| temperature: float = 0.5 | |
| def from_env(cls): | |
| """Load configuration from environment variables""" | |
| return cls( | |
| api_key=os.getenv("API_KEY"), | |
| api_url=os.getenv("API_URL"), | |
| model_name=os.getenv("MODEL_NAME") | |
| ) | |
| class EasyFarmsAssistant: | |
| """Enhanced EasyFarms AI Assistant with weather integration and persistent sessions""" | |
| def __init__(self, config: Optional[Config] = None, manager: Optional[ConversationManager] = None): | |
| """ | |
| Initialize the assistant with configuration and a conversation manager. | |
| Args: | |
| config (Optional[Config]): Configuration object. If None, loads from environment. | |
| manager (Optional[ConversationManager]): Manager for handling conversation persistence. | |
| """ | |
| self.config = config or Config.from_env() | |
| # Validate configuration | |
| if not all([self.config.api_key, self.config.api_url, self.config.model_name]): | |
| raise ValueError("Missing required configuration: API_KEY, API_URL, and MODEL_NAME must be set") | |
| self.client = OpenAI( | |
| api_key=self.config.api_key, | |
| base_url=self.config.api_url | |
| ) | |
| # All available functions from both modules are combined into the tools list | |
| self.tools = self._initialize_tools() | |
| # Use the provided conversation manager or create a new one | |
| self.manager = manager or ConversationManager() | |
| # System prompts | |
| self.system_prompt = """You are the AI assistant for EasyForms Agritech Solutions. Your task is to provide users with clear, concise, and actionable responses regarding agriculture, crop management, production, treatment, weather alerts, and related queries. | |
| Core Capabilities: | |
| - Crop recommendations based on soil and weather conditions | |
| - Fertilizer recommendations for specific crops | |
| - Plant disease detection and treatment advice | |
| - Weather alerts and forecasts for farming decisions | |
| - Market data and commodity prices | |
| - General agricultural guidance | |
| Rules: | |
| 1. Check if any relevant function_tools or datasets are available for this query. | |
| 2. If available, use the functions to fetch information and generate the final user-facing response. | |
| 3. If the functions or data are unavailable, do **not stop**; instead, generate a general, well-reasoned response based on your own knowledge. | |
| 4. Keep the response **simple, smooth, well-pointed, and concise**. | |
| 5. Structure the response with bullet points or numbered steps where helpful. | |
| 6. Provide practical, actionable advice a user can implement immediately. | |
| 7. Use English or Hindi based on user preference. | |
| 8. If any information is uncertain, mention it clearly and suggest alternatives. | |
| 9. For weather-related queries, prioritize safety and timely alerts.""" | |
| self.final_system = """You are the final response assistant for EasyForms Agritech Solutions. Use the outputs from previous function calls to generate a **clear, concise, actionable response** for the user. | |
| Rules: | |
| 1. Combine the function outputs and your own reasoning to answer the query. | |
| 2. Keep responses simple, smooth, well-pointed, and concise. | |
| 3. Structure response with headings or bullet points if helpful. | |
| 4. Provide practical advice that a farmer or user can implement immediately. | |
| 5. If some data is missing, clearly state it and offer alternatives. | |
| 6. Use English or Hindi based on the user preference. | |
| 7. For weather alerts, emphasize urgency and protective measures.""" | |
| def _initialize_tools(self) -> List[Dict]: | |
| """Initialize and convert all function schemas to the new tools format""" | |
| tools = [] | |
| # Convert EasyFarms schemas to the new format | |
| for schema in EASYFARMS_FUNCTION_SCHEMAS: | |
| tool = { | |
| "type": "function", | |
| "function": { | |
| "name": schema["name"], | |
| "description": schema["description"], | |
| "parameters": schema["parameters"] | |
| } | |
| } | |
| tools.append(tool) | |
| # Add weather tools (which are already in the correct format) | |
| tools.extend(WEATHER_TOOLS) | |
| return tools | |
| def call_function(self, function_name: str, arguments: Dict) -> Any: | |
| """Route function calls to appropriate handlers with error handling""" | |
| try: | |
| # Map all available function names to their handlers | |
| function_map = { | |
| # EasyFarms functions | |
| "get_crop_recommendation": lambda args: execute_easyfarms_function("get_crop_recommendation", **args), | |
| "get_fertilizer_recommendation": lambda args: execute_easyfarms_function("get_fertilizer_recommendation", **args), | |
| "detect_plant_disease": lambda args: execute_easyfarms_function("detect_plant_disease", **args), | |
| "get_supported_options": lambda args: execute_easyfarms_function("get_supported_options", **args), | |
| "get_market_prices": lambda args: execute_easyfarms_function("get_market_prices", **args), | |
| "compare_commodity_prices": lambda args: execute_easyfarms_function("compare_commodity_prices", **args), | |
| "get_market_locations": lambda args: execute_easyfarms_function("get_market_locations", **args), | |
| "get_commodity_list": lambda args: execute_easyfarms_function("get_commodity_list", **args), | |
| # Weather alert functions | |
| "get_weather_alerts": lambda args: self._execute_weather_function("get_weather_alerts", **args), | |
| "get_weather": lambda args: self._execute_weather_function("get_weather", **args), | |
| "get_alert_summary": lambda args: self._execute_weather_function("get_alert_summary", **args), | |
| "get_available_locations": lambda args: self._execute_weather_function("get_available_locations", **args) | |
| } | |
| if function_name in function_map: | |
| return function_map[function_name](arguments) | |
| else: | |
| return {"error": f"Unknown function: {function_name}"} | |
| except Exception as e: | |
| logger.error(f"Error executing function {function_name}: {e}") | |
| return {"error": str(e)} | |
| def _execute_weather_function(self, function_name: str, **kwargs): | |
| """Helper to execute weather functions from the alert.py module""" | |
| from alert import execute_function | |
| return execute_function(function_name, kwargs) | |
| def process_query(self, user_message: str, session_id: str, image_url: Optional[str] = None) -> str: | |
| """ | |
| Process user query, correctly reformatting history for the LLM API call. | |
| """ | |
| try: | |
| # MEMORY STEP 1: Fetch the complete past conversation using the session_id. | |
| conversation_history = self.manager.get_history(session_id) | |
| # Prepare the list that will be sent to the AI | |
| messages = [{"role": "system", "content": self.system_prompt}] | |
| # MEMORY STEP 2: Loop through the history and add every past message. | |
| # This builds the AI's memory of what was said before. | |
| for message in conversation_history: | |
| if message.get("role") == "user": | |
| llm_user_content = message.get("content", "") | |
| if message.get("imageUrl"): | |
| llm_user_content += f" [image_url: {message.get('imageUrl')}]" | |
| messages.append({"role": "user", "content": llm_user_content}) | |
| elif message.get("role") == "assistant": | |
| messages.append({"role": "assistant", "content": message.get("content", "")}) | |
| # MEMORY STEP 3: Add the user's CURRENT message to the end of the history. | |
| llm_message_content = user_message | |
| if image_url: | |
| llm_message_content += f" [image_url: {image_url}]" | |
| messages.append({"role": "user", "content": llm_message_content}) | |
| # MEMORY STEP 4: Send the entire 'messages' list to the AI. | |
| response = self.client.chat.completions.create( | |
| model=self.config.model_name, | |
| messages=messages, | |
| tools=self.tools, | |
| tool_choice="auto", | |
| temperature=self.config.temperature | |
| ) | |
| message = response.choices[0].message | |
| if hasattr(message, 'tool_calls') and message.tool_calls: | |
| # Add the assistant's message with tool calls | |
| messages.append({ | |
| "role": "assistant", | |
| "tool_calls": [ | |
| { | |
| "id": tool_call.id, | |
| "type": "function", | |
| "function": { | |
| "name": tool_call.function.name, | |
| "arguments": tool_call.function.arguments | |
| } | |
| } for tool_call in message.tool_calls | |
| ] | |
| }) | |
| # Execute all tool calls | |
| for tool_call in message.tool_calls: | |
| function_name = tool_call.function.name | |
| function_args = json.loads(tool_call.function.arguments) | |
| logger.info(f"Calling function: {function_name} with args: {function_args}") | |
| # Call the function | |
| function_result = self.call_function(function_name, function_args) | |
| # Add function result to messages | |
| messages.append({ | |
| "role": "tool", | |
| "tool_call_id": tool_call.id, | |
| "content": json.dumps(function_result) | |
| }) | |
| # Add final system prompt for generating response | |
| messages.append({ | |
| "role": "system", | |
| "content": self.final_system | |
| }) | |
| # Get final response | |
| final_response = self.client.chat.completions.create( | |
| model=self.config.model_name, | |
| messages=messages, | |
| temperature=self.config.temperature | |
| ) | |
| response_content = final_response.choices[0].message.content | |
| else: | |
| response_content = message.content | |
| # After getting the response, save the new turns back to the database for the next message. | |
| user_turn_for_storage = {"role": "user", "content": user_message} | |
| if image_url: | |
| user_turn_for_storage["imageUrl"] = image_url | |
| updated_history = conversation_history + [ | |
| user_turn_for_storage, | |
| {"role": "assistant", "content": response_content} | |
| ] | |
| self.manager.save_history(session_id, updated_history) | |
| return response_content | |
| except Exception as e: | |
| logger.error(f"Error processing query for session {session_id}: {e}") | |
| return f"I apologize, but I encountered an error: {str(e)}. Please try again or rephrase your question." | |
| def clear_history(self, session_id: str) -> bool: | |
| """ | |
| Clear conversation history for a specific session from the database. | |
| Args: | |
| session_id: The ID of the session to clear. | |
| Returns: | |
| True if deletion was successful, False otherwise. | |
| """ | |
| logger.info(f"Clearing history for session: {session_id}") | |
| return self.manager.delete_history(session_id) | |
| # Utility class for generating example queries (can be used for testing) | |
| class QuickQueries: | |
| """Pre-defined query templates for common farming questions""" | |
| def crop_recommendation(N: int, P: int, K: int, temp: float, humidity: float, ph: float = 6.5) -> str: | |
| """Generate crop recommendation query""" | |
| return f"What crop should I grow with N={N}, P={P}, K={K}, temperature {temp}Β°C, humidity {humidity}%, pH {ph}?" | |
| def fertilizer_query(crop: str, soil: str, N: int, P: int, K: int) -> str: | |
| """Generate fertilizer recommendation query""" | |
| return f"I need fertilizer recommendation for {crop} in {soil} soil with N={N}, P={P}, K={K}" | |
| def weather_alert(location: str = "") -> str: | |
| """Generate weather alert query""" | |
| location_str = f" for {location}" if location else "" | |
| return f"What are the current weather alerts and conditions{location_str}? How will this affect farming?" | |
| # Test function to validate configuration | |
| def test_configuration(): | |
| """Test if all configuration is properly set up""" | |
| try: | |
| # Check environment variables | |
| required_env_vars = ["API_KEY", "API_URL", "MODEL_NAME"] | |
| missing_vars = [var for var in required_env_vars if not os.getenv(var)] | |
| if missing_vars: | |
| print(f"β Missing environment variables: {missing_vars}") | |
| return False | |
| # Test assistant initialization | |
| assistant = EasyFarmsAssistant() | |
| print("β Assistant initialized successfully") | |
| # Test function schemas | |
| print(f"β Loaded {len(assistant.tools)} function tools") | |
| return True | |
| except Exception as e: | |
| print(f"β Configuration test failed: {e}") | |
| return False | |
| if __name__ == "__main__": | |
| print("=== EasyFarms Assistant Configuration Test ===") | |
| if test_configuration(): | |
| print("β All systems ready!") | |
| else: | |
| print("β Please fix configuration issues before running the assistant.") |