Spaces:
Sleeping
Sleeping
File size: 3,432 Bytes
e859875 9d66bc5 c22aaee 084f923 e859875 123b39d fbbd6a2 e859875 123b39d 9d66bc5 e859875 cb61f07 e859875 cb61f07 e859875 9d66bc5 123b39d 9d66bc5 79e310f 04e18e4 084f923 1ef6320 9d66bc5 123b39d 9d66bc5 123b39d e859875 123b39d e859875 123b39d e859875 b93d644 c94b056 e859875 9d66bc5 123b39d e859875 9d66bc5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 | import gradio as gr
import requests
import os
import time
import threading
import queue
import json
import re
def decode_unicode_escape(text):
return re.sub(r'\\u([0-9a-fA-F]{4})', lambda m: chr(int(m.group(1), 16)), text)
def generate_email(company, recipient_name, product, progress=gr.Progress()):
api_key = os.environ.get("BASETEN_API_KEY")
if not api_key:
return "Error: API_KEY not found in environment variables"
url = "https://model-4w56xy7q.api.baseten.co/development/predict"
headers = {
"Authorization": f"Api-Key {api_key}",
"Content-Type": "application/json"
}
data = {
"company_name": company,
"recipient_name": recipient_name,
"product_name": product
}
progress(0.1, "Initiating email generation...")
def make_request(result_queue):
try:
response = requests.post(url, headers=headers, json=data, timeout=300) # 5 minutes timeout
if response.status_code == 200:
result = response.json()
if 'response' in result and isinstance(result['response'], list) and len(result['response']) > 0:
full_response = result['response'][0]
# Extract the email content after [/INST]
email_content = full_response.split('[/INST]')[-1].strip()
email_content = email_content.split('Subject:')[-1].strip()
# Decode Unicode escape sequences
email_content = decode_unicode_escape(email_content)
result_queue.put(("success", email_content))
else:
result_queue.put(("error", f"Unexpected response structure: {json.dumps(result, indent=2)}"))
else:
result_queue.put(("error", f"Error: {response.status_code} - {response.text}"))
except requests.Timeout:
result_queue.put(("error", "Error: Request timed out. Please try again later."))
except Exception as e:
result_queue.put(("error", f"Error: An unexpected error occurred - {str(e)}"))
result_queue = queue.Queue()
thread = threading.Thread(target=make_request, args=(result_queue,))
thread.start()
start_time = time.time()
while thread.is_alive():
elapsed_time = time.time() - start_time
if elapsed_time > 300: # 5 minutes timeout
return "Error: Process took too long. Please try again later."
progress(min(0.1 + (elapsed_time / 300) * 0.8, 0.9), f"Generating email... (Elapsed time: {elapsed_time:.0f} seconds)")
time.sleep(5) # Update every 5 seconds
thread.join()
status, result = result_queue.get()
if status == "success":
progress(1.0, "Email generated successfully!")
return result
else:
return result
# Create the Gradio interface
iface = gr.Interface(
fn=generate_email,
inputs=[
gr.Textbox(label="Company Name", value="100xengineers"),
gr.Textbox(label="Recipient Name", value="Siddhant Goswami"),
gr.Textbox(label="Product", value="Artificial Intelligence Cohort 3")
],
outputs=gr.Textbox(label="Generated Email"),
title="AI Email Generator",
description="Generate personalized emails using AI (This process may take several minutes)",
allow_flagging="never"
)
# Launch the interface
iface.queue().launch() |