Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import os | |
| import time | |
| import threading | |
| import queue | |
| import json | |
| import re | |
| def decode_unicode_escape(text): | |
| return re.sub(r'\\u([0-9a-fA-F]{4})', lambda m: chr(int(m.group(1), 16)), text) | |
| def generate_email(company, recipient_name, product, progress=gr.Progress()): | |
| api_key = os.environ.get("BASETEN_API_KEY") | |
| if not api_key: | |
| return "Error: API_KEY not found in environment variables" | |
| url = "https://model-4w56xy7q.api.baseten.co/development/predict" | |
| headers = { | |
| "Authorization": f"Api-Key {api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "company_name": company, | |
| "recipient_name": recipient_name, | |
| "product_name": product | |
| } | |
| progress(0.1, "Initiating email generation...") | |
| def make_request(result_queue): | |
| try: | |
| response = requests.post(url, headers=headers, json=data, timeout=300) # 5 minutes timeout | |
| if response.status_code == 200: | |
| result = response.json() | |
| if 'response' in result and isinstance(result['response'], list) and len(result['response']) > 0: | |
| full_response = result['response'][0] | |
| # Extract the email content after [/INST] | |
| email_content = full_response.split('[/INST]')[-1].strip() | |
| email_content = email_content.split('Subject:')[-1].strip() | |
| # Decode Unicode escape sequences | |
| email_content = decode_unicode_escape(email_content) | |
| result_queue.put(("success", email_content)) | |
| else: | |
| result_queue.put(("error", f"Unexpected response structure: {json.dumps(result, indent=2)}")) | |
| else: | |
| result_queue.put(("error", f"Error: {response.status_code} - {response.text}")) | |
| except requests.Timeout: | |
| result_queue.put(("error", "Error: Request timed out. Please try again later.")) | |
| except Exception as e: | |
| result_queue.put(("error", f"Error: An unexpected error occurred - {str(e)}")) | |
| result_queue = queue.Queue() | |
| thread = threading.Thread(target=make_request, args=(result_queue,)) | |
| thread.start() | |
| start_time = time.time() | |
| while thread.is_alive(): | |
| elapsed_time = time.time() - start_time | |
| if elapsed_time > 300: # 5 minutes timeout | |
| return "Error: Process took too long. Please try again later." | |
| progress(min(0.1 + (elapsed_time / 300) * 0.8, 0.9), f"Generating email... (Elapsed time: {elapsed_time:.0f} seconds)") | |
| time.sleep(5) # Update every 5 seconds | |
| thread.join() | |
| status, result = result_queue.get() | |
| if status == "success": | |
| progress(1.0, "Email generated successfully!") | |
| return result | |
| else: | |
| return result | |
| # Create the Gradio interface | |
| iface = gr.Interface( | |
| fn=generate_email, | |
| inputs=[ | |
| gr.Textbox(label="Company Name", value="100xengineers"), | |
| gr.Textbox(label="Recipient Name", value="Siddhant Goswami"), | |
| gr.Textbox(label="Product", value="Artificial Intelligence Cohort 3") | |
| ], | |
| outputs=gr.Textbox(label="Generated Email"), | |
| title="AI Email Generator", | |
| description="Generate personalized emails using AI (This process may take several minutes)", | |
| allow_flagging="never" | |
| ) | |
| # Launch the interface | |
| iface.queue().launch() |