| from fastapi import FastAPI, File, UploadFile, HTTPException, Form |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import JSONResponse |
| from pydantic import BaseModel, Field, EmailStr |
| from typing import Optional, List, Dict, Any |
| import pandas as pd |
| import io |
| import json |
| from backend import RegexClassifier |
| from email_service import send_welcome_email |
|
|
| |
| app = FastAPI( |
| title="Segmento Sense API", |
| description="AI-powered PII Detection and Data Classification Platform", |
| version="1.0.0" |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| classifier = RegexClassifier() |
|
|
| |
| MAX_FILE_SIZE = 1024 * 1024 * 1024 |
|
|
| |
|
|
| class TextAnalysisRequest(BaseModel): |
| text: str = Field(..., description="Text to analyze for PII") |
|
|
| class PatternAddRequest(BaseModel): |
| name: str = Field(..., description="Pattern name") |
| regex: str = Field(..., description="Regex pattern") |
|
|
| class DatabaseConnectionRequest(BaseModel): |
| host: str |
| port: str |
| database: str |
| user: str |
| password: str |
| table: str = Field(None, description="Table name (or collection for MongoDB)") |
|
|
| class S3ConnectionRequest(BaseModel): |
| access_key: str |
| secret_key: str |
| region: str |
| bucket: str = Field(None, description="Bucket name") |
| file_key: str = Field(None, description="File key/path") |
|
|
| class AzureConnectionRequest(BaseModel): |
| connection_string: str |
| container: str = Field(None, description="Container name") |
| blob: str = Field(None, description="Blob name") |
|
|
| class GCSConnectionRequest(BaseModel): |
| credentials: Dict[str, Any] |
| bucket: str = Field(None, description="Bucket name") |
| file_name: str = Field(None, description="File name") |
|
|
| class GoogleDriveRequest(BaseModel): |
| credentials: Dict[str, Any] |
| file_id: str = Field(None, description="Drive file ID") |
| mime_type: str = Field(None, description="File MIME type") |
|
|
| class SlackRequest(BaseModel): |
| token: str |
| channel_id: str |
|
|
| class ConfluenceRequest(BaseModel): |
| url: str |
| username: str |
| token: str |
| page_id: str |
|
|
| class PDFPageRequest(BaseModel): |
| page_number: int = 0 |
|
|
| class WelcomeEmailRequest(BaseModel): |
| name: str = Field(..., description="User's name") |
| email: EmailStr = Field(..., description="User's email address") |
|
|
| |
|
|
| def validate_file_size(file: UploadFile): |
| """Validate uploaded file size""" |
| file.file.seek(0, 2) |
| size = file.file.tell() |
| file.file.seek(0) |
| |
| if size > MAX_FILE_SIZE: |
| raise HTTPException( |
| status_code=413, |
| detail=f"File size ({size} bytes) exceeds maximum allowed size (1GB)" |
| ) |
| return size |
|
|
| def format_pii_response(df: pd.DataFrame, source_df: pd.DataFrame = None, text: str = None) -> Dict: |
| """Format PII analysis response""" |
| count_df = classifier.get_pii_counts_dataframe(df) if source_df is not None else classifier.get_pii_counts(text) |
| |
| response = { |
| "pii_counts": count_df.to_dict(orient="records") if not count_df.empty else [], |
| "total_pii_found": int(count_df["Count"].sum()) if not count_df.empty else 0 |
| } |
| |
| |
| if source_df is not None and not source_df.empty: |
| schema_df = classifier.get_data_schema(source_df) |
| response["schema"] = schema_df.to_dict(orient="records") |
| |
| |
| if text: |
| inspector_df = classifier.run_full_inspection(text) |
| if not inspector_df.empty: |
| response["inspector"] = inspector_df.to_dict(orient="records") |
| |
| return response |
|
|
| |
|
|
| @app.post("/api/upload/csv") |
| async def upload_csv(file: UploadFile = File(...), mask: bool = Form(False)): |
| """Upload and analyze CSV file""" |
| try: |
| validate_file_size(file) |
| content = await file.read() |
| df = pd.read_csv(io.BytesIO(content)) |
| |
| text_sample = df.head(10).to_string() |
| response = format_pii_response(df, df, text_sample) |
| |
| if mask: |
| masked_df = classifier.mask_dataframe(df.head(50)) |
| response["data"] = masked_df.to_dict(orient="records") |
| else: |
| highlighted_df = classifier.scan_dataframe_with_html(df.head(50)) |
| response["data"] = highlighted_df.to_dict(orient="records") |
| |
| return JSONResponse(content=response) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.post("/api/upload/json") |
| async def upload_json(file: UploadFile = File(...), mask: bool = Form(False)): |
| """Upload and analyze JSON file""" |
| try: |
| validate_file_size(file) |
| df = classifier.get_json_data(file.file) |
| |
| text_sample = df.head(10).to_string() |
| response = format_pii_response(df, df, text_sample) |
| |
| if mask: |
| masked_df = classifier.mask_dataframe(df.head(50)) |
| response["data"] = masked_df.to_dict(orient="records") |
| else: |
| highlighted_df = classifier.scan_dataframe_with_html(df.head(50)) |
| response["data"] = highlighted_df.to_dict(orient="records") |
| |
| return JSONResponse(content=response) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.post("/api/upload/parquet") |
| async def upload_parquet(file: UploadFile = File(...), mask: bool = Form(False)): |
| """Upload and analyze Parquet file""" |
| try: |
| validate_file_size(file) |
| content = await file.read() |
| df = classifier.get_parquet_data(content) |
| |
| text_sample = df.head(10).to_string() |
| response = format_pii_response(df, df, text_sample) |
| |
| if mask: |
| masked_df = classifier.mask_dataframe(df.head(50)) |
| response["data"] = masked_df.to_dict(orient="records") |
| else: |
| highlighted_df = classifier.scan_dataframe_with_html(df.head(50)) |
| response["data"] = highlighted_df.to_dict(orient="records") |
| |
| return JSONResponse(content=response) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.post("/api/upload/avro") |
| async def upload_avro(file: UploadFile = File(...), mask: bool = Form(False)): |
| """Upload and analyze Apache Avro file""" |
| try: |
| validate_file_size(file) |
| content = await file.read() |
| df = classifier.get_avro_data(content) |
| |
| text_sample = df.head(10).to_string() |
| response = format_pii_response(df, df, text_sample) |
| |
| if mask: |
| masked_df = classifier.mask_dataframe(df.head(50)) |
| response["data"] = masked_df.to_dict(orient="records") |
| else: |
| highlighted_df = classifier.scan_dataframe_with_html(df.head(50)) |
| response["data"] = highlighted_df.to_dict(orient="records") |
| |
| return JSONResponse(content=response) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.post("/api/upload/pdf") |
| async def upload_pdf(file: UploadFile = File(...), page_number: int = Form(0)): |
| """Upload and analyze PDF file (with pagination)""" |
| try: |
| validate_file_size(file) |
| content = await file.read() |
| |
| |
| total_pages = classifier.get_pdf_total_pages(content) |
| text = classifier.get_pdf_page_text(content, page_number) |
| |
| |
| response = format_pii_response(None, None, text) |
| response["total_pages"] = total_pages |
| response["current_page"] = page_number |
| |
| |
| img = classifier.get_labeled_pdf_image(content, page_number) |
| if img: |
| import base64 |
| from PIL import Image |
| |
| |
| if isinstance(img, bytes): |
| |
| img_str = base64.b64encode(img).decode() |
| elif isinstance(img, Image.Image): |
| |
| buffered = io.BytesIO() |
| img.save(buffered, format="PNG") |
| img_str = base64.b64encode(buffered.getvalue()).decode() |
| else: |
| |
| img_str = None |
| |
| if img_str: |
| response["image"] = f"data:image/png;base64,{img_str}" |
| |
| return JSONResponse(content=response) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.post("/api/upload/image") |
| async def upload_image(file: UploadFile = File(...), mask: bool = Form(False)): |
| """Upload and analyze image with OCR""" |
| try: |
| validate_file_size(file) |
| content = await file.read() |
| |
| |
| text = classifier.get_ocr_text_from_image(content) |
| |
| if not text: |
| raise HTTPException(status_code=400, detail="No text could be extracted from the image") |
| |
| df = pd.DataFrame({"Content": [text]}) |
| response = format_pii_response(df, df, text) |
| |
| if mask: |
| masked_df = classifier.mask_dataframe(df) |
| response["data"] = masked_df.to_dict(orient="records") |
| else: |
| highlighted_df = classifier.scan_dataframe_with_html(df) |
| response["data"] = highlighted_df.to_dict(orient="records") |
| |
| |
| import base64 |
| img_str = base64.b64encode(content).decode() |
| response["original_image"] = f"data:image/png;base64,{img_str}" |
| |
| return JSONResponse(content=response) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| |
|
|
| @app.post("/api/analyze/text") |
| async def analyze_text(request: TextAnalysisRequest): |
| """Analyze plain text for PII""" |
| try: |
| matches = classifier.analyze_text_hybrid(request.text) |
| count_df = classifier.get_pii_counts(request.text) |
| |
| return JSONResponse(content={ |
| "matches": matches, |
| "pii_counts": count_df.to_dict(orient="records") if not count_df.empty else [], |
| "total_pii_found": len(matches) |
| }) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.post("/api/inspect") |
| async def inspect_text(request: TextAnalysisRequest): |
| """Run full model inspection on text""" |
| try: |
| inspector_df = classifier.run_full_inspection(request.text) |
| |
| if inspector_df.empty: |
| return JSONResponse(content={ |
| "inspector": [], |
| "message": "No PII detected by any model" |
| }) |
| |
| return JSONResponse(content={ |
| "inspector": inspector_df.to_dict(orient="records") |
| }) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.post("/api/mask") |
| async def mask_text(request: TextAnalysisRequest): |
| """Mask PII in text""" |
| try: |
| df = pd.DataFrame({"Content": [request.text]}) |
| masked_df = classifier.mask_dataframe(df) |
| |
| return JSONResponse(content={ |
| "original": request.text, |
| "masked": masked_df.iloc[0]["Content"] |
| }) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| |
|
|
| @app.get("/api/patterns") |
| async def get_patterns(): |
| """Get all regex patterns""" |
| try: |
| patterns = classifier.list_patterns() |
| return JSONResponse(content={ |
| "patterns": [{"name": k, "regex": v} for k, v in patterns.items()] |
| }) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.post("/api/patterns") |
| async def add_pattern(request: PatternAddRequest): |
| """Add a new regex pattern""" |
| try: |
| classifier.add_pattern(request.name, request.regex) |
| return JSONResponse(content={ |
| "message": f"Pattern '{request.name}' added successfully", |
| "pattern": {"name": request.name, "regex": request.regex} |
| }) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.delete("/api/patterns/{pattern_name}") |
| async def delete_pattern(pattern_name: str): |
| """Remove a regex pattern""" |
| try: |
| classifier.remove_pattern(pattern_name) |
| return JSONResponse(content={ |
| "message": f"Pattern '{pattern_name}' removed successfully" |
| }) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| |
|
|
| @app.post("/api/connect/postgresql") |
| async def connect_postgresql(request: DatabaseConnectionRequest): |
| """Connect to PostgreSQL and scan table""" |
| try: |
| df = classifier.get_postgres_data( |
| request.host, request.port, request.database, |
| request.user, request.password, request.table |
| ) |
| |
| text_sample = df.head(10).to_string() |
| response = format_pii_response(df, df, text_sample) |
| |
| masked_df = classifier.mask_dataframe(df.head(50)) |
| response["data"] = masked_df.to_dict(orient="records") |
| |
| return JSONResponse(content=response) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"PostgreSQL connection failed: {str(e)}") |
|
|
| @app.post("/api/connect/mysql") |
| async def connect_mysql(request: DatabaseConnectionRequest): |
| """Connect to MySQL and scan table""" |
| try: |
| df = classifier.get_mysql_data( |
| request.host, request.port, request.database, |
| request.user, request.password, request.table |
| ) |
| |
| text_sample = df.head(10).to_string() |
| response = format_pii_response(df, df, text_sample) |
| |
| masked_df = classifier.mask_dataframe(df.head(50)) |
| response["data"] = masked_df.to_dict(orient="records") |
| |
| return JSONResponse(content=response) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"MySQL connection failed: {str(e)}") |
|
|
| @app.post("/api/connect/mongodb") |
| async def connect_mongodb(request: DatabaseConnectionRequest): |
| """Connect to MongoDB and scan collection""" |
| try: |
| df = classifier.get_mongodb_data( |
| request.host, request.port, request.database, |
| request.user, request.password, request.table |
| ) |
| |
| text_sample = df.head(10).to_string() |
| response = format_pii_response(df, df, text_sample) |
| |
| masked_df = classifier.mask_dataframe(df.head(50)) |
| response["data"] = masked_df.to_dict(orient="records") |
| |
| return JSONResponse(content=response) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"MongoDB connection failed: {str(e)}") |
|
|
| |
|
|
| @app.post("/api/cloud/s3/list-buckets") |
| async def list_s3_buckets(request: S3ConnectionRequest): |
| """List S3 buckets""" |
| try: |
| buckets = classifier.get_s3_buckets(request.access_key, request.secret_key, request.region) |
| return JSONResponse(content={"buckets": buckets}) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"S3 connection failed: {str(e)}") |
|
|
| @app.post("/api/cloud/s3/list-files") |
| async def list_s3_files(request: S3ConnectionRequest): |
| """List files in S3 bucket""" |
| try: |
| if not request.bucket: |
| raise HTTPException(status_code=400, detail="Bucket name is required") |
| |
| files = classifier.get_s3_files( |
| request.access_key, request.secret_key, request.region, request.bucket |
| ) |
| return JSONResponse(content={"files": files}) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Failed to list S3 files: {str(e)}") |
|
|
| @app.post("/api/cloud/s3/scan") |
| async def scan_s3_file(request: S3ConnectionRequest): |
| """Download and scan S3 file""" |
| try: |
| if not request.bucket or not request.file_key: |
| raise HTTPException(status_code=400, detail="Bucket and file_key are required") |
| |
| content = classifier.download_s3_file( |
| request.access_key, request.secret_key, request.region, |
| request.bucket, request.file_key |
| ) |
| |
| |
| df = pd.read_csv(io.BytesIO(content)) |
| text_sample = df.head(10).to_string() |
| response = format_pii_response(df, df, text_sample) |
| |
| masked_df = classifier.mask_dataframe(df.head(50)) |
| response["data"] = masked_df.to_dict(orient="records") |
| |
| return JSONResponse(content=response) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"S3 scan failed: {str(e)}") |
|
|
| |
|
|
| @app.post("/api/cloud/azure/list-containers") |
| async def list_azure_containers(request: AzureConnectionRequest): |
| """List Azure containers""" |
| try: |
| containers = classifier.get_azure_containers(request.connection_string) |
| return JSONResponse(content={"containers": containers}) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Azure connection failed: {str(e)}") |
|
|
| @app.post("/api/cloud/azure/list-blobs") |
| async def list_azure_blobs(request: AzureConnectionRequest): |
| """List blobs in Azure container""" |
| try: |
| if not request.container: |
| raise HTTPException(status_code=400, detail="Container name is required") |
| |
| blobs = classifier.get_azure_blobs(request.connection_string, request.container) |
| return JSONResponse(content={"blobs": blobs}) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Failed to list blobs: {str(e)}") |
|
|
| @app.post("/api/cloud/azure/scan") |
| async def scan_azure_blob(request: AzureConnectionRequest): |
| """Download and scan Azure blob""" |
| try: |
| if not request.container or not request.blob: |
| raise HTTPException(status_code=400, detail="Container and blob are required") |
| |
| content = classifier.download_azure_blob( |
| request.connection_string, request.container, request.blob |
| ) |
| |
| df = pd.read_csv(io.BytesIO(content)) |
| text_sample = df.head(10).to_string() |
| response = format_pii_response(df, df, text_sample) |
| |
| masked_df = classifier.mask_dataframe(df.head(50)) |
| response["data"] = masked_df.to_dict(orient="records") |
| |
| return JSONResponse(content=response) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Azure scan failed: {str(e)}") |
|
|
| |
|
|
| @app.post("/api/cloud/gcs/list-buckets") |
| async def list_gcs_buckets(request: GCSConnectionRequest): |
| """List GCS buckets""" |
| try: |
| buckets = classifier.get_gcs_buckets(request.credentials) |
| return JSONResponse(content={"buckets": buckets}) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"GCS connection failed: {str(e)}") |
|
|
| @app.post("/api/cloud/gcs/list-files") |
| async def list_gcs_files(request: GCSConnectionRequest): |
| """List files in GCS bucket""" |
| try: |
| if not request.bucket: |
| raise HTTPException(status_code=400, detail="Bucket name is required") |
| |
| files = classifier.get_gcs_files(request.credentials, request.bucket) |
| return JSONResponse(content={"files": files}) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Failed to list GCS files: {str(e)}") |
|
|
| @app.post("/api/cloud/gcs/scan") |
| async def scan_gcs_file(request: GCSConnectionRequest): |
| """Download and scan GCS file""" |
| try: |
| if not request.bucket or not request.file_name: |
| raise HTTPException(status_code=400, detail="Bucket and file_name are required") |
| |
| content = classifier.download_gcs_file( |
| request.credentials, request.bucket, request.file_name |
| ) |
| |
| df = pd.read_csv(io.BytesIO(content)) |
| text_sample = df.head(10).to_string() |
| response = format_pii_response(df, df, text_sample) |
| |
| masked_df = classifier.mask_dataframe(df.head(50)) |
| response["data"] = masked_df.to_dict(orient="records") |
| |
| return JSONResponse(content=response) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"GCS scan failed: {str(e)}") |
|
|
| |
|
|
| @app.post("/api/cloud/drive/list-files") |
| async def list_drive_files(request: GoogleDriveRequest): |
| """List Google Drive files""" |
| try: |
| files = classifier.get_google_drive_files(request.credentials) |
| return JSONResponse(content={"files": files}) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Google Drive connection failed: {str(e)}") |
|
|
| @app.post("/api/cloud/drive/scan") |
| async def scan_drive_file(request: GoogleDriveRequest): |
| """Download and scan Google Drive file""" |
| try: |
| if not request.file_id or not request.mime_type: |
| raise HTTPException(status_code=400, detail="file_id and mime_type are required") |
| |
| content = classifier.download_drive_file( |
| request.file_id, request.mime_type, request.credentials |
| ) |
| |
| if isinstance(content, bytes): |
| try: |
| text = content.decode('utf-8') |
| df = pd.DataFrame({"Content": [text]}) |
| |
| response = format_pii_response(df, df, text) |
| highlighted_df = classifier.scan_dataframe_with_html(df) |
| response["data"] = highlighted_df.to_dict(orient="records") |
| |
| return JSONResponse(content=response) |
| except: |
| raise HTTPException(status_code=400, detail="Binary file cannot be processed") |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Google Drive scan failed: {str(e)}") |
|
|
| |
|
|
| @app.post("/api/enterprise/gmail") |
| async def scan_gmail(file: UploadFile = File(...), num_emails: int = Form(10)): |
| """Scan Gmail messages""" |
| try: |
| df = classifier.get_gmail_data(file.file, num_emails) |
| |
| if df.empty: |
| raise HTTPException(status_code=400, detail="No emails fetched") |
| |
| text_sample = df.iloc[0]['Content'] |
| response = format_pii_response(df, df, text_sample) |
| |
| masked_df = classifier.mask_dataframe(df) |
| response["data"] = masked_df.to_dict(orient="records") |
| |
| return JSONResponse(content=response) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Gmail scan failed: {str(e)}") |
|
|
| @app.post("/api/enterprise/slack") |
| async def scan_slack(request: SlackRequest): |
| """Scan Slack messages""" |
| try: |
| df = classifier.get_slack_messages(request.token, request.channel_id) |
| |
| if df.empty: |
| raise HTTPException(status_code=400, detail="No messages found or authentication failed") |
| |
| text_sample = df.iloc[0]['Content'] |
| response = format_pii_response(df, df, text_sample) |
| |
| masked_df = classifier.mask_dataframe(df) |
| response["data"] = masked_df.to_dict(orient="records") |
| |
| return JSONResponse(content=response) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Slack scan failed: {str(e)}") |
|
|
| @app.post("/api/enterprise/confluence") |
| async def scan_confluence(request: ConfluenceRequest): |
| """Scan Confluence page""" |
| try: |
| df = classifier.get_confluence_page( |
| request.url, request.username, request.token, request.page_id |
| ) |
| |
| if df.empty: |
| raise HTTPException(status_code=400, detail="Failed to fetch page") |
| |
| text_sample = df.iloc[0]['Content'] |
| response = format_pii_response(df, df, text_sample) |
| |
| highlighted_df = classifier.scan_dataframe_with_html(df) |
| response["data"] = highlighted_df.to_dict(orient="records") |
| |
| return JSONResponse(content=response) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Confluence scan failed: {str(e)}") |
|
|
| |
|
|
| @app.post("/api/send-welcome") |
| async def send_welcome(request: WelcomeEmailRequest): |
| """ |
| Send a welcome email to a new user. |
| This endpoint is called by the frontend after a user submits the contact form. |
| """ |
| try: |
| |
| success = send_welcome_email(request.name, request.email) |
| |
| if success: |
| return JSONResponse(content={ |
| "success": True, |
| "message": f"Welcome email sent to {request.email}" |
| }) |
| else: |
| raise HTTPException( |
| status_code=500, |
| detail="Failed to send welcome email. SMTP configuration may be missing." |
| ) |
| |
| except Exception as e: |
| raise HTTPException( |
| status_code=500, |
| detail=f"Email sending failed: {str(e)}" |
| ) |
|
|
| |
|
|
| @app.get("/") |
| async def root(): |
| """API health check""" |
| return { |
| "message": "Segmento Sense API", |
| "status": "operational", |
| "version": "1.0.0" |
| } |
|
|
| @app.get("/health") |
| async def health_check(): |
| """Detailed health check""" |
| return { |
| "status": "healthy", |
| "classifiers": { |
| "regex": True, |
| "nltk": True, |
| "spacy": True, |
| "presidio": True, |
| "gliner": True, |
| "deberta": True |
| } |
| } |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=7860) |