import json import random import re from typing import Dict, List import gradio as gr import gspread from oauth2client.service_account import ServiceAccountCredentials from datetime import datetime class LeadGenMCPServer: """LeadGen Pro MCP Server for finding and scoring business leads using AI reasoning""" def __init__(self): # No API key needed - using AI reasoning and demo data self.google_sheet = None def search_businesses(self, industry: str, location: str, max_results: int = 20) -> List[Dict]: """Search for businesses by industry and location using AI-generated realistic data""" try: businesses = [] # Generate realistic business names based on industry name_templates = { "restaurant": ["Bistro", "Cafe", "Grill", "Kitchen", "Diner", "Eatery"], "retail": ["Shop", "Store", "Boutique", "Market", "Emporium", "Outlet"], "tech": ["Solutions", "Systems", "Technologies", "Digital", "Software", "Labs"], "healthcare": ["Clinic", "Medical Center", "Health Services", "Wellness", "Care"], "default": ["Company", "Services", "Group", "Enterprise", "Associates"] } templates = name_templates.get(industry.lower(), name_templates["default"]) for i in range(min(max_results, 20)): business_name = f"{random.choice(['Prime', 'Elite', 'Metro', 'City', 'Downtown', 'Main Street'])} {random.choice(templates)}" businesses.append({ "name": business_name, "address": f"{random.randint(100, 9999)} {random.choice(['Main', 'Oak', 'Maple', 'Park', 'Broadway'])} St, {location}", "phone": f"+1-{random.randint(200, 999)}-{random.randint(100, 999)}-{random.randint(1000, 9999)}", "rating": round(random.uniform(3.5, 5.0), 1), "industry": industry, "website": f"https://{business_name.lower().replace(' ', '')}.com", "email": f"contact@{business_name.lower().replace(' ', '')}.com", "employees": random.choice([5, 10, 25, 50, 100, 250]), "founded": random.randint(2000, 2024) }) return businesses except Exception as e: return [{"error": str(e)}] def score_lead(self, business: Dict) -> Dict: """Score a business lead based on various factors""" try: score = 0 factors = {} # Rating score (0-30 points) if "rating" in business: rating_score = (business["rating"] / 5.0) * 30 score += rating_score factors["rating"] = rating_score # Has website (20 points) if business.get("website"): score += 20 factors["website"] = 20 # Has phone (15 points) if business.get("phone"): score += 15 factors["phone"] = 15 # Random engagement factor (0-35 points) engagement = random.uniform(0, 35) score += engagement factors["engagement"] = round(engagement, 1) return { "business": business["name"], "total_score": round(score, 1), "grade": self._get_grade(score), "factors": factors } except Exception as e: return {"error": str(e)} def _get_grade(self, score: float) -> str: """Convert score to letter grade""" if score >= 90: return "A" elif score >= 80: return "B" elif score >= 70: return "C" elif score >= 60: return "D" else: return "F" def export_leads(self, leads: List[Dict], format: str = "json") -> str: """Export leads to specified format""" try: if format == "json": return json.dumps(leads, indent=2) elif format == "csv": if not leads: return "No leads to export" import pandas as pd df = pd.DataFrame(leads) return df.to_csv(index=False) else: return "Unsupported format" except Exception as e: return f"Export error: {str(e)}" def setup_google_sheet(self, credentials_json: str, sheet_name: str): """Setup Google Sheets connection""" try: # Parse credentials creds_dict = json.loads(credentials_json) # Setup credentials scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive'] creds = ServiceAccountCredentials.from_json_keyfile_dict(creds_dict, scope) client = gspread.authorize(creds) # Open or create sheet try: sheet = client.open(sheet_name).sheet1 except: spreadsheet = client.create(sheet_name) sheet = spreadsheet.sheet1 # Share with everyone spreadsheet.share('', perm_type='anyone', role='writer') # Setup headers if empty if not sheet.row_values(1): headers = ['Business Name', 'Decision Maker', 'Title', 'Email', 'Website', 'Location', 'LinkedIn URL', 'Industry', 'Date Added'] sheet.append_row(headers) self.google_sheet = sheet return sheet, None except Exception as e: return None, str(e) def export_to_google_sheets(self, leads: List[Dict], credentials_json: str, sheet_name: str) -> Dict: """Export leads to Google Sheets""" try: sheet, error = self.setup_google_sheet(credentials_json, sheet_name) if error: return {"success": False, "error": error} added_count = 0 for lead in leads: row = [ lead.get('business_name', lead.get('name', 'N/A')), lead.get('decision_maker', 'N/A'), lead.get('title', 'N/A'), lead.get('email', 'N/A'), lead.get('website', 'N/A'), lead.get('location', lead.get('address', 'N/A')), lead.get('linkedin_url', 'N/A'), lead.get('industry', 'N/A'), datetime.now().strftime('%Y-%m-%d %H:%M') ] sheet.append_row(row) added_count += 1 sheet_url = f"https://docs.google.com/spreadsheets/d/{sheet.spreadsheet.id}" return { "success": True, "added_count": added_count, "sheet_url": sheet_url } except Exception as e: return {"success": False, "error": str(e)} def create_gradio_interface(server: LeadGenMCPServer): """Create Gradio interface for the MCP server""" def search_and_score(industry: str, location: str, max_results: int): businesses = server.search_businesses(industry, location, max_results) scored_leads = [server.score_lead(b) for b in businesses] scored_leads.sort(key=lambda x: x.get("total_score", 0), reverse=True) return json.dumps(scored_leads, indent=2) def export_to_sheets(leads_json: str, google_creds: str, sheet_name: str): """Export leads to Google Sheets""" try: if not google_creds.strip(): return "āŒ Please provide Google Service Account credentials" if not sheet_name.strip(): return "āŒ Please provide a sheet name" leads = json.loads(leads_json) result = server.export_to_google_sheets(leads, google_creds, sheet_name) if result["success"]: return f"āœ… Successfully exported {result['added_count']} leads!\n\nšŸ“Š Sheet URL: {result['sheet_url']}" else: return f"āŒ Error: {result['error']}" except Exception as e: return f"āŒ Error: {str(e)}" with gr.Blocks(title="LeadGen Pro MCP Server", theme=gr.themes.Soft(primary_hue="blue")) as interface: gr.Markdown("# šŸŽÆ LeadGen Pro MCP Server") gr.Markdown("Find, score, and export business leads with Google Sheets integration") with gr.Tabs(): # Tab 1: Lead Generation with gr.Tab("šŸ” Generate Leads"): with gr.Row(): with gr.Column(): industry_input = gr.Textbox(label="Industry", placeholder="e.g., restaurant, retail, tech") location_input = gr.Textbox(label="Location", placeholder="e.g., New York, NY") max_results_input = gr.Slider(minimum=1, maximum=20, value=10, step=1, label="Max Results") search_btn = gr.Button("Search & Score Leads", variant="primary") with gr.Column(): output = gr.Textbox(label="Scored Leads (JSON)", lines=20) search_btn.click( fn=search_and_score, inputs=[industry_input, location_input, max_results_input], outputs=output ) # Tab 2: Export to Google Sheets with gr.Tab("šŸ“Š Export to Google Sheets"): gr.Markdown("### Export your leads to Google Sheets") leads_input = gr.Textbox( label="Leads JSON (paste from above)", lines=10, placeholder='[{"business": "...", "total_score": 95, ...}]' ) google_creds_input = gr.Textbox( label="Google Service Account JSON", lines=5, placeholder='{"type": "service_account", "project_id": "...", ...}', type="password" ) sheet_name_input = gr.Textbox( label="Google Sheet Name", placeholder="Sales Leads 2024", value="LeadGen Pro Leads" ) export_btn = gr.Button("šŸ“¤ Export to Google Sheets", variant="primary") export_output = gr.Markdown() export_btn.click( fn=export_to_sheets, inputs=[leads_input, google_creds_input, sheet_name_input], outputs=export_output ) gr.Markdown(""" ### šŸ“‹ Setup Google Sheets API 1. Go to https://console.cloud.google.com 2. Create project → Enable Google Sheets API 3. Create Service Account → Download JSON 4. Paste JSON above """) gr.Markdown("---") gr.Markdown("### API Endpoints") gr.Markdown(""" - `search_businesses(industry, location, max_results)` - Find businesses - `score_lead(business)` - Score a single lead - `export_leads(leads, format)` - Export leads to JSON/CSV - `export_to_google_sheets(leads, credentials, sheet_name)` - Export to Google Sheets """) return interface if __name__ == "__main__": server = LeadGenMCPServer() interface = create_gradio_interface(server) interface.launch(server_port=7860)