Spaces:
Sleeping
Sleeping
| """ | |
| AI Lead Generation & Outreach Agent | |
| Optimized for Hugging Face Spaces Deployment | |
| """ | |
| import os | |
| import csv | |
| import json | |
| import time | |
| import sqlite3 | |
| import re | |
| from datetime import datetime | |
| from typing import List, Dict, Optional | |
| import streamlit as st | |
| import pandas as pd | |
| import requests | |
| from bs4 import BeautifulSoup | |
| # ======================== Configuration ======================== | |
| class Config: | |
| # Hugging Face Configuration - Using environment variables for security | |
| HF_API_TOKEN = os.getenv("HF_TOKEN", "") # Will be set in HF Spaces secrets | |
| # Free models available on HF Inference API | |
| MODELS = { | |
| "Mistral-7B": "mistralai/Mistral-7B-Instruct-v0.1", | |
| "Falcon-7B": "tiiuae/falcon-7b-instruct", | |
| "GPT-2": "gpt2", | |
| "FLAN-T5": "google/flan-t5-base" | |
| } | |
| # Database | |
| DB_PATH = "leads.db" | |
| # Rate limiting | |
| SCRAPE_DELAY = 2 | |
| HF_API_DELAY = 1 | |
| # ======================== Database Setup ======================== | |
| def init_database(): | |
| """Initialize database with connection pooling for Streamlit""" | |
| conn = sqlite3.connect(Config.DB_PATH, check_same_thread=False) | |
| cursor = conn.cursor() | |
| # Create tables | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS leads ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT, | |
| title TEXT, | |
| company TEXT, | |
| email TEXT UNIQUE, | |
| industry TEXT, | |
| website TEXT, | |
| scraped_date TIMESTAMP, | |
| status TEXT DEFAULT 'new' | |
| ) | |
| ''') | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS generated_emails ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| lead_id INTEGER, | |
| subject TEXT, | |
| body TEXT, | |
| generated_date TIMESTAMP, | |
| sent_status TEXT DEFAULT 'draft', | |
| FOREIGN KEY (lead_id) REFERENCES leads (id) | |
| ) | |
| ''') | |
| conn.commit() | |
| return conn | |
| # ======================== Lead Generation ======================== | |
| class LeadGenerator: | |
| """Generate sample leads for demonstration""" | |
| def generate_sample_leads(industry, count=5): | |
| """Generate sample leads based on industry""" | |
| # Sample data templates | |
| companies = { | |
| "Tech": ["TechCorp", "Digital Solutions", "CloudBase", "AI Innovations", "DataFlow Systems"], | |
| "Marketing": ["Growth Agency", "Brand builders", "Digital Marketing Pro", "Creative Studios", "AdTech Solutions"], | |
| "Finance": ["FinTech Plus", "Investment Partners", "Capital Growth", "Wealth Advisors", "Banking Solutions"], | |
| "Healthcare": ["HealthTech", "MedCare Solutions", "Wellness Corp", "BioTech Innovations", "Healthcare Plus"], | |
| "E-commerce": ["ShopFlow", "E-tail Masters", "Commerce Cloud", "Online Retail Pro", "Marketplace Leaders"] | |
| } | |
| first_names = ["John", "Sarah", "Michael", "Emma", "David", "Lisa", "Robert", "Jennifer", "James", "Maria"] | |
| last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis", "Wilson", "Martinez"] | |
| titles = ["CEO", "Marketing Director", "VP Sales", "CTO", "COO", "Head of Growth", "Director", "Founder"] | |
| leads = [] | |
| company_list = companies.get(industry, companies["Tech"]) | |
| for i in range(min(count, len(company_list))): | |
| first = first_names[i % len(first_names)] | |
| last = last_names[i % len(last_names)] | |
| company = company_list[i] | |
| lead = { | |
| 'name': f"{first} {last}", | |
| 'title': titles[i % len(titles)], | |
| 'company': company, | |
| 'email': f"{first.lower()}.{last.lower()}@{company.lower().replace(' ', '')}.com", | |
| 'industry': industry, | |
| 'website': f"https://www.{company.lower().replace(' ', '')}.com", | |
| 'scraped_date': datetime.now() | |
| } | |
| leads.append(lead) | |
| return leads | |
| class WebScraper: | |
| """Simple web scraping utilities""" | |
| def extract_emails_from_text(text): | |
| """Extract email addresses from text""" | |
| email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' | |
| return list(set(re.findall(email_pattern, text))) | |
| def scrape_website_info(url): | |
| """Basic website scraping - for demonstration""" | |
| try: | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
| } | |
| response = requests.get(url, headers=headers, timeout=5) | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Extract basic info | |
| title = soup.find('title').text if soup.find('title') else "N/A" | |
| description = "" | |
| meta_desc = soup.find('meta', attrs={'name': 'description'}) | |
| if meta_desc: | |
| description = meta_desc.get('content', '') | |
| return { | |
| 'title': title, | |
| 'description': description, | |
| 'success': True | |
| } | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| # ======================== AI Email Generation ======================== | |
| class EmailGenerator: | |
| def __init__(self, api_token, model_name): | |
| self.api_token = api_token | |
| self.model_name = model_name | |
| self.api_url = f"https://api-inference.huggingface.co/models/{model_name}" | |
| self.headers = {"Authorization": f"Bearer {api_token}"} | |
| def generate_email(self, lead_data, product_info, style="professional"): | |
| """Generate personalized email using HF API""" | |
| # Create prompt based on model type | |
| if "gpt2" in self.model_name.lower(): | |
| prompt = self._create_simple_prompt(lead_data, product_info) | |
| else: | |
| prompt = self._create_detailed_prompt(lead_data, product_info, style) | |
| # Prepare API request | |
| payload = { | |
| "inputs": prompt, | |
| "parameters": { | |
| "max_new_tokens": 250, | |
| "temperature": 0.7, | |
| "top_p": 0.95, | |
| "do_sample": True | |
| } | |
| } | |
| try: | |
| response = requests.post( | |
| self.api_url, | |
| headers=self.headers, | |
| json=payload, | |
| timeout=30 | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| if isinstance(result, list) and len(result) > 0: | |
| generated_text = result[0].get('generated_text', '') | |
| else: | |
| generated_text = result.get('generated_text', '') | |
| return self._parse_email_response(generated_text, lead_data, product_info) | |
| else: | |
| return self._create_fallback_email(lead_data, product_info) | |
| except Exception as e: | |
| st.error(f"API Error: {str(e)}") | |
| return self._create_fallback_email(lead_data, product_info) | |
| def _create_simple_prompt(self, lead_data, product_info): | |
| """Simple prompt for GPT-2""" | |
| return f"""Write a business email to {lead_data['name']} at {lead_data['company']} about {product_info}. | |
| Subject: Helping {lead_data['company']} grow | |
| Dear {lead_data['name']},""" | |
| def _create_detailed_prompt(self, lead_data, product_info, style): | |
| """Detailed prompt for instruction-following models""" | |
| return f"""Generate a {style} cold outreach email with these details: | |
| Recipient: {lead_data['name']}, {lead_data['title']} at {lead_data['company']} | |
| Industry: {lead_data.get('industry', 'Business')} | |
| Product/Service: {product_info} | |
| Create a personalized email that: | |
| 1. Has an attention-grabbing subject line | |
| 2. Shows understanding of their industry | |
| 3. Clearly states the value proposition | |
| 4. Includes a specific call-to-action | |
| 5. Keeps it under 150 words | |
| Format: | |
| Subject: [Create subject line] | |
| Body: [Create email body] | |
| Email:""" | |
| def _parse_email_response(self, text, lead_data, product_info): | |
| """Parse AI response to extract subject and body""" | |
| # Try to find subject line | |
| subject_match = re.search(r'Subject:?\s*(.+?)(?:\n|$)', text, re.IGNORECASE) | |
| if subject_match: | |
| subject = subject_match.group(1).strip() | |
| # Remove subject from text to get body | |
| body = text[subject_match.end():].strip() | |
| else: | |
| subject = f"Opportunity for {lead_data['company']}" | |
| body = text.strip() | |
| # Clean up body | |
| body = re.sub(r'^(Body|Dear|Email):?\s*', '', body, flags=re.IGNORECASE).strip() | |
| # Ensure we have content | |
| if len(body) < 50: | |
| return self._create_fallback_email(lead_data, product_info) | |
| # Add greeting if missing | |
| if not body.lower().startswith(('hi', 'hello', 'dear')): | |
| body = f"Dear {lead_data['name']},\n\n{body}" | |
| # Add signature if missing | |
| if not any(word in body.lower() for word in ['regards', 'best', 'sincerely', 'thanks']): | |
| body += "\n\nBest regards,\n[Your Name]" | |
| return { | |
| 'subject': subject[:100], # Limit subject length | |
| 'body': body[:1000] # Limit body length | |
| } | |
| def _create_fallback_email(self, lead_data, product_info): | |
| """Fallback template when AI generation fails""" | |
| templates = [ | |
| { | |
| 'subject': f"Quick question for {lead_data['company']}", | |
| 'body': f"""Dear {lead_data['name']}, | |
| I hope this message finds you well. I noticed that {lead_data['company']} is a leader in the {lead_data.get('industry', 'industry')}, and I wanted to reach out with a brief introduction. | |
| {product_info} | |
| Companies similar to yours have seen significant improvements in efficiency and growth using our solution. | |
| Would you be open to a brief 15-minute call next week to discuss how this could benefit {lead_data['company']}? | |
| Best regards, | |
| [Your Name]""" | |
| }, | |
| { | |
| 'subject': f"Helping {lead_data['company']} achieve better results", | |
| 'body': f"""Hi {lead_data['name']}, | |
| As {lead_data['title']} at {lead_data['company']}, you're likely focused on driving growth and efficiency. | |
| {product_info} | |
| I'd love to show you how we've helped similar companies in the {lead_data.get('industry', 'industry')} achieve remarkable results. | |
| Are you available for a quick call this week? | |
| Best regards, | |
| [Your Name]""" | |
| } | |
| ] | |
| import random | |
| return random.choice(templates) | |
| # ======================== Streamlit App ======================== | |
| def main(): | |
| st.set_page_config( | |
| page_title="AI Lead Gen Agent", | |
| page_icon="π", | |
| layout="wide" | |
| ) | |
| # Custom CSS | |
| st.markdown(""" | |
| <style> | |
| .main { padding-top: 2rem; } | |
| .stButton>button { width: 100%; } | |
| .success-box { padding: 1rem; background-color: #d4edda; border-radius: 5px; margin: 1rem 0; } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Header | |
| st.title("π AI Lead Generation & Outreach Agent") | |
| st.markdown("Generate leads and create personalized outreach emails using AI") | |
| st.markdown("---") | |
| # Initialize database | |
| conn = init_database() | |
| # Sidebar Configuration | |
| with st.sidebar: | |
| st.header("βοΈ Configuration") | |
| # API Token | |
| st.subheader("π€ Hugging Face Setup") | |
| api_token = st.text_input( | |
| "HF API Token", | |
| type="password", | |
| value=Config.HF_API_TOKEN, | |
| help="Get your free token at huggingface.co" | |
| ) | |
| if not api_token: | |
| st.warning("β οΈ Please enter your Hugging Face API token") | |
| st.markdown("[Get your free token here](https://huggingface.co/settings/tokens)") | |
| # Model Selection | |
| selected_model = st.selectbox( | |
| "AI Model", | |
| options=list(Config.MODELS.keys()), | |
| help="Choose the AI model for email generation" | |
| ) | |
| # Product Description | |
| st.subheader("π Your Product/Service") | |
| product_description = st.text_area( | |
| "Description", | |
| value="We provide AI-powered automation solutions that help businesses streamline their operations, reduce costs by 40%, and increase productivity.", | |
| height=100 | |
| ) | |
| # Email Style | |
| email_style = st.radio( | |
| "Email Style", | |
| ["Professional", "Casual", "Creative"], | |
| help="Choose the tone for generated emails" | |
| ) | |
| # Main Content Area | |
| tab1, tab2, tab3, tab4 = st.tabs(["π Generate Leads", "βοΈ Create Emails", "π View Database", "π Analytics"]) | |
| # Tab 1: Generate Leads | |
| with tab1: | |
| st.header("Lead Generation") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("π― Quick Lead Generation") | |
| industry = st.selectbox( | |
| "Select Industry", | |
| ["Tech", "Marketing", "Finance", "Healthcare", "E-commerce"] | |
| ) | |
| num_leads = st.slider("Number of Leads", 1, 10, 5) | |
| if st.button("Generate Sample Leads", type="primary"): | |
| with st.spinner("Generating leads..."): | |
| # Generate sample leads | |
| generator = LeadGenerator() | |
| leads = generator.generate_sample_leads(industry, num_leads) | |
| # Save to database | |
| cursor = conn.cursor() | |
| saved = 0 | |
| for lead in leads: | |
| try: | |
| cursor.execute(''' | |
| INSERT INTO leads (name, title, company, email, industry, website, scraped_date, status) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| lead['name'], lead['title'], lead['company'], | |
| lead['email'], lead['industry'], lead['website'], | |
| lead['scraped_date'], 'new' | |
| )) | |
| saved += 1 | |
| except sqlite3.IntegrityError: | |
| pass # Skip duplicates | |
| conn.commit() | |
| st.success(f"β Generated {saved} new leads!") | |
| # Display generated leads | |
| df = pd.DataFrame(leads) | |
| st.dataframe(df[['name', 'title', 'company', 'email']]) | |
| with col2: | |
| st.subheader("π Website Scraper") | |
| website_url = st.text_input("Website URL", "https://example.com") | |
| if st.button("Scrape Website Info"): | |
| if website_url: | |
| with st.spinner("Scraping website..."): | |
| scraper = WebScraper() | |
| info = scraper.scrape_website_info(website_url) | |
| if info['success']: | |
| st.success("β Website scraped successfully!") | |
| st.write(f"**Title:** {info.get('title', 'N/A')}") | |
| st.write(f"**Description:** {info.get('description', 'N/A')}") | |
| else: | |
| st.error(f"Failed to scrape: {info.get('error', 'Unknown error')}") | |
| # Tab 2: Create Emails | |
| with tab2: | |
| st.header("Email Generation") | |
| if not api_token: | |
| st.warning("β οΈ Please configure your Hugging Face API token in the sidebar") | |
| else: | |
| # Fetch leads from database | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT * FROM leads WHERE status = 'new' ORDER BY scraped_date DESC") | |
| leads = cursor.fetchall() | |
| if not leads: | |
| st.info("No leads available. Generate some leads first!") | |
| else: | |
| # Lead selection | |
| lead_options = [f"{lead[1]} - {lead[3]} ({lead[4]})" for lead in leads] | |
| selected_index = st.selectbox("Select Lead", range(len(lead_options)), format_func=lambda x: lead_options[x]) | |
| selected_lead = leads[selected_index] | |
| # Display lead info | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.write(f"**Name:** {selected_lead[1]}") | |
| st.write(f"**Title:** {selected_lead[2]}") | |
| with col2: | |
| st.write(f"**Company:** {selected_lead[3]}") | |
| st.write(f"**Email:** {selected_lead[4]}") | |
| st.markdown("---") | |
| # Generate email button | |
| if st.button("π€ Generate Personalized Email", type="primary"): | |
| with st.spinner("Generating email with AI..."): | |
| # Prepare lead data | |
| lead_data = { | |
| 'name': selected_lead[1], | |
| 'title': selected_lead[2], | |
| 'company': selected_lead[3], | |
| 'email': selected_lead[4], | |
| 'industry': selected_lead[5] | |
| } | |
| # Generate email | |
| generator = EmailGenerator( | |
| api_token, | |
| Config.MODELS[selected_model] | |
| ) | |
| email = generator.generate_email( | |
| lead_data, | |
| product_description, | |
| email_style.lower() | |
| ) | |
| # Display generated email | |
| st.success("β Email generated successfully!") | |
| # Editable fields | |
| subject = st.text_input("Subject Line", value=email['subject']) | |
| body = st.text_area("Email Body", value=email['body'], height=300) | |
| # Save to database | |
| cursor.execute(''' | |
| INSERT INTO generated_emails (lead_id, subject, body, generated_date, sent_status) | |
| VALUES (?, ?, ?, ?, ?) | |
| ''', (selected_lead[0], subject, body, datetime.now(), 'draft')) | |
| conn.commit() | |
| # Action buttons | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| if st.button("πΎ Save Draft"): | |
| st.success("Draft saved!") | |
| with col2: | |
| if st.button("π Regenerate"): | |
| st.experimental_rerun() | |
| with col3: | |
| if st.button("π§ Copy to Clipboard"): | |
| st.info("Email copied! (Feature requires JavaScript)") | |
| # Tab 3: View Database | |
| with tab3: | |
| st.header("Lead Database") | |
| # Fetch all leads | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT * FROM leads ORDER BY scraped_date DESC") | |
| all_leads = cursor.fetchall() | |
| if all_leads: | |
| # Convert to DataFrame | |
| df = pd.DataFrame(all_leads, columns=['ID', 'Name', 'Title', 'Company', 'Email', 'Industry', 'Website', 'Date', 'Status']) | |
| # Display metrics | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Total Leads", len(all_leads)) | |
| with col2: | |
| new_leads = len([l for l in all_leads if l[8] == 'new']) | |
| st.metric("New Leads", new_leads) | |
| with col3: | |
| industries = len(set([l[5] for l in all_leads if l[5]])) | |
| st.metric("Industries", industries) | |
| # Display table | |
| st.dataframe(df[['Name', 'Title', 'Company', 'Email', 'Industry', 'Status']]) | |
| # Export option | |
| csv = df.to_csv(index=False) | |
| st.download_button( | |
| label="π₯ Download CSV", | |
| data=csv, | |
| file_name=f"leads_{datetime.now().strftime('%Y%m%d')}.csv", | |
| mime="text/csv" | |
| ) | |
| # Clear database option | |
| if st.button("ποΈ Clear All Leads", type="secondary"): | |
| cursor.execute("DELETE FROM leads") | |
| cursor.execute("DELETE FROM generated_emails") | |
| conn.commit() | |
| st.experimental_rerun() | |
| else: | |
| st.info("No leads in database. Start by generating some leads!") | |
| # Tab 4: Analytics | |
| with tab4: | |
| st.header("Campaign Analytics") | |
| cursor = conn.cursor() | |
| # Get statistics | |
| cursor.execute("SELECT COUNT(*) FROM leads") | |
| total_leads = cursor.fetchone()[0] | |
| cursor.execute("SELECT COUNT(*) FROM generated_emails") | |
| total_emails = cursor.fetchone()[0] | |
| cursor.execute("SELECT industry, COUNT(*) FROM leads GROUP BY industry") | |
| industry_data = cursor.fetchall() | |
| # Display metrics | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("Total Leads", total_leads) | |
| with col2: | |
| st.metric("Emails Generated", total_emails) | |
| with col3: | |
| avg_rate = (total_emails / total_leads * 100) if total_leads > 0 else 0 | |
| st.metric("Generation Rate", f"{avg_rate:.1f}%") | |
| with col4: | |
| st.metric("Industries", len(industry_data)) | |
| # Industry breakdown | |
| if industry_data: | |
| st.subheader("π Leads by Industry") | |
| industry_df = pd.DataFrame(industry_data, columns=['Industry', 'Count']) | |
| st.bar_chart(industry_df.set_index('Industry')) | |
| # Recent activity | |
| st.subheader("π Recent Activity") | |
| cursor.execute(""" | |
| SELECT name, company, scraped_date | |
| FROM leads | |
| ORDER BY scraped_date DESC | |
| LIMIT 10 | |
| """) | |
| recent = cursor.fetchall() | |
| if recent: | |
| recent_df = pd.DataFrame(recent, columns=['Name', 'Company', 'Date']) | |
| st.dataframe(recent_df) | |
| # Footer | |
| st.markdown("---") | |
| st.markdown( | |
| """ | |
| <div style='text-align: center; color: #666;'> | |
| Built with β€οΈ using Streamlit & Hugging Face | | |
| <a href='https://huggingface.co/spaces' target='_blank'>Deploy your own</a> | |
| </div> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| if __name__ == "__main__": | |
| main() |