Spaces:
Sleeping
Sleeping
| import os | |
| import secrets | |
| import base64 | |
| import json | |
| import bcrypt | |
| import httpx | |
| import hashlib | |
| import smtplib | |
| import html | |
| import mimetypes | |
| from email.message import EmailMessage | |
| from urllib.parse import quote | |
| from fastapi import FastAPI, HTTPException, status, Request, Depends | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, EmailStr, Field, validator | |
| from typing import List, Optional | |
| from dotenv import load_dotenv | |
| from database import execute_query | |
| from storage import upload_base64_file | |
| load_dotenv() | |
| app = FastAPI(title="E-Citizen Backend API") | |
| def startup_db_patch(): | |
| try: | |
| execute_query("ALTER TABLE users MODIFY COLUMN role ENUM('superadmin', 'admin', 'agent', 'citizen') DEFAULT 'citizen'") | |
| except Exception as e: | |
| print(f"DB patch note: {e}") | |
| ALLOWED_ORIGINS = [ | |
| origin.strip() | |
| for origin in os.environ.get("ALLOWED_ORIGINS", "https://citizen-app-three.vercel.app").split(",") | |
| if origin.strip() | |
| ] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=ALLOWED_ORIGINS, | |
| allow_credentials=True, | |
| allow_methods=["GET", "POST", "PUT"], | |
| allow_headers=["Content-Type", "Authorization", "X-Admin-Token"], | |
| ) | |
| ALLOWED_FILE_EXTENSIONS = {".pdf", ".jpg", ".jpeg", ".png"} | |
| MAX_FILE_SIZE_BYTES = 5 * 1024 * 1024 # 5 MB | |
| CACHE_TTL_COMPLAINT = 120 | |
| CACHE_TTL_STATS = 60 | |
| CACHE_TTL_LIST = 30 | |
| CACHE_TTL_VERIFY = 600 | |
| def _redis_headers() -> dict: | |
| token = os.environ.get("UPSTASH_REDIS_REST_TOKEN", "") | |
| return {"Authorization": f"Bearer {token}"} | |
| def _redis_base() -> str: | |
| return os.environ.get("UPSTASH_REDIS_REST_URL", "") | |
| import time | |
| _memory_cache = {} | |
| def cache_get(key: str) -> Optional[str]: | |
| base = _redis_base() | |
| if base: | |
| try: | |
| encoded_key = quote(key, safe="") | |
| resp = httpx.get(f"{base}/GET/{encoded_key}", headers=_redis_headers(), timeout=2) | |
| if resp.status_code == 200: | |
| return resp.json().get("result") | |
| except Exception as e: | |
| print(f"Redis cache_get error: {e}") | |
| # Fallback to memory | |
| if key in _memory_cache: | |
| val, expiry = _memory_cache[key] | |
| if time.time() < expiry: | |
| return val | |
| else: | |
| del _memory_cache[key] | |
| return None | |
| def cache_set(key: str, value: str, ttl: int) -> None: | |
| base = _redis_base() | |
| if base: | |
| try: | |
| encoded_key = quote(key, safe="") | |
| encoded_value = quote(value, safe="") | |
| httpx.get(f"{base}/SET/{encoded_key}/{encoded_value}/EX/{ttl}", headers=_redis_headers(), timeout=2) | |
| return | |
| except Exception as e: | |
| print(f"Redis cache_set error: {e}") | |
| # Fallback to memory | |
| _memory_cache[key] = (value, time.time() + ttl) | |
| def cache_delete(*keys: str) -> None: | |
| base = _redis_base() | |
| if base: | |
| for key in keys: | |
| try: | |
| encoded_key = quote(key, safe="") | |
| httpx.get(f"{base}/DEL/{encoded_key}", headers=_redis_headers(), timeout=2) | |
| except Exception as e: | |
| print(f"Redis cache_delete error ({key}): {e}") | |
| # Fallback to memory | |
| for key in keys: | |
| _memory_cache.pop(key, None) | |
| def hash_code(code: str) -> str: | |
| return hashlib.sha256(code.strip().encode("utf-8")).hexdigest() | |
| def store_verification_code(purpose: str, email: str, code: str) -> None: | |
| cache_set(f"verify:{purpose}:{email.strip().lower()}", hash_code(code), CACHE_TTL_VERIFY) | |
| def verify_verification_code(purpose: str, email: str, code: str) -> bool: | |
| stored_hash = cache_get(f"verify:{purpose}:{email.strip().lower()}") | |
| if not stored_hash: | |
| return False | |
| return secrets.compare_digest(stored_hash, hash_code(code)) | |
| def delete_verification_code(purpose: str, email: str) -> None: | |
| cache_delete(f"verify:{purpose}:{email.strip().lower()}") | |
| def build_verification_email(full_name: str, code: str, context: str) -> str: | |
| safe_name = html.escape(full_name) | |
| safe_context = html.escape(context) | |
| safe_code = html.escape(code) | |
| return f""" | |
| <div style="font-family:Arial,sans-serif;line-height:1.6;color:#083D30"> | |
| <h2 style="margin:0 0 12px">Verification de votre compte E-CITIZEN</h2> | |
| <p>Bonjour {safe_name},</p> | |
| <p>Utilisez le code suivant pour confirmer {safe_context} :</p> | |
| <div style="font-size:28px;font-weight:800;letter-spacing:6px;background:#FAF8F2;border:1px solid #D4AF37;padding:14px 18px;width:max-content"> | |
| {safe_code} | |
| </div> | |
| <p>Ce code expire dans 10 minutes.</p> | |
| </div> | |
| """ | |
| def send_email(to_email: str, subject: str, html: str) -> bool: | |
| smtp_user = os.environ.get("GMAIL_USER", "") | |
| smtp_pass = os.environ.get("GMAIL_PASS", "") | |
| if not smtp_user or not smtp_pass: | |
| print("GMAIL_USER or GMAIL_PASS is not configured.") | |
| return False | |
| message = EmailMessage() | |
| message["From"] = f"E-CITIZEN <{smtp_user}>" | |
| message["To"] = to_email | |
| message["Subject"] = subject | |
| message.set_content("Votre client email ne supporte pas le HTML.") | |
| message.add_alternative(html, subtype="html") | |
| try: | |
| with smtplib.SMTP_SSL("smtp.gmail.com", 465, timeout=10) as smtp: | |
| smtp.login(smtp_user, smtp_pass) | |
| smtp.send_message(message) | |
| return True | |
| except Exception as e: | |
| print(f"Email send failed: {e}") | |
| # Print the email content to logs so the code can be retrieved manually during testing | |
| print(f"--- MOCK EMAIL (SMTP BLOCKED) To: {to_email} ---") | |
| print(html) | |
| print("-------------------------------------------------") | |
| # Return True so the frontend flow continues without a 500 error | |
| return True | |
| def create_user_if_missing(full_name: str, email: str, password: str, role: str = "citizen") -> None: | |
| existing = execute_query("SELECT id FROM users WHERE email = %s", (email,), is_select=True) | |
| if existing: | |
| return | |
| password_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") | |
| insert_query = """ | |
| INSERT INTO users (full_name, email, password_hash, role) | |
| VALUES (%s, %s, %s, %s) | |
| """ | |
| execute_query(insert_query, (full_name, email, password_hash, role)) | |
| def get_admin_token() -> str: | |
| token = os.environ.get("HF_ADMIN_TOKEN") | |
| if not token: | |
| raise RuntimeError("HF_ADMIN_TOKEN environment variable is not set.") | |
| return token | |
| def require_admin_token(request: Request): | |
| expected = os.environ.get("HF_ADMIN_TOKEN", "") | |
| if not expected: | |
| raise HTTPException(status_code=500, detail="Server misconfiguration") | |
| token = request.headers.get("X-Admin-Token", "") | |
| if not secrets.compare_digest(token, expected): | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| def validate_attachment(f) -> None: | |
| import os as _os | |
| ext = _os.path.splitext(f.name)[1].lower() | |
| if ext not in ALLOWED_FILE_EXTENSIONS: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"File type not allowed: '{ext}'. Accepted: pdf, jpg, jpeg, png." | |
| ) | |
| try: | |
| raw = base64.b64decode(f.base64, validate=True) | |
| except Exception: | |
| raise HTTPException(status_code=400, detail=f"Invalid base64 encoding for file '{f.name}'.") | |
| if len(raw) > MAX_FILE_SIZE_BYTES: | |
| raise HTTPException(status_code=400, detail=f"File '{f.name}' exceeds the 5 MB limit.") | |
| # Schemas | |
| class LoginRequest(BaseModel): | |
| email: str | |
| password: str | |
| class RegisterRequest(BaseModel): | |
| fullName: str = Field(..., min_length=2, max_length=200) | |
| email: EmailStr | |
| password: str = Field(..., min_length=8, max_length=128) | |
| class SignupCodeRequest(BaseModel): | |
| fullName: str = Field(..., min_length=2, max_length=200) | |
| email: EmailStr | |
| password: str = Field(..., min_length=8, max_length=128) | |
| class SignupVerifyRequest(BaseModel): | |
| fullName: str = Field(..., min_length=2, max_length=200) | |
| email: EmailStr | |
| password: str = Field(..., min_length=8, max_length=128) | |
| code: str = Field(..., min_length=4, max_length=12) | |
| class GoogleCodeRequest(BaseModel): | |
| fullName: str = Field(..., min_length=2, max_length=200) | |
| email: EmailStr | |
| class GoogleVerifyRequest(BaseModel): | |
| fullName: str = Field(..., min_length=2, max_length=200) | |
| email: EmailStr | |
| role: str = Field("citizen", pattern="^(citizen|admin)$") | |
| code: str = Field(..., min_length=4, max_length=12) | |
| class GoogleBypassRequest(BaseModel): | |
| fullName: str = Field(..., min_length=2, max_length=200) | |
| email: EmailStr | |
| role: str = Field("citizen", pattern="^(citizen|admin)$") | |
| class LoginResponse(BaseModel): | |
| success: bool | |
| full_name: str | |
| email: str | |
| role: str | |
| class FileAttachment(BaseModel): | |
| name: str = Field(..., max_length=255) | |
| base64: str | |
| class ComplaintSubmitRequest(BaseModel): | |
| fullName: str = Field(..., min_length=2, max_length=200) | |
| cin: str = Field(..., min_length=4, max_length=20) | |
| phone: str = Field(..., min_length=8, max_length=20) | |
| email: EmailStr | |
| compType: str = Field(..., max_length=100) | |
| subject: str | |
| description: str = Field(..., min_length=10, max_length=5000) | |
| referenceNumber: Optional[str] = None | |
| pdf: Optional[FileAttachment] = None | |
| files: List[FileAttachment] = [] | |
| def validate_subject(cls, v): | |
| allowed = { | |
| "health", "safety", "education", "infrastructure", | |
| "environment", "transport", "social", "economy", | |
| "urban", "water", "consumer", "admin" | |
| } | |
| if v not in allowed: | |
| raise ValueError(f"subject must be one of: {', '.join(allowed)}") | |
| return v | |
| class ComplaintSubmitResponse(BaseModel): | |
| success: bool | |
| reference_number: str | |
| complaint_id: int | |
| def welcom(): | |
| return {"server status" : "runing"} | |
| def health_check(): | |
| return {"status": "healthy"} | |
| def login(req: LoginRequest): | |
| query = "SELECT full_name, email, password_hash, role FROM users WHERE email = %s" | |
| result = execute_query(query, (req.email.strip().lower(),), is_select=True) | |
| if not result: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid email or password" | |
| ) | |
| user = result[0] | |
| db_hash = user["password_hash"] | |
| is_valid = False | |
| try: | |
| if bcrypt.checkpw(req.password.encode("utf-8"), db_hash.encode("utf-8")): | |
| is_valid = True | |
| except Exception: | |
| is_valid = False | |
| if not is_valid: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid email or password" | |
| ) | |
| return LoginResponse( | |
| success=True, | |
| full_name=user["full_name"], | |
| email=user["email"], | |
| role=user["role"] | |
| ) | |
| def register(req: RegisterRequest): | |
| email = req.email.lower().strip() | |
| full_name = req.fullName.strip() | |
| if len(req.password) < 8 or len(req.password) > 128: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Password must be between 8 and 128 characters" | |
| ) | |
| existing = execute_query("SELECT id FROM users WHERE email = %s", (email,), is_select=True) | |
| if existing: | |
| raise HTTPException( | |
| status_code=status.HTTP_409_CONFLICT, | |
| detail="An account already exists with this email" | |
| ) | |
| password_hash = bcrypt.hashpw(req.password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") | |
| insert_query = """ | |
| INSERT INTO users (full_name, email, password_hash, role) | |
| VALUES (%s, %s, %s, %s) | |
| """ | |
| execute_query(insert_query, (full_name, email, password_hash, "citizen")) | |
| return LoginResponse( | |
| success=True, | |
| full_name=full_name, | |
| email=email, | |
| role="citizen" | |
| ) | |
| def request_signup_code(req: SignupCodeRequest): | |
| email = req.email.lower().strip() | |
| full_name = req.fullName.strip() | |
| existing = execute_query("SELECT id FROM users WHERE email = %s", (email,), is_select=True) | |
| if existing: | |
| raise HTTPException( | |
| status_code=status.HTTP_409_CONFLICT, | |
| detail="An account already exists with this email" | |
| ) | |
| code = str(secrets.randbelow(900000) + 100000) | |
| store_verification_code("signup", email, code) | |
| return {"success": True, "email": email, "code": code, "message": "Verification code generated."} | |
| def verify_signup_code(req: SignupVerifyRequest): | |
| email = req.email.lower().strip() | |
| full_name = req.fullName.strip() | |
| if not verify_verification_code("signup", email, req.code): | |
| raise HTTPException(status_code=400, detail="Invalid verification code.") | |
| existing = execute_query("SELECT id FROM users WHERE email = %s", (email,), is_select=True) | |
| if existing: | |
| delete_verification_code("signup", email) | |
| raise HTTPException( | |
| status_code=status.HTTP_409_CONFLICT, | |
| detail="An account already exists with this email" | |
| ) | |
| create_user_if_missing(full_name, email, req.password, "citizen") | |
| delete_verification_code("signup", email) | |
| return LoginResponse( | |
| success=True, | |
| full_name=full_name, | |
| email=email, | |
| role="citizen" | |
| ) | |
| def request_google_code(req: GoogleCodeRequest): | |
| email = req.email.lower().strip() | |
| full_name = req.fullName.strip() | |
| code = str(secrets.randbelow(900000) + 100000) | |
| store_verification_code("google", email, code) | |
| return {"success": True, "email": email, "code": code, "message": "Verification code generated."} | |
| def verify_google_code(req: GoogleVerifyRequest): | |
| email = req.email.lower().strip() | |
| full_name = req.fullName.strip() | |
| role = "admin" if req.role == "admin" else "citizen" | |
| if not verify_verification_code("google", email, req.code): | |
| raise HTTPException(status_code=400, detail="Invalid verification code.") | |
| create_user_if_missing(full_name, email, secrets.token_urlsafe(48), role) | |
| delete_verification_code("google", email) | |
| return LoginResponse( | |
| success=True, | |
| full_name=full_name, | |
| email=email, | |
| role=role | |
| ) | |
| def google_bypass(req: GoogleBypassRequest): | |
| email = req.email.lower().strip() | |
| full_name = req.fullName.strip() | |
| role = "admin" if req.role == "admin" else "citizen" | |
| create_user_if_missing(full_name, email, secrets.token_urlsafe(48), role) | |
| return LoginResponse( | |
| success=True, | |
| full_name=full_name, | |
| email=email, | |
| role=role | |
| ) | |
| def submit_complaint(req: ComplaintSubmitRequest, request: Request): | |
| reference_number = None | |
| if req.referenceNumber: | |
| check_query = "SELECT id FROM complaints WHERE reference_number = %s" | |
| if execute_query(check_query, (req.referenceNumber,), is_select=True): | |
| raise HTTPException(status_code=400, detail="Reference number already exists") | |
| reference_number = req.referenceNumber | |
| else: | |
| for _ in range(10): | |
| rand_val = str(secrets.randbelow(9000) + 1000) | |
| candidate_ref = f"CIT-2026-{rand_val}" | |
| if not execute_query("SELECT id FROM complaints WHERE reference_number = %s", (candidate_ref,), is_select=True): | |
| reference_number = candidate_ref | |
| break | |
| if not reference_number: | |
| raise HTTPException(status_code=500, detail="Failed to generate a unique reference number") | |
| citizen = execute_query("SELECT id FROM citizens WHERE cin = %s OR email = %s", (req.cin, req.email), is_select=True) | |
| if citizen: | |
| citizen_id = citizen[0]['id'] | |
| else: | |
| try: | |
| citizen_id = execute_query( | |
| "INSERT INTO citizens (full_name, cin, phone, email) VALUES (%s, %s, %s, %s)", | |
| (req.fullName, req.cin, req.phone, req.email) | |
| ) | |
| except Exception as e: | |
| print(f"Failed to create citizen: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to record citizen data.") | |
| ctype = execute_query("SELECT id FROM complaint_types WHERE slug = %s", (req.subject,), is_select=True) | |
| if ctype: | |
| complaint_type_id = ctype[0]['id'] | |
| else: | |
| label = req.compType if req.compType else req.subject | |
| complaint_type_id = execute_query( | |
| "INSERT INTO complaint_types (label_fr, label_ar, slug) VALUES (%s, %s, %s)", | |
| (label, label, req.subject) | |
| ) | |
| insert_query = """ | |
| INSERT INTO complaints ( | |
| reference_number, citizen_id, complaint_type_id, description, status | |
| ) VALUES (%s, %s, %s, %s, %s) | |
| """ | |
| try: | |
| complaint_id = execute_query(insert_query, (reference_number, citizen_id, complaint_type_id, req.description, "submitted")) | |
| except Exception as e: | |
| print(f"Database insertion failed: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to record complaint.") | |
| def save_attachment(f, folder, is_pdf=False): | |
| validate_attachment(f) | |
| try: | |
| url = upload_base64_file(f.base64, f.name, folder=folder) | |
| raw = base64.b64decode(f.base64, validate=False) | |
| size = len(raw) | |
| mime = "application/pdf" if is_pdf else (mimetypes.guess_type(f.name)[0] or "application/octet-stream") | |
| execute_query( | |
| "INSERT INTO complaint_attachments (complaint_id, original_filename, storage_url, file_size_bytes, mime_type) VALUES (%s, %s, %s, %s, %s)", | |
| (complaint_id, f.name, url, size, mime) | |
| ) | |
| except Exception as e: | |
| print(f"Failed to upload {f.name}: {e}") | |
| if req.pdf: | |
| save_attachment(req.pdf, "receipts", is_pdf=True) | |
| for f in req.files: | |
| save_attachment(f, "attachments") | |
| cache_delete(f"complaint:{reference_number}", "admin:stats") | |
| return ComplaintSubmitResponse(success=True, reference_number=reference_number, complaint_id=complaint_id) | |
| def track_complaint(ref: str): | |
| cache_key = f"complaint:{ref}" | |
| cached = cache_get(cache_key) | |
| if cached: | |
| try: | |
| return json.loads(cached) | |
| except Exception: | |
| pass | |
| query = """ | |
| SELECT c.id, c.reference_number, cit.full_name, cit.cin, cit.phone, cit.email, | |
| ct.label_fr as complaint_type, ct.slug as subject, c.description, c.status, c.submitted_at as created_at | |
| FROM complaints c | |
| JOIN citizens cit ON c.citizen_id = cit.id | |
| JOIN complaint_types ct ON c.complaint_type_id = ct.id | |
| WHERE c.reference_number = %s | |
| """ | |
| result = execute_query(query, (ref,), is_select=True) | |
| if not result: | |
| raise HTTPException(status_code=404, detail="Complaint not found") | |
| complaint = result[0] | |
| response = { | |
| "success": True, | |
| "data": { | |
| "reference_number": complaint['reference_number'], | |
| "full_name": complaint['full_name'], | |
| "email": complaint['email'], | |
| "complaint_type": complaint['complaint_type'], | |
| "subject": complaint['subject'], | |
| "description": complaint['description'], | |
| "status": complaint['status'], | |
| "created_at": complaint['created_at'].isoformat() if hasattr(complaint['created_at'], 'isoformat') else str(complaint['created_at']) | |
| } | |
| } | |
| cache_set(cache_key, json.dumps(response), CACHE_TTL_COMPLAINT) | |
| return response | |
| class StatusUpdateRequest(BaseModel): | |
| status: str | |
| resolution_details: Optional[str] = None | |
| signatory: Optional[str] = None | |
| def get_admin_stats(): | |
| cache_key = "admin:stats" | |
| cached = cache_get(cache_key) | |
| if cached: | |
| try: | |
| return json.loads(cached) | |
| except Exception: | |
| pass | |
| query = "SELECT status, COUNT(*) as count FROM complaints GROUP BY status" | |
| try: | |
| results = execute_query(query, (), is_select=True) | |
| stats = {"total": 0, "submitted": 0, "review": 0, "resolved": 0} | |
| for row in results: | |
| status_name = row["status"] | |
| count = row["count"] | |
| stats["total"] += count | |
| if status_name in stats: | |
| stats[status_name] = count | |
| response = {"success": True, "stats": stats} | |
| cache_set(cache_key, json.dumps(response), CACHE_TTL_STATS) | |
| return response | |
| except Exception as e: | |
| print(f"Admin stats query failed: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to retrieve statistics.") | |
| def list_admin_complaints( | |
| status_filter: Optional[str] = None, | |
| search_query: Optional[str] = None, | |
| subject_filter: Optional[str] = None | |
| ): | |
| query = """ | |
| SELECT c.id, c.reference_number, cit.full_name, cit.cin, cit.phone, cit.email, | |
| ct.label_fr as complaint_type, ct.slug as subject, c.description, c.status, c.submitted_at as created_at, | |
| c.resolution_details, c.signatory | |
| FROM complaints c | |
| JOIN citizens cit ON c.citizen_id = cit.id | |
| JOIN complaint_types ct ON c.complaint_type_id = ct.id | |
| WHERE 1=1 | |
| """ | |
| params = [] | |
| if status_filter: | |
| query += " AND c.status = %s" | |
| params.append(status_filter) | |
| if subject_filter: | |
| query += " AND ct.slug = %s" | |
| params.append(subject_filter) | |
| if search_query: | |
| query += " AND (c.reference_number LIKE %s OR cit.full_name LIKE %s OR cit.cin LIKE %s)" | |
| like_str = f"%{search_query}%" | |
| params.extend([like_str, like_str, like_str]) | |
| query += " ORDER BY c.submitted_at DESC" | |
| try: | |
| complaints = execute_query(query, tuple(params), is_select=True) | |
| for comp in complaints: | |
| comp["created_at"] = comp["created_at"].isoformat() if hasattr(comp["created_at"], "isoformat") else str(comp["created_at"]) | |
| return {"success": True, "data": complaints} | |
| except Exception as e: | |
| print(f"Admin complaints query failed: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to retrieve complaints.") | |
| def list_complaint_attachments(complaint_id: int): | |
| query = "SELECT original_filename as file_name, storage_url as file_url FROM complaint_attachments WHERE complaint_id = %s" | |
| try: | |
| attachments = execute_query(query, (complaint_id,), is_select=True) | |
| return {"success": True, "attachments": attachments} | |
| except Exception as e: | |
| print(f"Attachments query failed: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to retrieve attachments.") | |
| def update_complaint_status(complaint_id: int, req: StatusUpdateRequest): | |
| if req.status not in ["submitted", "review", "resolved"]: | |
| raise HTTPException(status_code=400, detail="Invalid status. Must be submitted, review, or resolved.") | |
| verify_query = "SELECT id FROM complaints WHERE id = %s" | |
| existing = execute_query(verify_query, (complaint_id,), is_select=True) | |
| if not existing: | |
| raise HTTPException(status_code=404, detail="Complaint not found") | |
| query = "UPDATE complaints SET status = %s, resolution_details = %s, signatory = %s WHERE id = %s" | |
| try: | |
| execute_query(query, (req.status, req.resolution_details, req.signatory, complaint_id)) | |
| fetch_query = """ | |
| SELECT c.id, c.reference_number, cit.full_name, cit.cin, cit.phone, cit.email, | |
| ct.label_fr as complaint_type, ct.slug as subject, c.description, c.status, | |
| c.resolution_details, c.signatory | |
| FROM complaints c | |
| JOIN citizens cit ON c.citizen_id = cit.id | |
| JOIN complaint_types ct ON c.complaint_type_id = ct.id | |
| WHERE c.id = %s | |
| """ | |
| complaint_data = execute_query(fetch_query, (complaint_id,), is_select=True) | |
| response_data = {"success": True, "message": "Status updated successfully"} | |
| if complaint_data: | |
| response_data["complaint"] = complaint_data[0] | |
| ref = complaint_data[0].get("reference_number", "") | |
| if ref: | |
| cache_delete(f"complaint:{ref}", "admin:stats") | |
| return response_data | |
| except Exception as e: | |
| print(f"Status update failed for complaint {complaint_id}: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to update complaint status.") | |
| class PDFUploadRequest(BaseModel): | |
| pdf: FileAttachment | |
| def upload_resolution_pdf(complaint_id: int, req: PDFUploadRequest): | |
| validate_attachment(req.pdf) | |
| try: | |
| pdf_url = upload_base64_file(req.pdf.base64, req.pdf.name, folder="resolutions") | |
| raw = base64.b64decode(req.pdf.base64, validate=False) | |
| attach_query = "INSERT INTO complaint_attachments (complaint_id, original_filename, storage_url, file_size_bytes, mime_type) VALUES (%s, %s, %s, %s, %s)" | |
| execute_query(attach_query, (complaint_id, req.pdf.name, pdf_url, len(raw), "application/pdf")) | |
| return {"success": True, "url": pdf_url} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| print(f"Resolution PDF upload failed: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to upload resolution PDF.") | |
| def internal_track_complaint(ref: str, request: Request): | |
| expected = os.environ.get("HF_ADMIN_TOKEN", "") | |
| if not expected: | |
| raise HTTPException(status_code=500, detail="Server misconfiguration") | |
| auth = request.headers.get("Authorization", "") | |
| provided = auth.removeprefix("Bearer ").strip() | |
| if not secrets.compare_digest(provided, expected): | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| query = """ | |
| SELECT c.reference_number, cit.full_name, cit.cin, cit.phone, cit.email, | |
| ct.label_fr as complaint_type, ct.slug as subject, c.description, c.status, | |
| c.resolution_details, c.signatory | |
| FROM complaints c | |
| JOIN citizens cit ON c.citizen_id = cit.id | |
| JOIN complaint_types ct ON c.complaint_type_id = ct.id | |
| WHERE c.reference_number = %s | |
| """ | |
| result = execute_query(query, (ref,), is_select=True) | |
| if not result: | |
| raise HTTPException(status_code=404, detail="Complaint not found") | |
| return {"success": True, "complaint": result[0]} | |