generador-post-mortem / src /incident_crew.py
vvillarreal-cfee's picture
feat(app): Ahora utilizamos Groq
f9ccc70 verified
from crewai import Agent, Task, Crew, Process, LLM
from custom_tools import IPInfoGeoLookup
class IncidentReporterCrew:
def __init__(self, api_key):
self.api_key = api_key
self.llm = LLM(
#model="llama-3.3-70b-versatile",
model="openai/gpt-oss-120b",
temperature=0.7,
provider="openai",
)
def threat_hunter_agent(self) -> Agent:
return Agent(
role="Security Threat Hunter",
goal="Rapidly identify, extract, and analyze key Indicators of Compromise (IOCs) from initial security alerts to provide structured and actionable intelligence.",
backstory="Highly specialized in threat intelligence, capable of parsing raw incident data, identifying patterns, and performing virtual lookups on security intelligence databases (like VirusTotal, IPInfo, etc.). Your output must be a concise, structured summary.",
verbose=True,
tools=[IPInfoGeoLookup()],
allow_delegation=False,
llm=self.llm,
)
def reporter_agent(self) -> Agent:
return Agent(
role="Incident Response Document Specialist",
goal="Generate a formal, professional, and comprehensive Incident Response Report based on the initial alert and the technical intelligence provided by the Threat Hunter.",
backstory="An expert in cybersecurity documentation, focusing on clarity, structure, and adherence to industry-standard reporting formats. You transform raw data into polished, readable documents suitable for management and technical teams.",
verbose=True,
llm=self.llm,
)
def task_ioc_extraction(self) -> Task:
return Task(
description="""
Input: Initial raw incident alert details
{impacto_detalle}
Process:
1. Extract up to 10 observable Indicators of Compromise (IOCs) including IPs, domains, file hashes (SHA256, MD5), and URLs from the input.
2. For each extracted IOC, try querying external intelligence services (like VirusTotal, IPInfo, etc.).
3. Synthesize the findings into a clear, structured intelligence summary.
Output Requirements: The output MUST be a JSON-like or clearly delimited text block, detailing each IOC, its type, and a summary of the associated risk/reputation found (e.g., "Malicious/Known C2," "Clean," "High Reputation," "Related to Phishing Campaign X"). This summary is the ONLY content that should be passed to the next agent.
""",
expected_output="A structured summary of technical findings including IPs, hashes, and domains, and their corresponding threat intelligence status, ready for the final report.",
agent=self.threat_hunter_agent(),
)
def task_report_drafting(self) -> Task:
return Task(
description="""
Input:
1. The original user input:
- **Tipo de Alerta**: {tipo_alerta}
- **Sistema Afectado**: {sistema_afectado}
- **Fecha y Hora**: {fecha_hora}
- **Detalles del Incidente**: {impacto_detalle}
- **Acciones Tomadas (Mitigación Inmediata)**: {acciones_tomadas}
2. The structured technical intelligence summary provided by the Threat Hunter in context.
Process: Draft a complete, professional Incident Response Report.
Output Requirements: The report MUST be formatted in Markdown and include the following mandatory sections:
1. Executive Summary (A brief, high-level overview).
2. Initial Incident Details (Source, Time, Initial Observation).
3. Technical Analysis & Indicators of Compromise (IOCs) (A detailed section incorporating ALL findings from the Threat Hunter agent information context).
4. Impact Assessment (Potential or confirmed impact on systems/data).
5. Mitigation or remediation steps taken (if any).
The entire final output must be this Markdown report.
""",
expected_output="A complete Incident Response Report in Spanish language. Formatted as markdown without '```'",
agent=self.reporter_agent(),
markdown=True,
#context=[self.task_ioc_extraction()] # Esta tarea espera el resultado de la anterior
)
def crew(self) -> Crew:
return Crew(
agents=[self.threat_hunter_agent(), self.reporter_agent()],
tasks=[self.task_ioc_extraction(), self.task_report_drafting()],
process=Process.sequential, # Ejecución secuencial: Threat Hunter -> Reporter
verbose=True
)