AIDetect / api_server.py
Lucii1's picture
refactor code
ecccf5c
import asyncio
import os
import shutil
import json
import uuid
import time
import threading
from typing import List, Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel
# Simulated import of your libraries
from miragenews import run_multimodal_to_json
from AIGVDet import run_video_to_json
UPLOAD_DIR = "temp_uploads"
MAX_WORKERS = 4
ENV = os.getenv("ENV")
cred_json = os.getenv("GOOGLE_CREDENTIALS_JSON")
if ENV == "hf":
if cred_json:
try:
# Parse to ensure the JSON payload is valid
json.loads(cred_json)
file_path = "google-credentials.json"
with open(file_path, "w") as f:
f.write(cred_json)
# Reset env so Google auth can auto-detect the credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = file_path
print("[INFO] Google credentials saved to", file_path)
except json.JSONDecodeError:
print("[ERROR] GOOGLE_CREDENTIALS_JSON is not valid JSON")
else:
print("[ERROR] GOOGLE_CREDENTIALS_JSON is missing")
else:
# DEV mode (local)
print("[INFO] ENV != hf → skip Google credentials setup")
app = FastAPI(
title="Multimedia Analysis API (Polling Mode)",
description="Multimedia analysis API using a Polling mechanism to avoid Timeouts.",
version="2.0.0",
)
jobs: Dict[str, Dict] = {}
jobs_lock = threading.Lock()
class AnalysisResult(BaseModel):
image_analysis_results: Optional[List[Any]] = None
video_analysis_result: Optional[Dict[str, Any]] = None
class JobStatus(BaseModel):
job_id: str
status: str
message: Optional[str] = None
result: Optional[AnalysisResult] = None
created_at: float
updated_at: float
def _create_job() -> str:
job_id = uuid.uuid4().hex
now = time.time()
with jobs_lock:
jobs[job_id] = {
"job_id": job_id,
"status": "queued",
"message": "Waiting for processing...",
"result": None,
"created_at": now,
"updated_at": now
}
return job_id
def _update_job(job_id: str, **kwargs):
with jobs_lock:
if job_id in jobs:
jobs[job_id].update(kwargs)
jobs[job_id]["updated_at"] = time.time()
def _get_job(job_id: str) -> Optional[Dict]:
with jobs_lock:
return jobs.get(job_id)
async def run_analysis_logic(
image_paths: Optional[List[str]] = None,
video_path: Optional[str] = None,
text: str = "",
) -> Dict[str, Any]:
if not image_paths and not video_path:
raise ValueError("At least one of image_paths or video_path must be provided.")
tasks = []
if image_paths:
image_task = asyncio.create_task(
run_multimodal_to_json(image_paths=image_paths, text=text, output_json_path=None)
)
tasks.append(image_task)
if video_path:
video_task = asyncio.to_thread(
run_video_to_json, video_path=video_path, output_json_path=None
)
tasks.append(video_task)
task_results = await asyncio.gather(*tasks)
final_result = {"image_analysis_results": [], "video_analysis_result": {}}
image_analysis_results = []
video_result_index = -1
current_idx = 0
if image_paths:
image_analysis_results = task_results[current_idx]
current_idx += 1
if video_path:
video_result_index = current_idx
final_result["image_analysis_results"] = image_analysis_results
if video_result_index != -1:
raw_video_result = task_results[video_result_index]
if raw_video_result:
video_id_key = list(raw_video_result.keys())[0]
video_data = raw_video_result[video_id_key]
avg_authentic = video_data.get("authentic_confidence_score", 0)
avg_synthetic = video_data.get("synthetic_confidence_score", 0)
if avg_authentic > avg_synthetic and avg_authentic > 0.5:
authenticity_assessment = "REAL (Authentic)"
verification_tools = "Deepfake Detector"
synthetic_type = "N/A"
other_artifacts = "Our algorithms conducted a thorough analysis of the video's motion patterns, lighting consistency, and object interactions. We observed fluid, natural movements and consistent physics that align with real-world recordings. No discernible artifacts, such as pixel distortion, unnatural blurring, or shadow inconsistencies, were detected that would indicate digital manipulation or AI-driven synthesis."
elif avg_authentic > avg_synthetic and avg_authentic <= 0.5:
authenticity_assessment = "Potentially Synthetic"
verification_tools = "Deepfake Detector"
synthetic_type = "Potentially AI-generated"
other_artifacts = "Our analysis has identified subtle anomalies within the video frames, particularly in areas of complex texture and inconsistent lighting across different objects. While these discrepancies are not significant enough to definitively classify the video as synthetic, they do suggest a possibility of digital alteration or partial AI generation. Further examination may be required for a conclusive determination."
else:
authenticity_assessment = "NOT REAL (Fake, Manipulated, or AI)"
verification_tools = "Deepfake Detector"
synthetic_type = "AI-generated"
other_artifacts = "Our deep analysis detected multiple, significant artifacts commonly associated with synthetic or manipulated media. These include, but are not limited to, unnatural facial expressions and eye movements, inconsistent or floating shadows, logical impossibilities in object interaction, and high-frequency digital noise characteristic of generative models. These factors strongly indicate that the video is not authentic."
final_result["video_analysis_result"] = {
"filename": video_data.get("video_name", ""),
"result": {
"authenticity_assessment": authenticity_assessment,
"verification_tools_methods": verification_tools,
"synthetic_type": synthetic_type,
"other_artifacts": other_artifacts,
},
}
if not final_result.get("image_analysis_results"):
final_result.pop("image_analysis_results", None)
if not final_result.get("video_analysis_result"):
final_result.pop("video_analysis_result", None)
return final_result
async def process_job_background(
job_id: str,
temp_dir: str,
image_paths: List[str],
video_path: str,
text: str
):
"""Background function to perform analysis"""
_update_job(job_id, status="running", message="Analyzing...")
try:
result_data = await run_analysis_logic(
image_paths=image_paths if image_paths else None,
video_path=video_path if video_path else None,
text=text
)
_update_job(job_id, status="succeeded", result=result_data, message="Completed")
except Exception as e:
print(f"Error processing job {job_id}: {e}")
_update_job(job_id, status="failed", message=str(e))
finally:
try:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
print(f"Deleted temp dir: {temp_dir}")
except Exception as cleanup_error:
print(f"Cleanup error for {job_id}: {cleanup_error}")
@app.on_event("startup")
def startup_event():
os.makedirs(UPLOAD_DIR, exist_ok=True)
@app.get("/analyze/{job_id}", response_model=JobStatus)
async def get_job_status(job_id: str):
"""The client calls this API periodically to check the results"""
job = _get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return job
@app.post("/analyze/image/", response_model=JobStatus)
async def analyze_image_endpoint(
background_tasks: BackgroundTasks,
images: List[UploadFile] = File(...),
text: Optional[str] = Form(""),
):
job_id = _create_job()
job_dir = os.path.join(UPLOAD_DIR, job_id)
os.makedirs(job_dir, exist_ok=True)
saved_image_paths = []
try:
for img in images:
file_path = os.path.join(job_dir, img.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(img.file, buffer)
saved_image_paths.append(file_path)
except Exception as e:
_update_job(job_id, status="failed", message=f"Upload error: {e}")
return _get_job(job_id)
background_tasks.add_task(
process_job_background,
job_id, job_dir, saved_image_paths, None, text
)
return _get_job(job_id)
@app.post("/analyze/video/", response_model=JobStatus)
async def analyze_video_endpoint(
background_tasks: BackgroundTasks,
video: UploadFile = File(...),
):
job_id = _create_job()
job_dir = os.path.join(UPLOAD_DIR, job_id)
os.makedirs(job_dir, exist_ok=True)
saved_video_path = os.path.join(job_dir, video.filename)
try:
with open(saved_video_path, "wb") as buffer:
shutil.copyfileobj(video.file, buffer)
except Exception as e:
_update_job(job_id, status="failed", message=f"Upload error: {e}")
return _get_job(job_id)
background_tasks.add_task(
process_job_background,
job_id, job_dir, [], saved_video_path, ""
)
return _get_job(job_id)
@app.post("/analyze/multimodal/", response_model=JobStatus)
async def analyze_multimodal_endpoint(
background_tasks: BackgroundTasks,
images: List[UploadFile] = File(...),
video: UploadFile = File(...),
text: Optional[str] = Form(""),
):
job_id = _create_job()
job_dir = os.path.join(UPLOAD_DIR, job_id)
os.makedirs(job_dir, exist_ok=True)
saved_image_paths = []
saved_video_path = None
try:
# Save Images
for img in images:
file_path = os.path.join(job_dir, img.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(img.file, buffer)
saved_image_paths.append(file_path)
# Save Video
saved_video_path = os.path.join(job_dir, video.filename)
with open(saved_video_path, "wb") as buffer:
shutil.copyfileobj(video.file, buffer)
except Exception as e:
_update_job(job_id, status="failed", message=f"Upload error: {e}")
return _get_job(job_id)
background_tasks.add_task(
process_job_background,
job_id, job_dir, saved_image_paths, saved_video_path, text
)
return _get_job(job_id)