Spaces:
Sleeping
Sleeping
| from flask import Flask,request,make_response | |
| import os | |
| import logging | |
| from dotenv import load_dotenv | |
| from heyoo import WhatsApp | |
| import random | |
| import shutil | |
| from tempfile import NamedTemporaryFile | |
| import assemblyai as aai | |
| from pandasai.llm import GoogleGemini | |
| from pandasai import SmartDataframe | |
| from pandasai import Agent | |
| from pandasai.responses.response_parser import ResponseParser | |
| from langchain_experimental.agents import create_pandas_dataframe_agent | |
| import pandas as pd | |
| from langchain_google_genai import GoogleGenerativeAI | |
| from langchain_google_genai.chat_models import ChatGoogleGenerativeAI | |
| import requests | |
| import base64 | |
| from pandasai.helpers import path | |
| import uuid | |
| import pandasai as pai | |
| # load env data | |
| load_dotenv() | |
| ''' | |
| def modify_file_path(file_path): | |
| guid = uuid.uuid4() | |
| print(guid) | |
| guid = str(uuid.uuid4()) | |
| new_filename = f"{guid}temp_chart.png" | |
| new_file_path = os.path.join("/home/runner/Flasktestrairo/plots/", new_filename) | |
| # Copy the file | |
| shutil.copy2(file_path, new_file_path) | |
| return new_file_path | |
| ''' | |
| guid = uuid.uuid4() | |
| new_filename = f"{guid}" | |
| user_defined_path = os.path.join("/plots/", new_filename) | |
| class FlaskResponse(ResponseParser): | |
| def __init__(self, context) -> None: | |
| super().__init__(context) | |
| def format_dataframe(self, result): | |
| return result['value'].to_html() | |
| def format_plot(self, result): | |
| # Save the plot using savefig | |
| try: | |
| img_path = result['value'] | |
| except ValueError: | |
| img_path = str(result['value']) | |
| print("value error!", img_path) | |
| print("response_class_path:", img_path) | |
| return img_path | |
| def format_other(self, result): | |
| return str(result['value']) | |
| # messenger object | |
| messenger = WhatsApp( | |
| os.environ["whatsapp_token"], | |
| phone_number_id=os.environ["phone_number_id"] ) | |
| aai.settings.api_key = os.environ["aai_key"] | |
| transcriber = aai.Transcriber() | |
| g_api_key = os.environ["google_api_key"] | |
| app = Flask(__name__) | |
| VERIFY_TOKEN = "30cca545-3838-48b2-80a7-9e43b1ae8ce4" | |
| #df = pd.read_excel("craig.xlsx") | |
| df = pd.read_csv("small_business_data2.csv") | |
| llm = ChatGoogleGenerativeAI(model="gemini-pro", google_api_key=g_api_key, temperature=0.2) | |
| def generateResponse(prompt): | |
| llm = GoogleGemini(api_key=g_api_key) | |
| pandas_agent = Agent(df,config={"llm":llm, "save_charts_path": user_defined_path,"save_charts": False,"response_parser":FlaskResponse, "enable_cache": False, "conversational":True}) | |
| answer = pandas_agent.chat(prompt) | |
| return answer | |
| img_ID = "344744a88ad1098" | |
| img_secret = "3c542a40c215327045d7155bddfd8b8bc84aebbf" | |
| url = "https://api.imgur.com/3/image" | |
| headers = {"Authorization": f"Client-ID {img_ID}"} | |
| def respond(query_str:str): | |
| response = "hello, I don't have a brain yet" | |
| return response | |
| def hook(): | |
| if request.method == "GET": | |
| if request.args.get("hub.verify_token") == VERIFY_TOKEN: | |
| logging.info("Verified webhook") | |
| response = make_response(request.args.get("hub.challenge"), 200) | |
| response.mimetype = "text/plain" | |
| return response | |
| logging.error("Webhook Verification failed") | |
| return "Invalid verification token" | |
| # get message update.. | |
| data = request.get_json() | |
| changed_field = messenger.changed_field(data) | |
| if changed_field == "messages": | |
| new_message = messenger.get_mobile(data) | |
| if new_message: | |
| mobile = messenger.get_mobile(data) | |
| message_type = messenger.get_message_type(data) | |
| if message_type == "text": | |
| message = messenger.get_message(data) | |
| # Handle greetings | |
| if message.lower() in ("hi", "hello", "help", "how are you"): | |
| response = "Hi there! My name is BuzyHelper. How can I help you today?" | |
| messenger.send_message(message=f"{response}",recipient_id=mobile) | |
| else: | |
| response = str(generateResponse(message)) | |
| print("Response:", response) | |
| logging.info(f"\nAnswer: {response}\n") | |
| # Check if response is a string and represents a valid image path | |
| if isinstance(response, str) and os.path.isfile(os.path.join(response)): | |
| image_path = os.path.join(response) | |
| print("My image path:", image_path) | |
| with open(image_path, "rb") as file: | |
| data = file.read() | |
| base64_data = base64.b64encode(data) | |
| # Upload image to Imgur and get URL | |
| response = requests.post(url, headers=headers, data={"image": base64_data}) | |
| url1= response.json()["data"]["link"] | |
| print(url1) | |
| messenger.send_image(image=url1, recipient_id=mobile) | |
| else: | |
| # Handle cases where response is not a valid image path | |
| messenger.send_message(message=f"{response}", recipient_id=mobile) | |
| elif message_type == "audio": | |
| audio = messenger.get_audio(data) | |
| audio_id, mime_type = audio["id"], audio["mime_type"] | |
| audio_url = messenger.query_media_url(audio_id) | |
| audio_filename = messenger.download_media(audio_url, mime_type) | |
| transcript =transcriber.transcribe(audio_filename) | |
| print(audio_filename) | |
| print(transcript.text) | |
| res = transcript.text | |
| logging.info(f"\nAudio: {audio}\n") | |
| response = str(generateResponse(res)) | |
| if isinstance(response, str): | |
| messenger.send_message(message=f"{response}", recipient_id=mobile) | |
| elif isinstance(response, str) and os.path.isfile(response): | |
| messenger.send_image(image_path=response, recipient_id=mobile) | |
| else: | |
| messenger.send_message(message="Please send me text or audio messages",recipient_id=mobile) | |
| return "ok" | |
| if __name__ == '__main__': | |
| app.run(debug=True,host="0.0.0.0", port=7860) |