chatbot3 / app.py
jithenderchoudary's picture
Create app.py
8a21dba verified
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import HTMLResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from simple_salesforce import Salesforce
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import sqlite3
import os
import logging
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
from dotenv import load_dotenv
# Load environment variables from .env file (for local development)
load_dotenv()
# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
app = FastAPI()
# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static")
# Set up templates
templates = Jinja2Templates(directory="templates")
# Initialize SQLite database
def init_db():
try:
conn = sqlite3.connect("cases.db")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS cases (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
mobile_number TEXT,
email TEXT NOT NULL,
issue_description TEXT NOT NULL,
synced INTEGER DEFAULT 0
)
""")
conn.commit()
conn.close()
logger.debug("SQLite database initialized successfully.")
except Exception as e:
logger.error(f"Error initializing SQLite database: {e}")
raise
# Run database initialization on startup
init_db()
# Salesforce connection (using environment variables)
def get_salesforce_connection():
try:
username = os.getenv("SFDC_USERNAME")
password = os.getenv("SFDC_PASSWORD")
security_token = os.getenv("SFDC_SECURITY_TOKEN")
domain = os.getenv("SFDC_DOMAIN", "login")
if not username or not password or not security_token:
logger.error("Missing Salesforce credentials.")
return None
sf = Salesforce(
username=username,
password=password,
security_token=security_token,
domain=domain
)
logger.debug("Salesforce connection successful.")
return sf
except Exception as e:
logger.error(f"Error connecting to Salesforce: {e}")
return None
# Sync function with retry logic
@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(2),
retry=retry_if_exception_type(Exception),
before_sleep=lambda retry_state: logger.debug(f"Retrying Salesforce sync, attempt {retry_state.attempt_number}")
)
async def sync_to_salesforce():
try:
sf = get_salesforce_connection()
if not sf:
logger.error("Cannot sync: Salesforce connection failed.")
return
conn = sqlite3.connect("cases.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM cases WHERE synced = 0")
cases = cursor.fetchall()
for case in cases:
case_id, name, mobile_number, email, issue_description, _ = case
case_data = {
"Name": name,
"Mobile_Number__c": mobile_number,
"Email__c": email,
"Issue_Description__c": issue_description
}
sf.Case__c.create(case_data)
cursor.execute("UPDATE cases SET synced = 1 WHERE id = ?", (case_id,))
conn.commit()
logger.debug(f"Synced case ID {case_id} to Salesforce.")
conn.close()
except Exception as e:
logger.error(f"Error syncing to Salesforce: {str(e)}")
raise
# Set up scheduler
scheduler = AsyncIOScheduler()
scheduler.add_job(sync_to_salesforce, "interval", hours=4)
@app.on_event("startup")
async def startup_event():
scheduler.start()
logger.debug("Scheduler started successfully.")
@app.on_event("shutdown")
async def shutdown_event():
scheduler.shutdown()
logger.debug("Scheduler shut down successfully.")
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/static/{filename}")
async def serve_static(filename: str):
file_path = os.path.join("static", filename)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(file_path)
@app.post("/submit-case")
async def submit_case(request: Request):
try:
data = await request.json()
logger.debug(f"Received form data: {data}")
# Extract form data with default values and strip whitespace
name = data.get("name", "").strip()
mobile_number = data.get("mobileNumber", "").strip()
email = data.get("email", "").strip()
issue_description = data.get("issueDescription", "").strip()
# Validate required fields with specific error messages
if not name:
logger.error("Missing 'name' field in form data.")
raise HTTPException(status_code=400, detail="Name is required")
if not email:
logger.error("Missing 'email' field in form data.")
raise HTTPException(status_code=400, detail="Email is required")
if not issue_description:
logger.error("Missing 'issueDescription' field in form data.")
raise HTTPException(status_code=400, detail="Issue description is required")
# Basic email validation
if "@" not in email or "." not in email:
logger.error("Invalid email format.")
raise HTTPException(status_code=400, detail="Invalid email format")
# Store in SQLite
conn = sqlite3.connect("cases.db")
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO cases (name, mobile_number, email, issue_description)
VALUES (?, ?, ?, ?)
""",
(name, mobile_number, email, issue_description)
)
conn.commit()
conn.close()
logger.debug("Data successfully stored in SQLite.")
# Immediately trigger sync
await sync_to_salesforce()
return {"success": True}
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/debug-cases")
async def debug_cases():
try:
conn = sqlite3.connect("cases.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM cases")
cases = cursor.fetchall()
conn.close()
return {"cases": cases}
except Exception as e:
logger.error(f"Error retrieving cases: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)