import os import requests import logging from bs4 import BeautifulSoup import time import xml.etree.ElementTree as ET from urllib.parse import urljoin import gradio as gr from groq import Groq import json from googleapiclient.discovery import build from google.oauth2.service_account import Credentials from flask import Flask, redirect, request, session from google.auth.transport.requests import Request from google_auth_oauthlib.flow import Flow import base64 # Flask app setup app = Flask(__name__) app.secret_key = os.environ.get("FLASK_SECRET_KEY") REDIRECT_URI = "https://huggingface.co/spaces/curiousgeorge1292/Custom_Profile_Email_Generator/oauth2callback" GMAIL_SCOPES = ["https://www.googleapis.com/auth/gmail.compose"] # Create a simple credential storage mechanism class CredentialStore: def __init__(self): self.credentials = {} def set_credentials(self, user_email, creds): self.credentials[user_email] = creds def get_credentials(self, user_email): return self.credentials.get(user_email) # Create a global instance credential_store = CredentialStore() # Retrieve JSON string from the environment variable client_secrets_json = os.environ.get("GMAIL_OAUTH_SECRET_JSON") if not client_secrets_json: raise ValueError("Environment variable GMAIL_OAUTH_SECRET_JSON is not set or empty.") # Parse JSON string into a dictionary client_secrets = json.loads(client_secrets_json) # Write it to a temporary file (Google API requires a file path) with open("temp_client_secrets.json", "w") as temp_file: json.dump(client_secrets, temp_file) # Initialize Flow using the temporary JSON file flow = Flow.from_client_secrets_file( 'temp_client_secrets.json', # Path to the temporary JSON file scopes=['https://www.googleapis.com/auth/gmail.compose'], redirect_uri=REDIRECT_URI ) def get_user_email_from_credentials(credentials): try: service = build('gmail', 'v1', credentials=credentials) user_info = service.users().getProfile(userId='me').execute() return user_info.get('emailAddress') except Exception as e: print(f"Error getting user email: {e}") return None @app.route('/oauth2callback') def oauth2callback(): flow = Flow.from_client_secrets_file( 'temp_client_secrets.json', scopes=GMAIL_SCOPES, redirect_uri=REDIRECT_URI ) flow.fetch_token(authorization_response=request.url) logging.basicConfig(level=logging.DEBUG) # Save the credentials to the session credentials = flow.credentials creds_dict = { 'token': credentials.token, 'refresh_token': credentials.refresh_token, 'token_uri': credentials.token_uri, 'client_id': credentials.client_id, 'client_secret': credentials.client_secret, 'scopes': credentials.scopes } user_email = get_user_email_from_credentials(credentials) credential_store.set_credentials(user_email, creds_dict) return "Authorization successful. You can now save emails to Gmail drafts" logging.basicConfig(level=logging.DEBUG) @app.route('/authenticate_gmail') def authenticate_gmail(): authorization_url, state = flow.authorization_url( access_type='offline', include_granted_scopes='true' ) return redirect(authorization_url) logging.basicConfig(level=logging.DEBUG) # Gmail OAuth2 logic here #credentials = get_credentials_from_oauth() #if 'user_id' not in flask.session: #flask.session['user_id'] = str(uuid.uuid4()) # Generate a unique ID if not already present #user_id = flask.session.get('user_id') # Get user-specific ID #redis_store.set(user_id, credentials) # Save credentials to Redis (or a database) # After successful authentication: #session['credentials'] = credentials #return "Authentication successful!" #@app.route('/check_credentials') #def check_credentials(): #if 'credentials' in session: #return "Credentials are stored in session." #else: #return "No credentials found. Please authorize first." def save_to_gmail_drafts(credentials_info, subject, body, recipient): # Initialize the credentials credentials = Credentials( credentials_info['token'], refresh_token=credentials_info.get('refresh_token'), token_uri=credentials_info['token_uri'], client_id=credentials_info['client_id'], client_secret=credentials_info['client_secret'] ) # Refresh the token if expired if not credentials.valid and credentials.expired and credentials.refresh_token: credentials.refresh(Request()) service = build('gmail', 'v1', credentials=credentials) # Create the email message raw_message = f"To: {recipient}\nSubject: {subject}\n\n{body}" raw_message_bytes = base64.urlsafe_b64encode(raw_message.encode("utf-8")).decode("utf-8") # Save the draft message = {'message': {'raw': raw_message_bytes}} draft = service.users().drafts().create(userId="me", body=message).execute() return f"Draft created with ID: {draft['id']}" # Clean up the temporary file if os.path.exists("temp_client_secrets.json"): os.remove("temp_client_secrets.json") # Initialize Groq client client = Groq(api_key=os.environ.get("GROQ_API_KEY")) # Google Sheets setup SCOPES = ['https://www.googleapis.com/auth/spreadsheets'] SERVICE_ACCOUNT_JSON = os.environ.get("MY_SERVICE_ACCOUNT_JSON") SPREADSHEET_ID = os.environ.get("GOOGLE_SHEET_ID") if not SERVICE_ACCOUNT_JSON: raise ValueError("MY_SERVICE_ACCOUNT_JSON environment variable is not set or empty.") service_account_info = json.loads(SERVICE_ACCOUNT_JSON) credentials = Credentials.from_service_account_info(service_account_info, scopes=SCOPES) service = build('sheets', 'v4', credentials=credentials) sheet = service.spreadsheets() def fetch_user_details(email): """ Fetch user details from Google Sheet based on email. """ try: result = sheet.values().get(spreadsheetId=SPREADSHEET_ID, range="Sheet1!A:G").execute() rows = result.get('values', []) for row in rows: if row[0] == email: # Assuming email is in the first column (A) return { "email": row[0], "name": row[1], "professional_title": row[2], "industry": row[3], "target_audience": row[4], "personal_background": row[5], "company_url": row[6] } return None except Exception as e: return f"Error fetching details: {str(e)}" def save_user_info(email, name, professional_title, industry, target_audience, personal_background, company_url): """ Save or update user details in Google Sheet. """ try: result = sheet.values().get(spreadsheetId=SPREADSHEET_ID, range="Sheet1!A:G").execute() rows = result.get('values', []) updated = False # Check if email exists and update the row for i, row in enumerate(rows): if row[0] == email: rows[i] = [email, name, professional_title, industry, target_audience, personal_background, company_url] updated = True break if not updated: # Append a new row if email doesn't exist rows.append([email, name, professional_title, industry, target_audience, personal_background, company_url]) # Save updated rows back to the sheet body = {"values": rows} sheet.values().update( spreadsheetId=SPREADSHEET_ID, range="Sheet1!A:G", valueInputOption="RAW", body=body ).execute() return "Details saved successfully." except Exception as e: return f"Error saving details: {str(e)}" def handle_user_details(email, name=None, professional_title=None, industry=None, target_audience=None, personal_background=None, company_url=None, action="verify"): if action == "verify": # Fetch details based on email user_details = fetch_user_details(email) if user_details: return ( f"Record found for {email}. You can update details if needed.", user_details # Return existing details ) else: return ( f"No record found for {email}. Please enter details and save.", {} # Return empty fields ) elif action == "save": # Save or update details result = save_user_info(email, name, professional_title, industry, target_audience, personal_background, company_url) return result, {} # Step 1: User profile management UI def user_profile(email, name, professional_title, industry, target_audience, personal_background, company_url): save_user_info(email, name, professional_title, industry, target_audience, personal_background, company_url) return "Your information has been saved! Proceed to Step 2 for email generation." # Helper function to extract content from a URL def extract_content(url): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36' } try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') paragraphs = soup.find_all('p') content = ' '.join([para.get_text() for para in paragraphs]) return content[:2000] # Limit content to 2000 characters to avoid overload except Exception as e: return f"Error extracting content from {url}: {str(e)}" # Helper function to parse a sitemap and get valid URLs def parse_sitemap(sitemap_url): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36' } urls = [] try: response = requests.get(sitemap_url, headers=headers, timeout=10) response.raise_for_status() root = ET.fromstring(response.content) for loc in root.findall(".//{http://www.sitemaps.org/schemas/sitemap/0.9}loc"): urls.append(loc.text) except Exception as e: return f"Error parsing sitemap from {sitemap_url}: {str(e)}" return urls # Wrapper to handle retries and delay def safe_extract_content(url, delay=2): content = extract_content(url) if "Error extracting content" in content: print(content) # Log the error return None time.sleep(delay) # Respect crawl-delay return content # Function to fetch LinkedIn profile insights using Proxycurl API def fetch_linkedin_insights(profile_url): api_key = os.environ.get("PROXYCURL_API_KEY") api_endpoint = "https://nubela.co/proxycurl/api/v2/linkedin" headers = {"Authorization": f"Bearer {api_key}"} params = {"url": profile_url, "fallback_to_cache": "on-error"} try: response = requests.get(api_endpoint, headers=headers, params=params, timeout=10) response.raise_for_status() profile_data = response.json() insights = f"{profile_data.get('headline', '')}. {profile_data.get('summary', '')}" return insights except Exception as e: return f"Error fetching LinkedIn insights: {str(e)}" # Function to generate email using Llama def generate_email(name, email, prospect_name, linkedin_url, website_url, context_url, word_count, email_purpose, interested_position, company_url, professional_title, personal_background, output_language): # Fetch insights from LinkedIn and reference URLs linkedin_insights = fetch_linkedin_insights(linkedin_url) website_sitemap_url = urljoin(website_url, "sitemap_index.xml") website_content = safe_extract_content(website_url) if not website_content: # If direct scraping fails, fall back to the sitemap website_urls = parse_sitemap(website_sitemap_url) if isinstance(website_urls, list): for url in urls: website_content = safe_extract_content(url) if website_content: break context_content = extract_content(context_url) if context_url else "" # Fetch details from the company website company_sitemap_url = urljoin(company_url, "sitemap_index.xml") company_content = safe_extract_content(company_url) if not company_content: # If direct scraping fails, fall back to the sitemap company_urls = parse_sitemap(company_sitemap_url) if isinstance(company_urls, list): for url in urls: company_content = safe_extract_content(url) if company_content: break job_application = os.environ.get("JOB_APPLICATION") sales_cold_email = os.environ.get("SALES_COLD_EMAIL") # Construct the purpose-specific prompt if email_purpose == "Job Application": prompt = job_application.format( name=name, professional_title=professional_title, interested_position=interested_position, website_content=website_content, personal_background=personal_background, word_count=word_count, prospect_name=prospect_name, output_language=output_language ) elif email_purpose == "Sales Cold Email": prompt = sales_cold_email.format( company_content=company_content, prospect_name=prospect_name, linkedin_insights=linkedin_insights, website_content=website_content, context_content=context_content, word_count=word_count, output_language=output_language ) else: return "Invalid email purpose selected. Please choose either 'Job Application' or 'Sales Cold Email'." # Generate email using Llama try: chat_response = client.chat.completions.create( messages=[{"role": "user", "content": prompt}], model="llama3-8b-8192", ) email_content = chat_response.choices[0].message.content return email_content except Exception as e: return f"Error generating email: {str(e)}" # Step 2: Email generation UI def email_agent(credentials_info, name, email, prospect_name, linkedin_url, website_url, context_url, word_count, email_purpose, interested_position, company_url, professional_title, personal_background, output_language): try: # Generate the email content email_content = generate_email(name, email, prospect_name, linkedin_url, website_url, context_url, word_count, email_purpose, interested_position, company_url, professional_title, personal_background, output_language) if not email_content: return ("
Failed to generate email content.
", "") logging.basicConfig(level=logging.DEBUG) # Save to Gmail drafts try: save_to_gmail_drafts(credentials_info, subject="Generated Email Subject", body=email_content, recipient=email # Replace with the recipient's email ) return ("
Email saved to Gmail drafts successfully.
", email_content) except Exception as e: return (f"
Error saving to Gmail drafts: {str(e)}
", email_content) credentials_info = credential_store.get_credentials(email) if not credentials_info: return ("
Please click the 'Authenticate with Gmail' button above to connect your Gmail account.
", "") except Exception as e: return (f"
Error: {str(e)}
", "") # Gradio Interface def verify_user(email): # Logic for verifying the email message, user_details = handle_user_details(email, action="verify") # Ensure user_details is a dictionary if not isinstance(user_details, dict): user_details = {} if user_details: # Populate fields if user details are found return ( message, gr.update(visible=True, value=user_details.get("name", "")), gr.update(visible=True, value=user_details.get("professional_title", "")), gr.update(visible=True, value=user_details.get("industry", "")), gr.update(visible=True, value=user_details.get("target_audience", "")), gr.update(visible=True, value=user_details.get("personal_background", "")), gr.update(visible=True, value=user_details.get("company_url", "")) ) else: # Show empty fields for new user return ( message, gr.update(visible=True, value=""), gr.update(visible=True, value=""), gr.update(visible=True, value=""), gr.update(visible=True, value=""), gr.update(visible=True, value=""), gr.update(visible=True, value="") ) def save_and_switch_tab(email, name, professional_title, industry, target_audience, personal_background, company_url): # Logic for saving the user details message, _ = handle_user_details(email, name, professional_title, industry, target_audience, personal_background, company_url, action="save") return message, gr.update(selected=1) with gr.Blocks() as ui: with gr.Tabs() as tabs: with gr.Tab("Step 1: Profile"): email = gr.Textbox(label="Email", placeholder="Enter your email") verify_button = gr.Button("Verify") with gr.Row(): status = gr.Textbox(label="Status", interactive=False) # Other fields are initially hidden #with gr.Row() as details_section: name = gr.Textbox(label="Name", visible=False) professional_title = gr.Textbox(label="Professional Title", visible=False) industry = gr.Textbox(label="Industry", visible=False) target_audience = gr.Textbox(label="Target Audience", visible=False) personal_background = gr.Textbox(label="Personal Background", visible=False) company_url = gr.Textbox(label="Company URL", visible=False) save_button = gr.Button("Save & Proceed") with gr.Tab("Step 2: Email Generation"): # Gmail Authentication Button (HTML + JavaScript) auth_button_html = """
""" gr.HTML(auth_button_html) prospect_name = gr.Textbox(label="Prospect's Name") linkedin_url = gr.Textbox(label="LinkedIn Profile URL") website_url = gr.Textbox(label="Website URL") context_url = gr.Textbox(label="Additional Context URL (optional)") word_count = gr.Slider(label="Email Length (words)", minimum=150, maximum=1000, step=10, value=500) email_purpose = gr.Dropdown(label="Email Purpose", choices=["Job Application", "Sales Cold Email", "Select"], value="Select") output_language = gr.Dropdown(label="Select Output Language", choices=["English", "Spanish", "Portuguese"], value="English") interested_position = gr.Textbox(label="Interested Position (for Job Applications)") email_button = gr.Button("Generate Email") email_status = gr.HTML(label="Status") generated_email = gr.HTML(label="Generated Email") # Function to fetch user_id dynamically def get_user_id(): return session.get('user_id', "default_user") email_button.click( lambda *args: email_agent(None, *args), # Pass None as credentials_info inputs=[ name, email, prospect_name, linkedin_url, website_url, context_url, word_count, email_purpose, interested_position, company_url, professional_title, personal_background, output_language ], outputs=[email_status, generated_email] ) # Events verify_button.click( verify_user, inputs=[email], outputs=[ status, name, professional_title, industry, target_audience, personal_background, company_url, ] ) save_button.click( save_and_switch_tab, inputs=[ email, name, professional_title, industry, target_audience, personal_background, company_url, ], outputs=[status, tabs] ) ui.launch(share=True)