Spaces:
Build error
Build error
| import os | |
| import httpx | |
| from dotenv import load_dotenv | |
| from typing import Dict, Any, Optional, List | |
| from datetime import datetime | |
| import logging | |
| import asyncio | |
| import hashlib | |
| from openai import AsyncOpenAI | |
| import json, requests, mimetypes | |
| import google.generativeai as genai | |
| import re, json | |
| import PIL.Image | |
| import requests | |
| from typing import List, Dict, Any, Optional | |
| from app.utils.load_env import ACCESS_TOKEN, WHATSAPP_API_URL, GEMINI_API, MEDIA_UPLOAD_URL | |
| from app.utils.system_prompt import system_prompt | |
| from app.services.search_engine import google_search | |
| # from app.search.rag_pipeline import extract_keywords_async | |
| from vidavox.core import ( | |
| BaseResultFormatter, | |
| SearchResult) | |
| # Load environment variables | |
| load_dotenv() | |
| # Get base url from ngrok | |
| def get_ngrok_url() -> str: | |
| """Fetches the public URL of the first ngrok tunnel.""" | |
| try: | |
| response = requests.get("http://localhost:4040/api/tunnels") | |
| response.raise_for_status() # Raise an error for bad status codes. | |
| tunnels = response.json().get("tunnels", []) | |
| if tunnels: | |
| # Prefer the HTTPS tunnel if available. | |
| for tunnel in tunnels: | |
| if tunnel.get("proto") == "https": | |
| return tunnel.get("public_url") | |
| # Fallback: return the first tunnel's URL. | |
| return tunnels[0].get("public_url") | |
| except Exception as e: | |
| print("Error fetching ngrok URL:", e) | |
| # Fallback in case ngrok isn't running. | |
| return "http://localhost:8005" | |
| base_url = get_ngrok_url() # Automatically retrieve your public ngrok URL | |
| print("Base URL:", base_url) | |
| # Get image link from image paths | |
| def get_image_links(image_paths: List[str], base_url: str) -> List[str]: | |
| links = [] | |
| for path in image_paths: | |
| # Remove the surrounding brackets and any extra whitespace | |
| cleaned = path.strip("[]").strip() | |
| # Split by comma to get individual image paths | |
| parts = [part.strip() for part in cleaned.split(",") if part.strip()] | |
| for part in parts: | |
| # Assuming the part starts with "images/", extract the filename | |
| if part.startswith("images/"): | |
| filename = part.split("/", 1)[1] | |
| links.append(f"{base_url}/images/{filename}") | |
| else: | |
| links.append(f"{base_url}/{part}") # Fallback if the format is unexpected | |
| return links | |
| # Define function specifications for Gemini | |
| function_declarations = [ | |
| { | |
| "name": "google_search", | |
| "description": "Perform a Google search and retrieve search results", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "The search query to perform" | |
| }, | |
| "num_results": { | |
| "type": "string", | |
| "description": "Number of search results to retrieve (1-10)", | |
| "default": "3" | |
| } | |
| }, | |
| "required": ["query"] | |
| } | |
| } | |
| ] | |
| class CustomResultFormatter(BaseResultFormatter): | |
| def format(self, result: SearchResult) -> Dict[str, Any]: | |
| # Customize the result format as needed | |
| return { | |
| "doc_id": result.doc_id, | |
| "page_content": result.text, | |
| "image": result.meta_data['images_path'], | |
| "relevance": result.score, | |
| } | |
| genai.configure(api_key=GEMINI_API) | |
| # client = AsyncOpenAI(api_key = OPENAI_API) | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Validate environment variables | |
| if not WHATSAPP_API_URL or not ACCESS_TOKEN: | |
| logger.warning("Environment variables for WHATSAPP_API_URL or ACCESS_TOKEN are not set!") | |
| # Path for the cache file | |
| CACHE_FILE = 'upload_cache.json' | |
| # Load the cache if it exists, otherwise initialize an empty dict | |
| if os.path.exists(CACHE_FILE): | |
| with open(CACHE_FILE, 'r') as f: | |
| upload_cache = json.load(f) | |
| else: | |
| upload_cache = {} | |
| def save_cache(): | |
| with open(CACHE_FILE, 'w') as f: | |
| json.dump(upload_cache, f) | |
| def compute_file_hash(file_path, block_size=65536): | |
| """Compute SHA256 hash of a file to uniquely identify its content.""" | |
| hasher = hashlib.sha256() | |
| with open(file_path, 'rb') as f: | |
| for block in iter(lambda: f.read(block_size), b''): | |
| hasher.update(block) | |
| return hasher.hexdigest() | |
| # Helper function to upload an image | |
| async def upload_image(file_path): | |
| logger.info(f"Uploading image: {file_path}") | |
| # Ensure the file exists | |
| if not os.path.exists(file_path): | |
| raise Exception(f"File not found: {file_path}") | |
| # Compute a hash for the file to check for previous uploads | |
| file_hash = compute_file_hash(file_path) | |
| if file_hash in upload_cache: | |
| logger.info(f"File {file_path} already uploaded. Returning cached media ID.") | |
| return upload_cache[file_hash] | |
| # Get the MIME type of the file | |
| mime_type, _ = mimetypes.guess_type(file_path) | |
| if not mime_type: | |
| raise Exception(f"Could not determine the MIME type for file: {file_path}") | |
| headers = { | |
| 'Authorization': f'Bearer {ACCESS_TOKEN}' | |
| } | |
| # Open the file and prepare the payload for upload | |
| with open(file_path, 'rb') as video_file: | |
| files = { | |
| 'file': (os.path.basename(file_path), video_file, mime_type) | |
| } | |
| data = { | |
| 'messaging_product': 'whatsapp' | |
| } | |
| response = requests.post(MEDIA_UPLOAD_URL, headers=headers, files=files, data=data) | |
| if response.status_code == 200: | |
| logger.info(f"Upload successful: {response.text}") | |
| media_id = response.json()['id'] | |
| # Cache the result so future calls can use the same media ID | |
| upload_cache[file_hash] = media_id | |
| save_cache() | |
| return media_id | |
| else: | |
| logger.error(f"Upload failed: {response.text}") | |
| raise Exception(f'Failed to upload media: {response.status_code}, {response.text}') | |
| # Helper function to send a reply | |
| async def send_reply(to: str, body: str, whatsapp_token: str, whatsapp_url:str, image:Any) -> Dict[str, Any]: | |
| headers = { | |
| "Authorization": f"Bearer {whatsapp_token}", | |
| "Content-Type": "application/json" | |
| } | |
| text_data = { | |
| "messaging_product": "whatsapp", | |
| "to": to, | |
| "type": "text", | |
| "text": { | |
| "body": body | |
| } | |
| } | |
| responses = {} # To store the responses | |
| async with httpx.AsyncClient() as client: | |
| # response = await client.post(whatsapp_url, json=text_data, headers=headers) | |
| text_response = await client.post(whatsapp_url, json=text_data, headers=headers) | |
| if text_response.status_code != 200: | |
| error_detail = text_response.json() | |
| logger.error(f"Failed to send text reply: {error_detail}") | |
| raise Exception(f"Failed to send text reply with status code {text_response.status_code}: {error_detail}") | |
| responses["text"] = text_response.json() | |
| # if response.status_code != 200: | |
| # error_detail = response.json() | |
| # logger.error(f"Failed to send reply: {error_detail}") | |
| # raise Exception(f"Failed to send reply with status code {response.status_code}: {error_detail}") | |
| # Initialize list to hold image responses | |
| image_responses: List[Dict[str, Any]] = [] | |
| if image: | |
| # Get the list of full image URLs using your helper function. | |
| links = get_image_links(image, base_url) | |
| for link in links: | |
| image_payload = { | |
| "messaging_product": "whatsapp", | |
| "recipient_type": "individual", | |
| "to": to, | |
| "type": "image", | |
| "image": { | |
| "id": "", | |
| "link": link, | |
| "caption": "" # Using the text body as caption; adjust if needed. | |
| } | |
| } | |
| img_response = await client.post(whatsapp_url, json=image_payload, headers=headers) | |
| if img_response.status_code != 200: | |
| error_detail = img_response.json() | |
| logger.error(f"Failed to send image: {error_detail}") | |
| raise Exception(f"Failed to send image with status code {img_response.status_code}: {error_detail}") | |
| image_responses.append(img_response.json()) | |
| responses["images"] = image_responses | |
| return responses | |
| # return response.json() | |
| # Helper function to generate a reply based on message content | |
| async def generate_reply(sender: str, content: str, timestamp: int) -> str: | |
| try: | |
| received_time = datetime.fromtimestamp(int(timestamp) / 1000) # Assuming timestamp is in milliseconds | |
| if "hello" in content.lower(): | |
| return f"Hi {sender}, how can I assist you today?" | |
| elif "test" in content.lower(): | |
| return f"Hi {sender}, this is a reply to your test message." | |
| elif received_time.hour < 12: | |
| return f"Good morning, {sender}! How can I help you?" | |
| else: | |
| return f"Hello {sender}, I hope you're having a great day!" | |
| except Exception as e: | |
| logger.error(f"Error generating reply: {str(e)}", exc_info=True) | |
| return f"Sorry {sender}, I couldn't process your message. Please try again." | |
| async def process_message_with_llm( | |
| sender_id: str, | |
| content: str, | |
| history: List[Dict[str, str]], | |
| rag_system: Any, | |
| whatsapp_token: str, | |
| whatsapp_url:str, | |
| agentMemory: Any = None, | |
| memory:Any = None, | |
| image_file_path: Optional[str] = None, | |
| doc_path: Optional[str] = None, | |
| video_file_path: Optional[str] = None, | |
| ) -> str: | |
| """Process message with retry logic.""" | |
| try: | |
| logger.info(f"Processing message for sender: {sender_id}") | |
| generated_reply, image_path = await generate_response_from_gemini( | |
| sender=sender_id, | |
| content=content, | |
| history=history, | |
| rag_system=rag_system, | |
| image_file_path=image_file_path, | |
| doc_path=doc_path, | |
| video_file_path=video_file_path, | |
| agentMemory=agentMemory, | |
| memory = memory | |
| ) | |
| logger.info(f"Generated reply: {generated_reply}, extracted image path: {image_path}") | |
| response = await send_reply(sender_id, generated_reply , whatsapp_token, whatsapp_url, image_path) | |
| # return generated_reply | |
| return generated_reply | |
| except Exception as e: | |
| logger.error(f"Error in process_message_with_retry: {str(e)}", exc_info=True) | |
| return "Sorry, I couldn't generate a response at this time." | |
| import markdown | |
| from bs4 import BeautifulSoup | |
| def format_response_text(response_text: str) -> str: | |
| """ | |
| Converts markdown-formatted text to plain text with proper newlines. | |
| This will ensure bullet points, paragraphs, and other elements are formatted | |
| for display in WhatsApp. | |
| """ | |
| # Convert markdown to HTML | |
| html = markdown.markdown(response_text) | |
| # Parse HTML and extract text using newline as separator | |
| soup = BeautifulSoup(html, "html.parser") | |
| formatted_text = soup.get_text(separator="\n") | |
| return formatted_text | |
| import re | |
| import json | |
| def process_llm_response(llm_output): | |
| # If it's a string, attempt to extract JSON from markdown code fences. | |
| if isinstance(llm_output, str): | |
| pattern = r"```json\s*(\{.*\})\s*```" | |
| match = re.search(pattern, llm_output, re.DOTALL) | |
| if match: | |
| json_str = match.group(1) | |
| else: | |
| json_str = llm_output.strip() | |
| try: | |
| parsed = json.loads(json_str) | |
| if isinstance(parsed, dict) and "response" in parsed: | |
| response_text = parsed.get("response", "") | |
| # Optionally format the response text using our helper | |
| # formatted_response = format_response_text(response_text) | |
| references = parsed.get("references", []) | |
| if isinstance(references, list): | |
| image_paths = [ref.get("image") for ref in references | |
| if ref.get("image") and ref.get("image") != "nan"] | |
| else: | |
| image_paths = [] | |
| return response_text, image_paths | |
| else: | |
| # Fallback if the JSON doesn't have expected structure. | |
| return llm_output, [] | |
| except json.JSONDecodeError: | |
| # Fallback: if JSON parsing fails, assume it's plain text. | |
| return format_response_text(llm_output), [] | |
| # If not a string, return something sensible. | |
| return str(llm_output), [] | |
| # def process_llm_response(llm_output): | |
| # # If it's a string, attempt to extract JSON from markdown code fences. | |
| # if isinstance(llm_output, str): | |
| # # Try to capture JSON content if it's wrapped in ```json ... ``` | |
| # pattern = r"```json\s*(\{.*\})\s*```" | |
| # match = re.search(pattern, llm_output, re.DOTALL) | |
| # if match: | |
| # json_str = match.group(1) | |
| # else: | |
| # json_str = llm_output.strip() | |
| # try: | |
| # parsed = json.loads(json_str) | |
| # # Check if parsed output has the expected keys. | |
| # if isinstance(parsed, dict) and "response" in parsed: | |
| # response_text = parsed.get("response", "") | |
| # references = parsed.get("references", []) | |
| # if isinstance(references, list): | |
| # image_paths = [ref.get("image") for ref in references | |
| # if ref.get("image") and ref.get("image") != "nan"] | |
| # else: | |
| # image_paths = [] | |
| # return response_text, image_paths | |
| # else: | |
| # # Fallback: parsed JSON does not have the expected structure. | |
| # return llm_output, [] | |
| # except json.JSONDecodeError: | |
| # # Fallback: if JSON parsing fails, assume it's plain text. | |
| # return llm_output, [] | |
| # # If not a string, ensure we return something sensible. | |
| # return str(llm_output), [] | |
| async def generate_response_from_gemini( | |
| sender: str, | |
| content: str, | |
| history: List[Dict[str, str]], | |
| rag_system: Any = None, | |
| agentMemory: Any = None, | |
| memory:Any = None, | |
| image_file_path: Optional[str] = None, | |
| doc_path: Optional[str] = None, | |
| video_file_path: Optional[str] = None, | |
| ) -> str: | |
| try: | |
| logger.info(f"Generating response for sender: {sender}") | |
| # Initialize the model | |
| # model = genai.GenerativeModel("gemini-1.5-pro-002", system_instruction= system_prompt) | |
| model = genai.GenerativeModel("gemini-1.5-flash", system_instruction= system_prompt) | |
| # model = genai.GenerativeModel("gemini-exp-1206", system_instruction= system_prompt) | |
| # Start chat with history | |
| chat = model.start_chat(history=history) | |
| if content: | |
| if rag_system: | |
| # keywords = extract_keywords_async(content) | |
| # keywords = [] | |
| # logger.info(f"Extracted Keywords: {keywords}") | |
| # Implement RAG: Retrieve relevant documents | |
| retrieved_docs = rag_system.retrieve(query_text = content, result_formatter=CustomResultFormatter()) | |
| print(f"retrieved docs: {retrieved_docs}") | |
| if retrieved_docs: | |
| logger.info(f"Retrieved {len(retrieved_docs)} documents for context.") | |
| # Format the retrieved documents as a context string | |
| context = "\n\n".join([f"Source:{doc['doc_id']}\nContent: {doc['page_content']}\nImage: {doc['image']}" for doc in retrieved_docs]) | |
| # img_paths = doc['images_path'] for doc in retrieved_docs | |
| # Option 1: Append to history as a system message | |
| history.append({"role": "user", "parts": f"Relevant documents:\n{context}"}) | |
| # logger.info(f"History: {history}") | |
| # Reinitialize chat with updated history | |
| chat = model.start_chat(history=history) | |
| # Process image | |
| if image_file_path: | |
| logger.info(f"Processing image at {image_file_path}") | |
| image_data = PIL.Image.open(image_file_path) | |
| response = await chat.send_message_async(image_data) | |
| return response.text | |
| # Process document | |
| if doc_path: | |
| logger.info(f"Processing document at {doc_path}") | |
| doc_data = genai.upload_file(doc_path) | |
| response = await chat.send_message_async(doc_data) | |
| return response.text | |
| # Process video (if supported) | |
| if video_file_path: | |
| logger.info(f"Processing video at {video_file_path}") | |
| video_data = genai.upload_file(video_file_path) | |
| response = await chat.send_message_async(video_data) | |
| return response.text | |
| # Implement video processing logic here | |
| pass # Placeholder for video processing logic | |
| # Send the user's message | |
| response = await chat.send_message_async(content) | |
| print(f"text: {response.text}") | |
| response_text, image_paths = process_llm_response(response.text) | |
| # response = await handle_function_call(response) | |
| # return response.text | |
| return response_text, image_paths | |
| except Exception as e: | |
| logger.error("Error in generate_response_from_gemini:", exc_info=True) | |
| return "Sorry, I couldn't generate a response at this time." | |
| async def handle_function_call(chat): | |
| """ | |
| Handle function calls from the Gemini API. | |
| Args: | |
| chat (ChatSession): The current chat session. | |
| Returns: | |
| The response after resolving function calls. | |
| """ | |
| # Continue the conversation and handle any function calls | |
| while True: | |
| response = chat.send_message_async(chat.history[-1]) | |
| # Check if there are any function calls to handle | |
| if response.candidates[0].content.parts[0].function_call: | |
| function_call = response.candidates[0].content.parts[0].function_call | |
| function_name = function_call.name | |
| function_args = json.loads(function_call.args) | |
| # Dispatch to the appropriate function | |
| if function_name == "google_search": | |
| # Handle async function call | |
| result = await google_search( | |
| query=function_args['query'], | |
| num_results=function_args.get('num_results', '3') | |
| ) | |
| # Send the function result back to continue the conversation | |
| response = chat.send_message_async( | |
| part={ | |
| "function_response": { | |
| "name": function_name, | |
| "response": result | |
| } | |
| } | |
| ) | |
| else: | |
| # No more function calls, return the final response | |
| return response | |
| # Process message with retry logic | |
| # async def process_message_with_retry( | |
| # sender_id: str, | |
| # content: str, | |
| # history: List[str], | |
| # timestamp: Optional[int] = None, | |
| # media: Optional[Dict[str, Any]] = None, | |
| # image_file_path: Optional[str] = None, | |
| # doc_path: Optional[str] = None, | |
| # ) -> Dict[str, Any]: | |
| # """Process message with retry logic""" | |
| # retries = 1 | |
| # delay = 0.1 # Initial delay in seconds | |
| # # for attempt in range(retries): | |
| # try: | |
| # logger.info(f"Sending message to the Gemini model...") | |
| # generated_reply = await generate_response_from_gemini(sender = sender_id, content=content, history = history, timestamp = timestamp, image_file_path = image_file_path, media=media, doc_path = doc_path) | |
| # logger.info(f"Reply generated: {generated_reply}") | |
| # response = await send_reply(sender_id, generated_reply) | |
| # return generated_reply | |
| # return {"status": "success", "reply": generated_reply, "response": response} | |
| # except Exception as e: | |
| # logger.error(f"Error generating reply: {str(e)}", exc_info=True) | |
| # return {"status": "error", "reply": "Sorry, I couldn't generate a response at this time."} | |
| # logger.error(f"Attempt {attempt + 1} failed: {str(e)}", exc_info=True) | |
| # if attempt < retries - 1: | |
| # await asyncio.sleep(delay) | |
| # delay *= 2 # Exponential backoff | |
| # else: | |
| # raise Exception(f"All {retries} attempts failed.") from e | |
| # Example usage | |
| # asyncio.run(process_message_with_retry("1234567890", "hello", 1700424056000)) | |
| # async def generate_response_from_gemini(sender: str, content: str, timestamp: str, history: List[Dict[str, str]], media: Optional[Dict[str, Any]] = None, image_file_path: Optional[str] = None, doc_path: Optional[str] = None) -> str: | |
| # try: | |
| # print(f"Sender: {sender}") | |
| # print(f"Content: {content}") | |
| # print(f"Timestamp: {timestamp}") | |
| # print(f"History: {history}") | |
| # print(f"Media: {media}") | |
| # # Initialize the model | |
| # model = genai.GenerativeModel("gemini-1.5-pro-002") | |
| # # Define the chat history | |
| # chat = model.start_chat( | |
| # history=history | |
| # ) | |
| # logger.info(f"file_path: {image_file_path}") | |
| # if image_file_path: # Should be bytes or a file-like object | |
| # prompt = "Describe the following image:" | |
| # image_data = PIL.Image.open(image_file_path) | |
| # print("Sending image to the Gemini model...") | |
| # response = await chat.send_message_async(image_data) | |
| # print(f"Model response: {response.text}") | |
| # return response.text | |
| # if doc_path: | |
| # doc_data = genai.upload_file(doc_path) | |
| # print("Sending document to the Gemini model...") | |
| # response = await chat.send_message_async(doc_data) | |
| # print(f"Model response: {response.text}") | |
| # return response.text | |
| # # Send the user's message | |
| # print("Sending message to the Gemini model...") | |
| # response = await chat.send_message_async(content) | |
| # print(f"Model response: {response.text}") | |
| # return response.text | |
| # except Exception as e: | |
| # print("Error generating reply from Gemini:", e) | |
| # return "Sorry, I couldn't generate a response at this time." | |
| # async def generate_response_from_chatgpt(sender: str, content: str, timestamp: str, history: str) -> str: | |
| # """ | |
| # Generate a reply using OpenAI's ChatGPT API. | |
| # """ | |
| # try: | |
| # # # Initialize chat history if not provided | |
| # # chat_history = chat_history or [] | |
| # # # Append the current user message to the chat history | |
| # # chat_history.append({"role": "user", "content": f"From {sender} at {timestamp}: {content}"}) | |
| # messages = [ | |
| # {"role": "system", "content": "You're an investor, a serial founder, and you've sold many startups. You understand nothing but business."}, | |
| # {"role": "system", "content": f"Message History: {history}"}, | |
| # {"role": "user", "content": f"From {sender} at {timestamp}: {content}"} | |
| # ] | |
| # print(f"Messages: {messages}") | |
| # response = await client.chat.completions.create( | |
| # model="gpt-3.5-turbo", | |
| # messages=messages, | |
| # max_tokens=200, | |
| # temperature=0.5 | |
| # ) | |
| # chatgpt_response = response.choices[0].message.content.strip() | |
| # # Append the assistant's response to the chat history | |
| # # chat_history.append({"role": "assistant", "content": chatgpt_response}) | |
| # return chatgpt_response | |
| # except Exception as e: | |
| # print("Error generating reply:", e) | |
| # return "Sorry, I couldn't generate a response at this time." | |
| # async def generate_response_from_chatgpt( | |
| # sender: str, | |
| # content: str, | |
| # timestamp: str, | |
| # image: Optional[bytes] = None, | |
| # file: Optional[bytes] = None, | |
| # file_name: Optional[str] = None, | |
| # chat_history: Optional[List[Dict[str, str]]] = None, | |
| # ) -> Dict[str, Any]: | |
| # """ | |
| # Generate a reply using OpenAI's GPT-4 API, including support for images, files, and maintaining chat history. | |
| # """ | |
| # try: | |
| # # Initialize chat history if not provided | |
| # chat_history = chat_history or [] | |
| # # Append the current user message to the chat history | |
| # chat_history.append({"role": "user", "content": f"From {sender} at {timestamp}: {content}"}) | |
| # # Prepare files for the request | |
| # files = [] | |
| # if image: | |
| # files.append({"name": "image.png", "type": "image/png", "content": image}) | |
| # if file: | |
| # files.append({"name": file_name or "file.txt", "type": "application/octet-stream", "content": file}) | |
| # logger.debug(f"Chat History Before Response: {chat_history}") | |
| # # Send the request to the GPT-4 API | |
| # response = await client.chat.completions.create( | |
| # model="gpt-4-vision", # Ensure this is the correct model for multimodal support | |
| # messages=chat_history, | |
| # files=files if files else None, # Include files if present | |
| # max_tokens=200, | |
| # temperature=0.5, | |
| # ) | |
| # # Parse the assistant's response | |
| # chatgpt_response = response.choices[0].message.content.strip() | |
| # # Append the assistant's response to the chat history | |
| # chat_history.append({"role": "assistant", "content": chatgpt_response}) | |
| # logger.debug(f"Chat History After Response: {chat_history}") | |
| # # Return both the assistant's response and the updated chat history | |
| # return {"response": chatgpt_response, "chat_history": chat_history} | |
| # except Exception as e: | |
| # logger.error("Error generating reply", exc_info=True) | |
| # return {"response": "Sorry, I couldn't generate a response at this time.", "chat_history": chat_history} | |