Spaces:
Sleeping
Sleeping
| from crewai import Agent, Task, Crew, Process, LLM | |
| from custom_tools import IPInfoGeoLookup | |
| class IncidentReporterCrew: | |
| def __init__(self, api_key): | |
| self.api_key = api_key | |
| self.llm = LLM( | |
| #model="llama-3.3-70b-versatile", | |
| model="openai/gpt-oss-120b", | |
| temperature=0.7, | |
| provider="openai", | |
| ) | |
| def threat_hunter_agent(self) -> Agent: | |
| return Agent( | |
| role="Security Threat Hunter", | |
| goal="Rapidly identify, extract, and analyze key Indicators of Compromise (IOCs) from initial security alerts to provide structured and actionable intelligence.", | |
| backstory="Highly specialized in threat intelligence, capable of parsing raw incident data, identifying patterns, and performing virtual lookups on security intelligence databases (like VirusTotal, IPInfo, etc.). Your output must be a concise, structured summary.", | |
| verbose=True, | |
| tools=[IPInfoGeoLookup()], | |
| allow_delegation=False, | |
| llm=self.llm, | |
| ) | |
| def reporter_agent(self) -> Agent: | |
| return Agent( | |
| role="Incident Response Document Specialist", | |
| goal="Generate a formal, professional, and comprehensive Incident Response Report based on the initial alert and the technical intelligence provided by the Threat Hunter.", | |
| backstory="An expert in cybersecurity documentation, focusing on clarity, structure, and adherence to industry-standard reporting formats. You transform raw data into polished, readable documents suitable for management and technical teams.", | |
| verbose=True, | |
| llm=self.llm, | |
| ) | |
| def task_ioc_extraction(self) -> Task: | |
| return Task( | |
| description=""" | |
| Input: Initial raw incident alert details | |
| {impacto_detalle} | |
| Process: | |
| 1. Extract up to 10 observable Indicators of Compromise (IOCs) including IPs, domains, file hashes (SHA256, MD5), and URLs from the input. | |
| 2. For each extracted IOC, try querying external intelligence services (like VirusTotal, IPInfo, etc.). | |
| 3. Synthesize the findings into a clear, structured intelligence summary. | |
| Output Requirements: The output MUST be a JSON-like or clearly delimited text block, detailing each IOC, its type, and a summary of the associated risk/reputation found (e.g., "Malicious/Known C2," "Clean," "High Reputation," "Related to Phishing Campaign X"). This summary is the ONLY content that should be passed to the next agent. | |
| """, | |
| expected_output="A structured summary of technical findings including IPs, hashes, and domains, and their corresponding threat intelligence status, ready for the final report.", | |
| agent=self.threat_hunter_agent(), | |
| ) | |
| def task_report_drafting(self) -> Task: | |
| return Task( | |
| description=""" | |
| Input: | |
| 1. The original user input: | |
| - **Tipo de Alerta**: {tipo_alerta} | |
| - **Sistema Afectado**: {sistema_afectado} | |
| - **Fecha y Hora**: {fecha_hora} | |
| - **Detalles del Incidente**: {impacto_detalle} | |
| - **Acciones Tomadas (Mitigación Inmediata)**: {acciones_tomadas} | |
| 2. The structured technical intelligence summary provided by the Threat Hunter in context. | |
| Process: Draft a complete, professional Incident Response Report. | |
| Output Requirements: The report MUST be formatted in Markdown and include the following mandatory sections: | |
| 1. Executive Summary (A brief, high-level overview). | |
| 2. Initial Incident Details (Source, Time, Initial Observation). | |
| 3. Technical Analysis & Indicators of Compromise (IOCs) (A detailed section incorporating ALL findings from the Threat Hunter agent information context). | |
| 4. Impact Assessment (Potential or confirmed impact on systems/data). | |
| 5. Mitigation or remediation steps taken (if any). | |
| The entire final output must be this Markdown report. | |
| """, | |
| expected_output="A complete Incident Response Report in Spanish language. Formatted as markdown without '```'", | |
| agent=self.reporter_agent(), | |
| markdown=True, | |
| #context=[self.task_ioc_extraction()] # Esta tarea espera el resultado de la anterior | |
| ) | |
| def crew(self) -> Crew: | |
| return Crew( | |
| agents=[self.threat_hunter_agent(), self.reporter_agent()], | |
| tasks=[self.task_ioc_extraction(), self.task_report_drafting()], | |
| process=Process.sequential, # Ejecución secuencial: Threat Hunter -> Reporter | |
| verbose=True | |
| ) |