Gradii commited on
Commit
8e30b6a
·
1 Parent(s): d7322bf

splited methods for different object types

Browse files
backend/app/api/routes.py CHANGED
@@ -1,5 +1,3 @@
1
- """API route handlers."""
2
-
3
  import logging
4
  from fastapi import APIRouter, HTTPException
5
 
@@ -8,8 +6,14 @@ from app.models.schemas import (
8
  AnalysisResponse,
9
  ErrorResponse,
10
  HealthResponse,
 
 
 
 
11
  )
12
  from app.services.download import download_file
 
 
13
  from app.services.detector import get_detector
14
  from app.core.config import get_settings
15
  from app.utils.exceptions import DeepfakeDetectionError
@@ -26,22 +30,18 @@ router = APIRouter()
26
  summary="Health check endpoint",
27
  )
28
  async def health_check() -> HealthResponse:
29
- """
30
- Health check endpoint to verify service is running.
31
-
32
- Returns:
33
- Service status and version information
34
- """
35
  settings = get_settings()
36
  logger.info("Health check endpoint accessed")
37
 
38
- available_models = ["mock"] # Add more as you implement them
 
39
 
40
  return HealthResponse(
41
  status="ok",
42
  service="Deepfake Detection Service",
43
  version=settings.APP_VERSION,
44
  available_models=available_models,
 
45
  )
46
 
47
 
@@ -54,75 +54,121 @@ async def health_check() -> HealthResponse:
54
  500: {"model": ErrorResponse, "description": "Internal server error"},
55
  },
56
  tags=["Analysis"],
57
- summary="Analyze file for deepfake detection",
58
  )
59
  async def analyze(request: AnalysisRequest) -> AnalysisResponse:
60
- """
61
- Analyze a file for deepfake detection.
62
 
63
- Args:
64
- request: AnalysisRequest containing file_url and optional model selection
 
 
 
 
 
 
 
65
 
66
- Returns:
67
- AnalysisResponse with detection results
68
 
69
- Raises:
70
- HTTPException: For various error conditions during processing
71
- """
72
- settings = get_settings()
73
- detector_model = request.model or settings.DEFAULT_DETECTOR_MODEL
74
-
75
- logger.info(
76
- f"Received analysis request for URL: {request.file_url} "
77
- f"using model: {detector_model}"
78
- )
79
-
80
 
81
- try:
 
 
 
82
  try:
83
  detector = get_detector(detector_model)
84
  except ValueError as e:
85
  logger.error(f"Invalid detector model: {str(e)}")
86
- raise HTTPException(
87
- status_code=400,
88
- detail=str(e),
89
- )
90
 
91
- file_bytes = await download_file(str(request.file_url))
 
 
 
 
 
92
 
93
- if not file_bytes:
94
- logger.error("File download returned empty bytes")
95
- raise HTTPException(
96
- status_code=500,
97
- detail="Failed to download and process file",
98
- )
99
 
100
- analysis_result = await detector.detect(file_bytes)
101
 
102
- logger.info(
103
- f"Analysis request completed successfully. "
104
- f"File URL: {request.file_url}, Model: {detector_model}, "
105
- f"Result: {analysis_result}"
 
 
106
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
107
 
108
  return AnalysisResponse(
109
  is_deepfake=analysis_result["is_deepfake"],
110
  confidence=analysis_result["confidence"],
111
  analysis_time=analysis_result["analysis_time"],
112
  model_used=detector_model,
 
113
  )
 
 
 
 
114
 
115
- except HTTPException:
116
- raise
117
- except DeepfakeDetectionError as e:
118
- logger.error(f"Detection error: {e.message}")
119
- raise HTTPException(
120
- status_code=e.status_code,
121
- detail=e.message,
122
- )
123
- except Exception as e:
124
- logger.error(f"Unexpected error during analysis: {str(e)}", exc_info=True)
125
- raise HTTPException(
126
- status_code=500,
127
- detail="An unexpected error occurred during analysis. Please try again later.",
 
 
 
 
 
 
 
 
 
 
128
  )
 
 
 
 
 
 
1
  import logging
2
  from fastapi import APIRouter, HTTPException
3
 
 
6
  AnalysisResponse,
7
  ErrorResponse,
8
  HealthResponse,
9
+ TextAnalysisRequest,
10
+ ImageAnalysisRequest,
11
+ VideoAnalysisRequest,
12
+ FileAnalysisRequest,
13
  )
14
  from app.services.download import download_file
15
+ from app.services.text_analyzer import analyze_text
16
+ from app.services.image_analyzer import analyze_image
17
  from app.services.detector import get_detector
18
  from app.core.config import get_settings
19
  from app.utils.exceptions import DeepfakeDetectionError
 
30
  summary="Health check endpoint",
31
  )
32
  async def health_check() -> HealthResponse:
 
 
 
 
 
 
33
  settings = get_settings()
34
  logger.info("Health check endpoint accessed")
35
 
36
+ available_models = ["mock"]
37
+ supported_types = ["text", "image", "video", "file"]
38
 
39
  return HealthResponse(
40
  status="ok",
41
  service="Deepfake Detection Service",
42
  version=settings.APP_VERSION,
43
  available_models=available_models,
44
+ supported_types=supported_types,
45
  )
46
 
47
 
 
54
  500: {"model": ErrorResponse, "description": "Internal server error"},
55
  },
56
  tags=["Analysis"],
57
+ summary="Analyze content for deepfake detection",
58
  )
59
  async def analyze(request: AnalysisRequest) -> AnalysisResponse:
60
+ settings = get_settings()
61
+ detector_model = None
62
 
63
+ if isinstance(request, TextAnalysisRequest):
64
+ detector_model = request.model or settings.DEFAULT_DETECTOR_MODEL
65
+ logger.info(f"Received text analysis request, length: {len(request.text)} chars, model: {detector_model}")
66
+
67
+ try:
68
+ detector = get_detector(detector_model)
69
+ except ValueError as e:
70
+ logger.error(f"Invalid detector model: {str(e)}")
71
+ raise HTTPException(status_code=400, detail=str(e))
72
 
73
+ text_bytes = request.text.encode('utf-8')
74
+ analysis_result = await detector.detect(text_bytes)
75
 
76
+ logger.info(f"Text analysis completed. Result: {analysis_result}")
77
+
78
+ return AnalysisResponse(
79
+ is_deepfake=analysis_result["is_deepfake"],
80
+ confidence=analysis_result["confidence"],
81
+ analysis_time=analysis_result["analysis_time"],
82
+ model_used=detector_model,
83
+ content_type="text",
84
+ )
 
 
85
 
86
+ elif isinstance(request, ImageAnalysisRequest):
87
+ detector_model = request.model or settings.DEFAULT_DETECTOR_MODEL
88
+ logger.info(f"Received image analysis request for URL: {request.image_url}, model: {detector_model}")
89
+
90
  try:
91
  detector = get_detector(detector_model)
92
  except ValueError as e:
93
  logger.error(f"Invalid detector model: {str(e)}")
94
+ raise HTTPException(status_code=400, detail=str(e))
 
 
 
95
 
96
+ try:
97
+ image_bytes = await download_file(str(request.image_url))
98
+ if not image_bytes:
99
+ raise HTTPException(status_code=500, detail="Failed to download image")
100
+ except DeepfakeDetectionError as e:
101
+ raise HTTPException(status_code=e.status_code, detail=e.message)
102
 
103
+ analysis_result = await detector.detect(image_bytes)
 
 
 
 
 
104
 
105
+ logger.info(f"Image analysis completed. Result: {analysis_result}")
106
 
107
+ return AnalysisResponse(
108
+ is_deepfake=analysis_result["is_deepfake"],
109
+ confidence=analysis_result["confidence"],
110
+ analysis_time=analysis_result["analysis_time"],
111
+ model_used=detector_model,
112
+ content_type="image",
113
  )
114
+
115
+ elif isinstance(request, VideoAnalysisRequest):
116
+ detector_model = request.model or settings.DEFAULT_DETECTOR_MODEL
117
+ logger.info(f"Received video analysis request for URL: {request.video_url}, model: {detector_model}")
118
+
119
+ try:
120
+ detector = get_detector(detector_model)
121
+ except ValueError as e:
122
+ logger.error(f"Invalid detector model: {str(e)}")
123
+ raise HTTPException(status_code=400, detail=str(e))
124
+
125
+ try:
126
+ video_bytes = await download_file(str(request.video_url))
127
+ if not video_bytes:
128
+ raise HTTPException(status_code=500, detail="Failed to download video")
129
+ except DeepfakeDetectionError as e:
130
+ raise HTTPException(status_code=e.status_code, detail=e.message)
131
+
132
+ analysis_result = await detector.detect(video_bytes)
133
+
134
+ logger.info(f"Video analysis completed. Result: {analysis_result}")
135
 
136
  return AnalysisResponse(
137
  is_deepfake=analysis_result["is_deepfake"],
138
  confidence=analysis_result["confidence"],
139
  analysis_time=analysis_result["analysis_time"],
140
  model_used=detector_model,
141
+ content_type="video",
142
  )
143
+
144
+ elif isinstance(request, FileAnalysisRequest):
145
+ detector_model = request.model or settings.DEFAULT_DETECTOR_MODEL
146
+ logger.info(f"Received file analysis request for URL: {request.file_url}, model: {detector_model}")
147
 
148
+ try:
149
+ detector = get_detector(detector_model)
150
+ except ValueError as e:
151
+ logger.error(f"Invalid detector model: {str(e)}")
152
+ raise HTTPException(status_code=400, detail=str(e))
153
+
154
+ try:
155
+ file_bytes = await download_file(str(request.file_url))
156
+ if not file_bytes:
157
+ raise HTTPException(status_code=500, detail="Failed to download file")
158
+ except DeepfakeDetectionError as e:
159
+ raise HTTPException(status_code=e.status_code, detail=e.message)
160
+
161
+ analysis_result = await detector.detect(file_bytes)
162
+
163
+ logger.info(f"File analysis completed. Result: {analysis_result}")
164
+
165
+ return AnalysisResponse(
166
+ is_deepfake=analysis_result["is_deepfake"],
167
+ confidence=analysis_result["confidence"],
168
+ analysis_time=analysis_result["analysis_time"],
169
+ model_used=detector_model,
170
+ content_type="file",
171
  )
172
+
173
+ else:
174
+ raise HTTPException(status_code=400, detail="Unsupported content type")
backend/app/models/schemas.py CHANGED
@@ -1,41 +1,81 @@
1
  from pydantic import BaseModel, HttpUrl, Field
2
- from typing import Optional
3
 
4
 
5
- class AnalysisRequest(BaseModel):
6
- """Request model for deepfake analysis."""
 
 
7
 
8
- file_url: HttpUrl = Field(
9
- ..., description="URL of the file to analyze for deepfake detection"
10
- )
11
- model: Optional[str] = Field(
12
- None, description="Detector model to use (e.g., 'mock', 'deepseek', 'openai')"
13
- )
 
 
 
 
 
 
 
 
14
 
15
  class Config:
16
  json_schema_extra = {
17
  "example": {
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
  "file_url": "https://example.com/video.mp4",
19
  "model": "mock"
20
  }
21
  }
22
 
23
 
 
 
 
 
 
 
 
 
24
  class AnalysisResponse(BaseModel):
25
- """Response model for deepfake analysis results."""
26
-
27
- is_deepfake: bool = Field(
28
- ..., description="Whether the file is detected as a deepfake"
29
- )
30
- confidence: float = Field(
31
- ..., ge=0.0, le=1.0, description="Confidence score between 0.0 and 1.0"
32
- )
33
- analysis_time: float = Field(
34
- ..., description="Time taken for analysis in seconds"
35
- )
36
- model_used: str = Field(
37
- ..., description="The detector model that was used"
38
- )
39
 
40
  class Config:
41
  json_schema_extra = {
@@ -43,34 +83,30 @@ class AnalysisResponse(BaseModel):
43
  "is_deepfake": True,
44
  "confidence": 0.847,
45
  "analysis_time": 1.234,
46
- "model_used": "mock"
 
47
  }
48
  }
49
 
50
 
51
  class ErrorResponse(BaseModel):
52
- """Response model for errors."""
53
-
54
  error: str = Field(..., description="Error message")
55
  status_code: int = Field(..., description="HTTP status code")
56
- details: Optional[str] = Field(
57
- None, description="Additional error details"
58
- )
59
 
60
  class Config:
61
  json_schema_extra = {
62
  "example": {
63
  "error": "Invalid URL format",
64
  "status_code": 400,
65
- "details": "The provided file_url is not a valid URL"
66
  }
67
  }
68
 
69
 
70
  class HealthResponse(BaseModel):
71
- """Response model for health check."""
72
-
73
  status: str = Field(..., description="Service status")
74
  service: str = Field(..., description="Service name")
75
  version: str = Field(..., description="Service version")
76
  available_models: list = Field(..., description="Available detector models")
 
 
1
  from pydantic import BaseModel, HttpUrl, Field
2
+ from typing import Optional, Union, Literal
3
 
4
 
5
+ class TextAnalysisRequest(BaseModel):
6
+ content_type: Literal["text"]
7
+ text: str = Field(..., description="Text content to analyze for deepfake detection")
8
+ model: Optional[str] = Field(None, description="Detector model to use")
9
 
10
+ class Config:
11
+ json_schema_extra = {
12
+ "example": {
13
+ "content_type": "text",
14
+ "text": "Some text that might be AI-generated",
15
+ "model": "mock"
16
+ }
17
+ }
18
+
19
+
20
+ class ImageAnalysisRequest(BaseModel):
21
+ content_type: Literal["image"]
22
+ image_url: HttpUrl = Field(..., description="URL of the image to analyze")
23
+ model: Optional[str] = Field(None, description="Detector model to use")
24
 
25
  class Config:
26
  json_schema_extra = {
27
  "example": {
28
+ "content_type": "image",
29
+ "image_url": "https://example.com/image.jpg",
30
+ "model": "mock"
31
+ }
32
+ }
33
+
34
+
35
+ class VideoAnalysisRequest(BaseModel):
36
+ content_type: Literal["video"]
37
+ video_url: HttpUrl = Field(..., description="URL of the video to analyze")
38
+ model: Optional[str] = Field(None, description="Detector model to use")
39
+
40
+ class Config:
41
+ json_schema_extra = {
42
+ "example": {
43
+ "content_type": "video",
44
+ "video_url": "https://example.com/video.mp4",
45
+ "model": "mock"
46
+ }
47
+ }
48
+
49
+
50
+ class FileAnalysisRequest(BaseModel):
51
+ content_type: Literal["file"]
52
+ file_url: HttpUrl = Field(..., description="URL of the file to analyze")
53
+ model: Optional[str] = Field(None, description="Detector model to use")
54
+
55
+ class Config:
56
+ json_schema_extra = {
57
+ "example": {
58
+ "content_type": "file",
59
  "file_url": "https://example.com/video.mp4",
60
  "model": "mock"
61
  }
62
  }
63
 
64
 
65
+ AnalysisRequest = Union[
66
+ TextAnalysisRequest,
67
+ ImageAnalysisRequest,
68
+ VideoAnalysisRequest,
69
+ FileAnalysisRequest,
70
+ ]
71
+
72
+
73
  class AnalysisResponse(BaseModel):
74
+ is_deepfake: bool = Field(..., description="Whether the content is detected as a deepfake")
75
+ confidence: float = Field(..., ge=0.0, le=1.0, description="Confidence score between 0.0 and 1.0")
76
+ analysis_time: float = Field(..., description="Time taken for analysis in seconds")
77
+ model_used: str = Field(..., description="The detector model that was used")
78
+ content_type: str = Field(..., description="Type of content analyzed (text/image/video/file)")
 
 
 
 
 
 
 
 
 
79
 
80
  class Config:
81
  json_schema_extra = {
 
83
  "is_deepfake": True,
84
  "confidence": 0.847,
85
  "analysis_time": 1.234,
86
+ "model_used": "mock",
87
+ "content_type": "image"
88
  }
89
  }
90
 
91
 
92
  class ErrorResponse(BaseModel):
 
 
93
  error: str = Field(..., description="Error message")
94
  status_code: int = Field(..., description="HTTP status code")
95
+ details: Optional[str] = Field(None, description="Additional error details")
 
 
96
 
97
  class Config:
98
  json_schema_extra = {
99
  "example": {
100
  "error": "Invalid URL format",
101
  "status_code": 400,
102
+ "details": "The provided URL is not valid"
103
  }
104
  }
105
 
106
 
107
  class HealthResponse(BaseModel):
 
 
108
  status: str = Field(..., description="Service status")
109
  service: str = Field(..., description="Service name")
110
  version: str = Field(..., description="Service version")
111
  available_models: list = Field(..., description="Available detector models")
112
+ supported_types: list = Field(..., description="Supported content types")
backend/app/services/image_analyzer.py ADDED
@@ -0,0 +1,26 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import time
3
+ from typing import Dict, Any
4
+
5
+ logger = logging.getLogger(__name__)
6
+
7
+
8
+ async def analyze_image(image_bytes: bytes) -> Dict[str, Any]:
9
+ start_time = time.time()
10
+
11
+ logger.info(f"Starting image analysis, size: {len(image_bytes)} bytes")
12
+
13
+ image_hash = hash(image_bytes) % 100
14
+ is_deepfake = image_hash > 50
15
+ confidence = (image_hash % 100) / 100.0
16
+
17
+ analysis_time = time.time() - start_time
18
+
19
+ result = {
20
+ "is_deepfake": is_deepfake,
21
+ "confidence": round(confidence, 3),
22
+ "analysis_time": round(analysis_time, 3),
23
+ }
24
+
25
+ logger.info(f"Image analysis completed. Result: {result}")
26
+ return result
backend/app/services/text_analyzer.py ADDED
@@ -0,0 +1,26 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import time
3
+ from typing import Dict, Any
4
+
5
+ logger = logging.getLogger(__name__)
6
+
7
+
8
+ async def analyze_text(text: str) -> Dict[str, Any]:
9
+ start_time = time.time()
10
+
11
+ logger.info(f"Starting text analysis, length: {len(text)} chars")
12
+
13
+ text_hash = hash(text) % 100
14
+ is_deepfake = text_hash > 50
15
+ confidence = (text_hash % 100) / 100.0
16
+
17
+ analysis_time = time.time() - start_time
18
+
19
+ result = {
20
+ "is_deepfake": is_deepfake,
21
+ "confidence": round(confidence, 3),
22
+ "analysis_time": round(analysis_time, 3),
23
+ }
24
+
25
+ logger.info(f"Text analysis completed. Result: {result}")
26
+ return result