from fastapi import FastAPI, Request, HTTPException from fastapi.responses import HTMLResponse, FileResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from simple_salesforce import Salesforce from apscheduler.schedulers.asyncio import AsyncIOScheduler import sqlite3 import os import logging from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type from dotenv import load_dotenv # Load environment variables from .env file (for local development) load_dotenv() # Set up logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) app = FastAPI() # Mount static files app.mount("/static", StaticFiles(directory="static"), name="static") # Set up templates templates = Jinja2Templates(directory="templates") # Initialize SQLite database def init_db(): try: conn = sqlite3.connect("cases.db") cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS cases ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, mobile_number TEXT, email TEXT NOT NULL, issue_description TEXT NOT NULL, synced INTEGER DEFAULT 0 ) """) conn.commit() conn.close() logger.debug("SQLite database initialized successfully.") except Exception as e: logger.error(f"Error initializing SQLite database: {e}") raise # Run database initialization on startup init_db() # Salesforce connection (using environment variables) def get_salesforce_connection(): try: username = os.getenv("SFDC_USERNAME") password = os.getenv("SFDC_PASSWORD") security_token = os.getenv("SFDC_SECURITY_TOKEN") domain = os.getenv("SFDC_DOMAIN", "login") if not username or not password or not security_token: logger.error("Missing Salesforce credentials.") return None sf = Salesforce( username=username, password=password, security_token=security_token, domain=domain ) logger.debug("Salesforce connection successful.") return sf except Exception as e: logger.error(f"Error connecting to Salesforce: {e}") return None # Sync function with retry logic @retry( stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(Exception), before_sleep=lambda retry_state: logger.debug(f"Retrying Salesforce sync, attempt {retry_state.attempt_number}") ) async def sync_to_salesforce(): try: sf = get_salesforce_connection() if not sf: logger.error("Cannot sync: Salesforce connection failed.") return conn = sqlite3.connect("cases.db") cursor = conn.cursor() cursor.execute("SELECT * FROM cases WHERE synced = 0") cases = cursor.fetchall() for case in cases: case_id, name, mobile_number, email, issue_description, _ = case case_data = { "Name": name, "Mobile_Number__c": mobile_number, "Email__c": email, "Issue_Description__c": issue_description } sf.Case__c.create(case_data) cursor.execute("UPDATE cases SET synced = 1 WHERE id = ?", (case_id,)) conn.commit() logger.debug(f"Synced case ID {case_id} to Salesforce.") conn.close() except Exception as e: logger.error(f"Error syncing to Salesforce: {str(e)}") raise # Set up scheduler scheduler = AsyncIOScheduler() scheduler.add_job(sync_to_salesforce, "interval", hours=4) @app.on_event("startup") async def startup_event(): scheduler.start() logger.debug("Scheduler started successfully.") @app.on_event("shutdown") async def shutdown_event(): scheduler.shutdown() logger.debug("Scheduler shut down successfully.") @app.get("/", response_class=HTMLResponse) async def index(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.get("/static/{filename}") async def serve_static(filename: str): file_path = os.path.join("static", filename) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="File not found") return FileResponse(file_path) @app.post("/submit-case") async def submit_case(request: Request): try: data = await request.json() logger.debug(f"Received form data: {data}") # Extract form data with default values and strip whitespace name = data.get("name", "").strip() mobile_number = data.get("mobileNumber", "").strip() email = data.get("email", "").strip() issue_description = data.get("issueDescription", "").strip() # Validate required fields with specific error messages if not name: logger.error("Missing 'name' field in form data.") raise HTTPException(status_code=400, detail="Name is required") if not email: logger.error("Missing 'email' field in form data.") raise HTTPException(status_code=400, detail="Email is required") if not issue_description: logger.error("Missing 'issueDescription' field in form data.") raise HTTPException(status_code=400, detail="Issue description is required") # Basic email validation if "@" not in email or "." not in email: logger.error("Invalid email format.") raise HTTPException(status_code=400, detail="Invalid email format") # Store in SQLite conn = sqlite3.connect("cases.db") cursor = conn.cursor() cursor.execute( """ INSERT INTO cases (name, mobile_number, email, issue_description) VALUES (?, ?, ?, ?) """, (name, mobile_number, email, issue_description) ) conn.commit() conn.close() logger.debug("Data successfully stored in SQLite.") # Immediately trigger sync await sync_to_salesforce() return {"success": True} except Exception as e: logger.error(f"Error processing request: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/debug-cases") async def debug_cases(): try: conn = sqlite3.connect("cases.db") cursor = conn.cursor() cursor.execute("SELECT * FROM cases") cases = cursor.fetchall() conn.close() return {"cases": cases} except Exception as e: logger.error(f"Error retrieving cases: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)