File size: 11,031 Bytes
8aeb9ae
 
 
 
 
 
 
 
 
 
 
 
 
 
9ead99a
8aeb9ae
 
 
 
 
 
a759e39
5b12776
a759e39
 
 
 
ecccf5c
a759e39
 
 
 
 
 
ecccf5c
a759e39
 
 
 
 
5b12776
a759e39
 
5b12776
a759e39
 
 
 
 
 
8aeb9ae
 
9ead99a
8aeb9ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9ead99a
8aeb9ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9ead99a
8aeb9ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9ead99a
 
8aeb9ae
 
 
 
 
 
 
 
9ead99a
8aeb9ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9ead99a
8aeb9ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9ead99a
8aeb9ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9ead99a
8aeb9ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9ead99a
8aeb9ae
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
import asyncio
import os
import shutil
import json
import uuid
import time
import threading
from typing import List, Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor

from fastapi import FastAPI, File, UploadFile, Form, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel

# Simulated import of your libraries
from miragenews import run_multimodal_to_json
from AIGVDet import run_video_to_json

UPLOAD_DIR = "temp_uploads"
MAX_WORKERS = 4  

ENV = os.getenv("ENV")
cred_json = os.getenv("GOOGLE_CREDENTIALS_JSON")

if ENV == "hf":
    if cred_json:
        try:
            # Parse to ensure the JSON payload is valid
            json.loads(cred_json)

            file_path = "google-credentials.json"
            with open(file_path, "w") as f:
                f.write(cred_json)

            # Reset env so Google auth can auto-detect the credentials
            os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = file_path

            print("[INFO] Google credentials saved to", file_path)

        except json.JSONDecodeError:
            print("[ERROR] GOOGLE_CREDENTIALS_JSON is not valid JSON")

    else:
        print("[ERROR] GOOGLE_CREDENTIALS_JSON is missing")

else:
    # DEV mode (local)
    print("[INFO] ENV != hf → skip Google credentials setup")


app = FastAPI(
    title="Multimedia Analysis API (Polling Mode)",
    description="Multimedia analysis API using a Polling mechanism to avoid Timeouts.",
    version="2.0.0",
)

jobs: Dict[str, Dict] = {}
jobs_lock = threading.Lock()

class AnalysisResult(BaseModel):
    image_analysis_results: Optional[List[Any]] = None
    video_analysis_result: Optional[Dict[str, Any]] = None

class JobStatus(BaseModel):
    job_id: str
    status: str  
    message: Optional[str] = None
    result: Optional[AnalysisResult] = None
    created_at: float
    updated_at: float

def _create_job() -> str:
    job_id = uuid.uuid4().hex
    now = time.time()
    with jobs_lock:
        jobs[job_id] = {
            "job_id": job_id,
            "status": "queued",
            "message": "Waiting for processing...",
            "result": None,
            "created_at": now,
            "updated_at": now
        }
    return job_id

def _update_job(job_id: str, **kwargs):
    with jobs_lock:
        if job_id in jobs:
            jobs[job_id].update(kwargs)
            jobs[job_id]["updated_at"] = time.time()

def _get_job(job_id: str) -> Optional[Dict]:
    with jobs_lock:
        return jobs.get(job_id)

async def run_analysis_logic(
    image_paths: Optional[List[str]] = None,
    video_path: Optional[str] = None,
    text: str = "",
) -> Dict[str, Any]:
    
    if not image_paths and not video_path:
        raise ValueError("At least one of image_paths or video_path must be provided.")

    tasks = []
    
    if image_paths:
        image_task = asyncio.create_task(
            run_multimodal_to_json(image_paths=image_paths, text=text, output_json_path=None)
        )
        tasks.append(image_task)

    if video_path:
        video_task = asyncio.to_thread(
            run_video_to_json, video_path=video_path, output_json_path=None
        )
        tasks.append(video_task)

    task_results = await asyncio.gather(*tasks)

    final_result = {"image_analysis_results": [], "video_analysis_result": {}}
    image_analysis_results = []
    video_result_index = -1

    current_idx = 0
    if image_paths:
        image_analysis_results = task_results[current_idx]
        current_idx += 1
    
    if video_path:
        video_result_index = current_idx

    final_result["image_analysis_results"] = image_analysis_results

    if video_result_index != -1:
        raw_video_result = task_results[video_result_index]
        if raw_video_result:
            video_id_key = list(raw_video_result.keys())[0]
            video_data = raw_video_result[video_id_key]
            
            avg_authentic = video_data.get("authentic_confidence_score", 0)
            avg_synthetic = video_data.get("synthetic_confidence_score", 0)

            if avg_authentic > avg_synthetic and avg_authentic > 0.5:
                authenticity_assessment = "REAL (Authentic)"
                verification_tools = "Deepfake Detector"
                synthetic_type = "N/A"
                other_artifacts = "Our algorithms conducted a thorough analysis of the video's motion patterns, lighting consistency, and object interactions. We observed fluid, natural movements and consistent physics that align with real-world recordings. No discernible artifacts, such as pixel distortion, unnatural blurring, or shadow inconsistencies, were detected that would indicate digital manipulation or AI-driven synthesis."
            elif avg_authentic > avg_synthetic and avg_authentic <= 0.5:
                authenticity_assessment = "Potentially Synthetic"
                verification_tools = "Deepfake Detector"
                synthetic_type = "Potentially AI-generated"
                other_artifacts = "Our analysis has identified subtle anomalies within the video frames, particularly in areas of complex texture and inconsistent lighting across different objects. While these discrepancies are not significant enough to definitively classify the video as synthetic, they do suggest a possibility of digital alteration or partial AI generation. Further examination may be required for a conclusive determination."
            else:
                authenticity_assessment = "NOT REAL (Fake, Manipulated, or AI)"
                verification_tools = "Deepfake Detector"
                synthetic_type = "AI-generated"
                other_artifacts = "Our deep analysis detected multiple, significant artifacts commonly associated with synthetic or manipulated media. These include, but are not limited to, unnatural facial expressions and eye movements, inconsistent or floating shadows, logical impossibilities in object interaction, and high-frequency digital noise characteristic of generative models. These factors strongly indicate that the video is not authentic."

            final_result["video_analysis_result"] = {
                "filename": video_data.get("video_name", ""),
                "result": {
                    "authenticity_assessment": authenticity_assessment,
                    "verification_tools_methods": verification_tools,
                    "synthetic_type": synthetic_type,
                    "other_artifacts": other_artifacts,
                },
            }

    if not final_result.get("image_analysis_results"):
        final_result.pop("image_analysis_results", None)
    if not final_result.get("video_analysis_result"):
        final_result.pop("video_analysis_result", None)

    return final_result

async def process_job_background(
    job_id: str, 
    temp_dir: str, 
    image_paths: List[str], 
    video_path: str, 
    text: str
):
    """Background function to perform analysis"""
    _update_job(job_id, status="running", message="Analyzing...")
    
    try:
        result_data = await run_analysis_logic(
            image_paths=image_paths if image_paths else None,
            video_path=video_path if video_path else None,
            text=text
        )
        
        _update_job(job_id, status="succeeded", result=result_data, message="Completed")
        
    except Exception as e:
        print(f"Error processing job {job_id}: {e}")
        _update_job(job_id, status="failed", message=str(e))
        
    finally:
        try:
            if os.path.exists(temp_dir):
                shutil.rmtree(temp_dir)
                print(f"Deleted temp dir: {temp_dir}")
        except Exception as cleanup_error:
            print(f"Cleanup error for {job_id}: {cleanup_error}")


@app.on_event("startup")
def startup_event():
    os.makedirs(UPLOAD_DIR, exist_ok=True)

@app.get("/analyze/{job_id}", response_model=JobStatus)
async def get_job_status(job_id: str):
    """The client calls this API periodically to check the results"""
    job = _get_job(job_id)
    if not job:
        raise HTTPException(status_code=404, detail="Job not found")
    return job

@app.post("/analyze/image/", response_model=JobStatus)
async def analyze_image_endpoint(
    background_tasks: BackgroundTasks,
    images: List[UploadFile] = File(...),
    text: Optional[str] = Form(""),
):
    job_id = _create_job()
    
    job_dir = os.path.join(UPLOAD_DIR, job_id)
    os.makedirs(job_dir, exist_ok=True)
    
    saved_image_paths = []
    try:
        for img in images:
            file_path = os.path.join(job_dir, img.filename)
            with open(file_path, "wb") as buffer:
                shutil.copyfileobj(img.file, buffer)
            saved_image_paths.append(file_path)
    except Exception as e:
        _update_job(job_id, status="failed", message=f"Upload error: {e}")
        return _get_job(job_id)

    background_tasks.add_task(
        process_job_background, 
        job_id, job_dir, saved_image_paths, None, text
    )

    return _get_job(job_id)

@app.post("/analyze/video/", response_model=JobStatus)
async def analyze_video_endpoint(
    background_tasks: BackgroundTasks,
    video: UploadFile = File(...),
):
    job_id = _create_job()
    job_dir = os.path.join(UPLOAD_DIR, job_id)
    os.makedirs(job_dir, exist_ok=True)
    
    saved_video_path = os.path.join(job_dir, video.filename)
    try:
        with open(saved_video_path, "wb") as buffer:
            shutil.copyfileobj(video.file, buffer)
    except Exception as e:
         _update_job(job_id, status="failed", message=f"Upload error: {e}")
         return _get_job(job_id)

    background_tasks.add_task(
        process_job_background, 
        job_id, job_dir, [], saved_video_path, ""
    )

    return _get_job(job_id)

@app.post("/analyze/multimodal/", response_model=JobStatus)
async def analyze_multimodal_endpoint(
    background_tasks: BackgroundTasks,
    images: List[UploadFile] = File(...),
    video: UploadFile = File(...),
    text: Optional[str] = Form(""),
):
    job_id = _create_job()
    job_dir = os.path.join(UPLOAD_DIR, job_id)
    os.makedirs(job_dir, exist_ok=True)
    
    saved_image_paths = []
    saved_video_path = None
    
    try:
        # Save Images
        for img in images:
            file_path = os.path.join(job_dir, img.filename)
            with open(file_path, "wb") as buffer:
                shutil.copyfileobj(img.file, buffer)
            saved_image_paths.append(file_path)
        
        # Save Video
        saved_video_path = os.path.join(job_dir, video.filename)
        with open(saved_video_path, "wb") as buffer:
            shutil.copyfileobj(video.file, buffer)
            
    except Exception as e:
        _update_job(job_id, status="failed", message=f"Upload error: {e}")
        return _get_job(job_id)

    background_tasks.add_task(
        process_job_background, 
        job_id, job_dir, saved_image_paths, saved_video_path, text
    )

    return _get_job(job_id)