curiousgeorge1292's picture
Update app.py
cdf97c3 verified
import os
import requests
import logging
from bs4 import BeautifulSoup
import time
import xml.etree.ElementTree as ET
from urllib.parse import urljoin
import gradio as gr
from groq import Groq
import json
from googleapiclient.discovery import build
from google.oauth2.service_account import Credentials
from flask import Flask, redirect, request, session
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import Flow
import base64
# Flask app setup
app = Flask(__name__)
app.secret_key = os.environ.get("FLASK_SECRET_KEY")
REDIRECT_URI = "https://huggingface.co/spaces/curiousgeorge1292/Custom_Profile_Email_Generator/oauth2callback"
GMAIL_SCOPES = ["https://www.googleapis.com/auth/gmail.compose"]
# Create a simple credential storage mechanism
class CredentialStore:
def __init__(self):
self.credentials = {}
def set_credentials(self, user_email, creds):
self.credentials[user_email] = creds
def get_credentials(self, user_email):
return self.credentials.get(user_email)
# Create a global instance
credential_store = CredentialStore()
# Retrieve JSON string from the environment variable
client_secrets_json = os.environ.get("GMAIL_OAUTH_SECRET_JSON")
if not client_secrets_json:
raise ValueError("Environment variable GMAIL_OAUTH_SECRET_JSON is not set or empty.")
# Parse JSON string into a dictionary
client_secrets = json.loads(client_secrets_json)
# Write it to a temporary file (Google API requires a file path)
with open("temp_client_secrets.json", "w") as temp_file:
json.dump(client_secrets, temp_file)
# Initialize Flow using the temporary JSON file
flow = Flow.from_client_secrets_file(
'temp_client_secrets.json', # Path to the temporary JSON file
scopes=['https://www.googleapis.com/auth/gmail.compose'],
redirect_uri=REDIRECT_URI
)
def get_user_email_from_credentials(credentials):
try:
service = build('gmail', 'v1', credentials=credentials)
user_info = service.users().getProfile(userId='me').execute()
return user_info.get('emailAddress')
except Exception as e:
print(f"Error getting user email: {e}")
return None
@app.route('/oauth2callback')
def oauth2callback():
flow = Flow.from_client_secrets_file(
'temp_client_secrets.json',
scopes=GMAIL_SCOPES,
redirect_uri=REDIRECT_URI
)
flow.fetch_token(authorization_response=request.url)
logging.basicConfig(level=logging.DEBUG)
# Save the credentials to the session
credentials = flow.credentials
creds_dict = {
'token': credentials.token,
'refresh_token': credentials.refresh_token,
'token_uri': credentials.token_uri,
'client_id': credentials.client_id,
'client_secret': credentials.client_secret,
'scopes': credentials.scopes
}
user_email = get_user_email_from_credentials(credentials)
credential_store.set_credentials(user_email, creds_dict)
return "Authorization successful. You can now save emails to Gmail drafts"
logging.basicConfig(level=logging.DEBUG)
@app.route('/authenticate_gmail')
def authenticate_gmail():
authorization_url, state = flow.authorization_url(
access_type='offline',
include_granted_scopes='true'
)
return redirect(authorization_url)
logging.basicConfig(level=logging.DEBUG)
# Gmail OAuth2 logic here
#credentials = get_credentials_from_oauth()
#if 'user_id' not in flask.session:
#flask.session['user_id'] = str(uuid.uuid4()) # Generate a unique ID if not already present
#user_id = flask.session.get('user_id') # Get user-specific ID
#redis_store.set(user_id, credentials) # Save credentials to Redis (or a database)
# After successful authentication:
#session['credentials'] = credentials
#return "Authentication successful!"
#@app.route('/check_credentials')
#def check_credentials():
#if 'credentials' in session:
#return "Credentials are stored in session."
#else:
#return "No credentials found. Please authorize first."
def save_to_gmail_drafts(credentials_info, subject, body, recipient):
# Initialize the credentials
credentials = Credentials(
credentials_info['token'],
refresh_token=credentials_info.get('refresh_token'),
token_uri=credentials_info['token_uri'],
client_id=credentials_info['client_id'],
client_secret=credentials_info['client_secret']
)
# Refresh the token if expired
if not credentials.valid and credentials.expired and credentials.refresh_token:
credentials.refresh(Request())
service = build('gmail', 'v1', credentials=credentials)
# Create the email message
raw_message = f"To: {recipient}\nSubject: {subject}\n\n{body}"
raw_message_bytes = base64.urlsafe_b64encode(raw_message.encode("utf-8")).decode("utf-8")
# Save the draft
message = {'message': {'raw': raw_message_bytes}}
draft = service.users().drafts().create(userId="me", body=message).execute()
return f"Draft created with ID: {draft['id']}"
# Clean up the temporary file
if os.path.exists("temp_client_secrets.json"):
os.remove("temp_client_secrets.json")
# Initialize Groq client
client = Groq(api_key=os.environ.get("GROQ_API_KEY"))
# Google Sheets setup
SCOPES = ['https://www.googleapis.com/auth/spreadsheets']
SERVICE_ACCOUNT_JSON = os.environ.get("MY_SERVICE_ACCOUNT_JSON")
SPREADSHEET_ID = os.environ.get("GOOGLE_SHEET_ID")
if not SERVICE_ACCOUNT_JSON:
raise ValueError("MY_SERVICE_ACCOUNT_JSON environment variable is not set or empty.")
service_account_info = json.loads(SERVICE_ACCOUNT_JSON)
credentials = Credentials.from_service_account_info(service_account_info, scopes=SCOPES)
service = build('sheets', 'v4', credentials=credentials)
sheet = service.spreadsheets()
def fetch_user_details(email):
"""
Fetch user details from Google Sheet based on email.
"""
try:
result = sheet.values().get(spreadsheetId=SPREADSHEET_ID, range="Sheet1!A:G").execute()
rows = result.get('values', [])
for row in rows:
if row[0] == email: # Assuming email is in the first column (A)
return {
"email": row[0],
"name": row[1],
"professional_title": row[2],
"industry": row[3],
"target_audience": row[4],
"personal_background": row[5],
"company_url": row[6]
}
return None
except Exception as e:
return f"Error fetching details: {str(e)}"
def save_user_info(email, name, professional_title, industry, target_audience, personal_background, company_url):
"""
Save or update user details in Google Sheet.
"""
try:
result = sheet.values().get(spreadsheetId=SPREADSHEET_ID, range="Sheet1!A:G").execute()
rows = result.get('values', [])
updated = False
# Check if email exists and update the row
for i, row in enumerate(rows):
if row[0] == email:
rows[i] = [email, name, professional_title, industry, target_audience, personal_background, company_url]
updated = True
break
if not updated: # Append a new row if email doesn't exist
rows.append([email, name, professional_title, industry, target_audience, personal_background, company_url])
# Save updated rows back to the sheet
body = {"values": rows}
sheet.values().update(
spreadsheetId=SPREADSHEET_ID, range="Sheet1!A:G",
valueInputOption="RAW", body=body
).execute()
return "Details saved successfully."
except Exception as e:
return f"Error saving details: {str(e)}"
def handle_user_details(email, name=None, professional_title=None, industry=None, target_audience=None, personal_background=None, company_url=None, action="verify"):
if action == "verify":
# Fetch details based on email
user_details = fetch_user_details(email)
if user_details:
return (
f"Record found for {email}. You can update details if needed.",
user_details # Return existing details
)
else:
return (
f"No record found for {email}. Please enter details and save.",
{} # Return empty fields
)
elif action == "save":
# Save or update details
result = save_user_info(email, name, professional_title, industry, target_audience, personal_background, company_url)
return result, {}
# Step 1: User profile management UI
def user_profile(email, name, professional_title, industry, target_audience, personal_background, company_url):
save_user_info(email, name, professional_title, industry, target_audience, personal_background, company_url)
return "Your information has been saved! Proceed to Step 2 for email generation."
# Helper function to extract content from a URL
def extract_content(url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'
}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
paragraphs = soup.find_all('p')
content = ' '.join([para.get_text() for para in paragraphs])
return content[:2000] # Limit content to 2000 characters to avoid overload
except Exception as e:
return f"Error extracting content from {url}: {str(e)}"
# Helper function to parse a sitemap and get valid URLs
def parse_sitemap(sitemap_url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'
}
urls = []
try:
response = requests.get(sitemap_url, headers=headers, timeout=10)
response.raise_for_status()
root = ET.fromstring(response.content)
for loc in root.findall(".//{http://www.sitemaps.org/schemas/sitemap/0.9}loc"):
urls.append(loc.text)
except Exception as e:
return f"Error parsing sitemap from {sitemap_url}: {str(e)}"
return urls
# Wrapper to handle retries and delay
def safe_extract_content(url, delay=2):
content = extract_content(url)
if "Error extracting content" in content:
print(content) # Log the error
return None
time.sleep(delay) # Respect crawl-delay
return content
# Function to fetch LinkedIn profile insights using Proxycurl API
def fetch_linkedin_insights(profile_url):
api_key = os.environ.get("PROXYCURL_API_KEY")
api_endpoint = "https://nubela.co/proxycurl/api/v2/linkedin"
headers = {"Authorization": f"Bearer {api_key}"}
params = {"url": profile_url, "fallback_to_cache": "on-error"}
try:
response = requests.get(api_endpoint, headers=headers, params=params, timeout=10)
response.raise_for_status()
profile_data = response.json()
insights = f"{profile_data.get('headline', '')}. {profile_data.get('summary', '')}"
return insights
except Exception as e:
return f"Error fetching LinkedIn insights: {str(e)}"
# Function to generate email using Llama
def generate_email(name, email, prospect_name, linkedin_url, website_url, context_url, word_count, email_purpose, interested_position, company_url, professional_title, personal_background, output_language):
# Fetch insights from LinkedIn and reference URLs
linkedin_insights = fetch_linkedin_insights(linkedin_url)
website_sitemap_url = urljoin(website_url, "sitemap_index.xml")
website_content = safe_extract_content(website_url)
if not website_content:
# If direct scraping fails, fall back to the sitemap
website_urls = parse_sitemap(website_sitemap_url)
if isinstance(website_urls, list):
for url in urls:
website_content = safe_extract_content(url)
if website_content:
break
context_content = extract_content(context_url) if context_url else ""
# Fetch details from the company website
company_sitemap_url = urljoin(company_url, "sitemap_index.xml")
company_content = safe_extract_content(company_url)
if not company_content:
# If direct scraping fails, fall back to the sitemap
company_urls = parse_sitemap(company_sitemap_url)
if isinstance(company_urls, list):
for url in urls:
company_content = safe_extract_content(url)
if company_content:
break
job_application = os.environ.get("JOB_APPLICATION")
sales_cold_email = os.environ.get("SALES_COLD_EMAIL")
# Construct the purpose-specific prompt
if email_purpose == "Job Application":
prompt = job_application.format(
name=name,
professional_title=professional_title,
interested_position=interested_position,
website_content=website_content,
personal_background=personal_background,
word_count=word_count,
prospect_name=prospect_name,
output_language=output_language
)
elif email_purpose == "Sales Cold Email":
prompt = sales_cold_email.format(
company_content=company_content,
prospect_name=prospect_name,
linkedin_insights=linkedin_insights,
website_content=website_content,
context_content=context_content,
word_count=word_count,
output_language=output_language
)
else:
return "Invalid email purpose selected. Please choose either 'Job Application' or 'Sales Cold Email'."
# Generate email using Llama
try:
chat_response = client.chat.completions.create(
messages=[{"role": "user", "content": prompt}],
model="llama3-8b-8192",
)
email_content = chat_response.choices[0].message.content
return email_content
except Exception as e:
return f"Error generating email: {str(e)}"
# Step 2: Email generation UI
def email_agent(credentials_info, name, email, prospect_name, linkedin_url, website_url, context_url, word_count, email_purpose, interested_position, company_url, professional_title, personal_background, output_language):
try:
# Generate the email content
email_content = generate_email(name, email, prospect_name, linkedin_url, website_url, context_url, word_count, email_purpose, interested_position, company_url, professional_title, personal_background, output_language)
if not email_content:
return ("<div style='color: red;'>Failed to generate email content.</div>", "")
logging.basicConfig(level=logging.DEBUG)
# Save to Gmail drafts
try:
save_to_gmail_drafts(credentials_info,
subject="Generated Email Subject",
body=email_content,
recipient=email # Replace with the recipient's email
)
return ("<div style='color: green;'>Email saved to Gmail drafts successfully.</div>", email_content)
except Exception as e:
return (f"<div style='color: red;'>Error saving to Gmail drafts: {str(e)}</div>", email_content)
credentials_info = credential_store.get_credentials(email)
if not credentials_info:
return ("<div style='color: red;'>Please click the 'Authenticate with Gmail' button above to connect your Gmail account.</div>", "")
except Exception as e:
return (f"<div style='color: red;'>Error: {str(e)}</div>", "")
# Gradio Interface
def verify_user(email):
# Logic for verifying the email
message, user_details = handle_user_details(email, action="verify")
# Ensure user_details is a dictionary
if not isinstance(user_details, dict):
user_details = {}
if user_details: # Populate fields if user details are found
return (
message,
gr.update(visible=True, value=user_details.get("name", "")),
gr.update(visible=True, value=user_details.get("professional_title", "")),
gr.update(visible=True, value=user_details.get("industry", "")),
gr.update(visible=True, value=user_details.get("target_audience", "")),
gr.update(visible=True, value=user_details.get("personal_background", "")),
gr.update(visible=True, value=user_details.get("company_url", ""))
)
else:
# Show empty fields for new user
return (
message,
gr.update(visible=True, value=""),
gr.update(visible=True, value=""),
gr.update(visible=True, value=""),
gr.update(visible=True, value=""),
gr.update(visible=True, value=""),
gr.update(visible=True, value="")
)
def save_and_switch_tab(email, name, professional_title, industry, target_audience, personal_background, company_url):
# Logic for saving the user details
message, _ = handle_user_details(email, name, professional_title, industry, target_audience, personal_background, company_url, action="save")
return message, gr.update(selected=1)
with gr.Blocks() as ui:
with gr.Tabs() as tabs:
with gr.Tab("Step 1: Profile"):
email = gr.Textbox(label="Email", placeholder="Enter your email")
verify_button = gr.Button("Verify")
with gr.Row():
status = gr.Textbox(label="Status", interactive=False)
# Other fields are initially hidden
#with gr.Row() as details_section:
name = gr.Textbox(label="Name", visible=False)
professional_title = gr.Textbox(label="Professional Title", visible=False)
industry = gr.Textbox(label="Industry", visible=False)
target_audience = gr.Textbox(label="Target Audience", visible=False)
personal_background = gr.Textbox(label="Personal Background", visible=False)
company_url = gr.Textbox(label="Company URL", visible=False)
save_button = gr.Button("Save & Proceed")
with gr.Tab("Step 2: Email Generation"):
# Gmail Authentication Button (HTML + JavaScript)
auth_button_html = """
<div style="text-align: center; margin: 10px;">
<button onclick="window.location.href='/authenticate_gmail'"
style="padding: 10px 20px; font-size: 16px; cursor: pointer; background-color: #4285F4; color: white; border: none; border-radius: 5px;">
Authenticate with Gmail
</button>
</div>
"""
gr.HTML(auth_button_html)
prospect_name = gr.Textbox(label="Prospect's Name")
linkedin_url = gr.Textbox(label="LinkedIn Profile URL")
website_url = gr.Textbox(label="Website URL")
context_url = gr.Textbox(label="Additional Context URL (optional)")
word_count = gr.Slider(label="Email Length (words)", minimum=150, maximum=1000, step=10, value=500)
email_purpose = gr.Dropdown(label="Email Purpose", choices=["Job Application", "Sales Cold Email", "Select"], value="Select")
output_language = gr.Dropdown(label="Select Output Language", choices=["English", "Spanish", "Portuguese"], value="English")
interested_position = gr.Textbox(label="Interested Position (for Job Applications)")
email_button = gr.Button("Generate Email")
email_status = gr.HTML(label="Status")
generated_email = gr.HTML(label="Generated Email")
# Function to fetch user_id dynamically
def get_user_id():
return session.get('user_id', "default_user")
email_button.click(
lambda *args: email_agent(None, *args), # Pass None as credentials_info
inputs=[
name, email, prospect_name, linkedin_url, website_url, context_url,
word_count, email_purpose, interested_position, company_url,
professional_title, personal_background, output_language
],
outputs=[email_status, generated_email]
)
# Events
verify_button.click(
verify_user,
inputs=[email],
outputs=[
status,
name,
professional_title,
industry,
target_audience,
personal_background,
company_url,
]
)
save_button.click(
save_and_switch_tab,
inputs=[
email,
name,
professional_title,
industry,
target_audience,
personal_background,
company_url,
],
outputs=[status, tabs]
)
ui.launch(share=True)