import os import secrets import base64 import json import bcrypt import httpx import hashlib import smtplib import html import mimetypes from email.message import EmailMessage from urllib.parse import quote from fastapi import FastAPI, HTTPException, status, Request, Depends from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, EmailStr, Field, validator from typing import List, Optional from dotenv import load_dotenv from database import execute_query from storage import upload_base64_file load_dotenv() app = FastAPI(title="E-Citizen Backend API") @app.on_event("startup") def startup_db_patch(): try: execute_query("ALTER TABLE users MODIFY COLUMN role ENUM('superadmin', 'admin', 'agent', 'citizen') DEFAULT 'citizen'") except Exception as e: print(f"DB patch note: {e}") ALLOWED_ORIGINS = [ origin.strip() for origin in os.environ.get("ALLOWED_ORIGINS", "https://citizen-app-three.vercel.app").split(",") if origin.strip() ] app.add_middleware( CORSMiddleware, allow_origins=ALLOWED_ORIGINS, allow_credentials=True, allow_methods=["GET", "POST", "PUT"], allow_headers=["Content-Type", "Authorization", "X-Admin-Token"], ) ALLOWED_FILE_EXTENSIONS = {".pdf", ".jpg", ".jpeg", ".png"} MAX_FILE_SIZE_BYTES = 5 * 1024 * 1024 # 5 MB CACHE_TTL_COMPLAINT = 120 CACHE_TTL_STATS = 60 CACHE_TTL_LIST = 30 CACHE_TTL_VERIFY = 600 def _redis_headers() -> dict: token = os.environ.get("UPSTASH_REDIS_REST_TOKEN", "") return {"Authorization": f"Bearer {token}"} def _redis_base() -> str: return os.environ.get("UPSTASH_REDIS_REST_URL", "") import time _memory_cache = {} def cache_get(key: str) -> Optional[str]: base = _redis_base() if base: try: encoded_key = quote(key, safe="") resp = httpx.get(f"{base}/GET/{encoded_key}", headers=_redis_headers(), timeout=2) if resp.status_code == 200: return resp.json().get("result") except Exception as e: print(f"Redis cache_get error: {e}") # Fallback to memory if key in _memory_cache: val, expiry = _memory_cache[key] if time.time() < expiry: return val else: del _memory_cache[key] return None def cache_set(key: str, value: str, ttl: int) -> None: base = _redis_base() if base: try: encoded_key = quote(key, safe="") encoded_value = quote(value, safe="") httpx.get(f"{base}/SET/{encoded_key}/{encoded_value}/EX/{ttl}", headers=_redis_headers(), timeout=2) return except Exception as e: print(f"Redis cache_set error: {e}") # Fallback to memory _memory_cache[key] = (value, time.time() + ttl) def cache_delete(*keys: str) -> None: base = _redis_base() if base: for key in keys: try: encoded_key = quote(key, safe="") httpx.get(f"{base}/DEL/{encoded_key}", headers=_redis_headers(), timeout=2) except Exception as e: print(f"Redis cache_delete error ({key}): {e}") # Fallback to memory for key in keys: _memory_cache.pop(key, None) def hash_code(code: str) -> str: return hashlib.sha256(code.strip().encode("utf-8")).hexdigest() def store_verification_code(purpose: str, email: str, code: str) -> None: cache_set(f"verify:{purpose}:{email.strip().lower()}", hash_code(code), CACHE_TTL_VERIFY) def verify_verification_code(purpose: str, email: str, code: str) -> bool: stored_hash = cache_get(f"verify:{purpose}:{email.strip().lower()}") if not stored_hash: return False return secrets.compare_digest(stored_hash, hash_code(code)) def delete_verification_code(purpose: str, email: str) -> None: cache_delete(f"verify:{purpose}:{email.strip().lower()}") def build_verification_email(full_name: str, code: str, context: str) -> str: safe_name = html.escape(full_name) safe_context = html.escape(context) safe_code = html.escape(code) return f"""

Verification de votre compte E-CITIZEN

Bonjour {safe_name},

Utilisez le code suivant pour confirmer {safe_context} :

{safe_code}

Ce code expire dans 10 minutes.

""" def send_email(to_email: str, subject: str, html: str) -> bool: smtp_user = os.environ.get("GMAIL_USER", "") smtp_pass = os.environ.get("GMAIL_PASS", "") if not smtp_user or not smtp_pass: print("GMAIL_USER or GMAIL_PASS is not configured.") return False message = EmailMessage() message["From"] = f"E-CITIZEN <{smtp_user}>" message["To"] = to_email message["Subject"] = subject message.set_content("Votre client email ne supporte pas le HTML.") message.add_alternative(html, subtype="html") try: with smtplib.SMTP_SSL("smtp.gmail.com", 465, timeout=10) as smtp: smtp.login(smtp_user, smtp_pass) smtp.send_message(message) return True except Exception as e: print(f"Email send failed: {e}") # Print the email content to logs so the code can be retrieved manually during testing print(f"--- MOCK EMAIL (SMTP BLOCKED) To: {to_email} ---") print(html) print("-------------------------------------------------") # Return True so the frontend flow continues without a 500 error return True def create_user_if_missing(full_name: str, email: str, password: str, role: str = "citizen") -> None: existing = execute_query("SELECT id FROM users WHERE email = %s", (email,), is_select=True) if existing: return password_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") insert_query = """ INSERT INTO users (full_name, email, password_hash, role) VALUES (%s, %s, %s, %s) """ execute_query(insert_query, (full_name, email, password_hash, role)) def get_admin_token() -> str: token = os.environ.get("HF_ADMIN_TOKEN") if not token: raise RuntimeError("HF_ADMIN_TOKEN environment variable is not set.") return token def require_admin_token(request: Request): expected = os.environ.get("HF_ADMIN_TOKEN", "") if not expected: raise HTTPException(status_code=500, detail="Server misconfiguration") token = request.headers.get("X-Admin-Token", "") if not secrets.compare_digest(token, expected): raise HTTPException(status_code=401, detail="Unauthorized") def validate_attachment(f) -> None: import os as _os ext = _os.path.splitext(f.name)[1].lower() if ext not in ALLOWED_FILE_EXTENSIONS: raise HTTPException( status_code=400, detail=f"File type not allowed: '{ext}'. Accepted: pdf, jpg, jpeg, png." ) try: raw = base64.b64decode(f.base64, validate=True) except Exception: raise HTTPException(status_code=400, detail=f"Invalid base64 encoding for file '{f.name}'.") if len(raw) > MAX_FILE_SIZE_BYTES: raise HTTPException(status_code=400, detail=f"File '{f.name}' exceeds the 5 MB limit.") # Schemas class LoginRequest(BaseModel): email: str password: str class RegisterRequest(BaseModel): fullName: str = Field(..., min_length=2, max_length=200) email: EmailStr password: str = Field(..., min_length=8, max_length=128) class SignupCodeRequest(BaseModel): fullName: str = Field(..., min_length=2, max_length=200) email: EmailStr password: str = Field(..., min_length=8, max_length=128) class SignupVerifyRequest(BaseModel): fullName: str = Field(..., min_length=2, max_length=200) email: EmailStr password: str = Field(..., min_length=8, max_length=128) code: str = Field(..., min_length=4, max_length=12) class GoogleCodeRequest(BaseModel): fullName: str = Field(..., min_length=2, max_length=200) email: EmailStr class GoogleVerifyRequest(BaseModel): fullName: str = Field(..., min_length=2, max_length=200) email: EmailStr role: str = Field("citizen", pattern="^(citizen|admin)$") code: str = Field(..., min_length=4, max_length=12) class GoogleBypassRequest(BaseModel): fullName: str = Field(..., min_length=2, max_length=200) email: EmailStr role: str = Field("citizen", pattern="^(citizen|admin)$") class LoginResponse(BaseModel): success: bool full_name: str email: str role: str class FileAttachment(BaseModel): name: str = Field(..., max_length=255) base64: str class ComplaintSubmitRequest(BaseModel): fullName: str = Field(..., min_length=2, max_length=200) cin: str = Field(..., min_length=4, max_length=20) phone: str = Field(..., min_length=8, max_length=20) email: EmailStr compType: str = Field(..., max_length=100) subject: str description: str = Field(..., min_length=10, max_length=5000) referenceNumber: Optional[str] = None pdf: Optional[FileAttachment] = None files: List[FileAttachment] = [] @validator("subject") def validate_subject(cls, v): allowed = { "health", "safety", "education", "infrastructure", "environment", "transport", "social", "economy", "urban", "water", "consumer", "admin" } if v not in allowed: raise ValueError(f"subject must be one of: {', '.join(allowed)}") return v class ComplaintSubmitResponse(BaseModel): success: bool reference_number: str complaint_id: int @app.get("/") def welcom(): return {"server status" : "runing"} @app.get("/health") def health_check(): return {"status": "healthy"} @app.post("/auth/login", response_model=LoginResponse) def login(req: LoginRequest): query = "SELECT full_name, email, password_hash, role FROM users WHERE email = %s" result = execute_query(query, (req.email.strip().lower(),), is_select=True) if not result: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password" ) user = result[0] db_hash = user["password_hash"] is_valid = False try: if bcrypt.checkpw(req.password.encode("utf-8"), db_hash.encode("utf-8")): is_valid = True except Exception: is_valid = False if not is_valid: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password" ) return LoginResponse( success=True, full_name=user["full_name"], email=user["email"], role=user["role"] ) @app.post("/auth/register", response_model=LoginResponse) def register(req: RegisterRequest): email = req.email.lower().strip() full_name = req.fullName.strip() if len(req.password) < 8 or len(req.password) > 128: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Password must be between 8 and 128 characters" ) existing = execute_query("SELECT id FROM users WHERE email = %s", (email,), is_select=True) if existing: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="An account already exists with this email" ) password_hash = bcrypt.hashpw(req.password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") insert_query = """ INSERT INTO users (full_name, email, password_hash, role) VALUES (%s, %s, %s, %s) """ execute_query(insert_query, (full_name, email, password_hash, "citizen")) return LoginResponse( success=True, full_name=full_name, email=email, role="citizen" ) @app.post("/auth/register/request-code") def request_signup_code(req: SignupCodeRequest): email = req.email.lower().strip() full_name = req.fullName.strip() existing = execute_query("SELECT id FROM users WHERE email = %s", (email,), is_select=True) if existing: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="An account already exists with this email" ) code = str(secrets.randbelow(900000) + 100000) store_verification_code("signup", email, code) return {"success": True, "email": email, "code": code, "message": "Verification code generated."} @app.post("/auth/register/verify-code", response_model=LoginResponse) def verify_signup_code(req: SignupVerifyRequest): email = req.email.lower().strip() full_name = req.fullName.strip() if not verify_verification_code("signup", email, req.code): raise HTTPException(status_code=400, detail="Invalid verification code.") existing = execute_query("SELECT id FROM users WHERE email = %s", (email,), is_select=True) if existing: delete_verification_code("signup", email) raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="An account already exists with this email" ) create_user_if_missing(full_name, email, req.password, "citizen") delete_verification_code("signup", email) return LoginResponse( success=True, full_name=full_name, email=email, role="citizen" ) @app.post("/auth/google/request-code") def request_google_code(req: GoogleCodeRequest): email = req.email.lower().strip() full_name = req.fullName.strip() code = str(secrets.randbelow(900000) + 100000) store_verification_code("google", email, code) return {"success": True, "email": email, "code": code, "message": "Verification code generated."} @app.post("/auth/google/verify-code", response_model=LoginResponse) def verify_google_code(req: GoogleVerifyRequest): email = req.email.lower().strip() full_name = req.fullName.strip() role = "admin" if req.role == "admin" else "citizen" if not verify_verification_code("google", email, req.code): raise HTTPException(status_code=400, detail="Invalid verification code.") create_user_if_missing(full_name, email, secrets.token_urlsafe(48), role) delete_verification_code("google", email) return LoginResponse( success=True, full_name=full_name, email=email, role=role ) @app.post("/auth/google/bypass", dependencies=[Depends(require_admin_token)], response_model=LoginResponse) def google_bypass(req: GoogleBypassRequest): email = req.email.lower().strip() full_name = req.fullName.strip() role = "admin" if req.role == "admin" else "citizen" create_user_if_missing(full_name, email, secrets.token_urlsafe(48), role) return LoginResponse( success=True, full_name=full_name, email=email, role=role ) @app.post("/complaints/submit", response_model=ComplaintSubmitResponse) def submit_complaint(req: ComplaintSubmitRequest, request: Request): reference_number = None if req.referenceNumber: check_query = "SELECT id FROM complaints WHERE reference_number = %s" if execute_query(check_query, (req.referenceNumber,), is_select=True): raise HTTPException(status_code=400, detail="Reference number already exists") reference_number = req.referenceNumber else: for _ in range(10): rand_val = str(secrets.randbelow(9000) + 1000) candidate_ref = f"CIT-2026-{rand_val}" if not execute_query("SELECT id FROM complaints WHERE reference_number = %s", (candidate_ref,), is_select=True): reference_number = candidate_ref break if not reference_number: raise HTTPException(status_code=500, detail="Failed to generate a unique reference number") citizen = execute_query("SELECT id FROM citizens WHERE cin = %s OR email = %s", (req.cin, req.email), is_select=True) if citizen: citizen_id = citizen[0]['id'] else: try: citizen_id = execute_query( "INSERT INTO citizens (full_name, cin, phone, email) VALUES (%s, %s, %s, %s)", (req.fullName, req.cin, req.phone, req.email) ) except Exception as e: print(f"Failed to create citizen: {e}") raise HTTPException(status_code=500, detail="Failed to record citizen data.") ctype = execute_query("SELECT id FROM complaint_types WHERE slug = %s", (req.subject,), is_select=True) if ctype: complaint_type_id = ctype[0]['id'] else: label = req.compType if req.compType else req.subject complaint_type_id = execute_query( "INSERT INTO complaint_types (label_fr, label_ar, slug) VALUES (%s, %s, %s)", (label, label, req.subject) ) insert_query = """ INSERT INTO complaints ( reference_number, citizen_id, complaint_type_id, description, status ) VALUES (%s, %s, %s, %s, %s) """ try: complaint_id = execute_query(insert_query, (reference_number, citizen_id, complaint_type_id, req.description, "submitted")) except Exception as e: print(f"Database insertion failed: {e}") raise HTTPException(status_code=500, detail="Failed to record complaint.") def save_attachment(f, folder, is_pdf=False): validate_attachment(f) try: url = upload_base64_file(f.base64, f.name, folder=folder) raw = base64.b64decode(f.base64, validate=False) size = len(raw) mime = "application/pdf" if is_pdf else (mimetypes.guess_type(f.name)[0] or "application/octet-stream") execute_query( "INSERT INTO complaint_attachments (complaint_id, original_filename, storage_url, file_size_bytes, mime_type) VALUES (%s, %s, %s, %s, %s)", (complaint_id, f.name, url, size, mime) ) except Exception as e: print(f"Failed to upload {f.name}: {e}") if req.pdf: save_attachment(req.pdf, "receipts", is_pdf=True) for f in req.files: save_attachment(f, "attachments") cache_delete(f"complaint:{reference_number}", "admin:stats") return ComplaintSubmitResponse(success=True, reference_number=reference_number, complaint_id=complaint_id) @app.get("/complaints/track/{ref}") def track_complaint(ref: str): cache_key = f"complaint:{ref}" cached = cache_get(cache_key) if cached: try: return json.loads(cached) except Exception: pass query = """ SELECT c.id, c.reference_number, cit.full_name, cit.cin, cit.phone, cit.email, ct.label_fr as complaint_type, ct.slug as subject, c.description, c.status, c.submitted_at as created_at FROM complaints c JOIN citizens cit ON c.citizen_id = cit.id JOIN complaint_types ct ON c.complaint_type_id = ct.id WHERE c.reference_number = %s """ result = execute_query(query, (ref,), is_select=True) if not result: raise HTTPException(status_code=404, detail="Complaint not found") complaint = result[0] response = { "success": True, "data": { "reference_number": complaint['reference_number'], "full_name": complaint['full_name'], "email": complaint['email'], "complaint_type": complaint['complaint_type'], "subject": complaint['subject'], "description": complaint['description'], "status": complaint['status'], "created_at": complaint['created_at'].isoformat() if hasattr(complaint['created_at'], 'isoformat') else str(complaint['created_at']) } } cache_set(cache_key, json.dumps(response), CACHE_TTL_COMPLAINT) return response class StatusUpdateRequest(BaseModel): status: str resolution_details: Optional[str] = None signatory: Optional[str] = None @app.get("/admin/stats", dependencies=[Depends(require_admin_token)]) def get_admin_stats(): cache_key = "admin:stats" cached = cache_get(cache_key) if cached: try: return json.loads(cached) except Exception: pass query = "SELECT status, COUNT(*) as count FROM complaints GROUP BY status" try: results = execute_query(query, (), is_select=True) stats = {"total": 0, "submitted": 0, "review": 0, "resolved": 0} for row in results: status_name = row["status"] count = row["count"] stats["total"] += count if status_name in stats: stats[status_name] = count response = {"success": True, "stats": stats} cache_set(cache_key, json.dumps(response), CACHE_TTL_STATS) return response except Exception as e: print(f"Admin stats query failed: {e}") raise HTTPException(status_code=500, detail="Failed to retrieve statistics.") @app.get("/admin/complaints", dependencies=[Depends(require_admin_token)]) def list_admin_complaints( status_filter: Optional[str] = None, search_query: Optional[str] = None, subject_filter: Optional[str] = None ): query = """ SELECT c.id, c.reference_number, cit.full_name, cit.cin, cit.phone, cit.email, ct.label_fr as complaint_type, ct.slug as subject, c.description, c.status, c.submitted_at as created_at, c.resolution_details, c.signatory FROM complaints c JOIN citizens cit ON c.citizen_id = cit.id JOIN complaint_types ct ON c.complaint_type_id = ct.id WHERE 1=1 """ params = [] if status_filter: query += " AND c.status = %s" params.append(status_filter) if subject_filter: query += " AND ct.slug = %s" params.append(subject_filter) if search_query: query += " AND (c.reference_number LIKE %s OR cit.full_name LIKE %s OR cit.cin LIKE %s)" like_str = f"%{search_query}%" params.extend([like_str, like_str, like_str]) query += " ORDER BY c.submitted_at DESC" try: complaints = execute_query(query, tuple(params), is_select=True) for comp in complaints: comp["created_at"] = comp["created_at"].isoformat() if hasattr(comp["created_at"], "isoformat") else str(comp["created_at"]) return {"success": True, "data": complaints} except Exception as e: print(f"Admin complaints query failed: {e}") raise HTTPException(status_code=500, detail="Failed to retrieve complaints.") @app.get("/admin/complaints/{complaint_id}/attachments", dependencies=[Depends(require_admin_token)]) def list_complaint_attachments(complaint_id: int): query = "SELECT original_filename as file_name, storage_url as file_url FROM complaint_attachments WHERE complaint_id = %s" try: attachments = execute_query(query, (complaint_id,), is_select=True) return {"success": True, "attachments": attachments} except Exception as e: print(f"Attachments query failed: {e}") raise HTTPException(status_code=500, detail="Failed to retrieve attachments.") @app.put("/admin/complaints/{complaint_id}/status", dependencies=[Depends(require_admin_token)]) def update_complaint_status(complaint_id: int, req: StatusUpdateRequest): if req.status not in ["submitted", "review", "resolved"]: raise HTTPException(status_code=400, detail="Invalid status. Must be submitted, review, or resolved.") verify_query = "SELECT id FROM complaints WHERE id = %s" existing = execute_query(verify_query, (complaint_id,), is_select=True) if not existing: raise HTTPException(status_code=404, detail="Complaint not found") query = "UPDATE complaints SET status = %s, resolution_details = %s, signatory = %s WHERE id = %s" try: execute_query(query, (req.status, req.resolution_details, req.signatory, complaint_id)) fetch_query = """ SELECT c.id, c.reference_number, cit.full_name, cit.cin, cit.phone, cit.email, ct.label_fr as complaint_type, ct.slug as subject, c.description, c.status, c.resolution_details, c.signatory FROM complaints c JOIN citizens cit ON c.citizen_id = cit.id JOIN complaint_types ct ON c.complaint_type_id = ct.id WHERE c.id = %s """ complaint_data = execute_query(fetch_query, (complaint_id,), is_select=True) response_data = {"success": True, "message": "Status updated successfully"} if complaint_data: response_data["complaint"] = complaint_data[0] ref = complaint_data[0].get("reference_number", "") if ref: cache_delete(f"complaint:{ref}", "admin:stats") return response_data except Exception as e: print(f"Status update failed for complaint {complaint_id}: {e}") raise HTTPException(status_code=500, detail="Failed to update complaint status.") class PDFUploadRequest(BaseModel): pdf: FileAttachment @app.post("/admin/complaints/{complaint_id}/resolution-pdf", dependencies=[Depends(require_admin_token)]) def upload_resolution_pdf(complaint_id: int, req: PDFUploadRequest): validate_attachment(req.pdf) try: pdf_url = upload_base64_file(req.pdf.base64, req.pdf.name, folder="resolutions") raw = base64.b64decode(req.pdf.base64, validate=False) attach_query = "INSERT INTO complaint_attachments (complaint_id, original_filename, storage_url, file_size_bytes, mime_type) VALUES (%s, %s, %s, %s, %s)" execute_query(attach_query, (complaint_id, req.pdf.name, pdf_url, len(raw), "application/pdf")) return {"success": True, "url": pdf_url} except HTTPException: raise except Exception as e: print(f"Resolution PDF upload failed: {e}") raise HTTPException(status_code=500, detail="Failed to upload resolution PDF.") @app.get("/internal/complaints/{ref}") def internal_track_complaint(ref: str, request: Request): expected = os.environ.get("HF_ADMIN_TOKEN", "") if not expected: raise HTTPException(status_code=500, detail="Server misconfiguration") auth = request.headers.get("Authorization", "") provided = auth.removeprefix("Bearer ").strip() if not secrets.compare_digest(provided, expected): raise HTTPException(status_code=401, detail="Unauthorized") query = """ SELECT c.reference_number, cit.full_name, cit.cin, cit.phone, cit.email, ct.label_fr as complaint_type, ct.slug as subject, c.description, c.status, c.resolution_details, c.signatory FROM complaints c JOIN citizens cit ON c.citizen_id = cit.id JOIN complaint_types ct ON c.complaint_type_id = ct.id WHERE c.reference_number = %s """ result = execute_query(query, (ref,), is_select=True) if not result: raise HTTPException(status_code=404, detail="Complaint not found") return {"success": True, "complaint": result[0]}