| from fastapi import FastAPI, HTTPException, Request |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.responses import FileResponse, JSONResponse |
| from pydantic import BaseModel |
| from typing import List, Dict, Optional |
| import logging |
| import re |
| import os |
|
|
| from presidio_analyzer import AnalyzerEngine, RecognizerRegistry, PatternRecognizer, Pattern |
| from presidio_analyzer.predefined_recognizers import SpacyRecognizer |
| from presidio_analyzer.nlp_engine import NlpEngineProvider |
| from presidio_anonymizer import AnonymizerEngine |
| from langdetect import detect, DetectorFactory |
| import uvicorn |
| from slowapi import Limiter, _rate_limit_exceeded_handler |
| from slowapi.util import get_remote_address |
| from slowapi.errors import RateLimitExceeded |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| DetectorFactory.seed = 0 |
|
|
| |
| limiter = Limiter(key_func=get_remote_address) |
| app = FastAPI(title="Redac API") |
| app.state.limiter = limiter |
|
|
| @app.exception_handler(RateLimitExceeded) |
| async def custom_rate_limit_exceeded_handler(request: Request, exc: RateLimitExceeded): |
| return JSONResponse( |
| status_code=429, |
| content={"detail": "Too many requests. Please wait 2 seconds between each analysis to avoid saturating the server."} |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| PROTECTED_WORDS = [ |
| "Gérant", "Directeur", "Directrice", "Financière", "Architecte", |
| "Ingénieur", "Sécurité", "Administrateur", "Système", "Responsable", |
| "Réseau", "Consultant", "PDG", "Patient", "Infirmière", |
| "Comité", "Direction", "Chantier", "Projet" |
| ] |
|
|
| configuration = { |
| "nlp_engine_name": "spacy", |
| "models": [{"lang_code": "en", "model_name": "en_core_web_lg"}, {"lang_code": "fr", "model_name": "fr_core_news_lg"}], |
| "ner_model_configuration": { |
| "model_to_presidio_entity_mapping": { |
| "PER": "PERSON", "PERSON": "PERSON", "LOC": "LOCATION", |
| "GPE": "LOCATION", "ORG": "ORGANIZATION" |
| } |
| } |
| } |
|
|
| provider = NlpEngineProvider(nlp_configuration=configuration) |
| nlp_engine = provider.create_engine() |
|
|
| registry = RecognizerRegistry() |
| registry.load_predefined_recognizers(languages=["en", "fr"]) |
|
|
| fr_spacy = SpacyRecognizer( |
| supported_language="fr", |
| check_label_groups=[("PERSON", ["PER"]), ("LOCATION", ["LOC", "GPE"]), ("ORGANIZATION", ["ORG"])] |
| ) |
| registry.add_recognizer(fr_spacy) |
|
|
| |
| registry.add_recognizer(PatternRecognizer(supported_entity="IBAN", supported_language="fr", patterns=[Pattern(name="iban", regex=r"\b[A-Z]{2}\d{2}(?:\s*[A-Z0-9]{4}){4,7}\s*[A-Z0-9]{1,4}\b", score=1.0)])) |
| registry.add_recognizer(PatternRecognizer(supported_entity="SIRET", supported_language="fr", patterns=[Pattern(name="siret", regex=r"\b\d{3}\s*\d{3}\s*\d{3}\s*\d{5}\b", score=1.0)])) |
| registry.add_recognizer(PatternRecognizer(supported_entity="NIR", supported_language="fr", patterns=[Pattern(name="nir", regex=r"\b[12]\s*\d{2}\s*\d{2}\s*(?:\d{2}|2[AB])\s*\d{3}\s*\d{3}\s*\d{2}\b", score=1.0)])) |
|
|
| analyzer = AnalyzerEngine(nlp_engine=nlp_engine, registry=registry, default_score_threshold=0.3) |
| anonymizer = AnonymizerEngine() |
|
|
| class RedactRequest(BaseModel): |
| text: str |
| language: Optional[str] = "auto" |
|
|
| |
| @app.get("/api/status") |
| async def api_status(): |
| return {"status": "online", "mode": "pro-visual"} |
|
|
| @app.post("/api/redact") |
| @limiter.limit("1/2seconds") |
| async def redact_text(body: RedactRequest, request: Request): |
| try: |
| try: |
| target_lang = detect(body.text) if body.language == "auto" else body.language |
| if target_lang not in ["en", "fr"]: target_lang = "en" |
| except: |
| target_lang = "en" |
| |
| results = analyzer.analyze(text=body.text, language=target_lang) |
| |
| |
| clean_results = [] |
| for res in results: |
| detected_text = body.text[res.start:res.end] |
| if any(pw.lower() in detected_text.lower() for pw in PROTECTED_WORDS): |
| continue |
| clean_results.append(res) |
| |
| anonymized = anonymizer.anonymize(text=body.text, analyzer_results=clean_results) |
|
|
| |
| entities_meta = [] |
| for res in clean_results: |
| entities_meta.append({ |
| "type": res.entity_type, |
| "text": body.text[res.start:res.end], |
| "score": round(res.score * 100), |
| "start": res.start, |
| "end": res.end |
| }) |
|
|
| return { |
| "original_text": body.text, |
| "redacted_text": anonymized.text, |
| "detected_language": target_lang, |
| "entities": entities_meta |
| } |
| except Exception as e: |
| logger.error(f"Error: {str(e)}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| |
| if os.path.exists("dist"): |
| |
| app.mount("/assets", StaticFiles(directory="dist/assets"), name="assets") |
| |
| |
| @app.get("/{full_path:path}") |
| async def serve_frontend(full_path: str): |
| |
| potential_file = os.path.join("dist", full_path) |
| if os.path.isfile(potential_file): |
| return FileResponse(potential_file) |
| |
| return FileResponse("dist/index.html") |
|
|
| @app.get("/") |
| async def serve_index(): |
| return FileResponse("dist/index.html") |
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|