import os import requests import logging from bs4 import BeautifulSoup import time import xml.etree.ElementTree as ET from urllib.parse import urljoin import gradio as gr from groq import Groq import json from googleapiclient.discovery import build from google.oauth2.service_account import Credentials from flask import Flask, redirect, request, session from google.auth.transport.requests import Request from google_auth_oauthlib.flow import Flow import base64 # Flask app setup app = Flask(__name__) app.secret_key = os.environ.get("FLASK_SECRET_KEY") REDIRECT_URI = "https://huggingface.co/spaces/curiousgeorge1292/Custom_Profile_Email_Generator/oauth2callback" GMAIL_SCOPES = ["https://www.googleapis.com/auth/gmail.compose"] # Create a simple credential storage mechanism class CredentialStore: def __init__(self): self.credentials = {} def set_credentials(self, user_email, creds): self.credentials[user_email] = creds def get_credentials(self, user_email): return self.credentials.get(user_email) # Create a global instance credential_store = CredentialStore() # Retrieve JSON string from the environment variable client_secrets_json = os.environ.get("GMAIL_OAUTH_SECRET_JSON") if not client_secrets_json: raise ValueError("Environment variable GMAIL_OAUTH_SECRET_JSON is not set or empty.") # Parse JSON string into a dictionary client_secrets = json.loads(client_secrets_json) # Write it to a temporary file (Google API requires a file path) with open("temp_client_secrets.json", "w") as temp_file: json.dump(client_secrets, temp_file) # Initialize Flow using the temporary JSON file flow = Flow.from_client_secrets_file( 'temp_client_secrets.json', # Path to the temporary JSON file scopes=['https://www.googleapis.com/auth/gmail.compose'], redirect_uri=REDIRECT_URI ) def get_user_email_from_credentials(credentials): try: service = build('gmail', 'v1', credentials=credentials) user_info = service.users().getProfile(userId='me').execute() return user_info.get('emailAddress') except Exception as e: print(f"Error getting user email: {e}") return None @app.route('/oauth2callback') def oauth2callback(): flow = Flow.from_client_secrets_file( 'temp_client_secrets.json', scopes=GMAIL_SCOPES, redirect_uri=REDIRECT_URI ) flow.fetch_token(authorization_response=request.url) logging.basicConfig(level=logging.DEBUG) # Save the credentials to the session credentials = flow.credentials creds_dict = { 'token': credentials.token, 'refresh_token': credentials.refresh_token, 'token_uri': credentials.token_uri, 'client_id': credentials.client_id, 'client_secret': credentials.client_secret, 'scopes': credentials.scopes } user_email = get_user_email_from_credentials(credentials) credential_store.set_credentials(user_email, creds_dict) return "Authorization successful. You can now save emails to Gmail drafts" logging.basicConfig(level=logging.DEBUG) @app.route('/authenticate_gmail') def authenticate_gmail(): authorization_url, state = flow.authorization_url( access_type='offline', include_granted_scopes='true' ) return redirect(authorization_url) logging.basicConfig(level=logging.DEBUG) # Gmail OAuth2 logic here #credentials = get_credentials_from_oauth() #if 'user_id' not in flask.session: #flask.session['user_id'] = str(uuid.uuid4()) # Generate a unique ID if not already present #user_id = flask.session.get('user_id') # Get user-specific ID #redis_store.set(user_id, credentials) # Save credentials to Redis (or a database) # After successful authentication: #session['credentials'] = credentials #return "Authentication successful!" #@app.route('/check_credentials') #def check_credentials(): #if 'credentials' in session: #return "Credentials are stored in session." #else: #return "No credentials found. Please authorize first." def save_to_gmail_drafts(credentials_info, subject, body, recipient): # Initialize the credentials credentials = Credentials( credentials_info['token'], refresh_token=credentials_info.get('refresh_token'), token_uri=credentials_info['token_uri'], client_id=credentials_info['client_id'], client_secret=credentials_info['client_secret'] ) # Refresh the token if expired if not credentials.valid and credentials.expired and credentials.refresh_token: credentials.refresh(Request()) service = build('gmail', 'v1', credentials=credentials) # Create the email message raw_message = f"To: {recipient}\nSubject: {subject}\n\n{body}" raw_message_bytes = base64.urlsafe_b64encode(raw_message.encode("utf-8")).decode("utf-8") # Save the draft message = {'message': {'raw': raw_message_bytes}} draft = service.users().drafts().create(userId="me", body=message).execute() return f"Draft created with ID: {draft['id']}" # Clean up the temporary file if os.path.exists("temp_client_secrets.json"): os.remove("temp_client_secrets.json") # Initialize Groq client client = Groq(api_key=os.environ.get("GROQ_API_KEY")) # Google Sheets setup SCOPES = ['https://www.googleapis.com/auth/spreadsheets'] SERVICE_ACCOUNT_JSON = os.environ.get("MY_SERVICE_ACCOUNT_JSON") SPREADSHEET_ID = os.environ.get("GOOGLE_SHEET_ID") if not SERVICE_ACCOUNT_JSON: raise ValueError("MY_SERVICE_ACCOUNT_JSON environment variable is not set or empty.") service_account_info = json.loads(SERVICE_ACCOUNT_JSON) credentials = Credentials.from_service_account_info(service_account_info, scopes=SCOPES) service = build('sheets', 'v4', credentials=credentials) sheet = service.spreadsheets() def fetch_user_details(email): """ Fetch user details from Google Sheet based on email. """ try: result = sheet.values().get(spreadsheetId=SPREADSHEET_ID, range="Sheet1!A:G").execute() rows = result.get('values', []) for row in rows: if row[0] == email: # Assuming email is in the first column (A) return { "email": row[0], "name": row[1], "professional_title": row[2], "industry": row[3], "target_audience": row[4], "personal_background": row[5], "company_url": row[6] } return None except Exception as e: return f"Error fetching details: {str(e)}" def save_user_info(email, name, professional_title, industry, target_audience, personal_background, company_url): """ Save or update user details in Google Sheet. """ try: result = sheet.values().get(spreadsheetId=SPREADSHEET_ID, range="Sheet1!A:G").execute() rows = result.get('values', []) updated = False # Check if email exists and update the row for i, row in enumerate(rows): if row[0] == email: rows[i] = [email, name, professional_title, industry, target_audience, personal_background, company_url] updated = True break if not updated: # Append a new row if email doesn't exist rows.append([email, name, professional_title, industry, target_audience, personal_background, company_url]) # Save updated rows back to the sheet body = {"values": rows} sheet.values().update( spreadsheetId=SPREADSHEET_ID, range="Sheet1!A:G", valueInputOption="RAW", body=body ).execute() return "Details saved successfully." except Exception as e: return f"Error saving details: {str(e)}" def handle_user_details(email, name=None, professional_title=None, industry=None, target_audience=None, personal_background=None, company_url=None, action="verify"): if action == "verify": # Fetch details based on email user_details = fetch_user_details(email) if user_details: return ( f"Record found for {email}. You can update details if needed.", user_details # Return existing details ) else: return ( f"No record found for {email}. Please enter details and save.", {} # Return empty fields ) elif action == "save": # Save or update details result = save_user_info(email, name, professional_title, industry, target_audience, personal_background, company_url) return result, {} # Step 1: User profile management UI def user_profile(email, name, professional_title, industry, target_audience, personal_background, company_url): save_user_info(email, name, professional_title, industry, target_audience, personal_background, company_url) return "Your information has been saved! Proceed to Step 2 for email generation." # Helper function to extract content from a URL def extract_content(url): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36' } try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') paragraphs = soup.find_all('p') content = ' '.join([para.get_text() for para in paragraphs]) return content[:2000] # Limit content to 2000 characters to avoid overload except Exception as e: return f"Error extracting content from {url}: {str(e)}" # Helper function to parse a sitemap and get valid URLs def parse_sitemap(sitemap_url): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36' } urls = [] try: response = requests.get(sitemap_url, headers=headers, timeout=10) response.raise_for_status() root = ET.fromstring(response.content) for loc in root.findall(".//{http://www.sitemaps.org/schemas/sitemap/0.9}loc"): urls.append(loc.text) except Exception as e: return f"Error parsing sitemap from {sitemap_url}: {str(e)}" return urls # Wrapper to handle retries and delay def safe_extract_content(url, delay=2): content = extract_content(url) if "Error extracting content" in content: print(content) # Log the error return None time.sleep(delay) # Respect crawl-delay return content # Function to fetch LinkedIn profile insights using Proxycurl API def fetch_linkedin_insights(profile_url): api_key = os.environ.get("PROXYCURL_API_KEY") api_endpoint = "https://nubela.co/proxycurl/api/v2/linkedin" headers = {"Authorization": f"Bearer {api_key}"} params = {"url": profile_url, "fallback_to_cache": "on-error"} try: response = requests.get(api_endpoint, headers=headers, params=params, timeout=10) response.raise_for_status() profile_data = response.json() insights = f"{profile_data.get('headline', '')}. {profile_data.get('summary', '')}" return insights except Exception as e: return f"Error fetching LinkedIn insights: {str(e)}" # Function to generate email using Llama def generate_email(name, email, prospect_name, linkedin_url, website_url, context_url, word_count, email_purpose, interested_position, company_url, professional_title, personal_background, output_language): # Fetch insights from LinkedIn and reference URLs linkedin_insights = fetch_linkedin_insights(linkedin_url) website_sitemap_url = urljoin(website_url, "sitemap_index.xml") website_content = safe_extract_content(website_url) if not website_content: # If direct scraping fails, fall back to the sitemap website_urls = parse_sitemap(website_sitemap_url) if isinstance(website_urls, list): for url in urls: website_content = safe_extract_content(url) if website_content: break context_content = extract_content(context_url) if context_url else "" # Fetch details from the company website company_sitemap_url = urljoin(company_url, "sitemap_index.xml") company_content = safe_extract_content(company_url) if not company_content: # If direct scraping fails, fall back to the sitemap company_urls = parse_sitemap(company_sitemap_url) if isinstance(company_urls, list): for url in urls: company_content = safe_extract_content(url) if company_content: break job_application = os.environ.get("JOB_APPLICATION") sales_cold_email = os.environ.get("SALES_COLD_EMAIL") # Construct the purpose-specific prompt if email_purpose == "Job Application": prompt = job_application.format( name=name, professional_title=professional_title, interested_position=interested_position, website_content=website_content, personal_background=personal_background, word_count=word_count, prospect_name=prospect_name, output_language=output_language ) elif email_purpose == "Sales Cold Email": prompt = sales_cold_email.format( company_content=company_content, prospect_name=prospect_name, linkedin_insights=linkedin_insights, website_content=website_content, context_content=context_content, word_count=word_count, output_language=output_language ) else: return "Invalid email purpose selected. Please choose either 'Job Application' or 'Sales Cold Email'." # Generate email using Llama try: chat_response = client.chat.completions.create( messages=[{"role": "user", "content": prompt}], model="llama3-8b-8192", ) email_content = chat_response.choices[0].message.content return email_content except Exception as e: return f"Error generating email: {str(e)}" # Step 2: Email generation UI def email_agent(credentials_info, name, email, prospect_name, linkedin_url, website_url, context_url, word_count, email_purpose, interested_position, company_url, professional_title, personal_background, output_language): try: # Generate the email content email_content = generate_email(name, email, prospect_name, linkedin_url, website_url, context_url, word_count, email_purpose, interested_position, company_url, professional_title, personal_background, output_language) if not email_content: return ("