devorbit's picture
Initial deployment - secrets removed
26e1c2e
"""
POST /rules/analyze — Admin: analyze a rule, persist results.
GET /results/{rule_id} — SOC Analyst: retrieve stored mappings.
Based on:
Use Case 4+5+6 (Upload, Process, Store) — §3.2.7
Use Case 1 (View techniques and scores) — §3.2.7
"""
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.ml.pipeline import is_ready
from app.schemas.result import MappingResult, ResultsResponse
from app.schemas.rule import AnalyzeRequest, AnalyzeResponse, TechniqueResult
from app.services.ml_service import MLService
from app.services.result_service import ResultService
from app.services.rule_service import RuleService
router = APIRouter(tags=["rules"])
def _get_rule_service(db: Session = Depends(get_db)) -> RuleService:
return RuleService(db=db, ml=MLService())
def _get_result_service(db: Session = Depends(get_db)) -> ResultService:
return ResultService(db=db)
# ---------------------------------------------------------------------------
# POST /rules/analyze
# ---------------------------------------------------------------------------
@router.post("/rules/analyze", response_model=AnalyzeResponse, status_code=201)
def analyze_rule(
body: AnalyzeRequest,
svc: RuleService = Depends(_get_rule_service),
):
"""
Runs the full ML pipeline on the submitted Wazuh rule XML and stores
the results in the database.
"""
if not is_ready():
raise HTTPException(status_code=503, detail="ML pipeline not ready.")
try:
result = svc.analyze_and_persist(body.rule_xml)
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
except RuntimeError as exc:
raise HTTPException(status_code=503, detail=str(exc)) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc
all_results = [TechniqueResult(**r) for r in result["results"]]
detected = [r for r in all_results if r.predicted]
return AnalyzeResponse(
rule_id=result["rule_id"],
sanitized_xml=result["sanitized_xml"],
summary=result["summary"],
text_for_embedding=result["text_for_embedding"],
embedding_dim=result["embedding_dim"],
pipeline_mode=result.get("pipeline_mode", "full"),
detected=detected,
all_results=all_results,
)
# ---------------------------------------------------------------------------
# GET /results/{rule_id}
# ---------------------------------------------------------------------------
@router.get("/results/{rule_id}", response_model=ResultsResponse)
def get_results(
rule_id: str,
svc: ResultService = Depends(_get_result_service),
):
"""
Returns all stored MITRE ATT&CK techniques for a rule ID, sorted by confidence.
Use Case 1 — §3.2.7
- mappings: ALL techniques sorted by confidence desc (for Figure 4-11 Top 5 chart)
- detected: primary + secondary (≥0.5) only (for Figure 4-12 WQL queries)
"""
data = svc.get_results_for_rule(rule_id)
if data is None:
raise HTTPException(
status_code=404,
detail=f"No mapping results found for rule_id '{rule_id}'. "
"Run POST /rules/analyze first.",
)
return ResultsResponse(
rule_id=rule_id,
mappings=[MappingResult(**m) for m in data["mappings"]],
detected=[MappingResult(**m) for m in data["detected"]],
)