vvillarreal-cfee commited on
Commit
1383685
·
verified ·
1 Parent(s): c2463c7

feat(agent): Agregamos soporte para VirusTotal IP report.

Browse files
Files changed (2) hide show
  1. src/custom_tools.py +41 -14
  2. src/incident_crew.py +4 -3
src/custom_tools.py CHANGED
@@ -1,4 +1,5 @@
1
  import requests
 
2
  from crewai.tools import BaseTool
3
  from typing import Type, List
4
  from pydantic import BaseModel, Field
@@ -42,22 +43,48 @@ class IPInfoGeoLookup(BaseTool):
42
 
43
  return "\n".join(results)
44
 
45
- # --- 2. VirusTotal Scanner Tool ---
46
- class VirusTotalToolInput(BaseModel):
47
- """Input schema for VirusTotal Scanner Tool."""
48
- resources: List[str] = Field(
49
  ...,
50
- description="A list of resources to scan. Valid resources include file hash (MD5, SHA256), a URL, a domain, or an IP address."
51
  )
52
 
53
- class VirusTotalScanner(BaseTool):
54
  # See https://blog.virustotal.com/2024/08/VT-S1-EffectiveResearch.html
55
  # See https://docs.virustotal.com/reference/ip-info
56
- name: str = "VirusTotal Scanner"
57
- description: str = "Analyzes files, URLs, domains, or IP addresses for malware and other security threats using multiple antivirus engines and reputation services. Provides a detailed security analysis report."
58
- args_schema: Type[BaseModel] = VirusTotalToolInput
59
-
60
- def _run(self, resources: List[str]) -> str:
61
- # Placeholder logic for the actual VirusTotal API call
62
- print(f"DEBUG: Running VirusTotal Scan for {resources}")
63
- return f"Simulation: Resource {resources} analyzed. 0/90 engines flagged it as malicious. Reputation: Clean."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import requests
2
+ import os
3
  from crewai.tools import BaseTool
4
  from typing import Type, List
5
  from pydantic import BaseModel, Field
 
43
 
44
  return "\n".join(results)
45
 
46
+ # --- 2. VirusTotal IP Report ---
47
+ class VirusTotalIPReportInput(BaseModel):
48
+ """Input schema for VirusTotal IP Report Tool."""
49
+ ip_address: str = Field(
50
  ...,
51
+ description="The IP address to scan. Valid resources include IPv4 or IPv6 address."
52
  )
53
 
54
+ class VirusTotalIPReport(BaseTool):
55
  # See https://blog.virustotal.com/2024/08/VT-S1-EffectiveResearch.html
56
  # See https://docs.virustotal.com/reference/ip-info
57
+ name: str = "VirusTotal IP Report"
58
+ description: str = "Analyzes an IP address for malware and other security threats using multiple antivirus engines and reputation services. Provides a detailed security analysis report."
59
+ args_schema: Type[BaseModel] = VirusTotalIPReportInput
60
+
61
+ def _run(self, ip_address: str) -> str:
62
+ api_key = os.getenv("VT_APIKEY")
63
+ report = []
64
+
65
+ if not api_key:
66
+ return "Tool error: The VT_APIKEY environment variable is not set."
67
+
68
+ try:
69
+ url = f"https://www.virustotal.com/api/v3/ip_addresses/{ip_address}"
70
+ headers = {"x-apikey": api_key}
71
+ response = requests.get(url, headers=headers)
72
+ response.raise_for_status()
73
+ data = response.json()
74
+
75
+ # Formateamos la salida para que sea legible y útil para el agente
76
+ result_str = (
77
+ f"IP: {data.get('data', {}).get('id')}",
78
+ f"Country: {data.get('data', {}).get('attributes', {}).get('country')}",
79
+ f"ASN Owner: {data.get('data', {}).get('attributes', {}).get('as_owner')}",
80
+ f"Reputation: {data.get('data', {}).get('attributes', {}).get('reputation')}",
81
+ f"Last Analysis Date: {data.get('data', {}).get('attributes', {}).get('last_analysis_date')}",
82
+ f"Malicious Detections: {data.get('data', {}).get('attributes', {}).get('last_analysis_stats', {}).get('malicious')}",
83
+ f"Harmless Detections: {data.get('data', {}).get('attributes', {}).get('last_analysis_stats', {}).get('harmless')}",
84
+ f"Suspicious Detections: {data.get('data', {}).get('attributes', {}).get('last_analysis_stats', {}).get('suspicious')}"
85
+ )
86
+
87
+ return "\n".join(result_str)
88
+
89
+ except requests.exceptions.RequestException as e:
90
+ return f"Error al consultar la IP {ip_address}: {e}"
src/incident_crew.py CHANGED
@@ -1,5 +1,5 @@
1
  from crewai import Agent, Task, Crew, Process, LLM
2
- from custom_tools import IPInfoGeoLookup
3
 
4
  class IncidentReporterCrew:
5
  def __init__(self, api_key):
@@ -7,7 +7,8 @@ class IncidentReporterCrew:
7
 
8
  self.llm = LLM(
9
  #model="llama-3.3-70b-versatile",
10
- model="openai/gpt-oss-120b:free",
 
11
  temperature=0.7,
12
  provider="openai",
13
  )
@@ -18,7 +19,7 @@ class IncidentReporterCrew:
18
  goal="Rapidly identify, extract, and analyze key Indicators of Compromise (IOCs) from initial security alerts to provide structured and actionable intelligence.",
19
  backstory="Highly specialized in threat intelligence, capable of parsing raw incident data, identifying patterns, and performing virtual lookups on security intelligence databases (like VirusTotal, IPInfo, etc.). Your output must be a concise, structured summary.",
20
  verbose=True,
21
- tools=[IPInfoGeoLookup()],
22
  allow_delegation=False,
23
  llm=self.llm,
24
  )
 
1
  from crewai import Agent, Task, Crew, Process, LLM
2
+ from custom_tools import IPInfoGeoLookup, VirusTotalIPReport
3
 
4
  class IncidentReporterCrew:
5
  def __init__(self, api_key):
 
7
 
8
  self.llm = LLM(
9
  #model="llama-3.3-70b-versatile",
10
+ #model="openai/gpt-oss-120b:free",
11
+ model="google/gemini-2.5-flash",
12
  temperature=0.7,
13
  provider="openai",
14
  )
 
19
  goal="Rapidly identify, extract, and analyze key Indicators of Compromise (IOCs) from initial security alerts to provide structured and actionable intelligence.",
20
  backstory="Highly specialized in threat intelligence, capable of parsing raw incident data, identifying patterns, and performing virtual lookups on security intelligence databases (like VirusTotal, IPInfo, etc.). Your output must be a concise, structured summary.",
21
  verbose=True,
22
+ tools=[IPInfoGeoLookup(), VirusTotalIPReport()],
23
  allow_delegation=False,
24
  llm=self.llm,
25
  )