broskiiii commited on
Commit
4daf9cd
·
1 Parent(s): a3c1b43

Integrate VirusTotal for URL and file scanning. Add file analysis route.

Browse files
app/config.py CHANGED
@@ -5,6 +5,7 @@ load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), "..", ".env"))
5
 
6
  GEMINI_API_KEY: str = os.environ["GEMINI_API_KEY"]
7
  HUGGING_FACE_TOKEN: str = os.environ["HUGGING_FACE_TOKEN"]
 
8
 
9
  HF_IMAGE_MODEL = "dima806/deepfake_vs_real_image_detection"
10
  HF_AUDIO_MODEL = "mo-thecreator/Deepfake-audio-detection"
 
5
 
6
  GEMINI_API_KEY: str = os.environ["GEMINI_API_KEY"]
7
  HUGGING_FACE_TOKEN: str = os.environ["HUGGING_FACE_TOKEN"]
8
+ VIRUS_TOTAL_API_KEY: str = os.environ.get("VIRUS_TOTAL", "")
9
 
10
  HF_IMAGE_MODEL = "dima806/deepfake_vs_real_image_detection"
11
  HF_AUDIO_MODEL = "mo-thecreator/Deepfake-audio-detection"
app/main.py CHANGED
@@ -8,7 +8,7 @@ from fastapi.staticfiles import StaticFiles
8
  from fastapi.responses import FileResponse
9
  import os
10
 
11
- from app.routers import text, image, video, audio
12
 
13
  app = FastAPI(
14
  title="Anti-Phishing AI Backend",
@@ -27,6 +27,7 @@ app.include_router(text.router, prefix="/analyze", tags=["Text"])
27
  app.include_router(image.router, prefix="/analyze", tags=["Image"])
28
  app.include_router(video.router, prefix="/analyze", tags=["Video"])
29
  app.include_router(audio.router, prefix="/analyze", tags=["Audio"])
 
30
 
31
  FRONTEND_DIR = os.path.join(os.path.dirname(__file__), "..", "frontend")
32
 
 
8
  from fastapi.responses import FileResponse
9
  import os
10
 
11
+ from app.routers import text, image, video, audio, file
12
 
13
  app = FastAPI(
14
  title="Anti-Phishing AI Backend",
 
27
  app.include_router(image.router, prefix="/analyze", tags=["Image"])
28
  app.include_router(video.router, prefix="/analyze", tags=["Video"])
29
  app.include_router(audio.router, prefix="/analyze", tags=["Audio"])
30
+ app.include_router(file.router, prefix="/analyze", tags=["File"])
31
 
32
  FRONTEND_DIR = os.path.join(os.path.dirname(__file__), "..", "frontend")
33
 
app/routers/file.py ADDED
@@ -0,0 +1,47 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, HTTPException, UploadFile, File
2
+ from app.models import AnalysisResult
3
+ from app.tools.virustotal_tools import scan_file_virustotal
4
+
5
+ router = APIRouter()
6
+
7
+ def _risk_level(score: float) -> str:
8
+ if score < 0.3:
9
+ return "LOW"
10
+ elif score < 0.6:
11
+ return "MEDIUM"
12
+ elif score < 0.85:
13
+ return "HIGH"
14
+ return "CRITICAL"
15
+
16
+ @router.post("/file", response_model=AnalysisResult)
17
+ async def analyze_file(file: UploadFile = File(...)):
18
+ try:
19
+ content = await file.read()
20
+ vt_result = scan_file_virustotal(content, file.filename)
21
+
22
+ # Calculate a simple risk score based on VT malicious count
23
+ # 0 malicious = 0 score, 5+ malicious = 1.0 score
24
+ malicious = vt_result.get("malicious", 0)
25
+ risk_score = min(malicious / 5.0, 1.0)
26
+
27
+ threat_types = []
28
+ if malicious > 0:
29
+ threat_types.append("malicious_file")
30
+ if vt_result.get("suspicious", 0) > 0:
31
+ threat_types.append("suspicious_file")
32
+
33
+ explanation = f"VirusTotal analysis for {file.filename}: {malicious} malicious detections."
34
+ if "status" in vt_result:
35
+ explanation = vt_result["message"]
36
+
37
+ return AnalysisResult(
38
+ risk_score=risk_score,
39
+ risk_level=_risk_level(risk_score),
40
+ threat_types=threat_types,
41
+ explanation=explanation,
42
+ tool_outputs={"virustotal_file": vt_result}
43
+ )
44
+ except Exception as e:
45
+ raise HTTPException(status_code=500, detail=str(e))
46
+ finally:
47
+ await file.close()
app/tools/text_tools.py CHANGED
@@ -17,8 +17,10 @@ def extract_urls(text: str) -> list[str]:
17
  return _SUSPICIOUS_TLD.findall(text)
18
 
19
 
20
- def score_url(url: str) -> dict:
21
  from urllib.parse import urlparse
 
 
22
  parsed = urlparse(url)
23
  domain = parsed.netloc.lower()
24
  flags = []
@@ -39,12 +41,22 @@ def score_url(url: str) -> dict:
39
  flags.append("ip_address_url")
40
  is_suspicious = True
41
 
42
- return {"url": url, "suspicious": is_suspicious, "flags": flags}
 
 
 
 
 
 
 
 
 
43
 
44
 
45
  def analyze_urls_in_text(text: str) -> dict:
46
  urls = extract_urls(text)
47
- scored = [score_url(u) for u in urls]
 
48
  suspicious_count = sum(1 for s in scored if s["suspicious"])
49
  return {
50
  "urls_found": len(urls),
 
17
  return _SUSPICIOUS_TLD.findall(text)
18
 
19
 
20
+ def score_url(url: str, use_vt: bool = True) -> dict:
21
  from urllib.parse import urlparse
22
+ from app.tools.virustotal_tools import scan_url_virustotal
23
+
24
  parsed = urlparse(url)
25
  domain = parsed.netloc.lower()
26
  flags = []
 
41
  flags.append("ip_address_url")
42
  is_suspicious = True
43
 
44
+ vt_result = {}
45
+ if use_vt:
46
+ vt_data = scan_url_virustotal(url)
47
+ if "malicious" in vt_data:
48
+ vt_result = vt_data
49
+ if vt_data["malicious"] > 0:
50
+ flags.append(f"virustotal_malicious:{vt_data['malicious']}")
51
+ is_suspicious = True
52
+
53
+ return {"url": url, "suspicious": is_suspicious, "flags": flags, "virustotal": vt_result}
54
 
55
 
56
  def analyze_urls_in_text(text: str) -> dict:
57
  urls = extract_urls(text)
58
+ # Only use VT for top 3 URLs to avoid long wait times and rate limits
59
+ scored = [score_url(u, use_vt=(i < 3)) for i, u in enumerate(urls)]
60
  suspicious_count = sum(1 for s in scored if s["suspicious"])
61
  return {
62
  "urls_found": len(urls),
app/tools/virustotal_tools.py ADDED
@@ -0,0 +1,95 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ import base64
3
+ import hashlib
4
+ import time
5
+ from app.config import VIRUS_TOTAL_API_KEY
6
+
7
+ def get_url_id(url: str) -> str:
8
+ """VT v3 uses base64 without padding for URL IDs."""
9
+ return base64.urlsafe_b64encode(url.encode()).decode().strip("=")
10
+
11
+ def scan_url_virustotal(url: str) -> dict:
12
+ if not VIRUS_TOTAL_API_KEY:
13
+ return {"error": "VirusTotal API key not configured"}
14
+
15
+ url_id = get_url_id(url)
16
+ endpoint = f"https://www.virustotal.com/api/v3/urls/{url_id}"
17
+ headers = {
18
+ "x-apikey": VIRUS_TOTAL_API_KEY
19
+ }
20
+
21
+ try:
22
+ response = requests.get(endpoint, headers=headers)
23
+ if response.status_code == 200:
24
+ data = response.json()
25
+ stats = data.get("data", {}).get("attributes", {}).get("last_analysis_stats", {})
26
+ return {
27
+ "malicious": stats.get("malicious", 0),
28
+ "suspicious": stats.get("suspicious", 0),
29
+ "harmless": stats.get("harmless", 0),
30
+ "undetected": stats.get("undetected", 0),
31
+ "link": f"https://www.virustotal.com/gui/url/{url_id}"
32
+ }
33
+ elif response.status_code == 404:
34
+ return {"status": "not_found", "message": "URL not previously scanned by VirusTotal"}
35
+ else:
36
+ return {"error": f"VirusTotal API returned {response.status_code}", "details": response.text}
37
+ except Exception as e:
38
+ return {"error": str(e)}
39
+
40
+ def scan_file_virustotal(file_content: bytes, filename: str) -> dict:
41
+ """
42
+ Scans a file using VirusTotal v3 API.
43
+ First checks by hash, then uploads if not found.
44
+ """
45
+ if not VIRUS_TOTAL_API_KEY:
46
+ return {"error": "VirusTotal API key not configured"}
47
+
48
+ sha256_hash = hashlib.sha256(file_content).hexdigest()
49
+ headers = {"x-apikey": VIRUS_TOTAL_API_KEY}
50
+
51
+ # 1. Check if hash already exists
52
+ endpoint = f"https://www.virustotal.com/api/v3/files/{sha256_hash}"
53
+ try:
54
+ response = requests.get(endpoint, headers=headers)
55
+ if response.status_code == 200:
56
+ return _parse_vt_file_response(response.json())
57
+
58
+ # 2. If not found, upload the file
59
+ if response.status_code == 404:
60
+ upload_url = "https://www.virustotal.com/api/v3/files"
61
+ # For files > 32MB, we'd need a special upload URL, but let's assume standard for now
62
+ files = {"file": (filename, file_content)}
63
+ upload_response = requests.post(upload_url, headers=headers, files=files)
64
+
65
+ if upload_response.status_code == 200:
66
+ analysis_id = upload_response.json().get("data", {}).get("id")
67
+ # In a real app, we might poll or return the ID.
68
+ # For this tool, let's wait a few seconds and try to get the report by hash
69
+ # usually hash report is available shortly after upload if it's small
70
+ time.sleep(2)
71
+ # Re-check by hash
72
+ response = requests.get(endpoint, headers=headers)
73
+ if response.status_code == 200:
74
+ return _parse_vt_file_response(response.json())
75
+ return {"status": "queued", "analysis_id": analysis_id, "message": "File uploaded, analysis in progress."}
76
+ else:
77
+ return {"error": f"Upload failed: {upload_response.status_code}", "details": upload_response.text}
78
+
79
+ return {"error": f"VT API error: {response.status_code}", "details": response.text}
80
+ except Exception as e:
81
+ return {"error": str(e)}
82
+
83
+ def _parse_vt_file_response(data: dict) -> dict:
84
+ attributes = data.get("data", {}).get("attributes", {})
85
+ stats = attributes.get("last_analysis_stats", {})
86
+ return {
87
+ "malicious": stats.get("malicious", 0),
88
+ "suspicious": stats.get("suspicious", 0),
89
+ "harmless": stats.get("harmless", 0),
90
+ "undetected": stats.get("undetected", 0),
91
+ "file_type": attributes.get("type_description", "unknown"),
92
+ "size": attributes.get("size", 0),
93
+ "sha256": attributes.get("sha256", ""),
94
+ "link": f"https://www.virustotal.com/gui/file/{attributes.get('sha256')}"
95
+ }
tests/test_file_vt.py ADDED
@@ -0,0 +1,31 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import sys
3
+ import hashlib
4
+
5
+ # Add the project root to sys.path
6
+ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
7
+
8
+ from app.tools.virustotal_tools import scan_file_virustotal
9
+
10
+ def test_file_scan_hash():
11
+ # EICAR test file content (standard for testing antivirus)
12
+ eicar_content = b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*'
13
+ print(f"Testing VT for EICAR file hash: {hashlib.sha256(eicar_content).hexdigest()}")
14
+ result = scan_file_virustotal(eicar_content, "eicar.com")
15
+ print("VT Result (EICAR):")
16
+ import json
17
+ print(json.dumps(result, indent=2))
18
+
19
+ def test_file_scan_upload():
20
+ # A unique small file to trigger upload
21
+ import time
22
+ unique_content = f"This is a unique test file created at {time.time()}".encode()
23
+ print(f"\nTesting VT for unique file upload (hash: {hashlib.sha256(unique_content).hexdigest()})")
24
+ result = scan_file_virustotal(unique_content, "test.txt")
25
+ print("VT Result (Unique Upload):")
26
+ import json
27
+ print(json.dumps(result, indent=2))
28
+
29
+ if __name__ == "__main__":
30
+ test_file_scan_hash()
31
+ test_file_scan_upload()
tests/test_vt.py ADDED
@@ -0,0 +1,25 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import sys
3
+
4
+ # Add the project root to sys.path
5
+ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
6
+
7
+ from app.tools.virustotal_tools import scan_url_virustotal
8
+ from app.tools.text_tools import analyze_urls_in_text
9
+
10
+ def test_vt():
11
+ url = "http://malware.testing.google.test/testing/malware/"
12
+ print(f"Testing VT for URL: {url}")
13
+ result = scan_url_virustotal(url)
14
+ print("VT Result:", result)
15
+
16
+ def test_text_analysis():
17
+ text = "Please check this link: http://malware.testing.google.test/testing/malware/"
18
+ print(f"\nTesting text analysis for: {text}")
19
+ result = analyze_urls_in_text(text)
20
+ import json
21
+ print("Analysis Result:", json.dumps(result, indent=2))
22
+
23
+ if __name__ == "__main__":
24
+ test_vt()
25
+ test_text_analysis()