Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import time | |
| from datetime import datetime | |
| from typing import List, Dict, Any, Optional, Union | |
| from pydantic import BaseModel, Field, EmailStr, validator | |
| from fastapi import FastAPI, HTTPException, Query, Depends, Request | |
| from fastapi.responses import JSONResponse, Response | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.openapi.utils import get_openapi | |
| import httpx | |
| from dotenv import load_dotenv | |
| # LangChain and OpenAI imports | |
| try: | |
| from langchain_openai import ChatOpenAI | |
| from langchain.prompts import ChatPromptTemplate | |
| LANGCHAIN_AVAILABLE = True | |
| except ImportError: | |
| LANGCHAIN_AVAILABLE = False | |
| print("Warning: LangChain not available. Install with: pip install langchain langchain-openai") | |
| load_dotenv(override=True) | |
| # Configuration | |
| SMARTLEAD_API_KEY = os.getenv("SMARTLEAD_API_KEY", "your-api-key-here") | |
| SMARTLEAD_BASE_URL = "https://server.smartlead.ai/api/v1" | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="Smartlead API - Complete Integration", | |
| version="2.0.0", | |
| description="Comprehensive FastAPI wrapper for Smartlead email automation platform", | |
| docs_url="/docs", | |
| redoc_url="/redoc" | |
| ) | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ============================================================================ | |
| # DATA MODELS | |
| # ============================================================================ | |
| class CreateCampaignRequest(BaseModel): | |
| name: str = Field(..., description="Campaign name") | |
| client_id: Optional[int] = Field(None, description="Client ID (leave null if no client)") | |
| class CampaignScheduleRequest(BaseModel): | |
| timezone: str = Field(..., description="Timezone for the campaign schedule (e.g., 'America/Los_Angeles')") | |
| days_of_the_week: List[int] = Field(..., description="Days of the week for scheduling [0=Sunday, 1=Monday, 2=Tuesday, 3=Wednesday, 4=Thursday, 5=Friday, 6=Saturday]") | |
| start_hour: str = Field(..., description="Start hour for sending emails in HH:MM format (e.g., '09:00')") | |
| end_hour: str = Field(..., description="End hour for sending emails in HH:MM format (e.g., '18:00')") | |
| min_time_btw_emails: int = Field(..., description="Minimum time in minutes between sending emails") | |
| max_new_leads_per_day: int = Field(..., description="Maximum number of new leads to process per day") | |
| schedule_start_time: str = Field(..., description="Schedule start time in ISO 8601 format (e.g., '2023-04-25T07:29:25.978Z')") | |
| class CampaignSettingsRequest(BaseModel): | |
| track_settings: List[str] = Field(..., description="Tracking settings array (allowed values: DONT_TRACK_EMAIL_OPEN, DONT_TRACK_LINK_CLICK, DONT_TRACK_REPLY_TO_AN_EMAIL)") | |
| stop_lead_settings: str = Field(..., description="Settings for stopping leads (allowed values: CLICK_ON_A_LINK, OPEN_AN_EMAIL)") | |
| unsubscribe_text: str = Field(..., description="Text for the unsubscribe link") | |
| send_as_plain_text: bool = Field(..., description="Whether emails should be sent as plain text") | |
| follow_up_percentage: int = Field(ge=0, le=100, description="Follow-up percentage (max 100, min 0)") | |
| client_id: Optional[int] = Field(None, description="Client ID (leave as null if not needed)") | |
| enable_ai_esp_matching: bool = Field(False, description="Enable AI ESP matching (by default is false)") | |
| class LeadInput(BaseModel): | |
| first_name: Optional[str] = Field(None, description="Lead's first name") | |
| last_name: Optional[str] = Field(None, description="Lead's last name") | |
| email: str = Field(..., description="Lead's email address") | |
| phone_number: Optional[Union[str, int]] = Field(None, description="Lead's phone number (can be string or integer)") | |
| company_name: Optional[str] = Field(None, description="Lead's company name") | |
| website: Optional[str] = Field(None, description="Lead's website") | |
| location: Optional[str] = Field(None, description="Lead's location") | |
| custom_fields: Optional[Dict[str, str]] = Field(None, description="Custom fields as key-value pairs (max 20 fields)") | |
| linkedin_profile: Optional[str] = Field(None, description="Lead's LinkedIn profile URL") | |
| company_url: Optional[str] = Field(None, description="Company website URL") | |
| def validate_custom_fields(cls, v): | |
| if v is not None and len(v) > 20: | |
| raise ValueError('Custom fields cannot exceed 20 fields') | |
| return v | |
| def validate_phone_number(cls, v): | |
| if v is not None: | |
| # Convert to string if it's an integer | |
| return str(v) | |
| return v | |
| class LeadSettings(BaseModel): | |
| ignore_global_block_list: bool = Field(True, description="Ignore leads if they are in the global block list") | |
| ignore_unsubscribe_list: bool = Field(True, description="Ignore leads if they are in the unsubscribe list") | |
| ignore_duplicate_leads_in_other_campaign: bool = Field(False, description="Allow leads to be added even if they are duplicates in other campaigns") | |
| class AddLeadsRequest(BaseModel): | |
| lead_list: List[LeadInput] = Field(..., max_items=100, description="List of leads to add (maximum 100 leads)") | |
| settings: Optional[LeadSettings] = Field(None, description="Settings for lead processing") | |
| class AddLeadsResponse(BaseModel): | |
| ok: bool = Field(..., description="Indicates if the operation was successful") | |
| upload_count: int = Field(..., description="Number of leads successfully uploaded") | |
| total_leads: int = Field(..., description="Total number of leads attempted to upload") | |
| already_added_to_campaign: int = Field(..., description="Number of leads already present in the campaign") | |
| duplicate_count: int = Field(..., description="Number of duplicate emails found") | |
| invalid_email_count: int = Field(..., description="Number of leads with invalid email format") | |
| unsubscribed_leads: Any = Field(..., description="Number of leads that had previously unsubscribed (can be int or empty list)") | |
| class SeqDelayDetails(BaseModel): | |
| delay_in_days: int = Field(..., description="Delay in days before sending this sequence") | |
| class SeqVariant(BaseModel): | |
| subject: str = Field(..., description="Email subject line") | |
| email_body: str = Field(..., description="Email body content (HTML format)") | |
| variant_label: str = Field(..., description="Variant label (A, B, C, etc.)") | |
| id: Optional[int] = Field(None, description="Variant ID (only for updating, not for creating)") | |
| class CampaignSequence(BaseModel): | |
| id: Optional[int] = Field(None, description="Sequence ID (only for updating, not for creating)") | |
| seq_number: int = Field(..., description="Sequence number (1, 2, 3, etc.)") | |
| seq_delay_details: SeqDelayDetails = Field(..., description="Delay details for this sequence") | |
| seq_variants: Optional[List[SeqVariant]] = Field(None, description="Email variants for A/B testing") | |
| subject: Optional[str] = Field("", description="Subject line (blank for follow-up in same thread)") | |
| email_body: Optional[str] = Field(None, description="Email body content (HTML format)") | |
| class SaveSequencesRequest(BaseModel): | |
| sequences: List[CampaignSequence] = Field(..., description="List of campaign sequences") | |
| class GenerateSequencesRequest(BaseModel): | |
| job_description: str = Field(..., description="Job description to generate sequences for") | |
| class Campaign(BaseModel): | |
| id: int | |
| user_id: int | |
| created_at: datetime | |
| updated_at: datetime | |
| status: str | |
| name: str | |
| track_settings: Union[str, List[Any]] # FIX: Accept string or list | |
| scheduler_cron_value: Optional[Union[str, Dict[str, Any]]] = None # FIX: Accept string or dict | |
| min_time_btwn_emails: int | |
| max_leads_per_day: int | |
| stop_lead_settings: str | |
| unsubscribe_text: Optional[str] = None | |
| client_id: Optional[int] = None | |
| enable_ai_esp_matching: bool | |
| send_as_plain_text: bool | |
| follow_up_percentage: Optional[Union[str, int]] = None # FIX: Accept string or int | |
| class CampaignListResponse(BaseModel): | |
| campaigns: List[Campaign] | |
| total: int | |
| source: str | |
| class Lead(BaseModel): | |
| id: int | |
| email: EmailStr | |
| first_name: Optional[str] = None | |
| last_name: Optional[str] = None | |
| company: Optional[str] = None | |
| position: Optional[str] = None | |
| phone_number: Optional[str] = None | |
| linkedin_url: Optional[str] = None | |
| status: Optional[str] = None | |
| class WarmupDetails(BaseModel): | |
| status: str | |
| total_sent_count: int | |
| total_spam_count: int | |
| warmup_reputation: Union[str, int] | |
| warmup_key_id: Optional[str] = None | |
| warmup_created_at: Optional[datetime] = None | |
| reply_rate: Optional[int] = None | |
| blocked_reason: Optional[str] = None | |
| class EmailAccount(BaseModel): | |
| id: int | |
| created_at: datetime | |
| updated_at: datetime | |
| user_id: int | |
| from_name: str | |
| from_email: str | |
| username: str | |
| password: Optional[str] = None | |
| smtp_host: Optional[str] = None | |
| smtp_port: Optional[int] = None | |
| smtp_port_type: Optional[str] = None | |
| message_per_day: int | |
| different_reply_to_address: Optional[str] = None | |
| is_different_imap_account: bool | |
| imap_username: Optional[str] = None | |
| imap_password: Optional[str] = None | |
| imap_host: Optional[str] = None | |
| imap_port: Optional[int] = None | |
| imap_port_type: Optional[str] = None | |
| signature: Optional[str] = None | |
| custom_tracking_domain: Optional[str] = None | |
| bcc_email: Optional[str] = None | |
| is_smtp_success: bool | |
| is_imap_success: bool | |
| smtp_failure_error: Optional[str] = None | |
| imap_failure_error: Optional[str] = None | |
| type: str | |
| daily_sent_count: int | |
| client_id: Optional[int] = None | |
| campaign_count: Optional[int] = None | |
| warmup_details: Optional[WarmupDetails] = None | |
| class WarmupSettingsRequest(BaseModel): | |
| warmup_enabled: bool | |
| total_warmup_per_day: Optional[int] = None | |
| daily_rampup: Optional[int] = None | |
| reply_rate_percentage: Optional[int] = None | |
| warmup_key_id: Optional[str] = Field(None, description="String value if passed will update the custom warmup-key identifier") | |
| class LeadCategoryUpdateRequest(BaseModel): | |
| category_id: int = Field(..., description="Category ID to assign to the lead") | |
| pause_lead: bool = Field(False, description="Whether to pause the lead after category update") | |
| class CampaignStatusUpdateRequest(BaseModel): | |
| status: str = Field(..., description="New campaign status (PAUSED, STOPPED, START)") | |
| class ResumeLeadRequest(BaseModel): | |
| resume_lead_with_delay_days: Optional[int] = Field(None, description="Delay in days before resuming (defaults to 0)") | |
| class DomainBlockListRequest(BaseModel): | |
| domain_block_list: List[str] = Field(..., description="List of domains/emails to block") | |
| client_id: Optional[int] = Field(None, description="Client ID if blocking is client-specific") | |
| class WebhookRequest(BaseModel): | |
| id: Optional[int] = Field(None, description="Webhook ID (null for creating new)") | |
| name: str = Field(..., description="Webhook name") | |
| webhook_url: str = Field(..., description="Webhook URL") | |
| event_types: List[str] = Field(..., description="List of event types to listen for") | |
| categories: Optional[List[str]] = Field(None, description="List of categories to filter by") | |
| class WebhookDeleteRequest(BaseModel): | |
| id: int = Field(..., description="Webhook ID to delete") | |
| class ClientRequest(BaseModel): | |
| name: str = Field(..., description="Client name") | |
| email: str = Field(..., description="Client email") | |
| permission: List[str] = Field(..., description="List of permissions") | |
| logo: Optional[str] = Field(None, description="Client logo text") | |
| logo_url: Optional[str] = Field(None, description="Client logo URL") | |
| password: str = Field(..., description="Client password") | |
| class MessageHistoryRequest(BaseModel): | |
| email_stats_id: str = Field(..., description="Email stats ID for the specific email") | |
| email_body: str = Field(..., description="Reply message email body") | |
| reply_message_id: str = Field(..., description="Message ID to reply to") | |
| reply_email_time: str = Field(..., description="Time of the message being replied to") | |
| reply_email_body: str = Field(..., description="Body of the message being replied to") | |
| cc: Optional[str] = Field(None, description="CC recipients") | |
| bcc: Optional[str] = Field(None, description="BCC recipients") | |
| add_signature: bool = Field(True, description="Whether to add signature") | |
| # ============================================================================ | |
| # HELPER FUNCTIONS | |
| # ============================================================================ | |
| def _get_smartlead_url(endpoint: str) -> str: | |
| return f"{SMARTLEAD_BASE_URL}/{endpoint.lstrip('/')}" | |
| async def call_smartlead_api(method: str, endpoint: str, data: Any = None, params: Dict[str, Any] = None) -> Any: | |
| if SMARTLEAD_API_KEY == "your-api-key-here": | |
| raise HTTPException(status_code=400, detail="Smartlead API key not configured") | |
| if params is None: | |
| params = {} | |
| params['api_key'] = SMARTLEAD_API_KEY | |
| url = _get_smartlead_url(endpoint) | |
| try: | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| request_kwargs = {"params": params} | |
| if data is not None: | |
| request_kwargs["json"] = data | |
| resp = await client.request(method, url, **request_kwargs) | |
| if resp.status_code >= 400: | |
| try: | |
| error_data = resp.json() | |
| error_message = error_data.get('message', error_data.get('error', 'Unknown error')) | |
| raise HTTPException(status_code=resp.status_code, detail=error_message) | |
| except (ValueError, KeyError): | |
| raise HTTPException(status_code=resp.status_code, detail=resp.text) | |
| return resp.json() | |
| except httpx.TimeoutException: | |
| raise HTTPException(status_code=408, detail="Request to Smartlead API timed out") | |
| except httpx.RequestError as e: | |
| raise HTTPException(status_code=503, detail=f"Failed to connect to Smartlead API: {str(e)}") | |
| # ============================================================================ | |
| # CAMPAIGN ENDPOINTS | |
| # ============================================================================ | |
| async def create_campaign(campaign: CreateCampaignRequest): | |
| """Create a new campaign in Smartlead""" | |
| return await call_smartlead_api("POST", "campaigns/create", data=campaign.dict()) | |
| async def list_campaigns(): | |
| """Fetch all campaigns from Smartlead API""" | |
| campaigns = await call_smartlead_api("GET", "campaigns") | |
| return {"campaigns": campaigns, "total": len(campaigns), "source": "smartlead"} | |
| async def get_campaign(campaign_id: int): | |
| """Get Campaign By Id""" | |
| return await call_smartlead_api("GET", f"campaigns/{campaign_id}") | |
| async def update_campaign_settings(campaign_id: int, settings: CampaignSettingsRequest): | |
| """Update Campaign General Settings""" | |
| return await call_smartlead_api("POST", f"campaigns/{campaign_id}/settings", data=settings.dict()) | |
| async def schedule_campaign(campaign_id: int, schedule: CampaignScheduleRequest): | |
| """Update Campaign Schedule""" | |
| return await call_smartlead_api("POST", f"campaigns/{campaign_id}/schedule", data=schedule.dict()) | |
| async def delete_campaign(campaign_id: int): | |
| """Delete Campaign""" | |
| return await call_smartlead_api("DELETE", f"campaigns/{campaign_id}") | |
| async def patch_campaign_status(campaign_id: int, request: CampaignStatusUpdateRequest): | |
| """Patch campaign status""" | |
| return await call_smartlead_api("POST", f"campaigns/{campaign_id}/status", data=request.dict()) | |
| async def campaign_analytics(campaign_id: int): | |
| """Fetch analytics for a campaign""" | |
| return await call_smartlead_api("GET", f"campaigns/{campaign_id}/analytics") | |
| async def fetch_campaign_statistics_by_campaign_id( | |
| campaign_id: int, | |
| offset: int = 0, | |
| limit: int = 100, | |
| email_sequence_number: Optional[int] = None, | |
| email_status: Optional[str] = None | |
| ): | |
| """Fetch Campaign Statistics By Campaign Id""" | |
| params = {"offset": offset, "limit": limit} | |
| if email_sequence_number: | |
| params["email_sequence_number"] = email_sequence_number | |
| if email_status: | |
| params["email_status"] = email_status | |
| return await call_smartlead_api("GET", f"campaigns/{campaign_id}/statistics", params=params) | |
| async def fetch_campaign_statistics_by_date_range( | |
| campaign_id: int, | |
| start_date: str, | |
| end_date: str | |
| ): | |
| """Fetch Campaign Statistics By Campaign Id And Date Range""" | |
| params = {"start_date": start_date, "end_date": end_date} | |
| return await call_smartlead_api("GET", f"campaigns/{campaign_id}/analytics-by-date", params=params) | |
| # ============================================================================ | |
| # LEAD MANAGEMENT ENDPOINTS | |
| # ============================================================================ | |
| async def get_campaign_leads(campaign_id: int, offset: int = 0, limit: int = 100): | |
| """List all leads by campaign id""" | |
| params = {"offset": offset, "limit": limit} | |
| return await call_smartlead_api("GET", f"campaigns/{campaign_id}/leads", params=params) | |
| async def add_leads_to_campaign(campaign_id: int, request: AddLeadsRequest): | |
| """Add leads to a campaign by ID with personalized welcome and closing messages""" | |
| request_data = request.dict() | |
| # Process each lead to generate personalized messages and clean up data | |
| for lead in request_data.get("lead_list", []): | |
| lead_cleaned = {k: v for k, v in lead.items() if v is not None and v != ""} | |
| # Generate personalized welcome and closing messages using LLM | |
| try: | |
| personalized_messages = await generate_welcome_closing_messages(lead_cleaned) | |
| # Initialize custom_fields if it doesn't exist | |
| if "custom_fields" not in lead_cleaned: | |
| lead_cleaned["custom_fields"] = {} | |
| # Add the generated messages to custom_fields | |
| if personalized_messages.get("welcome_message"): | |
| lead_cleaned["custom_fields"]["Welcome_Message"] = personalized_messages["welcome_message"] | |
| if personalized_messages.get("closing_message"): | |
| lead_cleaned["custom_fields"]["Closing_Message"] = personalized_messages["closing_message"] | |
| except Exception as e: | |
| print(f"Error generating personalized messages for lead {lead_cleaned.get('email', 'unknown')}: {str(e)}") | |
| # Continue with template messages if LLM fails | |
| template_messages = generate_template_welcome_closing_messages(lead_cleaned) | |
| if "custom_fields" not in lead_cleaned: | |
| lead_cleaned["custom_fields"] = {} | |
| if template_messages.get("welcome_message"): | |
| lead_cleaned["custom_fields"]["Welcome_Message"] = template_messages["welcome_message"] | |
| if template_messages.get("closing_message"): | |
| lead_cleaned["custom_fields"]["Closing_Message"] = template_messages["closing_message"] | |
| # Clean up custom_fields - remove None values and empty strings | |
| if "custom_fields" in lead_cleaned: | |
| custom_fields = lead_cleaned["custom_fields"] | |
| if custom_fields: | |
| custom_fields_cleaned = {k: v for k, v in custom_fields.items() if v is not None and v != ""} | |
| if custom_fields_cleaned: | |
| lead_cleaned["custom_fields"] = custom_fields_cleaned | |
| else: | |
| lead_cleaned.pop("custom_fields", None) | |
| else: | |
| lead_cleaned.pop("custom_fields", None) | |
| lead.clear() | |
| lead.update(lead_cleaned) | |
| request_data["lead_list"] = [lead for lead in request_data["lead_list"] if lead] | |
| if not request_data["lead_list"]: | |
| raise HTTPException(status_code=400, detail="No valid leads to add.") | |
| if "settings" not in request_data or request_data["settings"] is None: | |
| request_data["settings"] = LeadSettings().dict() | |
| return await call_smartlead_api("POST", f"campaigns/{campaign_id}/leads", data=request_data) | |
| async def add_bulk_leads(campaign_id: int, leads: List[LeadInput]): | |
| """Add multiple leads to a Smartlead campaign with personalized messages (legacy endpoint)""" | |
| request = AddLeadsRequest(lead_list=leads) | |
| return await add_leads_to_campaign(campaign_id, request) | |
| async def resume_lead_by_campaign_id(campaign_id: int, lead_id: int, request: ResumeLeadRequest): | |
| """Resume Lead By Campaign ID""" | |
| return await call_smartlead_api("POST", f"campaigns/{campaign_id}/leads/{lead_id}/resume", data=request.dict()) | |
| async def pause_lead_by_campaign_id(campaign_id: int, lead_id: int): | |
| """Pause Lead By Campaign ID""" | |
| return await call_smartlead_api("POST", f"campaigns/{campaign_id}/leads/{lead_id}/pause") | |
| async def delete_lead_by_campaign_id(campaign_id: int, lead_id: int): | |
| """Delete Lead By Campaign ID""" | |
| return await call_smartlead_api("DELETE", f"campaigns/{campaign_id}/leads/{lead_id}") | |
| async def unsubscribe_lead_from_campaign(campaign_id: int, lead_id: int): | |
| """Unsubscribe/Pause Lead From Campaign""" | |
| return await call_smartlead_api("POST", f"campaigns/{campaign_id}/leads/{lead_id}/unsubscribe") | |
| async def unsubscribe_lead_from_all_campaigns(lead_id: int): | |
| """Unsubscribe Lead From All Campaigns""" | |
| return await call_smartlead_api("POST", f"leads/{lead_id}/unsubscribe") | |
| async def update_lead(lead_id: int, lead_data: Dict[str, Any]): | |
| """Update lead using the Lead ID""" | |
| return await call_smartlead_api("POST", f"leads/{lead_id}", data=lead_data) | |
| async def update_lead_category_by_campaign(campaign_id: int, lead_id: int, request: LeadCategoryUpdateRequest): | |
| """Update a lead's category based on their campaign""" | |
| return await call_smartlead_api("POST", f"campaigns/{campaign_id}/leads/{lead_id}/category", data=request.dict()) | |
| async def add_domain_to_global_block_list(request: DomainBlockListRequest): | |
| """Add Lead/Domain to Global Block List""" | |
| return await call_smartlead_api("POST", "leads/add-domain-block-list", data=request.dict()) | |
| async def fetch_lead_categories(): | |
| """Fetch lead categories""" | |
| return await call_smartlead_api("GET", "leads/fetch-categories") | |
| async def fetch_lead_by_email_address(email: str): | |
| """Fetch lead by email address""" | |
| return await call_smartlead_api("GET", "leads", params={"email": email}) | |
| async def campaigns_for_lead(lead_id: int): | |
| """Fetch all campaigns that a lead belongs to""" | |
| return await call_smartlead_api("GET", f"leads/{lead_id}/campaigns") | |
| async def check_lead_in_campaign(campaign_id: int, email: str): | |
| """Check if a lead exists in a campaign using efficient indexed lookups""" | |
| try: | |
| lead_response = await call_smartlead_api("GET", "leads", params={"email": email}) | |
| if not lead_response or "id" not in lead_response: | |
| return {"exists": False, "message": "Lead not found"} | |
| lead_id = lead_response["id"] | |
| campaigns_response = await call_smartlead_api("GET", f"leads/{lead_id}/campaigns") | |
| if not campaigns_response: | |
| return {"exists": False, "message": "No campaigns found for lead"} | |
| campaign_exists = any(campaign.get("id") == campaign_id for campaign in campaigns_response) | |
| return {"exists": campaign_exists, "message": "Lead found in campaign" if campaign_exists else "Lead not found in campaign"} | |
| except HTTPException as e: | |
| if e.status_code == 404: | |
| return {"exists": False, "message": "Lead not found"} | |
| raise e | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error checking lead in campaign: {str(e)}") | |
| async def export_data_from_campaign(campaign_id: int): | |
| """Export data from a campaign as CSV""" | |
| if SMARTLEAD_API_KEY == "your-api-key-here": | |
| raise HTTPException(status_code=400, detail="Smartlead API key not configured") | |
| url = _get_smartlead_url(f"campaigns/{campaign_id}/leads-export") | |
| params = {"api_key": SMARTLEAD_API_KEY} | |
| try: | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| resp = await client.get(url, params=params) | |
| if resp.status_code >= 400: | |
| try: | |
| error_data = resp.json() | |
| error_message = error_data.get('message', error_data.get('error', 'Unknown error')) | |
| raise HTTPException(status_code=resp.status_code, detail=error_message) | |
| except (ValueError, KeyError): | |
| raise HTTPException(status_code=resp.status_code, detail=resp.text) | |
| return Response( | |
| content=resp.text, | |
| media_type="text/csv", | |
| headers={"Content-Disposition": f"attachment; filename=campaign_{campaign_id}_leads.csv"} | |
| ) | |
| except httpx.TimeoutException: | |
| raise HTTPException(status_code=408, detail="Request to Smartlead API timed out") | |
| except httpx.RequestError as e: | |
| raise HTTPException(status_code=503, detail=f"Failed to connect to Smartlead API: {str(e)}") | |
| # ============================================================================ | |
| # SEQUENCE ENDPOINTS | |
| # ============================================================================ | |
| async def get_campaign_sequences(campaign_id: int): | |
| """Fetch email sequences for a campaign""" | |
| return await call_smartlead_api("GET", f"campaigns/{campaign_id}/sequences") | |
| async def save_campaign_sequences(campaign_id: int, request: SaveSequencesRequest): | |
| """Save Campaign Sequence""" | |
| return await call_smartlead_api("POST", f"campaigns/{campaign_id}/sequences", data=request.dict()) | |
| async def generate_campaign_sequences(campaign_id: int, request: GenerateSequencesRequest): | |
| """Generate Campaign Sequences using LLM""" | |
| job_description = request.job_description | |
| generated_sequences = await generate_sequences_with_llm(job_description) | |
| save_request = SaveSequencesRequest(sequences=generated_sequences) | |
| result = await call_smartlead_api("POST", f"campaigns/{campaign_id}/sequences", data=save_request.dict()) | |
| return { | |
| "ok": True, | |
| "message": "Sequences generated and saved successfully", | |
| "generated_sequences": [seq for seq in generated_sequences], | |
| "save_result": result | |
| } | |
| # ============================================================================ | |
| # WEBHOOK ENDPOINTS | |
| # ============================================================================ | |
| async def fetch_webhooks_by_campaign_id(campaign_id: int): | |
| """Fetch Webhooks By Campaign ID""" | |
| return await call_smartlead_api("GET", f"campaigns/{campaign_id}/webhooks") | |
| async def add_update_campaign_webhook(campaign_id: int, request: WebhookRequest): | |
| """Add / Update Campaign Webhook""" | |
| return await call_smartlead_api("POST", f"campaigns/{campaign_id}/webhooks", data=request.dict()) | |
| async def delete_campaign_webhook(campaign_id: int, request: WebhookDeleteRequest): | |
| """Delete Campaign Webhook""" | |
| return await call_smartlead_api("DELETE", f"campaigns/{campaign_id}/webhooks", data=request.dict()) | |
| # ============================================================================ | |
| # CLIENT MANAGEMENT ENDPOINTS | |
| # ============================================================================ | |
| async def add_client_to_system(request: ClientRequest): | |
| """Add Client To System (Whitelabel or not)""" | |
| return await call_smartlead_api("POST", "client/save", data=request.dict()) | |
| async def fetch_all_clients(): | |
| """Fetch all clients""" | |
| return await call_smartlead_api("GET", "client") | |
| # ============================================================================ | |
| # MESSAGE HISTORY AND REPLY ENDPOINTS | |
| # ============================================================================ | |
| async def fetch_lead_message_history_based_on_campaign(campaign_id: int, lead_id: int): | |
| """Fetch Lead Message History Based On Campaign""" | |
| return await call_smartlead_api("GET", f"campaigns/{campaign_id}/leads/{lead_id}/message-history") | |
| async def reply_to_lead_from_master_inbox(campaign_id: int, request: MessageHistoryRequest): | |
| """Reply To Lead From Master Inbox via API""" | |
| return await call_smartlead_api("POST", f"campaigns/{campaign_id}/reply-email-thread", data=request.dict()) | |
| # ============================================================================ | |
| # EMAIL ACCOUNT ENDPOINTS | |
| # ============================================================================ | |
| async def list_email_accounts(offset: int = 0, limit: int = 100): | |
| """List all email accounts with optional pagination""" | |
| params = {"offset": offset, "limit": limit} | |
| return await call_smartlead_api("GET", "email-accounts", params=params) | |
| async def save_email_account(account: Dict[str, Any]): | |
| """Create an Email Account""" | |
| return await call_smartlead_api("POST", "email-accounts/save", data=account) | |
| async def reconnect_failed_email_accounts(body: Optional[Dict] = {}): | |
| """Reconnect failed email accounts""" | |
| return await call_smartlead_api("POST", "email-accounts/reconnect-failed-email-accounts", data={}) | |
| async def get_email_account(account_id: int): | |
| """Fetch Email Account By ID""" | |
| return await call_smartlead_api("GET", f"email-accounts/{account_id}") | |
| async def update_email_account(account_id: int, payload: Dict[str, Any]): | |
| """Update Email Account""" | |
| return await call_smartlead_api("POST", f"email-accounts/{account_id}", data=payload) | |
| async def set_warmup(account_id: int, payload: WarmupSettingsRequest): | |
| """Add/Update Warmup To Email Account""" | |
| return await call_smartlead_api("POST", f"email-accounts/{account_id}/warmup", data=payload.dict(exclude_none=True)) | |
| async def get_warmup_stats(account_id: int): | |
| """Fetch Warmup Stats By Email Account ID""" | |
| return await call_smartlead_api("GET", f"email-accounts/{account_id}/warmup-stats") | |
| async def list_campaign_email_accounts(campaign_id: int): | |
| """List all email accounts per campaign""" | |
| return await call_smartlead_api("GET", f"campaigns/{campaign_id}/email-accounts") | |
| async def add_campaign_email_accounts(campaign_id: int, payload: Dict[str, Any]): | |
| """Add Email Account To A Campaign""" | |
| return await call_smartlead_api("POST", f"campaigns/{campaign_id}/email-accounts", data=payload) | |
| async def remove_campaign_email_accounts(campaign_id: int, payload: Dict[str, Any]): | |
| """Remove Email Account From A Campaign""" | |
| return await call_smartlead_api("DELETE", f"campaigns/{campaign_id}/email-accounts", data=payload) | |
| # ============================================================================ | |
| # UTILITY ENDPOINTS | |
| # ============================================================================ | |
| async def health_check(): | |
| """Health check endpoint to verify API connectivity""" | |
| try: | |
| campaigns = await call_smartlead_api("GET", "campaigns") | |
| return { | |
| "status": "healthy", | |
| "message": "Smartlead API is accessible", | |
| "campaigns_count": len(campaigns) if isinstance(campaigns, list) else 0, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "unhealthy", | |
| "message": f"Smartlead API connection failed: {str(e)}", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def api_info(): | |
| """Get information about the API and available endpoints""" | |
| return { | |
| "name": "Smartlead API - Complete Integration", | |
| "version": "2.0.0", | |
| "description": "Comprehensive FastAPI wrapper for Smartlead email automation platform", | |
| "base_url": SMARTLEAD_BASE_URL, | |
| "available_endpoints": [ | |
| "Campaign Management", | |
| "Lead Management", | |
| "Sequence Management", | |
| "Webhook Management", | |
| "Client Management", | |
| "Message History & Reply", | |
| "Analytics", | |
| "Email Account Management" | |
| ], | |
| "documentation": "Based on Smartlead API documentation", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # ============================================================================ | |
| # AI SEQUENCE GENERATION FUNCTIONS | |
| # ============================================================================ | |
| async def generate_welcome_closing_messages(lead_data: Dict[str, Any]) -> Dict[str, str]: | |
| class structure(BaseModel): | |
| welcome_message: str = Field(description="Welcome message for the candidate") | |
| closing_message: str = Field(description="Closing message for the candidate") | |
| """Generate personalized welcome and closing messages using LLM based on candidate details""" | |
| if not LANGCHAIN_AVAILABLE: | |
| return generate_template_welcome_closing_messages(lead_data) | |
| try: | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| if not openai_api_key: | |
| print("Warning: OPENAI_API_KEY not set. Using template messages.") | |
| return generate_template_welcome_closing_messages(lead_data) | |
| llm = ChatOpenAI( | |
| model="gpt-4o-mini", | |
| temperature=0.7, | |
| openai_api_key=openai_api_key | |
| ) | |
| str_llm = llm.with_structured_output(structure, method="function_calling") | |
| # Extract relevant information from lead data | |
| first_name = lead_data.get("first_name", "") | |
| last_name = lead_data.get("last_name", "") | |
| company_name = lead_data.get("company_name", "") | |
| location = lead_data.get("location", "") | |
| title = lead_data.get("custom_fields", {}).get("Title", "") | |
| linkedin_profile = lead_data.get("linkedin_profile", "") | |
| # Create a summary of the candidate's background | |
| candidate_info = f""" | |
| Name: {first_name} | |
| Company: {company_name} | |
| Location: {location} | |
| Title: {title} | |
| LinkedIn: {linkedin_profile} | |
| """ | |
| system_prompt = """You are an expert recruiter who creates personalized welcome and closing messages for email campaigns. | |
| Based on the candidate's information, generate: | |
| 1. A personalized welcome message (2-3 sentences) starting with "Hi first_name then change the line using <br> tag and continue with a friendly introduction about their background/company/role. | |
| 2. A personalized closing message (1-2 sentences) | |
| Requirements: | |
| - Professional but friendly tone | |
| - Reference their specific background/company/role when possible | |
| - Keep messages concise and engaging | |
| - Make them feel valued and understood | |
| IMPORTANT: Respond with ONLY valid JSON. No additional text.""" | |
| prompt_template = ChatPromptTemplate.from_messages([ | |
| ("system", system_prompt), | |
| ("human", "Generate personalized messages for this candidate: {candidate_info}") | |
| ]) | |
| messages = prompt_template.format_messages(candidate_info=candidate_info) | |
| response = await str_llm.ainvoke(messages) | |
| try: | |
| return { | |
| "welcome_message": response.welcome_message, | |
| "closing_message": response.closing_message | |
| } | |
| except Exception as parse_error: | |
| print(f"JSON parsing failed for welcome/closing messages: {parse_error}") | |
| return generate_template_welcome_closing_messages(lead_data) | |
| except Exception as e: | |
| print(f"Error generating welcome/closing messages with LLM: {str(e)}") | |
| return generate_template_welcome_closing_messages(lead_data) | |
| def generate_template_welcome_closing_messages(lead_data: Dict[str, Any]) -> Dict[str, str]: | |
| """Generate template-based welcome and closing messages as fallback""" | |
| first_name = lead_data.get("first_name", "") | |
| company_name = lead_data.get("company_name", "") | |
| title = lead_data.get("custom_fields", {}).get("Title", "") | |
| # Personalized welcome message | |
| if first_name and company_name: | |
| welcome_message = f"Hi {first_name}, I came across your profile and was impressed by your work at {company_name}." | |
| elif first_name: | |
| welcome_message = f"Hi {first_name}, I came across your profile and was impressed by your background." | |
| elif company_name: | |
| welcome_message = f"Hi there, I came across your profile and was impressed by your work at {company_name}." | |
| else: | |
| welcome_message = "Hi there, I came across your profile and was impressed by your background." | |
| # Personalized closing message | |
| if first_name: | |
| closing_message = f"Looking forward to connecting with you, {first_name}!" | |
| else: | |
| closing_message = "Looking forward to connecting with you!" | |
| return { | |
| "welcome_message": welcome_message, | |
| "closing_message": closing_message | |
| } | |
| async def generate_sequences_with_llm(job_description: str) -> List[CampaignSequence]: | |
| class email_seq(BaseModel): | |
| subject: str = Field(description="Subject line for the email") | |
| body: str = Field(description="Body of the email") | |
| class structure(BaseModel): | |
| introduction: email_seq = Field(description="Email sequence for sequence 1 asking for consent and interest in the role") | |
| email_sequence_2: email_seq = Field(description="Email sequence for sequence 2 following up on updates and next steps") | |
| email_sequence_3: email_seq = Field(description="Email sequence for sequence 3 Another variant on following up on updates and next steps") | |
| """Generate email sequences using LangChain and OpenAI based on job description""" | |
| if not LANGCHAIN_AVAILABLE: | |
| return await generate_template_sequences(job_description) | |
| try: | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| if not openai_api_key: | |
| print("Warning: OPENAI_API_KEY not set. Using template sequences.") | |
| return await generate_template_sequences(job_description) | |
| llm = ChatOpenAI( | |
| model="gpt-4o-mini", | |
| temperature=0.7, | |
| openai_api_key=openai_api_key | |
| ) | |
| str_llm = llm.with_structured_output(structure, method="function_calling") | |
| system_prompt = """You are an expert email sequence template generator for recruitment campaigns. | |
| Generate ONLY the subject lines and email body content for 3 professional email sequences. | |
| Write the email on behalf of Ali Taghikhani, CEO SRN. | |
| Use the following placeholders in your templates, EXACTLY as written: | |
| - `{{first_name}}` | |
| - `{{company}}` | |
| - `{{title}}` | |
| - `{{Welcome_Message}}` | |
| - `{{Closing_Message}}` | |
| Email Sequence Structure: | |
| 1. INTRODUCTION (Day 1): Start the email with `{{Welcome_Message}}`. Ask for consent and interest in the role. End the email with `{{Closing_Message}}` followed by the sender's name and title. | |
| 2. OUTREACH (Day 3): Provide detailed job information. This is a follow-up, so it should not have a subject line. | |
| 3. FOLLOW-UP (Day 5): Another follow-up on updates and next steps. Also no subject line. | |
| Requirements: | |
| - The first sequence must start with `{{Welcome_Message}}` and end with `{{Closing_Message}}`. | |
| - Always end the email with Best regards | |
| - The second and third sequences are follow-ups and should not have a subject line. | |
| - All emails should be HTML formatted with proper `<br>` tags. | |
| - Professional but friendly tone. | |
| - Include clear call-to-actions. | |
| - Focus on building consent and trust. | |
| IMPORTANT: Respond with ONLY valid JSON. No additional text.""" | |
| prompt_template = ChatPromptTemplate.from_messages([ | |
| ("system", system_prompt), | |
| ("human", "Generate email content for this job description: {job_description}") | |
| ]) | |
| messages = prompt_template.format_messages(job_description=job_description) | |
| response = await str_llm.ainvoke(messages) | |
| try: | |
| # Handle the response structure correctly | |
| sequences = [] | |
| # Sequence 1: Introduction with A/B variants | |
| if hasattr(response, 'introduction') and response.introduction: | |
| # Check if body is a string or dict | |
| intro_body = response.introduction.body | |
| if isinstance(intro_body, dict): | |
| # If it's a dict, extract the content | |
| intro_body = str(intro_body) | |
| sequences.append(CampaignSequence( | |
| seq_number=1, | |
| seq_delay_details=SeqDelayDetails(delay_in_days=1), | |
| seq_variants=[ | |
| SeqVariant( | |
| subject=response.introduction.subject, | |
| email_body=intro_body, | |
| variant_label="A" | |
| ) | |
| ] | |
| )) | |
| # Sequence 2: Outreach | |
| if hasattr(response, 'email_sequence_2') and response.email_sequence_2: | |
| seq2_body = response.email_sequence_2.body | |
| if isinstance(seq2_body, dict): | |
| seq2_body = str(seq2_body) | |
| sequences.append(CampaignSequence( | |
| seq_number=2, | |
| seq_delay_details=SeqDelayDetails(delay_in_days=3), | |
| subject="", | |
| email_body=seq2_body | |
| )) | |
| # Sequence 3: Follow-up | |
| if hasattr(response, 'email_sequence_3') and response.email_sequence_3: | |
| seq3_body = response.email_sequence_3.body | |
| if isinstance(seq3_body, dict): | |
| seq3_body = str(seq3_body) | |
| sequences.append(CampaignSequence( | |
| seq_number=3, | |
| seq_delay_details=SeqDelayDetails(delay_in_days=5), | |
| subject="", | |
| email_body=seq3_body | |
| )) | |
| # Fill with templates if needed | |
| while len(sequences) < 3: | |
| if len(sequences) == 0: | |
| sequences.append(CampaignSequence( | |
| seq_number=1, | |
| seq_delay_details=SeqDelayDetails(delay_in_days=1), | |
| seq_variants=[ | |
| SeqVariant( | |
| subject=f"Quick question about {job_description}", | |
| email_body=f"""<p>Hi there,<br><br> | |
| I came across your profile and noticed your experience in {job_description}. | |
| I'm reaching out because we have some exciting opportunities that might be a great fit for your background.<br><br> | |
| Before I share more details, I wanted to ask: Are you currently open to exploring new opportunities in this space?<br><br> | |
| Would you be interested in hearing more about the roles we have available?<br><br> | |
| Best regards,<br> | |
| [Your Name]</p>""", | |
| variant_label="A" | |
| ), | |
| SeqVariant( | |
| subject=f"Interested in {job_description} opportunities?", | |
| email_body=f"""<p>Hello,<br><br> | |
| I hope this message finds you well. I'm a recruiter specializing in {job_description} positions.<br><br> | |
| I'd love to connect and share some opportunities that align with your expertise. | |
| Are you currently open to exploring new roles in this space?<br><br> | |
| If so, I can send you specific details about the positions we have available.<br><br> | |
| Thanks,<br> | |
| [Your Name]</p>""", | |
| variant_label="B" | |
| ) | |
| ] | |
| )) | |
| elif len(sequences) == 1: | |
| sequences.append(CampaignSequence( | |
| seq_number=2, | |
| seq_delay_details=SeqDelayDetails(delay_in_days=3), | |
| subject="", | |
| email_body=f"""<p>Hi,<br><br> | |
| Thanks for your interest! Here are more details about the {job_description} opportunities:<br><br> | |
| <strong>Role Details:</strong><br> | |
| • [Specific responsibilities]<br> | |
| • [Required skills and experience]<br> | |
| • [Team and company information]<br><br> | |
| <strong>Benefits:</strong><br> | |
| • [Compensation and benefits]<br> | |
| • [Growth opportunities]<br> | |
| • [Work environment]<br><br> | |
| Would you be interested in a quick call to discuss this role in more detail?<br><br> | |
| Best regards,<br> | |
| [Your Name]</p>""" | |
| )) | |
| elif len(sequences) == 2: | |
| sequences.append(CampaignSequence( | |
| seq_number=3, | |
| seq_delay_details=SeqDelayDetails(delay_in_days=5), | |
| subject="", | |
| email_body=f"""<p>Hi,<br><br> | |
| Just wanted to follow up on the {job_description} opportunity I shared.<br><br> | |
| Have you had a chance to review the information? I'd love to hear your thoughts and answer any questions.<br><br> | |
| If you're interested, I can help schedule next steps. If not, no worries at all!<br><br> | |
| Thanks for your time!<br> | |
| [Your Name]</p>""" | |
| )) | |
| return sequences | |
| except Exception as parse_error: | |
| print(f"JSON parsing failed: {parse_error}") | |
| return await generate_template_sequences(job_description) | |
| except Exception as e: | |
| print(f"Error generating sequences with LLM: {str(e)}") | |
| return await generate_template_sequences(job_description) | |
| def create_sequences_from_content(content: dict, job_description: str) -> List[CampaignSequence]: | |
| """Create CampaignSequence objects from parsed LLM content""" | |
| sequences = [] | |
| # Sequence 1: Introduction with A/B variants | |
| if "sequence1_variant_a" in content and "sequence1_variant_b" in content: | |
| variants = [] | |
| if "sequence1_variant_a" in content: | |
| var_a = content["sequence1_variant_a"] | |
| variants.append(SeqVariant( | |
| subject=var_a.get("subject", f"Quick question about {job_description}"), | |
| email_body=var_a.get("body", ""), | |
| variant_label="A" | |
| )) | |
| if "sequence1_variant_b" in content: | |
| var_b = content["sequence1_variant_b"] | |
| variants.append(SeqVariant( | |
| subject=var_b.get("subject", f"Interested in {job_description} opportunities?"), | |
| email_body=var_b.get("body", ""), | |
| variant_label="B" | |
| )) | |
| sequences.append(CampaignSequence( | |
| seq_number=1, | |
| seq_delay_details=SeqDelayDetails(delay_in_days=1), | |
| seq_variants=variants | |
| )) | |
| # Sequence 2: Outreach | |
| if "sequence2" in content: | |
| seq2_body = content["sequence2"].get("body", "") | |
| sequences.append(CampaignSequence( | |
| seq_number=2, | |
| seq_delay_details=SeqDelayDetails(delay_in_days=3), | |
| subject="", | |
| email_body=seq2_body | |
| )) | |
| # Sequence 3: Follow-up | |
| if "sequence3" in content: | |
| seq3_body = content["sequence3"].get("body", "") | |
| sequences.append(CampaignSequence( | |
| seq_number=3, | |
| seq_delay_details=SeqDelayDetails(delay_in_days=5), | |
| subject="", | |
| email_body=seq3_body | |
| )) | |
| # Fill with templates if needed | |
| while len(sequences) < 3: | |
| if len(sequences) == 0: | |
| sequences.append(CampaignSequence( | |
| seq_number=1, | |
| seq_delay_details=SeqDelayDetails(delay_in_days=1), | |
| seq_variants=[ | |
| SeqVariant( | |
| subject=f"Quick question about {job_description}", | |
| email_body=f"""<p>Hi there,<br><br> | |
| I came across your profile and noticed your experience in {job_description}. | |
| I'm reaching out because we have some exciting opportunities that might be a great fit for your background.<br><br> | |
| Before I share more details, I wanted to ask: Are you currently open to exploring new opportunities in this space?<br><br> | |
| Would you be interested in hearing more about the roles we have available?<br><br> | |
| Best regards,<br> | |
| [Your Name]</p>""", | |
| variant_label="A" | |
| ), | |
| SeqVariant( | |
| subject=f"Interested in {job_description} opportunities?", | |
| email_body=f"""<p>Hello,<br><br> | |
| I hope this message finds you well. I'm a recruiter specializing in {job_description} positions.<br><br> | |
| I'd love to connect and share some opportunities that align with your expertise. | |
| Are you currently open to exploring new roles in this space?<br><br> | |
| If so, I can send you specific details about the positions we have available.<br><br> | |
| Thanks,<br> | |
| [Your Name]</p>""", | |
| variant_label="B" | |
| ) | |
| ] | |
| )) | |
| elif len(sequences) == 1: | |
| sequences.append(CampaignSequence( | |
| seq_number=2, | |
| seq_delay_details=SeqDelayDetails(delay_in_days=3), | |
| subject="", | |
| email_body=f"""<p>Hi,<br><br> | |
| Thanks for your interest! Here are more details about the {job_description} opportunities:<br><br> | |
| <strong>Role Details:</strong><br> | |
| • [Specific responsibilities]<br> | |
| • [Required skills and experience]<br> | |
| • [Team and company information]<br><br> | |
| <strong>Benefits:</strong><br> | |
| • [Compensation and benefits]<br> | |
| • [Growth opportunities]<br> | |
| • [Work environment]<br><br> | |
| Would you be interested in a quick call to discuss this role in more detail?<br><br> | |
| Best regards,<br> | |
| [Your Name]</p>""" | |
| )) | |
| elif len(sequences) == 2: | |
| sequences.append(CampaignSequence( | |
| seq_number=3, | |
| seq_delay_details=SeqDelayDetails(delay_in_days=5), | |
| subject="", | |
| email_body=f"""<p>Hi,<br><br> | |
| Just wanted to follow up on the {job_description} opportunity I shared.<br><br> | |
| Have you had a chance to review the information? I'd love to hear your thoughts and answer any questions.<br><br> | |
| If you're interested, I can help schedule next steps. If not, no worries at all!<br><br> | |
| Thanks for your time!<br> | |
| [Your Name]</p>""" | |
| )) | |
| return sequences | |
| async def generate_template_sequences(job_description: str) -> List[CampaignSequence]: | |
| """Generate template-based sequences as fallback""" | |
| sequences = [ | |
| CampaignSequence( | |
| seq_number=1, | |
| seq_delay_details=SeqDelayDetails(delay_in_days=1), | |
| seq_variants=[ | |
| SeqVariant( | |
| subject=f"Quick question about {job_description}", | |
| email_body=f"""<p>Hi there,<br><br> | |
| I came across your profile and noticed your experience in {job_description}. | |
| I'm reaching out because we have some exciting opportunities that might be a great fit for your background.<br><br> | |
| Before I share more details, I wanted to ask: Are you currently open to exploring new opportunities in this space?<br><br> | |
| Would you be interested in hearing more about the roles we have available?<br><br> | |
| Best regards,<br> | |
| [Your Name]</p>""", | |
| variant_label="A" | |
| ), | |
| SeqVariant( | |
| subject=f"Interested in {job_description} opportunities?", | |
| email_body=f"""<p>Hello,<br><br> | |
| I hope this message finds you well. I'm a recruiter specializing in {job_description} positions.<br><br> | |
| I'd love to connect and share some opportunities that align with your expertise. | |
| Are you currently open to exploring new roles in this space?<br><br> | |
| If so, I can send you specific details about the positions we have available.<br><br> | |
| Thanks,<br> | |
| [Your Name]</p>""", | |
| variant_label="B" | |
| ) | |
| ] | |
| ), | |
| CampaignSequence( | |
| seq_number=2, | |
| seq_delay_details=SeqDelayDetails(delay_in_days=3), | |
| subject="", | |
| email_body=f"""<p>Hi,<br><br> | |
| Thanks for your interest! Here are more details about the {job_description} opportunities:<br><br> | |
| <strong>Role Details:</strong><br> | |
| • [Specific responsibilities]<br> | |
| • [Required skills and experience]<br> | |
| • [Team and company information]<br><br> | |
| <strong>Benefits:</strong><br> | |
| • [Compensation and benefits]<br> | |
| • [Growth opportunities]<br> | |
| • [Work environment]<br><br> | |
| Would you be interested in a quick call to discuss this role in more detail?<br><br> | |
| Best regards,<br> | |
| [Your Name]</p>""" | |
| ), | |
| CampaignSequence( | |
| seq_number=3, | |
| seq_delay_details=SeqDelayDetails(delay_in_days=5), | |
| subject="", | |
| email_body=f"""<p>Hi,<br><br> | |
| Just wanted to follow up on the {job_description} opportunity I shared.<br><br> | |
| Have you had a chance to review the information? I'd love to hear your thoughts and answer any questions.<br><br> | |
| If you're interested, I can help schedule next steps. If not, no worries at all!<br><br> | |
| Thanks for your time!<br> | |
| [Your Name]</p>""" | |
| ) | |
| ] | |
| return sequences | |
| # ============================================================================ | |
| # RATE LIMITING MIDDLEWARE | |
| # ============================================================================ | |
| class RateLimiter: | |
| def __init__(self, max_requests: int = 10, window_seconds: int = 2): | |
| self.max_requests = max_requests | |
| self.window_seconds = window_seconds | |
| self.requests = [] | |
| def is_allowed(self) -> bool: | |
| now = time.time() | |
| # Remove old requests outside the window | |
| self.requests = [req_time for req_time in self.requests if now - req_time < self.window_seconds] | |
| if len(self.requests) >= self.max_requests: | |
| return False | |
| self.requests.append(now) | |
| return True | |
| # Global rate limiter instance | |
| rate_limiter = RateLimiter(max_requests=10, window_seconds=2) | |
| async def rate_limit_middleware(request: Request, call_next): | |
| """Rate limiting middleware to respect Smartlead's API limits""" | |
| if not rate_limiter.is_allowed(): | |
| return JSONResponse( | |
| status_code=429, | |
| content={ | |
| "error": "Rate limit exceeded", | |
| "message": "Too many requests. Please wait before making another request.", | |
| "retry_after": 2 | |
| } | |
| ) | |
| response = await call_next(request) | |
| return response | |
| # ============================================================================ | |
| # ERROR HANDLING | |
| # ============================================================================ | |
| async def http_exception_handler(request: Request, exc: HTTPException): | |
| """Custom HTTP exception handler""" | |
| return JSONResponse( | |
| status_code=exc.status_code, | |
| content={ | |
| "error": True, | |
| "message": exc.detail, | |
| "status_code": exc.status_code, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| ) | |
| async def general_exception_handler(request: Request, exc: Exception): | |
| """General exception handler""" | |
| return JSONResponse( | |
| status_code=500, | |
| content={ | |
| "error": True, | |
| "message": "Internal server error", | |
| "detail": str(exc) if os.getenv("DEBUG", "false").lower() == "true" else "An unexpected error occurred", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| ) | |
| # ============================================================================ | |
| # CUSTOM OPENAPI SCHEMA | |
| # ============================================================================ | |
| def custom_openapi(): | |
| if app.openapi_schema: | |
| return app.openapi_schema | |
| openapi_schema = get_openapi( | |
| title="Smartlead API - Complete Integration", | |
| version="2.0.0", | |
| description=""" | |
| # Smartlead API - Complete Integration | |
| A comprehensive FastAPI wrapper for the Smartlead email automation platform. | |
| ## Features | |
| - **Campaign Management**: Create, update, and manage email campaigns | |
| - **Lead Management**: Add, update, and manage leads across campaigns with AI-powered personalization | |
| - **Sequence Management**: Create and manage email sequences with AI generation | |
| - **Webhook Management**: Set up webhooks for real-time notifications | |
| - **Analytics**: Get detailed campaign analytics and statistics | |
| - **Email Account Management**: Manage email accounts and warmup | |
| - **Client Management**: Handle client accounts and permissions | |
| ## AI-Powered Personalization | |
| When adding leads to campaigns, the API automatically generates personalized welcome and closing messages using LLM (Language Model) based on candidate details. These messages are added to the custom_fields as: | |
| - `Welcome_Message`: Personalized greeting based on candidate's background | |
| - `Closing_Message`: Personalized closing statement | |
| ## Lead Schema | |
| The lead schema supports the following structure: | |
| ```json | |
| { | |
| "lead_list": [ | |
| { | |
| "first_name": "Cristiano", | |
| "last_name": "Ronaldo", | |
| "email": "cristiano@mufc.com", | |
| "phone_number": "0239392029", | |
| "company_name": "Manchester United", | |
| "website": "mufc.com", | |
| "location": "London", | |
| "custom_fields": { | |
| "Title": "Regional Manager", | |
| "First_Line": "Loved your recent post about remote work on Linkedin" | |
| }, | |
| "linkedin_profile": "http://www.linkedin.com/in/cristianoronaldo", | |
| "company_url": "mufc.com" | |
| } | |
| ], | |
| "settings": { | |
| "ignore_global_block_list": true, | |
| "ignore_unsubscribe_list": true, | |
| "ignore_duplicate_leads_in_other_campaign": false | |
| } | |
| } | |
| ``` | |
| ## Authentication | |
| All requests require a Smartlead API key passed as a query parameter: `?api_key=YOUR_API_KEY` | |
| ## Rate Limits | |
| - 10 requests per 2 seconds (enforced automatically) | |
| ## Base URL | |
| - Smartlead API: `https://server.smartlead.ai/api/v1` | |
| """, | |
| routes=app.routes, | |
| ) | |
| # Add custom tags | |
| openapi_schema["tags"] = [ | |
| {"name": "Campaigns", "description": "Campaign management operations"}, | |
| {"name": "Leads", "description": "Lead management operations"}, | |
| {"name": "Sequences", "description": "Email sequence management"}, | |
| {"name": "Webhooks", "description": "Webhook management"}, | |
| {"name": "Clients", "description": "Client account management"}, | |
| {"name": "Messages", "description": "Message history and reply operations"}, | |
| {"name": "Analytics", "description": "Campaign analytics and statistics"}, | |
| {"name": "Email Accounts", "description": "Email account management"}, | |
| {"name": "Utilities", "description": "Utility endpoints"} | |
| ] | |
| app.openapi_schema = openapi_schema | |
| return app.openapi_schema | |
| app.openapi = custom_openapi | |
| # ============================================================================ | |
| # MAIN APPLICATION ENTRY POINT | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| import uvicorn | |
| print("�� Starting Smartlead API - Complete Integration") | |
| print(f"�� API Documentation: http://localhost:8000/docs") | |
| print(f"📖 ReDoc Documentation: http://localhost:8000/redoc") | |
| print(f"�� Smartlead Base URL: {SMARTLEAD_BASE_URL}") | |
| print(f"⚡ Rate Limit: 10 requests per 2 seconds") | |
| uvicorn.run( | |
| "updated-final:app", | |
| host="0.0.0.0", | |
| port=8000, | |
| reload=True, | |
| log_level="info" | |
| ) |