Spaces:
Sleeping
Sleeping
| import json | |
| import random | |
| import re | |
| from typing import Dict, List | |
| import gradio as gr | |
| import gspread | |
| from oauth2client.service_account import ServiceAccountCredentials | |
| from datetime import datetime | |
| class LeadGenMCPServer: | |
| """LeadGen Pro MCP Server for finding and scoring business leads using AI reasoning""" | |
| def __init__(self): | |
| # No API key needed - using AI reasoning and demo data | |
| self.google_sheet = None | |
| def search_businesses(self, industry: str, location: str, max_results: int = 20) -> List[Dict]: | |
| """Search for businesses by industry and location using AI-generated realistic data""" | |
| try: | |
| businesses = [] | |
| # Generate realistic business names based on industry | |
| name_templates = { | |
| "restaurant": ["Bistro", "Cafe", "Grill", "Kitchen", "Diner", "Eatery"], | |
| "retail": ["Shop", "Store", "Boutique", "Market", "Emporium", "Outlet"], | |
| "tech": ["Solutions", "Systems", "Technologies", "Digital", "Software", "Labs"], | |
| "healthcare": ["Clinic", "Medical Center", "Health Services", "Wellness", "Care"], | |
| "default": ["Company", "Services", "Group", "Enterprise", "Associates"] | |
| } | |
| templates = name_templates.get(industry.lower(), name_templates["default"]) | |
| for i in range(min(max_results, 20)): | |
| business_name = f"{random.choice(['Prime', 'Elite', 'Metro', 'City', 'Downtown', 'Main Street'])} {random.choice(templates)}" | |
| businesses.append({ | |
| "name": business_name, | |
| "address": f"{random.randint(100, 9999)} {random.choice(['Main', 'Oak', 'Maple', 'Park', 'Broadway'])} St, {location}", | |
| "phone": f"+1-{random.randint(200, 999)}-{random.randint(100, 999)}-{random.randint(1000, 9999)}", | |
| "rating": round(random.uniform(3.5, 5.0), 1), | |
| "industry": industry, | |
| "website": f"https://{business_name.lower().replace(' ', '')}.com", | |
| "email": f"contact@{business_name.lower().replace(' ', '')}.com", | |
| "employees": random.choice([5, 10, 25, 50, 100, 250]), | |
| "founded": random.randint(2000, 2024) | |
| }) | |
| return businesses | |
| except Exception as e: | |
| return [{"error": str(e)}] | |
| def score_lead(self, business: Dict) -> Dict: | |
| """Score a business lead based on various factors""" | |
| try: | |
| score = 0 | |
| factors = {} | |
| # Rating score (0-30 points) | |
| if "rating" in business: | |
| rating_score = (business["rating"] / 5.0) * 30 | |
| score += rating_score | |
| factors["rating"] = rating_score | |
| # Has website (20 points) | |
| if business.get("website"): | |
| score += 20 | |
| factors["website"] = 20 | |
| # Has phone (15 points) | |
| if business.get("phone"): | |
| score += 15 | |
| factors["phone"] = 15 | |
| # Random engagement factor (0-35 points) | |
| engagement = random.uniform(0, 35) | |
| score += engagement | |
| factors["engagement"] = round(engagement, 1) | |
| return { | |
| "business": business["name"], | |
| "total_score": round(score, 1), | |
| "grade": self._get_grade(score), | |
| "factors": factors | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def _get_grade(self, score: float) -> str: | |
| """Convert score to letter grade""" | |
| if score >= 90: | |
| return "A" | |
| elif score >= 80: | |
| return "B" | |
| elif score >= 70: | |
| return "C" | |
| elif score >= 60: | |
| return "D" | |
| else: | |
| return "F" | |
| def export_leads(self, leads: List[Dict], format: str = "json") -> str: | |
| """Export leads to specified format""" | |
| try: | |
| if format == "json": | |
| return json.dumps(leads, indent=2) | |
| elif format == "csv": | |
| if not leads: | |
| return "No leads to export" | |
| import pandas as pd | |
| df = pd.DataFrame(leads) | |
| return df.to_csv(index=False) | |
| else: | |
| return "Unsupported format" | |
| except Exception as e: | |
| return f"Export error: {str(e)}" | |
| def setup_google_sheet(self, credentials_json: str, sheet_name: str): | |
| """Setup Google Sheets connection""" | |
| try: | |
| # Parse credentials | |
| creds_dict = json.loads(credentials_json) | |
| # Setup credentials | |
| scope = ['https://spreadsheets.google.com/feeds', | |
| 'https://www.googleapis.com/auth/drive'] | |
| creds = ServiceAccountCredentials.from_json_keyfile_dict(creds_dict, scope) | |
| client = gspread.authorize(creds) | |
| # Open or create sheet | |
| try: | |
| sheet = client.open(sheet_name).sheet1 | |
| except: | |
| spreadsheet = client.create(sheet_name) | |
| sheet = spreadsheet.sheet1 | |
| # Share with everyone | |
| spreadsheet.share('', perm_type='anyone', role='writer') | |
| # Setup headers if empty | |
| if not sheet.row_values(1): | |
| headers = ['Business Name', 'Decision Maker', 'Title', 'Email', | |
| 'Website', 'Location', 'LinkedIn URL', 'Industry', 'Date Added'] | |
| sheet.append_row(headers) | |
| self.google_sheet = sheet | |
| return sheet, None | |
| except Exception as e: | |
| return None, str(e) | |
| def export_to_google_sheets(self, leads: List[Dict], credentials_json: str, sheet_name: str) -> Dict: | |
| """Export leads to Google Sheets""" | |
| try: | |
| sheet, error = self.setup_google_sheet(credentials_json, sheet_name) | |
| if error: | |
| return {"success": False, "error": error} | |
| added_count = 0 | |
| for lead in leads: | |
| row = [ | |
| lead.get('business_name', lead.get('name', 'N/A')), | |
| lead.get('decision_maker', 'N/A'), | |
| lead.get('title', 'N/A'), | |
| lead.get('email', 'N/A'), | |
| lead.get('website', 'N/A'), | |
| lead.get('location', lead.get('address', 'N/A')), | |
| lead.get('linkedin_url', 'N/A'), | |
| lead.get('industry', 'N/A'), | |
| datetime.now().strftime('%Y-%m-%d %H:%M') | |
| ] | |
| sheet.append_row(row) | |
| added_count += 1 | |
| sheet_url = f"https://docs.google.com/spreadsheets/d/{sheet.spreadsheet.id}" | |
| return { | |
| "success": True, | |
| "added_count": added_count, | |
| "sheet_url": sheet_url | |
| } | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| def create_gradio_interface(server: LeadGenMCPServer): | |
| """Create Gradio interface for the MCP server""" | |
| def search_and_score(industry: str, location: str, max_results: int): | |
| businesses = server.search_businesses(industry, location, max_results) | |
| scored_leads = [server.score_lead(b) for b in businesses] | |
| scored_leads.sort(key=lambda x: x.get("total_score", 0), reverse=True) | |
| return json.dumps(scored_leads, indent=2) | |
| def export_to_sheets(leads_json: str, google_creds: str, sheet_name: str): | |
| """Export leads to Google Sheets""" | |
| try: | |
| if not google_creds.strip(): | |
| return "β Please provide Google Service Account credentials" | |
| if not sheet_name.strip(): | |
| return "β Please provide a sheet name" | |
| leads = json.loads(leads_json) | |
| result = server.export_to_google_sheets(leads, google_creds, sheet_name) | |
| if result["success"]: | |
| return f"β Successfully exported {result['added_count']} leads!\n\nπ Sheet URL: {result['sheet_url']}" | |
| else: | |
| return f"β Error: {result['error']}" | |
| except Exception as e: | |
| return f"β Error: {str(e)}" | |
| with gr.Blocks(title="LeadGen Pro MCP Server", theme=gr.themes.Soft(primary_hue="blue")) as interface: | |
| gr.Markdown("# π― LeadGen Pro MCP Server") | |
| gr.Markdown("Find, score, and export business leads with Google Sheets integration") | |
| with gr.Tabs(): | |
| # Tab 1: Lead Generation | |
| with gr.Tab("π Generate Leads"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| industry_input = gr.Textbox(label="Industry", placeholder="e.g., restaurant, retail, tech") | |
| location_input = gr.Textbox(label="Location", placeholder="e.g., New York, NY") | |
| max_results_input = gr.Slider(minimum=1, maximum=20, value=10, step=1, label="Max Results") | |
| search_btn = gr.Button("Search & Score Leads", variant="primary") | |
| with gr.Column(): | |
| output = gr.Textbox(label="Scored Leads (JSON)", lines=20) | |
| search_btn.click( | |
| fn=search_and_score, | |
| inputs=[industry_input, location_input, max_results_input], | |
| outputs=output | |
| ) | |
| # Tab 2: Export to Google Sheets | |
| with gr.Tab("π Export to Google Sheets"): | |
| gr.Markdown("### Export your leads to Google Sheets") | |
| leads_input = gr.Textbox( | |
| label="Leads JSON (paste from above)", | |
| lines=10, | |
| placeholder='[{"business": "...", "total_score": 95, ...}]' | |
| ) | |
| google_creds_input = gr.Textbox( | |
| label="Google Service Account JSON", | |
| lines=5, | |
| placeholder='{"type": "service_account", "project_id": "...", ...}', | |
| type="password" | |
| ) | |
| sheet_name_input = gr.Textbox( | |
| label="Google Sheet Name", | |
| placeholder="Sales Leads 2024", | |
| value="LeadGen Pro Leads" | |
| ) | |
| export_btn = gr.Button("π€ Export to Google Sheets", variant="primary") | |
| export_output = gr.Markdown() | |
| export_btn.click( | |
| fn=export_to_sheets, | |
| inputs=[leads_input, google_creds_input, sheet_name_input], | |
| outputs=export_output | |
| ) | |
| gr.Markdown(""" | |
| ### π Setup Google Sheets API | |
| 1. Go to https://console.cloud.google.com | |
| 2. Create project β Enable Google Sheets API | |
| 3. Create Service Account β Download JSON | |
| 4. Paste JSON above | |
| """) | |
| gr.Markdown("---") | |
| gr.Markdown("### API Endpoints") | |
| gr.Markdown(""" | |
| - `search_businesses(industry, location, max_results)` - Find businesses | |
| - `score_lead(business)` - Score a single lead | |
| - `export_leads(leads, format)` - Export leads to JSON/CSV | |
| - `export_to_google_sheets(leads, credentials, sheet_name)` - Export to Google Sheets | |
| """) | |
| return interface | |
| if __name__ == "__main__": | |
| server = LeadGenMCPServer() | |
| interface = create_gradio_interface(server) | |
| interface.launch(server_port=7860) |