EasyBot / server.py
NitinBot001's picture
Upload 5 files
8c56999 verified
raw
history blame
5.46 kB
# flask_app.py
from flask import Flask, request, jsonify, render_template
from app import EasyFarmsAssistant
from conversation_manager import ConversationManager # Make sure this import is present
import logging
import uuid
import os
from flask_cors import CORS
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize the Flask application
app = Flask(__name__)
CORS(app)
# --- Initialize Core Services ---
# This setup assumes your .env file is correctly configured with Supabase credentials.
try:
conv_manager = ConversationManager()
assistant = EasyFarmsAssistant(manager=conv_manager)
logger.info("EasyFarmsAssistant and ConversationManager initialized successfully.")
except Exception as e:
logger.error(f"FATAL: Could not initialize services. Error: {e}")
assistant = None
conv_manager = None
# --- Frontend Serving Route ---
@app.route('/')
def index():
"""Serves the main chat application page (index.html)."""
return render_template('index.html')
# --- API Endpoints ---
@app.route('/config', methods=['GET'])
def get_config():
"""Provides public configuration keys to the frontend."""
return jsonify({
'imgbb_api_key': os.getenv('IMGBB_API_KEY')
})
@app.route('/chat', methods=['POST'])
def chat():
"""Handles incoming user messages and returns the assistant's response."""
if not assistant:
return jsonify({"error": "Assistant is not available due to an initialization error."}), 503
# The request is multipart/form-data to handle potential image uploads
data = request.form
user_message = data.get('message')
session_id = data.get('session_id')
image_url = data.get('image_url') # The permanent URL from ImgBB
# A message must contain either text or an image
if not user_message and not image_url:
return jsonify({"error": "Cannot process an empty message."}), 400
# If no session_id is provided by the client, it's a new conversation
if not session_id or session_id == 'null' or session_id == 'undefined':
session_id = str(uuid.uuid4())
logger.info(f"No session_id provided. Creating new session: {session_id}")
# Call the assistant's core logic, now passing the image_url separately
response_content = assistant.process_query(
user_message=user_message or "", # Ensure user_message is a string
session_id=session_id,
image_url=image_url
)
return jsonify({
"response": response_content,
"session_id": session_id
})
@app.route('/history/sessions', methods=['GET'])
def get_sessions():
"""Fetches a list of all conversation sessions for the sidebar."""
if not conv_manager:
return jsonify({"error": "Conversation manager not available"}), 503
try:
# Fetch session_id and the full history to generate a title
all_conversations = conv_manager.supabase.table('conversations').select('session_id, history').execute()
sessions = []
for conv in all_conversations.data:
title = "New Chat" # Default title
# Generate a title from the first user message in the history
if conv.get('history') and len(conv['history']) > 0:
first_message_content = conv['history'][0].get('content')
if first_message_content:
# Truncate for display
title = first_message_content[:35] + '...' if len(first_message_content) > 35 else first_message_content
else: # If the first message was only an image
title = "Image Query"
sessions.append({
"session_id": conv.get('session_id'),
"title": title
})
return jsonify(sessions)
except Exception as e:
logger.error(f"Error fetching sessions from Supabase: {e}")
return jsonify({"error": "Could not fetch conversation sessions"}), 500
@app.route('/history/messages/<session_id>', methods=['GET'])
def get_messages(session_id):
"""Fetches the full, structured message history for a given session_id."""
if not conv_manager:
return jsonify({"error": "Conversation manager not available"}), 503
try:
history = conv_manager.get_history(session_id)
return jsonify(history)
except Exception as e:
logger.error(f"Error fetching messages for session {session_id}: {e}")
return jsonify({"error": "Could not fetch message history"}), 500
@app.route('/clear', methods=['POST'])
def clear_history():
"""Deletes a conversation history from the database."""
if not assistant:
return jsonify({"error": "Assistant is not available."}), 503
data = request.get_json()
session_id = data.get('session_id')
if not session_id:
return jsonify({"error": "Missing 'session_id' in request body"}), 400
if assistant.clear_history(session_id):
return jsonify({"status": "success", "message": f"History for session {session_id} was cleared."})
else:
return jsonify({"error": "Failed to clear history."}), 500
# --- Main Execution ---
if __name__ == '__main__':
# Set debug=False for production
app.run(debug=False, port=7860)