Spaces:
Sleeping
Sleeping
File size: 11,031 Bytes
8aeb9ae 9ead99a 8aeb9ae a759e39 5b12776 a759e39 ecccf5c a759e39 ecccf5c a759e39 5b12776 a759e39 5b12776 a759e39 8aeb9ae 9ead99a 8aeb9ae 9ead99a 8aeb9ae 9ead99a 8aeb9ae 9ead99a 8aeb9ae 9ead99a 8aeb9ae 9ead99a 8aeb9ae 9ead99a 8aeb9ae 9ead99a 8aeb9ae 9ead99a 8aeb9ae |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
import asyncio
import os
import shutil
import json
import uuid
import time
import threading
from typing import List, Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel
# Simulated import of your libraries
from miragenews import run_multimodal_to_json
from AIGVDet import run_video_to_json
UPLOAD_DIR = "temp_uploads"
MAX_WORKERS = 4
ENV = os.getenv("ENV")
cred_json = os.getenv("GOOGLE_CREDENTIALS_JSON")
if ENV == "hf":
if cred_json:
try:
# Parse to ensure the JSON payload is valid
json.loads(cred_json)
file_path = "google-credentials.json"
with open(file_path, "w") as f:
f.write(cred_json)
# Reset env so Google auth can auto-detect the credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = file_path
print("[INFO] Google credentials saved to", file_path)
except json.JSONDecodeError:
print("[ERROR] GOOGLE_CREDENTIALS_JSON is not valid JSON")
else:
print("[ERROR] GOOGLE_CREDENTIALS_JSON is missing")
else:
# DEV mode (local)
print("[INFO] ENV != hf → skip Google credentials setup")
app = FastAPI(
title="Multimedia Analysis API (Polling Mode)",
description="Multimedia analysis API using a Polling mechanism to avoid Timeouts.",
version="2.0.0",
)
jobs: Dict[str, Dict] = {}
jobs_lock = threading.Lock()
class AnalysisResult(BaseModel):
image_analysis_results: Optional[List[Any]] = None
video_analysis_result: Optional[Dict[str, Any]] = None
class JobStatus(BaseModel):
job_id: str
status: str
message: Optional[str] = None
result: Optional[AnalysisResult] = None
created_at: float
updated_at: float
def _create_job() -> str:
job_id = uuid.uuid4().hex
now = time.time()
with jobs_lock:
jobs[job_id] = {
"job_id": job_id,
"status": "queued",
"message": "Waiting for processing...",
"result": None,
"created_at": now,
"updated_at": now
}
return job_id
def _update_job(job_id: str, **kwargs):
with jobs_lock:
if job_id in jobs:
jobs[job_id].update(kwargs)
jobs[job_id]["updated_at"] = time.time()
def _get_job(job_id: str) -> Optional[Dict]:
with jobs_lock:
return jobs.get(job_id)
async def run_analysis_logic(
image_paths: Optional[List[str]] = None,
video_path: Optional[str] = None,
text: str = "",
) -> Dict[str, Any]:
if not image_paths and not video_path:
raise ValueError("At least one of image_paths or video_path must be provided.")
tasks = []
if image_paths:
image_task = asyncio.create_task(
run_multimodal_to_json(image_paths=image_paths, text=text, output_json_path=None)
)
tasks.append(image_task)
if video_path:
video_task = asyncio.to_thread(
run_video_to_json, video_path=video_path, output_json_path=None
)
tasks.append(video_task)
task_results = await asyncio.gather(*tasks)
final_result = {"image_analysis_results": [], "video_analysis_result": {}}
image_analysis_results = []
video_result_index = -1
current_idx = 0
if image_paths:
image_analysis_results = task_results[current_idx]
current_idx += 1
if video_path:
video_result_index = current_idx
final_result["image_analysis_results"] = image_analysis_results
if video_result_index != -1:
raw_video_result = task_results[video_result_index]
if raw_video_result:
video_id_key = list(raw_video_result.keys())[0]
video_data = raw_video_result[video_id_key]
avg_authentic = video_data.get("authentic_confidence_score", 0)
avg_synthetic = video_data.get("synthetic_confidence_score", 0)
if avg_authentic > avg_synthetic and avg_authentic > 0.5:
authenticity_assessment = "REAL (Authentic)"
verification_tools = "Deepfake Detector"
synthetic_type = "N/A"
other_artifacts = "Our algorithms conducted a thorough analysis of the video's motion patterns, lighting consistency, and object interactions. We observed fluid, natural movements and consistent physics that align with real-world recordings. No discernible artifacts, such as pixel distortion, unnatural blurring, or shadow inconsistencies, were detected that would indicate digital manipulation or AI-driven synthesis."
elif avg_authentic > avg_synthetic and avg_authentic <= 0.5:
authenticity_assessment = "Potentially Synthetic"
verification_tools = "Deepfake Detector"
synthetic_type = "Potentially AI-generated"
other_artifacts = "Our analysis has identified subtle anomalies within the video frames, particularly in areas of complex texture and inconsistent lighting across different objects. While these discrepancies are not significant enough to definitively classify the video as synthetic, they do suggest a possibility of digital alteration or partial AI generation. Further examination may be required for a conclusive determination."
else:
authenticity_assessment = "NOT REAL (Fake, Manipulated, or AI)"
verification_tools = "Deepfake Detector"
synthetic_type = "AI-generated"
other_artifacts = "Our deep analysis detected multiple, significant artifacts commonly associated with synthetic or manipulated media. These include, but are not limited to, unnatural facial expressions and eye movements, inconsistent or floating shadows, logical impossibilities in object interaction, and high-frequency digital noise characteristic of generative models. These factors strongly indicate that the video is not authentic."
final_result["video_analysis_result"] = {
"filename": video_data.get("video_name", ""),
"result": {
"authenticity_assessment": authenticity_assessment,
"verification_tools_methods": verification_tools,
"synthetic_type": synthetic_type,
"other_artifacts": other_artifacts,
},
}
if not final_result.get("image_analysis_results"):
final_result.pop("image_analysis_results", None)
if not final_result.get("video_analysis_result"):
final_result.pop("video_analysis_result", None)
return final_result
async def process_job_background(
job_id: str,
temp_dir: str,
image_paths: List[str],
video_path: str,
text: str
):
"""Background function to perform analysis"""
_update_job(job_id, status="running", message="Analyzing...")
try:
result_data = await run_analysis_logic(
image_paths=image_paths if image_paths else None,
video_path=video_path if video_path else None,
text=text
)
_update_job(job_id, status="succeeded", result=result_data, message="Completed")
except Exception as e:
print(f"Error processing job {job_id}: {e}")
_update_job(job_id, status="failed", message=str(e))
finally:
try:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
print(f"Deleted temp dir: {temp_dir}")
except Exception as cleanup_error:
print(f"Cleanup error for {job_id}: {cleanup_error}")
@app.on_event("startup")
def startup_event():
os.makedirs(UPLOAD_DIR, exist_ok=True)
@app.get("/analyze/{job_id}", response_model=JobStatus)
async def get_job_status(job_id: str):
"""The client calls this API periodically to check the results"""
job = _get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return job
@app.post("/analyze/image/", response_model=JobStatus)
async def analyze_image_endpoint(
background_tasks: BackgroundTasks,
images: List[UploadFile] = File(...),
text: Optional[str] = Form(""),
):
job_id = _create_job()
job_dir = os.path.join(UPLOAD_DIR, job_id)
os.makedirs(job_dir, exist_ok=True)
saved_image_paths = []
try:
for img in images:
file_path = os.path.join(job_dir, img.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(img.file, buffer)
saved_image_paths.append(file_path)
except Exception as e:
_update_job(job_id, status="failed", message=f"Upload error: {e}")
return _get_job(job_id)
background_tasks.add_task(
process_job_background,
job_id, job_dir, saved_image_paths, None, text
)
return _get_job(job_id)
@app.post("/analyze/video/", response_model=JobStatus)
async def analyze_video_endpoint(
background_tasks: BackgroundTasks,
video: UploadFile = File(...),
):
job_id = _create_job()
job_dir = os.path.join(UPLOAD_DIR, job_id)
os.makedirs(job_dir, exist_ok=True)
saved_video_path = os.path.join(job_dir, video.filename)
try:
with open(saved_video_path, "wb") as buffer:
shutil.copyfileobj(video.file, buffer)
except Exception as e:
_update_job(job_id, status="failed", message=f"Upload error: {e}")
return _get_job(job_id)
background_tasks.add_task(
process_job_background,
job_id, job_dir, [], saved_video_path, ""
)
return _get_job(job_id)
@app.post("/analyze/multimodal/", response_model=JobStatus)
async def analyze_multimodal_endpoint(
background_tasks: BackgroundTasks,
images: List[UploadFile] = File(...),
video: UploadFile = File(...),
text: Optional[str] = Form(""),
):
job_id = _create_job()
job_dir = os.path.join(UPLOAD_DIR, job_id)
os.makedirs(job_dir, exist_ok=True)
saved_image_paths = []
saved_video_path = None
try:
# Save Images
for img in images:
file_path = os.path.join(job_dir, img.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(img.file, buffer)
saved_image_paths.append(file_path)
# Save Video
saved_video_path = os.path.join(job_dir, video.filename)
with open(saved_video_path, "wb") as buffer:
shutil.copyfileobj(video.file, buffer)
except Exception as e:
_update_job(job_id, status="failed", message=f"Upload error: {e}")
return _get_job(job_id)
background_tasks.add_task(
process_job_background,
job_id, job_dir, saved_image_paths, saved_video_path, text
)
return _get_job(job_id) |