EasyBot / app.py
NitinBot001's picture
Update app.py
e15c8a7 verified
raw
history blame
22.5 kB
# app.py
import json
import openai
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
import logging
from openai import OpenAI
from dotenv import load_dotenv
import os
# Import your modules
from easy_agents import EASYFARMS_FUNCTION_SCHEMAS, execute_easyfarms_function
from alert import WEATHER_TOOLS , execute_function
from conversation_manager import ConversationManager
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
@dataclass
class Config:
"""Configuration settings"""
api_key: str
api_url: str
model_name: str
max_retries: int = 3
temperature: float = 0.5
@classmethod
def from_env(cls):
"""Load configuration from environment variables"""
return cls(
api_key=os.getenv("API_KEY"),
api_url=os.getenv("API_URL"),
model_name=os.getenv("MODEL_NAME")
)
class EasyFarmsAssistant:
"""Enhanced EasyFarms AI Assistant with weather integration and persistent sessions"""
def __init__(self, config: Optional[Config] = None, manager: Optional[ConversationManager] = None):
"""
Initialize the assistant with configuration and a conversation manager.
Args:
config (Optional[Config]): Configuration object. If None, loads from environment.
manager (Optional[ConversationManager]): Manager for handling conversation persistence.
"""
self.config = config or Config.from_env()
# Validate configuration
if not all([self.config.api_key, self.config.api_url, self.config.model_name]):
raise ValueError("Missing required configuration: API_KEY, API_URL, and MODEL_NAME must be set")
self.client = OpenAI(
api_key=self.config.api_key,
base_url=self.config.api_url
)
# All available functions from both modules are combined into the tools list
self.tools = self._initialize_tools()
# Use the provided conversation manager or create a new one
self.manager = manager or ConversationManager()
# System prompts
self.system_prompt = """You are the AI assistant for EasyForms Agritech Solutions. Your task is to provide users with clear, concise, and actionable responses regarding agriculture, crop management, production, treatment, weather alerts, and related queries.
Core Capabilities:
- Crop recommendations based on soil and weather conditions
- Fertilizer recommendations for specific crops
- Plant disease detection and treatment advice
- Weather alerts and forecasts for farming decisions
- Market data and commodity prices
- General agricultural guidance
Rules:
1. Check if any relevant function_tools or datasets are available for this query.
2. If available, use the functions to fetch information and generate the final user-facing response.
3. If the functions or data are unavailable, do **not stop**; instead, generate a general, well-reasoned response based on your own knowledge.
4. Keep the response **simple, smooth, well-pointed, and concise**.
5. Structure the response with bullet points or numbered steps where helpful.
6. Provide practical, actionable advice a user can implement immediately.
7. Use English or Hindi based on user preference.
8. If any information is uncertain, mention it clearly and suggest alternatives.
9. For weather-related queries, prioritize safety and timely alerts."""
self.final_system = """You are the final response assistant for EasyForms Agritech Solutions. Use the outputs from previous function calls to generate a **clear, concise, actionable response** for the user.
Rules:
1. Combine the function outputs and your own reasoning to answer the query.
2. Keep responses simple, smooth, well-pointed, and concise.
3. Structure response with headings or bullet points if helpful.
4. Provide practical advice that a farmer or user can implement immediately.
5. If some data is missing, clearly state it and offer alternatives.
6. Use English or Hindi based on the user preference.
7. For weather alerts, emphasize urgency and protective measures."""
def _initialize_tools(self) -> List[Dict]:
"""Initialize and convert all function schemas to the new tools format"""
tools = []
# Convert EasyFarms schemas to the new format
for schema in EASYFARMS_FUNCTION_SCHEMAS:
tool = {
"type": "function",
"function": {
"name": schema["name"],
"description": schema["description"],
"parameters": schema["parameters"]
}
}
tools.append(tool)
# Add weather tools (which are already in the correct format)
tools.extend(WEATHER_TOOLS)
return tools
def call_function(self, function_name: str, arguments: Dict) -> Any:
"""Route function calls to appropriate handlers with error handling"""
try:
# Map all available function names to their handlers
function_map = {
# EasyFarms functions
"get_crop_recommendation": lambda args: execute_easyfarms_function("get_crop_recommendation", **args),
"get_fertilizer_recommendation": lambda args: execute_easyfarms_function("get_fertilizer_recommendation", **args),
"detect_plant_disease": lambda args: execute_easyfarms_function("detect_plant_disease", **args),
"get_supported_options": lambda args: execute_easyfarms_function("get_supported_options", **args),
"get_market_prices": lambda args: execute_easyfarms_function("get_market_prices", **args),
"compare_commodity_prices": lambda args: execute_easyfarms_function("compare_commodity_prices", **args),
"get_market_locations": lambda args: execute_easyfarms_function("get_market_locations", **args),
"get_commodity_list": lambda args: execute_easyfarms_function("get_commodity_list", **args),
# Weather alert functions
"get_weather_alerts": lambda args: self._execute_weather_function("get_weather_alerts", **args),
"get_weather": lambda args: self._execute_weather_function("get_weather", **args),
"get_alert_summary": lambda args: self._execute_weather_function("get_alert_summary", **args),
"get_available_locations": lambda args: self._execute_weather_function("get_available_locations", **args)
}
if function_name in function_map:
return function_map[function_name](arguments)
else:
return {"error": f"Unknown function: {function_name}"}
except Exception as e:
logger.error(f"Error executing function {function_name}: {e}")
return {"error": str(e)}
def _execute_weather_function(self, function_name: str, **kwargs):
"""Helper to execute weather functions from the alert.py module"""
from alert import execute_function
return execute_function(function_name, kwargs)
def process_query(self, user_message: str, user_id: str, chat_id: Optional[str] = None, image_url: Optional[str] = None) -> Dict[str, Any]:
"""
Process user query with user authentication and improved conversation management
Args:
user_message: The user's message
user_id: The user ID for authentication and isolation
chat_id: Optional chat ID. If None, generates a new one
image_url: Optional image URL
Returns:
Dictionary containing response, chat_id, and message IDs
"""
try:
# Validate user_id
if not user_id:
return {
"error": "User ID is required for authentication",
"chat_id": None,
"is_new_chat": True,
"user_message_id": None,
"assistant_message_id": None,
"total_messages": 0
}
# Handle chat ID
if not chat_id:
chat_id = self.manager.generate_chat_id(user_id)
is_new_chat = True
logger.info(f"Generated new chat ID: {chat_id} for user: {user_id}")
else:
is_new_chat = not self.manager.chat_exists(chat_id, user_id)
if is_new_chat:
logger.info(f"Creating new chat with provided ID: {chat_id} for user: {user_id}")
else:
logger.info(f"Continuing existing chat: {chat_id} for user: {user_id}")
# Get conversation history for this user
conversation_history = self.manager.get_history(chat_id, user_id)
# Prepare messages for AI
messages = [{"role": "system", "content": self.system_prompt}]
# Add conversation history
for message in conversation_history:
if message.get("role") == "user":
llm_user_content = message.get("content", "")
if message.get("imageUrl"):
llm_user_content += f" [image_url: {message.get('imageUrl')}]"
messages.append({"role": "user", "content": llm_user_content})
elif message.get("role") == "assistant":
messages.append({"role": "assistant", "content": message.get("content", "")})
# Add current user message
llm_message_content = user_message
if image_url:
llm_message_content += f" [image_url: {image_url}]"
messages.append({"role": "user", "content": llm_message_content})
# Make API call
response = self.client.chat.completions.create(
model=self.config.model_name,
messages=messages,
tools=self.tools,
tool_choice="auto",
temperature=self.config.temperature
)
message = response.choices[0].message
if hasattr(message, 'tool_calls') and message.tool_calls:
# Add the assistant's message with tool calls
messages.append({
"role": "assistant",
"tool_calls": [
{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments
}
} for tool_call in message.tool_calls
]
})
# Execute all tool calls
for tool_call in message.tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
logger.info(f"Calling function: {function_name} with args: {function_args}")
# Call the function
function_result = self.call_function(function_name, function_args)
# Add function result to messages
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(function_result)
})
# Add final system prompt for generating response
messages.append({
"role": "system",
"content": self.final_system
})
# Get final response
final_response = self.client.chat.completions.create(
model=self.config.model_name,
messages=messages,
temperature=self.config.temperature
)
response_content = final_response.choices[0].message.content
else:
response_content = message.content
# Add messages to conversation history with unique IDs
user_msg = self.manager.add_message(chat_id, user_id, "user", user_message, image_url)
assistant_msg = self.manager.add_message(chat_id, user_id, "assistant", response_content)
return {
"response": response_content,
"chat_id": chat_id,
"is_new_chat": is_new_chat,
"user_message_id": user_msg.get("message_id"),
"assistant_message_id": assistant_msg.get("message_id"),
"total_messages": len(self.manager.get_history(chat_id, user_id))
}
except Exception as e:
logger.error(f"Error processing query for chat {chat_id}, user {user_id}: {e}")
return {
"error": f"I apologize, but I encountered an error: {str(e)}. Please try again or rephrase your question.",
"chat_id": chat_id or self.manager.generate_chat_id(user_id) if user_id else None,
"is_new_chat": True,
"user_message_id": None,
"assistant_message_id": None,
"total_messages": 0
}.tool_calls
]
})
# Execute all tool calls
for tool_call in message.tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
logger.info(f"Calling function: {function_name} with args: {function_args}")
# Call the function
function_result = self.call_function(function_name, function_args)
# Add function result to messages
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(function_result)
})
# Add final system prompt for generating response
messages.append({
"role": "system",
"content": self.final_system
})
# Get final response
final_response = self.client.chat.completions.create(
model=self.config.model_name,
messages=messages,
temperature=self.config.temperature
)
response_content = final_response.choices[0].message.content
else:
response_content = message.content
# Add messages to conversation history with unique IDs
user_msg = self.manager.add_message(chat_id, "user", user_message, image_url)
assistant_msg = self.manager.add_message(chat_id, "assistant", response_content)
return {
"response": response_content,
"chat_id": chat_id,
"is_new_chat": is_new_chat,
"user_message_id": user_msg.get("message_id"),
"assistant_message_id": assistant_msg.get("message_id"),
"total_messages": len(self.manager.get_history(chat_id))
}
except Exception as e:
logger.error(f"Error processing query for chat {chat_id}: {e}")
return {
"error": f"I apologize, but I encountered an error: {str(e)}. Please try again or rephrase your question.",
"chat_id": chat_id or self.manager.generate_chat_id(),
"is_new_chat": True,
"user_message_id": None,
"assistant_message_id": None,
"total_messages": 0
}
def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
"""
Get information about a specific chat
Args:
chat_id: The chat ID to get information for
Returns:
Dictionary with chat information
"""
return self.manager.get_chat_info(chat_id)
def get_all_chats(self) -> List[Dict[str, Any]]:
"""
Get all chat sessions
Returns:
List of chat session information
"""
return self.manager.get_all_chat_sessions()
def clear_history(self, chat_id: str) -> bool:
"""
Clear conversation history for a specific chat
Args:
chat_id: The ID of the chat to clear
Returns:
True if deletion was successful, False otherwise
"""
logger.info(f"Clearing history for chat: {chat_id}")
return self.manager.delete_history(chat_id)
def get_messages(self, chat_id: str) -> List[Dict[str, Any]]:
"""
Get all messages for a specific chat
Args:
chat_id: The chat ID to get messages for
Returns:
List of messages with their IDs
"""
return self.manager.get_history(chat_id)
# Utility class for generating example queries (can be used for testing)
class QuickQueries:
"""Pre-defined query templates for common farming questions"""
@staticmethod
def crop_recommendation(N: int, P: int, K: int, temp: float, humidity: float, ph: float = 6.5) -> str:
"""Generate crop recommendation query"""
return f"What crop should I grow with N={N}, P={P}, K={K}, temperature {temp}°C, humidity {humidity}%, pH {ph}?"
@staticmethod
def fertilizer_query(crop: str, soil: str, N: int, P: int, K: int) -> str:
"""Generate fertilizer recommendation query"""
return f"I need fertilizer recommendation for {crop} in {soil} soil with N={N}, P={P}, K={K}"
@staticmethod
def weather_alert(location: str = "") -> str:
"""Generate weather alert query"""
location_str = f" for {location}" if location else ""
return f"What are the current weather alerts and conditions{location_str}? How will this affect farming?"
# Test function to validate configuration
def test_configuration():
"""Test if all configuration is properly set up"""
try:
# Check environment variables
required_env_vars = ["API_KEY", "API_URL", "MODEL_NAME"]
missing_vars = [var for var in required_env_vars if not os.getenv(var)]
if missing_vars:
print(f"Missing environment variables: {missing_vars}")
return False
# Test assistant initialization
assistant = EasyFarmsAssistant()
print("Assistant initialized successfully")
# Test function schemas
print(f"Loaded {len(assistant.tools)} function tools")
return True
except Exception as e:
print(f"Configuration test failed: {e}")
return False
# Test the new conversation management
def test_conversation_management():
"""Test the enhanced conversation management"""
try:
assistant = EasyFarmsAssistant()
# Test 1: New conversation
print("\n=== Test 1: New Conversation ===")
result1 = assistant.process_query("Hello, I need help with farming")
print(f"Chat ID: {result1['chat_id']}")
print(f"Is new chat: {result1['is_new_chat']}")
print(f"User message ID: {result1['user_message_id']}")
print(f"Assistant message ID: {result1['assistant_message_id']}")
print(f"Total messages: {result1['total_messages']}")
# Test 2: Continue conversation
print("\n=== Test 2: Continue Conversation ===")
chat_id = result1['chat_id']
result2 = assistant.process_query("What crops grow well in sandy soil?", chat_id=chat_id)
print(f"Chat ID: {result2['chat_id']}")
print(f"Is new chat: {result2['is_new_chat']}")
print(f"User message ID: {result2['user_message_id']}")
print(f"Assistant message ID: {result2['assistant_message_id']}")
print(f"Total messages: {result2['total_messages']}")
# Test 3: Get messages
print(f"\n=== Test 3: Messages in chat {chat_id} ===")
messages = assistant.get_messages(chat_id)
for msg in messages:
print(f" Message {msg.get('message_id')}: {msg.get('role')} - {msg.get('content')[:50]}...")
# Test 4: User-defined chat ID
print("\n=== Test 4: User-defined Chat ID ===")
custom_chat_id = "my_farming_chat_2024"
result3 = assistant.process_query("Tell me about wheat farming", chat_id=custom_chat_id)
print(f"Custom chat ID: {result3['chat_id']}")
print(f"Is new chat: {result3['is_new_chat']}")
# Test 5: Get all chats
print("\n=== Test 5: All Chats ===")
all_chats = assistant.get_all_chats()
for chat in all_chats[:3]: # Show first 3
print(f" {chat.get('session_id')}: {chat.get('title')} ({chat.get('message_count')} messages)")
print("\n✅ Conversation management tests completed!")
return True
except Exception as e:
print(f"❌ Conversation management test failed: {e}")
return False
if __name__ == "__main__":
print("=== EasyFarms Assistant Configuration Test ===")
if test_configuration():
print("✅ Basic configuration ready!")
print("\n=== Testing Enhanced Conversation Management ===")
if test_conversation_management():
print("✅ All systems ready!")
else:
print("⚠️ Basic functions work, but conversation management needs attention")
else:
print("❌ Please fix configuration issues before running the assistant.")