| import os |
| import requests |
| import logging |
| from bs4 import BeautifulSoup |
| import time |
| import xml.etree.ElementTree as ET |
| from urllib.parse import urljoin |
| import gradio as gr |
| from groq import Groq |
| import json |
| from googleapiclient.discovery import build |
| from google.oauth2.service_account import Credentials |
| from flask import Flask, redirect, request, session |
| from google.auth.transport.requests import Request |
| from google_auth_oauthlib.flow import Flow |
| import base64 |
|
|
| |
| app = Flask(__name__) |
| app.secret_key = os.environ.get("FLASK_SECRET_KEY") |
| REDIRECT_URI = "https://huggingface.co/spaces/curiousgeorge1292/Custom_Profile_Email_Generator/oauth2callback" |
| GMAIL_SCOPES = ["https://www.googleapis.com/auth/gmail.compose"] |
|
|
| |
| class CredentialStore: |
| def __init__(self): |
| self.credentials = {} |
|
|
| def set_credentials(self, user_email, creds): |
| self.credentials[user_email] = creds |
|
|
| def get_credentials(self, user_email): |
| return self.credentials.get(user_email) |
|
|
| |
| credential_store = CredentialStore() |
|
|
| |
| client_secrets_json = os.environ.get("GMAIL_OAUTH_SECRET_JSON") |
|
|
| if not client_secrets_json: |
| raise ValueError("Environment variable GMAIL_OAUTH_SECRET_JSON is not set or empty.") |
|
|
| |
| client_secrets = json.loads(client_secrets_json) |
|
|
| |
| with open("temp_client_secrets.json", "w") as temp_file: |
| json.dump(client_secrets, temp_file) |
|
|
| |
| flow = Flow.from_client_secrets_file( |
| 'temp_client_secrets.json', |
| scopes=['https://www.googleapis.com/auth/gmail.compose'], |
| redirect_uri=REDIRECT_URI |
| ) |
|
|
| def get_user_email_from_credentials(credentials): |
| try: |
| service = build('gmail', 'v1', credentials=credentials) |
| user_info = service.users().getProfile(userId='me').execute() |
| return user_info.get('emailAddress') |
| except Exception as e: |
| print(f"Error getting user email: {e}") |
| return None |
|
|
| @app.route('/oauth2callback') |
| def oauth2callback(): |
| flow = Flow.from_client_secrets_file( |
| 'temp_client_secrets.json', |
| scopes=GMAIL_SCOPES, |
| redirect_uri=REDIRECT_URI |
| ) |
| flow.fetch_token(authorization_response=request.url) |
|
|
| logging.basicConfig(level=logging.DEBUG) |
|
|
| |
| credentials = flow.credentials |
| creds_dict = { |
| 'token': credentials.token, |
| 'refresh_token': credentials.refresh_token, |
| 'token_uri': credentials.token_uri, |
| 'client_id': credentials.client_id, |
| 'client_secret': credentials.client_secret, |
| 'scopes': credentials.scopes |
| } |
| user_email = get_user_email_from_credentials(credentials) |
| credential_store.set_credentials(user_email, creds_dict) |
| |
| return "Authorization successful. You can now save emails to Gmail drafts" |
|
|
| logging.basicConfig(level=logging.DEBUG) |
|
|
| @app.route('/authenticate_gmail') |
| def authenticate_gmail(): |
| authorization_url, state = flow.authorization_url( |
| access_type='offline', |
| include_granted_scopes='true' |
| ) |
| return redirect(authorization_url) |
|
|
| logging.basicConfig(level=logging.DEBUG) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
|
|
| def save_to_gmail_drafts(credentials_info, subject, body, recipient): |
|
|
| |
| credentials = Credentials( |
| credentials_info['token'], |
| refresh_token=credentials_info.get('refresh_token'), |
| token_uri=credentials_info['token_uri'], |
| client_id=credentials_info['client_id'], |
| client_secret=credentials_info['client_secret'] |
| ) |
|
|
| |
| if not credentials.valid and credentials.expired and credentials.refresh_token: |
| credentials.refresh(Request()) |
| |
| service = build('gmail', 'v1', credentials=credentials) |
|
|
| |
| raw_message = f"To: {recipient}\nSubject: {subject}\n\n{body}" |
| raw_message_bytes = base64.urlsafe_b64encode(raw_message.encode("utf-8")).decode("utf-8") |
|
|
| |
| message = {'message': {'raw': raw_message_bytes}} |
| draft = service.users().drafts().create(userId="me", body=message).execute() |
| return f"Draft created with ID: {draft['id']}" |
|
|
| |
| if os.path.exists("temp_client_secrets.json"): |
| os.remove("temp_client_secrets.json") |
|
|
| |
| client = Groq(api_key=os.environ.get("GROQ_API_KEY")) |
|
|
| |
| SCOPES = ['https://www.googleapis.com/auth/spreadsheets'] |
| SERVICE_ACCOUNT_JSON = os.environ.get("MY_SERVICE_ACCOUNT_JSON") |
| SPREADSHEET_ID = os.environ.get("GOOGLE_SHEET_ID") |
|
|
| if not SERVICE_ACCOUNT_JSON: |
| raise ValueError("MY_SERVICE_ACCOUNT_JSON environment variable is not set or empty.") |
|
|
| service_account_info = json.loads(SERVICE_ACCOUNT_JSON) |
| credentials = Credentials.from_service_account_info(service_account_info, scopes=SCOPES) |
| service = build('sheets', 'v4', credentials=credentials) |
| sheet = service.spreadsheets() |
|
|
| def fetch_user_details(email): |
| """ |
| Fetch user details from Google Sheet based on email. |
| """ |
| try: |
| result = sheet.values().get(spreadsheetId=SPREADSHEET_ID, range="Sheet1!A:G").execute() |
| rows = result.get('values', []) |
| for row in rows: |
| if row[0] == email: |
| return { |
| "email": row[0], |
| "name": row[1], |
| "professional_title": row[2], |
| "industry": row[3], |
| "target_audience": row[4], |
| "personal_background": row[5], |
| "company_url": row[6] |
| } |
| return None |
| except Exception as e: |
| return f"Error fetching details: {str(e)}" |
|
|
| def save_user_info(email, name, professional_title, industry, target_audience, personal_background, company_url): |
| """ |
| Save or update user details in Google Sheet. |
| """ |
| try: |
| result = sheet.values().get(spreadsheetId=SPREADSHEET_ID, range="Sheet1!A:G").execute() |
| rows = result.get('values', []) |
| updated = False |
|
|
| |
| for i, row in enumerate(rows): |
| if row[0] == email: |
| rows[i] = [email, name, professional_title, industry, target_audience, personal_background, company_url] |
| updated = True |
| break |
|
|
| if not updated: |
| rows.append([email, name, professional_title, industry, target_audience, personal_background, company_url]) |
|
|
| |
| body = {"values": rows} |
| sheet.values().update( |
| spreadsheetId=SPREADSHEET_ID, range="Sheet1!A:G", |
| valueInputOption="RAW", body=body |
| ).execute() |
| return "Details saved successfully." |
| except Exception as e: |
| return f"Error saving details: {str(e)}" |
|
|
|
|
|
|
| def handle_user_details(email, name=None, professional_title=None, industry=None, target_audience=None, personal_background=None, company_url=None, action="verify"): |
| if action == "verify": |
| |
| user_details = fetch_user_details(email) |
| if user_details: |
| return ( |
| f"Record found for {email}. You can update details if needed.", |
| user_details |
| ) |
| else: |
| return ( |
| f"No record found for {email}. Please enter details and save.", |
| {} |
| ) |
|
|
| elif action == "save": |
| |
| result = save_user_info(email, name, professional_title, industry, target_audience, personal_background, company_url) |
| return result, {} |
|
|
|
|
| |
| def user_profile(email, name, professional_title, industry, target_audience, personal_background, company_url): |
| save_user_info(email, name, professional_title, industry, target_audience, personal_background, company_url) |
| return "Your information has been saved! Proceed to Step 2 for email generation." |
|
|
| |
| def extract_content(url): |
| headers = { |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36' |
| } |
| try: |
| response = requests.get(url, headers=headers, timeout=10) |
| response.raise_for_status() |
| soup = BeautifulSoup(response.text, 'html.parser') |
| paragraphs = soup.find_all('p') |
| content = ' '.join([para.get_text() for para in paragraphs]) |
| return content[:2000] |
| except Exception as e: |
| return f"Error extracting content from {url}: {str(e)}" |
|
|
| |
| def parse_sitemap(sitemap_url): |
| headers = { |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36' |
| } |
| urls = [] |
| try: |
| response = requests.get(sitemap_url, headers=headers, timeout=10) |
| response.raise_for_status() |
| root = ET.fromstring(response.content) |
| for loc in root.findall(".//{http://www.sitemaps.org/schemas/sitemap/0.9}loc"): |
| urls.append(loc.text) |
| except Exception as e: |
| return f"Error parsing sitemap from {sitemap_url}: {str(e)}" |
| return urls |
|
|
| |
| def safe_extract_content(url, delay=2): |
| content = extract_content(url) |
| if "Error extracting content" in content: |
| print(content) |
| return None |
| time.sleep(delay) |
| return content |
|
|
| |
| def fetch_linkedin_insights(profile_url): |
| api_key = os.environ.get("PROXYCURL_API_KEY") |
| api_endpoint = "https://nubela.co/proxycurl/api/v2/linkedin" |
| headers = {"Authorization": f"Bearer {api_key}"} |
| params = {"url": profile_url, "fallback_to_cache": "on-error"} |
|
|
| try: |
| response = requests.get(api_endpoint, headers=headers, params=params, timeout=10) |
| response.raise_for_status() |
| profile_data = response.json() |
| insights = f"{profile_data.get('headline', '')}. {profile_data.get('summary', '')}" |
| return insights |
| except Exception as e: |
| return f"Error fetching LinkedIn insights: {str(e)}" |
|
|
| |
| def generate_email(name, email, prospect_name, linkedin_url, website_url, context_url, word_count, email_purpose, interested_position, company_url, professional_title, personal_background, output_language): |
| |
| linkedin_insights = fetch_linkedin_insights(linkedin_url) |
| website_sitemap_url = urljoin(website_url, "sitemap_index.xml") |
| website_content = safe_extract_content(website_url) |
| if not website_content: |
| |
| website_urls = parse_sitemap(website_sitemap_url) |
| if isinstance(website_urls, list): |
| for url in urls: |
| website_content = safe_extract_content(url) |
| if website_content: |
| break |
| context_content = extract_content(context_url) if context_url else "" |
|
|
| |
| company_sitemap_url = urljoin(company_url, "sitemap_index.xml") |
| company_content = safe_extract_content(company_url) |
| if not company_content: |
| |
| company_urls = parse_sitemap(company_sitemap_url) |
| if isinstance(company_urls, list): |
| for url in urls: |
| company_content = safe_extract_content(url) |
| if company_content: |
| break |
| job_application = os.environ.get("JOB_APPLICATION") |
| sales_cold_email = os.environ.get("SALES_COLD_EMAIL") |
|
|
| |
| if email_purpose == "Job Application": |
| prompt = job_application.format( |
| name=name, |
| professional_title=professional_title, |
| interested_position=interested_position, |
| website_content=website_content, |
| personal_background=personal_background, |
| word_count=word_count, |
| prospect_name=prospect_name, |
| output_language=output_language |
| ) |
| |
| elif email_purpose == "Sales Cold Email": |
| prompt = sales_cold_email.format( |
| company_content=company_content, |
| prospect_name=prospect_name, |
| linkedin_insights=linkedin_insights, |
| website_content=website_content, |
| context_content=context_content, |
| word_count=word_count, |
| output_language=output_language |
| ) |
| |
| else: |
| return "Invalid email purpose selected. Please choose either 'Job Application' or 'Sales Cold Email'." |
|
|
| |
| try: |
| chat_response = client.chat.completions.create( |
| messages=[{"role": "user", "content": prompt}], |
| model="llama3-8b-8192", |
| ) |
| email_content = chat_response.choices[0].message.content |
| return email_content |
| except Exception as e: |
| return f"Error generating email: {str(e)}" |
|
|
| |
| def email_agent(credentials_info, name, email, prospect_name, linkedin_url, website_url, context_url, word_count, email_purpose, interested_position, company_url, professional_title, personal_background, output_language): |
| |
| try: |
| |
| |
| email_content = generate_email(name, email, prospect_name, linkedin_url, website_url, context_url, word_count, email_purpose, interested_position, company_url, professional_title, personal_background, output_language) |
|
|
| if not email_content: |
| return ("<div style='color: red;'>Failed to generate email content.</div>", "") |
|
|
| logging.basicConfig(level=logging.DEBUG) |
| |
| |
| try: |
| save_to_gmail_drafts(credentials_info, |
| subject="Generated Email Subject", |
| body=email_content, |
| recipient=email |
| ) |
| return ("<div style='color: green;'>Email saved to Gmail drafts successfully.</div>", email_content) |
| except Exception as e: |
| return (f"<div style='color: red;'>Error saving to Gmail drafts: {str(e)}</div>", email_content) |
|
|
| credentials_info = credential_store.get_credentials(email) |
| if not credentials_info: |
| return ("<div style='color: red;'>Please click the 'Authenticate with Gmail' button above to connect your Gmail account.</div>", "") |
|
|
| except Exception as e: |
| return (f"<div style='color: red;'>Error: {str(e)}</div>", "") |
|
|
| |
| def verify_user(email): |
| |
| message, user_details = handle_user_details(email, action="verify") |
| |
| |
| if not isinstance(user_details, dict): |
| user_details = {} |
|
|
| if user_details: |
| return ( |
| message, |
| gr.update(visible=True, value=user_details.get("name", "")), |
| gr.update(visible=True, value=user_details.get("professional_title", "")), |
| gr.update(visible=True, value=user_details.get("industry", "")), |
| gr.update(visible=True, value=user_details.get("target_audience", "")), |
| gr.update(visible=True, value=user_details.get("personal_background", "")), |
| gr.update(visible=True, value=user_details.get("company_url", "")) |
| ) |
| else: |
| |
| return ( |
| message, |
| gr.update(visible=True, value=""), |
| gr.update(visible=True, value=""), |
| gr.update(visible=True, value=""), |
| gr.update(visible=True, value=""), |
| gr.update(visible=True, value=""), |
| gr.update(visible=True, value="") |
| ) |
|
|
|
|
| def save_and_switch_tab(email, name, professional_title, industry, target_audience, personal_background, company_url): |
| |
| message, _ = handle_user_details(email, name, professional_title, industry, target_audience, personal_background, company_url, action="save") |
| return message, gr.update(selected=1) |
|
|
| with gr.Blocks() as ui: |
| with gr.Tabs() as tabs: |
| with gr.Tab("Step 1: Profile"): |
| email = gr.Textbox(label="Email", placeholder="Enter your email") |
| verify_button = gr.Button("Verify") |
| with gr.Row(): |
| status = gr.Textbox(label="Status", interactive=False) |
|
|
| |
| |
| name = gr.Textbox(label="Name", visible=False) |
| professional_title = gr.Textbox(label="Professional Title", visible=False) |
| industry = gr.Textbox(label="Industry", visible=False) |
| target_audience = gr.Textbox(label="Target Audience", visible=False) |
| personal_background = gr.Textbox(label="Personal Background", visible=False) |
| company_url = gr.Textbox(label="Company URL", visible=False) |
| save_button = gr.Button("Save & Proceed") |
|
|
| |
|
|
| with gr.Tab("Step 2: Email Generation"): |
| |
| auth_button_html = """ |
| <div style="text-align: center; margin: 10px;"> |
| <button onclick="window.location.href='/authenticate_gmail'" |
| style="padding: 10px 20px; font-size: 16px; cursor: pointer; background-color: #4285F4; color: white; border: none; border-radius: 5px;"> |
| Authenticate with Gmail |
| </button> |
| </div> |
| """ |
| gr.HTML(auth_button_html) |
| |
| prospect_name = gr.Textbox(label="Prospect's Name") |
| linkedin_url = gr.Textbox(label="LinkedIn Profile URL") |
| website_url = gr.Textbox(label="Website URL") |
| context_url = gr.Textbox(label="Additional Context URL (optional)") |
| word_count = gr.Slider(label="Email Length (words)", minimum=150, maximum=1000, step=10, value=500) |
| email_purpose = gr.Dropdown(label="Email Purpose", choices=["Job Application", "Sales Cold Email", "Select"], value="Select") |
| output_language = gr.Dropdown(label="Select Output Language", choices=["English", "Spanish", "Portuguese"], value="English") |
| interested_position = gr.Textbox(label="Interested Position (for Job Applications)") |
| |
| email_button = gr.Button("Generate Email") |
| email_status = gr.HTML(label="Status") |
| generated_email = gr.HTML(label="Generated Email") |
| |
| |
| def get_user_id(): |
| return session.get('user_id', "default_user") |
| |
| email_button.click( |
| lambda *args: email_agent(None, *args), |
| inputs=[ |
| name, email, prospect_name, linkedin_url, website_url, context_url, |
| word_count, email_purpose, interested_position, company_url, |
| professional_title, personal_background, output_language |
| ], |
| outputs=[email_status, generated_email] |
| ) |
| |
| verify_button.click( |
| verify_user, |
| inputs=[email], |
| outputs=[ |
| status, |
| name, |
| professional_title, |
| industry, |
| target_audience, |
| personal_background, |
| company_url, |
| ] |
| ) |
|
|
| save_button.click( |
| save_and_switch_tab, |
| inputs=[ |
| email, |
| name, |
| professional_title, |
| industry, |
| target_audience, |
| personal_background, |
| company_url, |
| ], |
| outputs=[status, tabs] |
| ) |
|
|
| ui.launch(share=True) |
|
|