Butlers / app.py
aromidvar's picture
Update app.py
72ebd59 verified
import requests
import time
import os
import gradio as gr
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Define your environment variables
ZAP_WEBHOOK_URL = os.getenv("ZAP_WEBHOOK_URL") # Your Zapier Webhook URL
PHANTOMBUSTER_API_KEY = os.getenv("PHANTOMBUSTER_API_KEY") # Your PhantomBuster API Key
# Static PhantomBuster Agent ID
PHANTOMBUSTER_AGENT_ID = "7204691416716504"
def append_to_google_sheet_via_webhook(data):
"""
Append hashtags and location to a Google Sheet via Zapier Webhook.
"""
try:
# Send data to Zapier Webhook
response = requests.post(ZAP_WEBHOOK_URL, json={"hashtag": data[0], "location": data[1]})
if response.status_code == 200:
return f"Appended {data} to Google Sheet successfully via Zapier."
else:
return f"Error sending data to Zapier Webhook: {response.text}"
except Exception as e:
return f"Exception occurred while sending data to Zapier Webhook: {str(e)}"
def trigger_phantombuster():
"""
Trigger the PhantomBuster agent.
"""
url = "https://api.phantombuster.com/api/v2/agents/launch"
headers = {
"X-Phantombuster-Key-1": PHANTOMBUSTER_API_KEY,
"Content-Type": "application/json"
}
# Add the required 'id' parameter for the agent
payload = {
"id": PHANTOMBUSTER_AGENT_ID
}
try:
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
return f"PhantomBuster triggered successfully: {response.json()}"
else:
return f"Error triggering PhantomBuster: {response.text}"
except Exception as e:
return f"Exception occurred while triggering PhantomBuster: {str(e)}"
def handle_inputs(hashtag, location):
"""
Main function to handle user inputs and workflow.
"""
log = [] # Initialize a log to keep track of the process
log.append(f"Received hashtag: {hashtag} and location: {location}")
# Step 1: Append data to Google Sheet via Zapier Webhook
sheet_response = append_to_google_sheet_via_webhook([hashtag, location])
log.append(sheet_response)
# Step 2: Delay (e.g., 10 seconds before triggering)
log.append("Waiting for 10 seconds before triggering PhantomBuster...")
time.sleep(10)
# Step 3: Trigger the PhantomBuster agent
phantombuster_response = trigger_phantombuster()
log.append(phantombuster_response)
return "\n".join(log) # Return the log as a single string
# Gradio Interface
with gr.Blocks() as demo:
gr.Markdown("# Instagram Hashtag Search Dashboard")
hashtag_input = gr.Textbox(label="Enter Instagram Hashtag", placeholder="#example")
location_input = gr.Textbox(label="Enter Instagram Location", placeholder="Location")
trigger_btn = gr.Button("Submit")
log_output = gr.Textbox(label="Processing Log", interactive=False, lines=10)
# Set up the button click event
trigger_btn.click(handle_inputs, inputs=[hashtag_input, location_input], outputs=[log_output])
# Launch the Gradio interface
demo.launch()