Spaces:
Build error
Build error
| import os | |
| import httpx | |
| from dotenv import load_dotenv | |
| from typing import Dict, Any, Optional, List | |
| from datetime import datetime | |
| import logging | |
| import asyncio | |
| from openai import AsyncOpenAI | |
| import google.generativeai as genai | |
| import PIL.Image | |
| from typing import List, Dict, Any, Optional | |
| from app.utils.load_env import ACCESS_TOKEN, WHATSAPP_API_URL, GEMNI_API, OPENAI_API | |
| from app.utils.system_prompt import system_prompt | |
| # Load environment variables | |
| load_dotenv() | |
| genai.configure(api_key=GEMNI_API) | |
| client = AsyncOpenAI(api_key = OPENAI_API) | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Validate environment variables | |
| if not WHATSAPP_API_URL or not ACCESS_TOKEN: | |
| logger.warning("Environment variables for WHATSAPP_API_URL or ACCESS_TOKEN are not set!") | |
| # Helper function to send a reply | |
| async def send_reply(to: str, body: str) -> Dict[str, Any]: | |
| headers = { | |
| "Authorization": f"Bearer {ACCESS_TOKEN}", | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "messaging_product": "whatsapp", | |
| "to": to, | |
| "type": "text", | |
| "text": { | |
| "body": body | |
| } | |
| } | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post(WHATSAPP_API_URL, json=data, headers=headers) | |
| if response.status_code != 200: | |
| error_detail = response.json() | |
| logger.error(f"Failed to send reply: {error_detail}") | |
| raise Exception(f"Failed to send reply with status code {response.status_code}: {error_detail}") | |
| return response.json() | |
| # Helper function to generate a reply based on message content | |
| async def generate_reply(sender: str, content: str, timestamp: int) -> str: | |
| try: | |
| received_time = datetime.fromtimestamp(int(timestamp) / 1000) # Assuming timestamp is in milliseconds | |
| if "hello" in content.lower(): | |
| return f"Hi {sender}, how can I assist you today?" | |
| elif "test" in content.lower(): | |
| return f"Hi {sender}, this is a reply to your test message." | |
| elif received_time.hour < 12: | |
| return f"Good morning, {sender}! How can I help you?" | |
| else: | |
| return f"Hello {sender}, I hope you're having a great day!" | |
| except Exception as e: | |
| logger.error(f"Error generating reply: {str(e)}", exc_info=True) | |
| return f"Sorry {sender}, I couldn't process your message. Please try again." | |
| async def process_message_with_llm( | |
| sender_id: str, | |
| content: str, | |
| history: List[Dict[str, str]], | |
| image_file_path: Optional[str] = None, | |
| doc_path: Optional[str] = None, | |
| video_file_path: Optional[str] = None, | |
| ) -> str: | |
| """Process message with retry logic.""" | |
| try: | |
| logger.info(f"Processing message for sender: {sender_id}") | |
| generated_reply = await generate_response_from_gemini( | |
| sender=sender_id, | |
| content=content, | |
| history=history, | |
| image_file_path=image_file_path, | |
| doc_path=doc_path, | |
| video_file_path=video_file_path | |
| ) | |
| logger.info(f"Generated reply: {generated_reply}") | |
| response = await send_reply(sender_id, generated_reply) | |
| # return generated_reply | |
| return generated_reply | |
| except Exception as e: | |
| logger.error(f"Error in process_message_with_retry: {str(e)}", exc_info=True) | |
| return "Sorry, I couldn't generate a response at this time." | |
| async def generate_response_from_gemini( | |
| sender: str, | |
| content: str, | |
| history: List[Dict[str, str]], | |
| image_file_path: Optional[str] = None, | |
| doc_path: Optional[str] = None, | |
| video_file_path: Optional[str] = None, | |
| ) -> str: | |
| try: | |
| logger.info(f"Generating response for sender: {sender}") | |
| # Initialize the model | |
| model = genai.GenerativeModel("gemini-1.5-pro-002", system_instruction= system_prompt) | |
| # Start chat with history | |
| chat = model.start_chat(history=history) | |
| # Process image | |
| if image_file_path: | |
| logger.info(f"Processing image at {image_file_path}") | |
| image_data = PIL.Image.open(image_file_path) | |
| response = await chat.send_message_async(image_data) | |
| return response.text | |
| # Process document | |
| if doc_path: | |
| logger.info(f"Processing document at {doc_path}") | |
| doc_data = genai.upload_file(doc_path) | |
| response = await chat.send_message_async(doc_data) | |
| return response.text | |
| # Process video (if supported) | |
| if video_file_path: | |
| logger.info(f"Processing video at {video_file_path}") | |
| video_data = genai.upload_file(video_file_path) | |
| response = await chat.send_message_async(video_data) | |
| return response.text | |
| # Implement video processing logic here | |
| pass # Placeholder for video processing logic | |
| # Send the user's message | |
| response = await chat.send_message_async(content) | |
| return response.text | |
| except Exception as e: | |
| logger.error("Error in generate_response_from_gemini:", exc_info=True) | |
| return "Sorry, I couldn't generate a response at this time." | |
| # Process message with retry logic | |
| # async def process_message_with_retry( | |
| # sender_id: str, | |
| # content: str, | |
| # history: List[str], | |
| # timestamp: Optional[int] = None, | |
| # media: Optional[Dict[str, Any]] = None, | |
| # image_file_path: Optional[str] = None, | |
| # doc_path: Optional[str] = None, | |
| # ) -> Dict[str, Any]: | |
| # """Process message with retry logic""" | |
| # retries = 1 | |
| # delay = 0.1 # Initial delay in seconds | |
| # # for attempt in range(retries): | |
| # try: | |
| # logger.info(f"Sending message to the Gemini model...") | |
| # generated_reply = await generate_response_from_gemini(sender = sender_id, content=content, history = history, timestamp = timestamp, image_file_path = image_file_path, media=media, doc_path = doc_path) | |
| # logger.info(f"Reply generated: {generated_reply}") | |
| # response = await send_reply(sender_id, generated_reply) | |
| # return generated_reply | |
| # return {"status": "success", "reply": generated_reply, "response": response} | |
| # except Exception as e: | |
| # logger.error(f"Error generating reply: {str(e)}", exc_info=True) | |
| # return {"status": "error", "reply": "Sorry, I couldn't generate a response at this time."} | |
| # logger.error(f"Attempt {attempt + 1} failed: {str(e)}", exc_info=True) | |
| # if attempt < retries - 1: | |
| # await asyncio.sleep(delay) | |
| # delay *= 2 # Exponential backoff | |
| # else: | |
| # raise Exception(f"All {retries} attempts failed.") from e | |
| # Example usage | |
| # asyncio.run(process_message_with_retry("1234567890", "hello", 1700424056000)) | |
| # async def generate_response_from_gemini(sender: str, content: str, timestamp: str, history: List[Dict[str, str]], media: Optional[Dict[str, Any]] = None, image_file_path: Optional[str] = None, doc_path: Optional[str] = None) -> str: | |
| # try: | |
| # print(f"Sender: {sender}") | |
| # print(f"Content: {content}") | |
| # print(f"Timestamp: {timestamp}") | |
| # print(f"History: {history}") | |
| # print(f"Media: {media}") | |
| # # Initialize the model | |
| # model = genai.GenerativeModel("gemini-1.5-pro-002") | |
| # # Define the chat history | |
| # chat = model.start_chat( | |
| # history=history | |
| # ) | |
| # logger.info(f"file_path: {image_file_path}") | |
| # if image_file_path: # Should be bytes or a file-like object | |
| # prompt = "Describe the following image:" | |
| # image_data = PIL.Image.open(image_file_path) | |
| # print("Sending image to the Gemini model...") | |
| # response = await chat.send_message_async(image_data) | |
| # print(f"Model response: {response.text}") | |
| # return response.text | |
| # if doc_path: | |
| # doc_data = genai.upload_file(doc_path) | |
| # print("Sending document to the Gemini model...") | |
| # response = await chat.send_message_async(doc_data) | |
| # print(f"Model response: {response.text}") | |
| # return response.text | |
| # # Send the user's message | |
| # print("Sending message to the Gemini model...") | |
| # response = await chat.send_message_async(content) | |
| # print(f"Model response: {response.text}") | |
| # return response.text | |
| # except Exception as e: | |
| # print("Error generating reply from Gemini:", e) | |
| # return "Sorry, I couldn't generate a response at this time." | |
| async def generate_response_from_chatgpt(sender: str, content: str, timestamp: str, history: str) -> str: | |
| """ | |
| Generate a reply using OpenAI's ChatGPT API. | |
| """ | |
| try: | |
| # # Initialize chat history if not provided | |
| # chat_history = chat_history or [] | |
| # # Append the current user message to the chat history | |
| # chat_history.append({"role": "user", "content": f"From {sender} at {timestamp}: {content}"}) | |
| messages = [ | |
| {"role": "system", "content": "You're an investor, a serial founder, and you've sold many startups. You understand nothing but business."}, | |
| {"role": "system", "content": f"Message History: {history}"}, | |
| {"role": "user", "content": f"From {sender} at {timestamp}: {content}"} | |
| ] | |
| print(f"Messages: {messages}") | |
| response = await client.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=messages, | |
| max_tokens=200, | |
| temperature=0.5 | |
| ) | |
| chatgpt_response = response.choices[0].message.content.strip() | |
| # Append the assistant's response to the chat history | |
| # chat_history.append({"role": "assistant", "content": chatgpt_response}) | |
| return chatgpt_response | |
| except Exception as e: | |
| print("Error generating reply:", e) | |
| return "Sorry, I couldn't generate a response at this time." | |
| # async def generate_response_from_chatgpt( | |
| # sender: str, | |
| # content: str, | |
| # timestamp: str, | |
| # image: Optional[bytes] = None, | |
| # file: Optional[bytes] = None, | |
| # file_name: Optional[str] = None, | |
| # chat_history: Optional[List[Dict[str, str]]] = None, | |
| # ) -> Dict[str, Any]: | |
| # """ | |
| # Generate a reply using OpenAI's GPT-4 API, including support for images, files, and maintaining chat history. | |
| # """ | |
| # try: | |
| # # Initialize chat history if not provided | |
| # chat_history = chat_history or [] | |
| # # Append the current user message to the chat history | |
| # chat_history.append({"role": "user", "content": f"From {sender} at {timestamp}: {content}"}) | |
| # # Prepare files for the request | |
| # files = [] | |
| # if image: | |
| # files.append({"name": "image.png", "type": "image/png", "content": image}) | |
| # if file: | |
| # files.append({"name": file_name or "file.txt", "type": "application/octet-stream", "content": file}) | |
| # logger.debug(f"Chat History Before Response: {chat_history}") | |
| # # Send the request to the GPT-4 API | |
| # response = await client.chat.completions.create( | |
| # model="gpt-4-vision", # Ensure this is the correct model for multimodal support | |
| # messages=chat_history, | |
| # files=files if files else None, # Include files if present | |
| # max_tokens=200, | |
| # temperature=0.5, | |
| # ) | |
| # # Parse the assistant's response | |
| # chatgpt_response = response.choices[0].message.content.strip() | |
| # # Append the assistant's response to the chat history | |
| # chat_history.append({"role": "assistant", "content": chatgpt_response}) | |
| # logger.debug(f"Chat History After Response: {chat_history}") | |
| # # Return both the assistant's response and the updated chat history | |
| # return {"response": chatgpt_response, "chat_history": chat_history} | |
| # except Exception as e: | |
| # logger.error("Error generating reply", exc_info=True) | |
| # return {"response": "Sorry, I couldn't generate a response at this time.", "chat_history": chat_history} | |