stage-backend / main.py
github-actions[bot]
Sync backend folder from GitHub
3d72cba
Raw
History Blame Contribute Delete
27.4 kB
import os
import secrets
import base64
import json
import bcrypt
import httpx
import hashlib
import smtplib
import html
import mimetypes
from email.message import EmailMessage
from urllib.parse import quote
from fastapi import FastAPI, HTTPException, status, Request, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, EmailStr, Field, validator
from typing import List, Optional
from dotenv import load_dotenv
from database import execute_query
from storage import upload_base64_file
load_dotenv()
app = FastAPI(title="E-Citizen Backend API")
@app.on_event("startup")
def startup_db_patch():
try:
execute_query("ALTER TABLE users MODIFY COLUMN role ENUM('superadmin', 'admin', 'agent', 'citizen') DEFAULT 'citizen'")
except Exception as e:
print(f"DB patch note: {e}")
ALLOWED_ORIGINS = [
origin.strip()
for origin in os.environ.get("ALLOWED_ORIGINS", "https://citizen-app-three.vercel.app").split(",")
if origin.strip()
]
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT"],
allow_headers=["Content-Type", "Authorization", "X-Admin-Token"],
)
ALLOWED_FILE_EXTENSIONS = {".pdf", ".jpg", ".jpeg", ".png"}
MAX_FILE_SIZE_BYTES = 5 * 1024 * 1024 # 5 MB
CACHE_TTL_COMPLAINT = 120
CACHE_TTL_STATS = 60
CACHE_TTL_LIST = 30
CACHE_TTL_VERIFY = 600
def _redis_headers() -> dict:
token = os.environ.get("UPSTASH_REDIS_REST_TOKEN", "")
return {"Authorization": f"Bearer {token}"}
def _redis_base() -> str:
return os.environ.get("UPSTASH_REDIS_REST_URL", "")
import time
_memory_cache = {}
def cache_get(key: str) -> Optional[str]:
base = _redis_base()
if base:
try:
encoded_key = quote(key, safe="")
resp = httpx.get(f"{base}/GET/{encoded_key}", headers=_redis_headers(), timeout=2)
if resp.status_code == 200:
return resp.json().get("result")
except Exception as e:
print(f"Redis cache_get error: {e}")
# Fallback to memory
if key in _memory_cache:
val, expiry = _memory_cache[key]
if time.time() < expiry:
return val
else:
del _memory_cache[key]
return None
def cache_set(key: str, value: str, ttl: int) -> None:
base = _redis_base()
if base:
try:
encoded_key = quote(key, safe="")
encoded_value = quote(value, safe="")
httpx.get(f"{base}/SET/{encoded_key}/{encoded_value}/EX/{ttl}", headers=_redis_headers(), timeout=2)
return
except Exception as e:
print(f"Redis cache_set error: {e}")
# Fallback to memory
_memory_cache[key] = (value, time.time() + ttl)
def cache_delete(*keys: str) -> None:
base = _redis_base()
if base:
for key in keys:
try:
encoded_key = quote(key, safe="")
httpx.get(f"{base}/DEL/{encoded_key}", headers=_redis_headers(), timeout=2)
except Exception as e:
print(f"Redis cache_delete error ({key}): {e}")
# Fallback to memory
for key in keys:
_memory_cache.pop(key, None)
def hash_code(code: str) -> str:
return hashlib.sha256(code.strip().encode("utf-8")).hexdigest()
def store_verification_code(purpose: str, email: str, code: str) -> None:
cache_set(f"verify:{purpose}:{email.strip().lower()}", hash_code(code), CACHE_TTL_VERIFY)
def verify_verification_code(purpose: str, email: str, code: str) -> bool:
stored_hash = cache_get(f"verify:{purpose}:{email.strip().lower()}")
if not stored_hash:
return False
return secrets.compare_digest(stored_hash, hash_code(code))
def delete_verification_code(purpose: str, email: str) -> None:
cache_delete(f"verify:{purpose}:{email.strip().lower()}")
def build_verification_email(full_name: str, code: str, context: str) -> str:
safe_name = html.escape(full_name)
safe_context = html.escape(context)
safe_code = html.escape(code)
return f"""
<div style="font-family:Arial,sans-serif;line-height:1.6;color:#083D30">
<h2 style="margin:0 0 12px">Verification de votre compte E-CITIZEN</h2>
<p>Bonjour {safe_name},</p>
<p>Utilisez le code suivant pour confirmer {safe_context} :</p>
<div style="font-size:28px;font-weight:800;letter-spacing:6px;background:#FAF8F2;border:1px solid #D4AF37;padding:14px 18px;width:max-content">
{safe_code}
</div>
<p>Ce code expire dans 10 minutes.</p>
</div>
"""
def send_email(to_email: str, subject: str, html: str) -> bool:
smtp_user = os.environ.get("GMAIL_USER", "")
smtp_pass = os.environ.get("GMAIL_PASS", "")
if not smtp_user or not smtp_pass:
print("GMAIL_USER or GMAIL_PASS is not configured.")
return False
message = EmailMessage()
message["From"] = f"E-CITIZEN <{smtp_user}>"
message["To"] = to_email
message["Subject"] = subject
message.set_content("Votre client email ne supporte pas le HTML.")
message.add_alternative(html, subtype="html")
try:
with smtplib.SMTP_SSL("smtp.gmail.com", 465, timeout=10) as smtp:
smtp.login(smtp_user, smtp_pass)
smtp.send_message(message)
return True
except Exception as e:
print(f"Email send failed: {e}")
# Print the email content to logs so the code can be retrieved manually during testing
print(f"--- MOCK EMAIL (SMTP BLOCKED) To: {to_email} ---")
print(html)
print("-------------------------------------------------")
# Return True so the frontend flow continues without a 500 error
return True
def create_user_if_missing(full_name: str, email: str, password: str, role: str = "citizen") -> None:
existing = execute_query("SELECT id FROM users WHERE email = %s", (email,), is_select=True)
if existing:
return
password_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
insert_query = """
INSERT INTO users (full_name, email, password_hash, role)
VALUES (%s, %s, %s, %s)
"""
execute_query(insert_query, (full_name, email, password_hash, role))
def get_admin_token() -> str:
token = os.environ.get("HF_ADMIN_TOKEN")
if not token:
raise RuntimeError("HF_ADMIN_TOKEN environment variable is not set.")
return token
def require_admin_token(request: Request):
expected = os.environ.get("HF_ADMIN_TOKEN", "")
if not expected:
raise HTTPException(status_code=500, detail="Server misconfiguration")
token = request.headers.get("X-Admin-Token", "")
if not secrets.compare_digest(token, expected):
raise HTTPException(status_code=401, detail="Unauthorized")
def validate_attachment(f) -> None:
import os as _os
ext = _os.path.splitext(f.name)[1].lower()
if ext not in ALLOWED_FILE_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"File type not allowed: '{ext}'. Accepted: pdf, jpg, jpeg, png."
)
try:
raw = base64.b64decode(f.base64, validate=True)
except Exception:
raise HTTPException(status_code=400, detail=f"Invalid base64 encoding for file '{f.name}'.")
if len(raw) > MAX_FILE_SIZE_BYTES:
raise HTTPException(status_code=400, detail=f"File '{f.name}' exceeds the 5 MB limit.")
# Schemas
class LoginRequest(BaseModel):
email: str
password: str
class RegisterRequest(BaseModel):
fullName: str = Field(..., min_length=2, max_length=200)
email: EmailStr
password: str = Field(..., min_length=8, max_length=128)
class SignupCodeRequest(BaseModel):
fullName: str = Field(..., min_length=2, max_length=200)
email: EmailStr
password: str = Field(..., min_length=8, max_length=128)
class SignupVerifyRequest(BaseModel):
fullName: str = Field(..., min_length=2, max_length=200)
email: EmailStr
password: str = Field(..., min_length=8, max_length=128)
code: str = Field(..., min_length=4, max_length=12)
class GoogleCodeRequest(BaseModel):
fullName: str = Field(..., min_length=2, max_length=200)
email: EmailStr
class GoogleVerifyRequest(BaseModel):
fullName: str = Field(..., min_length=2, max_length=200)
email: EmailStr
role: str = Field("citizen", pattern="^(citizen|admin)$")
code: str = Field(..., min_length=4, max_length=12)
class GoogleBypassRequest(BaseModel):
fullName: str = Field(..., min_length=2, max_length=200)
email: EmailStr
role: str = Field("citizen", pattern="^(citizen|admin)$")
class LoginResponse(BaseModel):
success: bool
full_name: str
email: str
role: str
class FileAttachment(BaseModel):
name: str = Field(..., max_length=255)
base64: str
class ComplaintSubmitRequest(BaseModel):
fullName: str = Field(..., min_length=2, max_length=200)
cin: str = Field(..., min_length=4, max_length=20)
phone: str = Field(..., min_length=8, max_length=20)
email: EmailStr
compType: str = Field(..., max_length=100)
subject: str
description: str = Field(..., min_length=10, max_length=5000)
referenceNumber: Optional[str] = None
pdf: Optional[FileAttachment] = None
files: List[FileAttachment] = []
@validator("subject")
def validate_subject(cls, v):
allowed = {
"health", "safety", "education", "infrastructure",
"environment", "transport", "social", "economy",
"urban", "water", "consumer", "admin"
}
if v not in allowed:
raise ValueError(f"subject must be one of: {', '.join(allowed)}")
return v
class ComplaintSubmitResponse(BaseModel):
success: bool
reference_number: str
complaint_id: int
@app.get("/")
def welcom():
return {"server status" : "runing"}
@app.get("/health")
def health_check():
return {"status": "healthy"}
@app.post("/auth/login", response_model=LoginResponse)
def login(req: LoginRequest):
query = "SELECT full_name, email, password_hash, role FROM users WHERE email = %s"
result = execute_query(query, (req.email.strip().lower(),), is_select=True)
if not result:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password"
)
user = result[0]
db_hash = user["password_hash"]
is_valid = False
try:
if bcrypt.checkpw(req.password.encode("utf-8"), db_hash.encode("utf-8")):
is_valid = True
except Exception:
is_valid = False
if not is_valid:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password"
)
return LoginResponse(
success=True,
full_name=user["full_name"],
email=user["email"],
role=user["role"]
)
@app.post("/auth/register", response_model=LoginResponse)
def register(req: RegisterRequest):
email = req.email.lower().strip()
full_name = req.fullName.strip()
if len(req.password) < 8 or len(req.password) > 128:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password must be between 8 and 128 characters"
)
existing = execute_query("SELECT id FROM users WHERE email = %s", (email,), is_select=True)
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="An account already exists with this email"
)
password_hash = bcrypt.hashpw(req.password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
insert_query = """
INSERT INTO users (full_name, email, password_hash, role)
VALUES (%s, %s, %s, %s)
"""
execute_query(insert_query, (full_name, email, password_hash, "citizen"))
return LoginResponse(
success=True,
full_name=full_name,
email=email,
role="citizen"
)
@app.post("/auth/register/request-code")
def request_signup_code(req: SignupCodeRequest):
email = req.email.lower().strip()
full_name = req.fullName.strip()
existing = execute_query("SELECT id FROM users WHERE email = %s", (email,), is_select=True)
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="An account already exists with this email"
)
code = str(secrets.randbelow(900000) + 100000)
store_verification_code("signup", email, code)
return {"success": True, "email": email, "code": code, "message": "Verification code generated."}
@app.post("/auth/register/verify-code", response_model=LoginResponse)
def verify_signup_code(req: SignupVerifyRequest):
email = req.email.lower().strip()
full_name = req.fullName.strip()
if not verify_verification_code("signup", email, req.code):
raise HTTPException(status_code=400, detail="Invalid verification code.")
existing = execute_query("SELECT id FROM users WHERE email = %s", (email,), is_select=True)
if existing:
delete_verification_code("signup", email)
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="An account already exists with this email"
)
create_user_if_missing(full_name, email, req.password, "citizen")
delete_verification_code("signup", email)
return LoginResponse(
success=True,
full_name=full_name,
email=email,
role="citizen"
)
@app.post("/auth/google/request-code")
def request_google_code(req: GoogleCodeRequest):
email = req.email.lower().strip()
full_name = req.fullName.strip()
code = str(secrets.randbelow(900000) + 100000)
store_verification_code("google", email, code)
return {"success": True, "email": email, "code": code, "message": "Verification code generated."}
@app.post("/auth/google/verify-code", response_model=LoginResponse)
def verify_google_code(req: GoogleVerifyRequest):
email = req.email.lower().strip()
full_name = req.fullName.strip()
role = "admin" if req.role == "admin" else "citizen"
if not verify_verification_code("google", email, req.code):
raise HTTPException(status_code=400, detail="Invalid verification code.")
create_user_if_missing(full_name, email, secrets.token_urlsafe(48), role)
delete_verification_code("google", email)
return LoginResponse(
success=True,
full_name=full_name,
email=email,
role=role
)
@app.post("/auth/google/bypass", dependencies=[Depends(require_admin_token)], response_model=LoginResponse)
def google_bypass(req: GoogleBypassRequest):
email = req.email.lower().strip()
full_name = req.fullName.strip()
role = "admin" if req.role == "admin" else "citizen"
create_user_if_missing(full_name, email, secrets.token_urlsafe(48), role)
return LoginResponse(
success=True,
full_name=full_name,
email=email,
role=role
)
@app.post("/complaints/submit", response_model=ComplaintSubmitResponse)
def submit_complaint(req: ComplaintSubmitRequest, request: Request):
reference_number = None
if req.referenceNumber:
check_query = "SELECT id FROM complaints WHERE reference_number = %s"
if execute_query(check_query, (req.referenceNumber,), is_select=True):
raise HTTPException(status_code=400, detail="Reference number already exists")
reference_number = req.referenceNumber
else:
for _ in range(10):
rand_val = str(secrets.randbelow(9000) + 1000)
candidate_ref = f"CIT-2026-{rand_val}"
if not execute_query("SELECT id FROM complaints WHERE reference_number = %s", (candidate_ref,), is_select=True):
reference_number = candidate_ref
break
if not reference_number:
raise HTTPException(status_code=500, detail="Failed to generate a unique reference number")
citizen = execute_query("SELECT id FROM citizens WHERE cin = %s OR email = %s", (req.cin, req.email), is_select=True)
if citizen:
citizen_id = citizen[0]['id']
else:
try:
citizen_id = execute_query(
"INSERT INTO citizens (full_name, cin, phone, email) VALUES (%s, %s, %s, %s)",
(req.fullName, req.cin, req.phone, req.email)
)
except Exception as e:
print(f"Failed to create citizen: {e}")
raise HTTPException(status_code=500, detail="Failed to record citizen data.")
ctype = execute_query("SELECT id FROM complaint_types WHERE slug = %s", (req.subject,), is_select=True)
if ctype:
complaint_type_id = ctype[0]['id']
else:
label = req.compType if req.compType else req.subject
complaint_type_id = execute_query(
"INSERT INTO complaint_types (label_fr, label_ar, slug) VALUES (%s, %s, %s)",
(label, label, req.subject)
)
insert_query = """
INSERT INTO complaints (
reference_number, citizen_id, complaint_type_id, description, status
) VALUES (%s, %s, %s, %s, %s)
"""
try:
complaint_id = execute_query(insert_query, (reference_number, citizen_id, complaint_type_id, req.description, "submitted"))
except Exception as e:
print(f"Database insertion failed: {e}")
raise HTTPException(status_code=500, detail="Failed to record complaint.")
def save_attachment(f, folder, is_pdf=False):
validate_attachment(f)
try:
url = upload_base64_file(f.base64, f.name, folder=folder)
raw = base64.b64decode(f.base64, validate=False)
size = len(raw)
mime = "application/pdf" if is_pdf else (mimetypes.guess_type(f.name)[0] or "application/octet-stream")
execute_query(
"INSERT INTO complaint_attachments (complaint_id, original_filename, storage_url, file_size_bytes, mime_type) VALUES (%s, %s, %s, %s, %s)",
(complaint_id, f.name, url, size, mime)
)
except Exception as e:
print(f"Failed to upload {f.name}: {e}")
if req.pdf:
save_attachment(req.pdf, "receipts", is_pdf=True)
for f in req.files:
save_attachment(f, "attachments")
cache_delete(f"complaint:{reference_number}", "admin:stats")
return ComplaintSubmitResponse(success=True, reference_number=reference_number, complaint_id=complaint_id)
@app.get("/complaints/track/{ref}")
def track_complaint(ref: str):
cache_key = f"complaint:{ref}"
cached = cache_get(cache_key)
if cached:
try:
return json.loads(cached)
except Exception:
pass
query = """
SELECT c.id, c.reference_number, cit.full_name, cit.cin, cit.phone, cit.email,
ct.label_fr as complaint_type, ct.slug as subject, c.description, c.status, c.submitted_at as created_at
FROM complaints c
JOIN citizens cit ON c.citizen_id = cit.id
JOIN complaint_types ct ON c.complaint_type_id = ct.id
WHERE c.reference_number = %s
"""
result = execute_query(query, (ref,), is_select=True)
if not result:
raise HTTPException(status_code=404, detail="Complaint not found")
complaint = result[0]
response = {
"success": True,
"data": {
"reference_number": complaint['reference_number'],
"full_name": complaint['full_name'],
"email": complaint['email'],
"complaint_type": complaint['complaint_type'],
"subject": complaint['subject'],
"description": complaint['description'],
"status": complaint['status'],
"created_at": complaint['created_at'].isoformat() if hasattr(complaint['created_at'], 'isoformat') else str(complaint['created_at'])
}
}
cache_set(cache_key, json.dumps(response), CACHE_TTL_COMPLAINT)
return response
class StatusUpdateRequest(BaseModel):
status: str
resolution_details: Optional[str] = None
signatory: Optional[str] = None
@app.get("/admin/stats", dependencies=[Depends(require_admin_token)])
def get_admin_stats():
cache_key = "admin:stats"
cached = cache_get(cache_key)
if cached:
try:
return json.loads(cached)
except Exception:
pass
query = "SELECT status, COUNT(*) as count FROM complaints GROUP BY status"
try:
results = execute_query(query, (), is_select=True)
stats = {"total": 0, "submitted": 0, "review": 0, "resolved": 0}
for row in results:
status_name = row["status"]
count = row["count"]
stats["total"] += count
if status_name in stats:
stats[status_name] = count
response = {"success": True, "stats": stats}
cache_set(cache_key, json.dumps(response), CACHE_TTL_STATS)
return response
except Exception as e:
print(f"Admin stats query failed: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve statistics.")
@app.get("/admin/complaints", dependencies=[Depends(require_admin_token)])
def list_admin_complaints(
status_filter: Optional[str] = None,
search_query: Optional[str] = None,
subject_filter: Optional[str] = None
):
query = """
SELECT c.id, c.reference_number, cit.full_name, cit.cin, cit.phone, cit.email,
ct.label_fr as complaint_type, ct.slug as subject, c.description, c.status, c.submitted_at as created_at,
c.resolution_details, c.signatory
FROM complaints c
JOIN citizens cit ON c.citizen_id = cit.id
JOIN complaint_types ct ON c.complaint_type_id = ct.id
WHERE 1=1
"""
params = []
if status_filter:
query += " AND c.status = %s"
params.append(status_filter)
if subject_filter:
query += " AND ct.slug = %s"
params.append(subject_filter)
if search_query:
query += " AND (c.reference_number LIKE %s OR cit.full_name LIKE %s OR cit.cin LIKE %s)"
like_str = f"%{search_query}%"
params.extend([like_str, like_str, like_str])
query += " ORDER BY c.submitted_at DESC"
try:
complaints = execute_query(query, tuple(params), is_select=True)
for comp in complaints:
comp["created_at"] = comp["created_at"].isoformat() if hasattr(comp["created_at"], "isoformat") else str(comp["created_at"])
return {"success": True, "data": complaints}
except Exception as e:
print(f"Admin complaints query failed: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve complaints.")
@app.get("/admin/complaints/{complaint_id}/attachments", dependencies=[Depends(require_admin_token)])
def list_complaint_attachments(complaint_id: int):
query = "SELECT original_filename as file_name, storage_url as file_url FROM complaint_attachments WHERE complaint_id = %s"
try:
attachments = execute_query(query, (complaint_id,), is_select=True)
return {"success": True, "attachments": attachments}
except Exception as e:
print(f"Attachments query failed: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve attachments.")
@app.put("/admin/complaints/{complaint_id}/status", dependencies=[Depends(require_admin_token)])
def update_complaint_status(complaint_id: int, req: StatusUpdateRequest):
if req.status not in ["submitted", "review", "resolved"]:
raise HTTPException(status_code=400, detail="Invalid status. Must be submitted, review, or resolved.")
verify_query = "SELECT id FROM complaints WHERE id = %s"
existing = execute_query(verify_query, (complaint_id,), is_select=True)
if not existing:
raise HTTPException(status_code=404, detail="Complaint not found")
query = "UPDATE complaints SET status = %s, resolution_details = %s, signatory = %s WHERE id = %s"
try:
execute_query(query, (req.status, req.resolution_details, req.signatory, complaint_id))
fetch_query = """
SELECT c.id, c.reference_number, cit.full_name, cit.cin, cit.phone, cit.email,
ct.label_fr as complaint_type, ct.slug as subject, c.description, c.status,
c.resolution_details, c.signatory
FROM complaints c
JOIN citizens cit ON c.citizen_id = cit.id
JOIN complaint_types ct ON c.complaint_type_id = ct.id
WHERE c.id = %s
"""
complaint_data = execute_query(fetch_query, (complaint_id,), is_select=True)
response_data = {"success": True, "message": "Status updated successfully"}
if complaint_data:
response_data["complaint"] = complaint_data[0]
ref = complaint_data[0].get("reference_number", "")
if ref:
cache_delete(f"complaint:{ref}", "admin:stats")
return response_data
except Exception as e:
print(f"Status update failed for complaint {complaint_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to update complaint status.")
class PDFUploadRequest(BaseModel):
pdf: FileAttachment
@app.post("/admin/complaints/{complaint_id}/resolution-pdf", dependencies=[Depends(require_admin_token)])
def upload_resolution_pdf(complaint_id: int, req: PDFUploadRequest):
validate_attachment(req.pdf)
try:
pdf_url = upload_base64_file(req.pdf.base64, req.pdf.name, folder="resolutions")
raw = base64.b64decode(req.pdf.base64, validate=False)
attach_query = "INSERT INTO complaint_attachments (complaint_id, original_filename, storage_url, file_size_bytes, mime_type) VALUES (%s, %s, %s, %s, %s)"
execute_query(attach_query, (complaint_id, req.pdf.name, pdf_url, len(raw), "application/pdf"))
return {"success": True, "url": pdf_url}
except HTTPException:
raise
except Exception as e:
print(f"Resolution PDF upload failed: {e}")
raise HTTPException(status_code=500, detail="Failed to upload resolution PDF.")
@app.get("/internal/complaints/{ref}")
def internal_track_complaint(ref: str, request: Request):
expected = os.environ.get("HF_ADMIN_TOKEN", "")
if not expected:
raise HTTPException(status_code=500, detail="Server misconfiguration")
auth = request.headers.get("Authorization", "")
provided = auth.removeprefix("Bearer ").strip()
if not secrets.compare_digest(provided, expected):
raise HTTPException(status_code=401, detail="Unauthorized")
query = """
SELECT c.reference_number, cit.full_name, cit.cin, cit.phone, cit.email,
ct.label_fr as complaint_type, ct.slug as subject, c.description, c.status,
c.resolution_details, c.signatory
FROM complaints c
JOIN citizens cit ON c.citizen_id = cit.id
JOIN complaint_types ct ON c.complaint_type_id = ct.id
WHERE c.reference_number = %s
"""
result = execute_query(query, (ref,), is_select=True)
if not result:
raise HTTPException(status_code=404, detail="Complaint not found")
return {"success": True, "complaint": result[0]}