abhisheksan commited on
Commit
8487406
·
1 Parent(s): 4e3157e

Refactor file path for loading deepfake video detection model

Browse files

Refactor forgery detection process and improve result aggregation
Refactor audio deepfake service to improve error handling and result reporting
Add route for comparing hashes

app/api/routes.py CHANGED
@@ -2,7 +2,9 @@ from fastapi import APIRouter, HTTPException, Response
2
  from pydantic import BaseModel
3
  from app.services import video_service, image_service, antispoof_service
4
  from app.services.antispoof_service import antispoof_service
 
5
  from app.services.image_service import compare_images
 
6
  import logging
7
  import os
8
 
@@ -15,6 +17,11 @@ class CompareRequest(BaseModel):
15
  url1: str
16
  url2: str
17
 
 
 
 
 
 
18
  SUPPORTED_VIDEO_FORMATS = ['mp4', 'avi', 'mov', 'flv', 'wmv']
19
  SUPPORTED_IMAGE_FORMATS = ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'tiff', 'webp']
20
 
@@ -96,4 +103,18 @@ async def compare_images_route(request: CompareRequest):
96
  return {"message": "Image comparison completed", "result": result}
97
  except Exception as e:
98
  logging.error(f"Error in image comparison: {str(e)}")
99
- raise HTTPException(status_code=500, detail=f"Error in image comparison: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
  from pydantic import BaseModel
3
  from app.services import video_service, image_service, antispoof_service
4
  from app.services.antispoof_service import antispoof_service
5
+ from app.services.hash_comparison_service import compare_hash_with_array
6
  from app.services.image_service import compare_images
7
+ from typing import List
8
  import logging
9
  import os
10
 
 
17
  url1: str
18
  url2: str
19
 
20
+ class CompareHashesRequest(BaseModel):
21
+ hash_to_compare: str
22
+ hash_array: List[str]
23
+ file_type: str
24
+
25
  SUPPORTED_VIDEO_FORMATS = ['mp4', 'avi', 'mov', 'flv', 'wmv']
26
  SUPPORTED_IMAGE_FORMATS = ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'tiff', 'webp']
27
 
 
103
  return {"message": "Image comparison completed", "result": result}
104
  except Exception as e:
105
  logging.error(f"Error in image comparison: {str(e)}")
106
+ raise HTTPException(status_code=500, detail=f"Error in image comparison: {str(e)}")
107
+
108
+
109
+ @router.post("/compare_hashes")
110
+ async def compare_hashes_route(request: CompareHashesRequest):
111
+ try:
112
+ comparison_results = compare_hash_with_array(request.hash_to_compare, request.hash_array, request.file_type)
113
+ return {
114
+ "message": comparison_results["message"],
115
+ "results": comparison_results["results"],
116
+ "matching_hash": comparison_results["matching_hash"]
117
+ }
118
+ except Exception as e:
119
+ logging.error(f"Error in hash comparison: {str(e)}")
120
+ raise HTTPException(status_code=500, detail=f"Error in hash comparison: {str(e)}")
app/services/hash_comparison_service.py ADDED
@@ -0,0 +1,86 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import re
2
+ from typing import List, Dict, Any
3
+ import imagehash
4
+
5
+ def identify_hash_type(hash_value: str) -> str:
6
+ # Image hashes are typically 64 characters long and contain only 0 and 1
7
+ if len(hash_value) == 64 and set(hash_value).issubset({'0', '1'}):
8
+ return 'image'
9
+ # Video hashes are typically 16 characters long and contain hexadecimal values
10
+ elif len(hash_value) == 16 and re.match(r'^[0-9a-fA-F]+$', hash_value):
11
+ return 'video'
12
+ else:
13
+ return 'unknown'
14
+
15
+ def compare_hashes(hash1: str, hash2: str, hash_type: str) -> Dict[str, Any]:
16
+ if hash_type == 'image':
17
+ return compare_image_hashes(hash1, hash2)
18
+ elif hash_type == 'video':
19
+ return compare_video_hashes(hash1, hash2)
20
+ else:
21
+ return {"error": "Unknown hash type"}
22
+
23
+ def compare_image_hashes(hash1: str, hash2: str) -> Dict[str, Any]:
24
+ h1 = imagehash.hex_to_hash(hash1)
25
+ h2 = imagehash.hex_to_hash(hash2)
26
+ distance = h1 - h2
27
+ similarity = 1 - (distance / 64.0)
28
+ return {
29
+ "hash1": hash1,
30
+ "hash2": hash2,
31
+ "are_similar": similarity > 0.8, # Using 0.8 as the threshold
32
+ "similarity": similarity,
33
+ "hamming_distance": distance
34
+ }
35
+
36
+ def compare_video_hashes(hash1: str, hash2: str) -> Dict[str, Any]:
37
+ h1 = int(hash1, 16)
38
+ h2 = int(hash2, 16)
39
+ distance = bin(h1 ^ h2).count('1')
40
+ similarity = 1 - (distance / 64.0)
41
+ return {
42
+ "hash1": hash1,
43
+ "hash2": hash2,
44
+ "are_similar": similarity > 0.8, # Using 0.8 as the threshold
45
+ "similarity": similarity,
46
+ "hamming_distance": distance
47
+ }
48
+
49
+ def compare_hash_with_array(hash_to_compare: str, hash_array: List[str], file_type: str) -> Dict[str, Any]:
50
+ results = []
51
+ matching_hash = ""
52
+
53
+ if not hash_array:
54
+ return {
55
+ "results": [],
56
+ "matching_hash": "",
57
+ "message": "The provided hash array is empty."
58
+ }
59
+
60
+ for hash_value in hash_array:
61
+ hash_type = identify_hash_type(hash_value)
62
+ if hash_type == file_type:
63
+ comparison_result = compare_hashes(hash_to_compare, hash_value, file_type)
64
+ results.append(comparison_result)
65
+
66
+ if comparison_result['are_similar']:
67
+ matching_hash = hash_value
68
+ break # Stop after finding the first similar hash
69
+
70
+ if not results:
71
+ return {
72
+ "results": [],
73
+ "matching_hash": "",
74
+ "message": f"No hashes of type '{file_type}' found in the provided array."
75
+ }
76
+
77
+ if matching_hash:
78
+ message = "Hash comparison completed successfully. A matching hash was found."
79
+ else:
80
+ message = "Hash comparison completed successfully. No sufficiently similar hash was found."
81
+
82
+ return {
83
+ "results": results,
84
+ "matching_hash": matching_hash,
85
+ "message": message
86
+ }