Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import HTMLResponse, FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| from simple_salesforce import Salesforce | |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
| import sqlite3 | |
| import os | |
| import logging | |
| from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type | |
| from dotenv import load_dotenv | |
| # Load environment variables from .env file (for local development) | |
| load_dotenv() | |
| # Set up logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI() | |
| # Mount static files | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| # Set up templates | |
| templates = Jinja2Templates(directory="templates") | |
| # Initialize SQLite database | |
| def init_db(): | |
| try: | |
| conn = sqlite3.connect("cases.db") | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS cases ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT NOT NULL, | |
| mobile_number TEXT, | |
| email TEXT NOT NULL, | |
| issue_description TEXT NOT NULL, | |
| synced INTEGER DEFAULT 0 | |
| ) | |
| """) | |
| conn.commit() | |
| conn.close() | |
| logger.debug("SQLite database initialized successfully.") | |
| except Exception as e: | |
| logger.error(f"Error initializing SQLite database: {e}") | |
| raise | |
| # Run database initialization on startup | |
| init_db() | |
| # Salesforce connection (using environment variables) | |
| def get_salesforce_connection(): | |
| try: | |
| username = os.getenv("SFDC_USERNAME") | |
| password = os.getenv("SFDC_PASSWORD") | |
| security_token = os.getenv("SFDC_SECURITY_TOKEN") | |
| domain = os.getenv("SFDC_DOMAIN", "login") | |
| if not username or not password or not security_token: | |
| logger.error("Missing Salesforce credentials.") | |
| return None | |
| sf = Salesforce( | |
| username=username, | |
| password=password, | |
| security_token=security_token, | |
| domain=domain | |
| ) | |
| logger.debug("Salesforce connection successful.") | |
| return sf | |
| except Exception as e: | |
| logger.error(f"Error connecting to Salesforce: {e}") | |
| return None | |
| # Sync function with retry logic | |
| async def sync_to_salesforce(): | |
| try: | |
| sf = get_salesforce_connection() | |
| if not sf: | |
| logger.error("Cannot sync: Salesforce connection failed.") | |
| return | |
| conn = sqlite3.connect("cases.db") | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT * FROM cases WHERE synced = 0") | |
| cases = cursor.fetchall() | |
| for case in cases: | |
| case_id, name, mobile_number, email, issue_description, _ = case | |
| case_data = { | |
| "Name": name, | |
| "Mobile_Number__c": mobile_number, | |
| "Email__c": email, | |
| "Issue_Description__c": issue_description | |
| } | |
| sf.Case__c.create(case_data) | |
| cursor.execute("UPDATE cases SET synced = 1 WHERE id = ?", (case_id,)) | |
| conn.commit() | |
| logger.debug(f"Synced case ID {case_id} to Salesforce.") | |
| conn.close() | |
| except Exception as e: | |
| logger.error(f"Error syncing to Salesforce: {str(e)}") | |
| raise | |
| # Set up scheduler | |
| scheduler = AsyncIOScheduler() | |
| scheduler.add_job(sync_to_salesforce, "interval", hours=4) | |
| async def startup_event(): | |
| scheduler.start() | |
| logger.debug("Scheduler started successfully.") | |
| async def shutdown_event(): | |
| scheduler.shutdown() | |
| logger.debug("Scheduler shut down successfully.") | |
| async def index(request: Request): | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| async def serve_static(filename: str): | |
| file_path = os.path.join("static", filename) | |
| if not os.path.exists(file_path): | |
| raise HTTPException(status_code=404, detail="File not found") | |
| return FileResponse(file_path) | |
| async def submit_case(request: Request): | |
| try: | |
| data = await request.json() | |
| logger.debug(f"Received form data: {data}") | |
| # Extract form data with default values and strip whitespace | |
| name = data.get("name", "").strip() | |
| mobile_number = data.get("mobileNumber", "").strip() | |
| email = data.get("email", "").strip() | |
| issue_description = data.get("issueDescription", "").strip() | |
| # Validate required fields with specific error messages | |
| if not name: | |
| logger.error("Missing 'name' field in form data.") | |
| raise HTTPException(status_code=400, detail="Name is required") | |
| if not email: | |
| logger.error("Missing 'email' field in form data.") | |
| raise HTTPException(status_code=400, detail="Email is required") | |
| if not issue_description: | |
| logger.error("Missing 'issueDescription' field in form data.") | |
| raise HTTPException(status_code=400, detail="Issue description is required") | |
| # Basic email validation | |
| if "@" not in email or "." not in email: | |
| logger.error("Invalid email format.") | |
| raise HTTPException(status_code=400, detail="Invalid email format") | |
| # Store in SQLite | |
| conn = sqlite3.connect("cases.db") | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| """ | |
| INSERT INTO cases (name, mobile_number, email, issue_description) | |
| VALUES (?, ?, ?, ?) | |
| """, | |
| (name, mobile_number, email, issue_description) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| logger.debug("Data successfully stored in SQLite.") | |
| # Immediately trigger sync | |
| await sync_to_salesforce() | |
| return {"success": True} | |
| except Exception as e: | |
| logger.error(f"Error processing request: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def debug_cases(): | |
| try: | |
| conn = sqlite3.connect("cases.db") | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT * FROM cases") | |
| cases = cursor.fetchall() | |
| conn.close() | |
| return {"cases": cases} | |
| except Exception as e: | |
| logger.error(f"Error retrieving cases: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |