| """ |
| POST /rules/analyze — Admin: analyze a rule, persist results. |
| GET /results/{rule_id} — SOC Analyst: retrieve stored mappings. |
| |
| Based on: |
| Use Case 4+5+6 (Upload, Process, Store) — §3.2.7 |
| Use Case 1 (View techniques and scores) — §3.2.7 |
| """ |
|
|
| from fastapi import APIRouter, Depends, HTTPException |
| from sqlalchemy.orm import Session |
|
|
| from app.db.session import get_db |
| from app.ml.pipeline import is_ready |
| from app.schemas.result import MappingResult, ResultsResponse |
| from app.schemas.rule import AnalyzeRequest, AnalyzeResponse, TechniqueResult |
| from app.services.ml_service import MLService |
| from app.services.result_service import ResultService |
| from app.services.rule_service import RuleService |
|
|
| router = APIRouter(tags=["rules"]) |
|
|
|
|
| def _get_rule_service(db: Session = Depends(get_db)) -> RuleService: |
| return RuleService(db=db, ml=MLService()) |
|
|
|
|
| def _get_result_service(db: Session = Depends(get_db)) -> ResultService: |
| return ResultService(db=db) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.post("/rules/analyze", response_model=AnalyzeResponse, status_code=201) |
| def analyze_rule( |
| body: AnalyzeRequest, |
| svc: RuleService = Depends(_get_rule_service), |
| ): |
| """ |
| Runs the full ML pipeline on the submitted Wazuh rule XML and stores |
| the results in the database. |
| """ |
| if not is_ready(): |
| raise HTTPException(status_code=503, detail="ML pipeline not ready.") |
|
|
| try: |
| result = svc.analyze_and_persist(body.rule_xml) |
| except ValueError as exc: |
| raise HTTPException(status_code=422, detail=str(exc)) from exc |
| except RuntimeError as exc: |
| raise HTTPException(status_code=503, detail=str(exc)) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail=str(exc)) from exc |
|
|
| all_results = [TechniqueResult(**r) for r in result["results"]] |
| detected = [r for r in all_results if r.predicted] |
|
|
| return AnalyzeResponse( |
| rule_id=result["rule_id"], |
| sanitized_xml=result["sanitized_xml"], |
| summary=result["summary"], |
| text_for_embedding=result["text_for_embedding"], |
| embedding_dim=result["embedding_dim"], |
| pipeline_mode=result.get("pipeline_mode", "full"), |
| detected=detected, |
| all_results=all_results, |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.get("/results/{rule_id}", response_model=ResultsResponse) |
| def get_results( |
| rule_id: str, |
| svc: ResultService = Depends(_get_result_service), |
| ): |
| """ |
| Returns all stored MITRE ATT&CK techniques for a rule ID, sorted by confidence. |
| Use Case 1 — §3.2.7 |
| - mappings: ALL techniques sorted by confidence desc (for Figure 4-11 Top 5 chart) |
| - detected: primary + secondary (≥0.5) only (for Figure 4-12 WQL queries) |
| """ |
| data = svc.get_results_for_rule(rule_id) |
| if data is None: |
| raise HTTPException( |
| status_code=404, |
| detail=f"No mapping results found for rule_id '{rule_id}'. " |
| "Run POST /rules/analyze first.", |
| ) |
| return ResultsResponse( |
| rule_id=rule_id, |
| mappings=[MappingResult(**m) for m in data["mappings"]], |
| detected=[MappingResult(**m) for m in data["detected"]], |
| ) |
|
|