File size: 2,807 Bytes
b8d9e47
 
 
 
9fceb39
b8d9e47
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9fceb39
b8d9e47
 
 
 
9fceb39
b8d9e47
 
 
 
9fceb39
b8d9e47
 
 
 
 
 
9fceb39
b8d9e47
 
9fceb39
b8d9e47
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import requests
from crewai.tools import BaseTool
from typing import Type, List
from pydantic import BaseModel, Field
import streamlit as st

# --- 1. IPInfo Geo Lookup Tool ---
class IPInfoToolInput(BaseModel):
    """Input schema for IPInfo Geo Lookup Tool."""
    target: List[str] = Field(
        ...,
        description="A list of IP addresses (e.g., ['8.8.8.8', '1.1.1.1']) to query."
    )

class IPInfoGeoLookup(BaseTool):
    # See https://ipinfo.io/developers/lite-api
    name: str = "IPInfo Geo Lookup"
    description: str = "Looks up geolocation, Internet Service Provider (ISP), and network details for an IP address. Useful for determining the geographic location of a digital asset."
    args_schema: Type[BaseModel] = IPInfoToolInput

    def _run(self, target: List[str]) -> str:
        api_key = st.session_state['ipinfo_api_key']
        if not api_key:
            return "Error: La variable de entorno IPINFO_APIKEY no está configurada."

        results = []

        for ip_address in target:
            try:
                url = f"https://api.ipinfo.io/lite/{ip_address}?token={api_key}"
                response = requests.get(url)
                response.raise_for_status()
                data = response.json()
                # Formateamos la salida para que sea legible y útil para el agente
                result_str = (
                    f"IP: {data.get('ip')}, Country: {data.get('country')}, ASN: {data.get('as_name')}"
                )
                results.append(result_str)

            except requests.exceptions.RequestException as e:
                results.append(f"Error al consultar la IP {ip_address}: {e}")
                
        return "\n".join(results)

# --- 2. VirusTotal Scanner Tool ---
class VirusTotalToolInput(BaseModel):
    """Input schema for VirusTotal Scanner Tool."""
    resources: List[str] = Field(
        ..., 
        description="A list of resources to scan. Valid resources include file hash (MD5, SHA256), a URL, a domain, or an IP address."
    )

class VirusTotalScanner(BaseTool):
    # See https://blog.virustotal.com/2024/08/VT-S1-EffectiveResearch.html
    # See https://docs.virustotal.com/reference/ip-info
    name: str = "VirusTotal Scanner"
    description: str = "Analyzes files, URLs, domains, or IP addresses for malware and other security threats using multiple antivirus engines and reputation services. Provides a detailed security analysis report."
    args_schema: Type[BaseModel] = VirusTotalToolInput

    def _run(self, resources: List[str]) -> str:
        # Placeholder logic for the actual VirusTotal API call
        print(f"DEBUG: Running VirusTotal Scan for {resources}")
        return f"Simulation: Resource {resources} analyzed. 0/90 engines flagged it as malicious. Reputation: Clean."