nothingworry's picture
feat: add admin rules ingestion UI and persistent backend storage
e4abb85
raw
history blame
2.93 kB
from fastapi import APIRouter, Header, HTTPException
from pydantic import BaseModel
from typing import List, Optional
from backend.api.storage.rules_store import RulesStore
router = APIRouter()
rules_store = RulesStore()
class RulePayload(BaseModel):
rule: str
class BulkRulePayload(BaseModel):
rules: List[str]
def get_rules_for_tenant(tenant_id: str) -> List[str]:
return rules_store.get_rules(tenant_id)
@router.get("/rules")
async def get_redflag_rules(
x_tenant_id: str = Header(None)
):
"""
Returns all red-flag rules for this tenant.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
return {
"tenant_id": x_tenant_id,
"rules": get_rules_for_tenant(x_tenant_id)
}
@router.post("/rules")
async def add_redflag_rule(
payload: Optional[RulePayload] = None,
rule: Optional[str] = None,
x_tenant_id: str = Header(None)
):
"""
Adds a new red-flag rule to this tenant.
Accepts either JSON body {"rule": "..."} or query parameter ?rule=...
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
rule_value = payload.rule if payload else rule
if not rule_value:
raise HTTPException(status_code=400, detail="Missing rule text")
rule_value = rule_value.strip()
if not rule_value:
raise HTTPException(status_code=400, detail="Rule cannot be empty")
rules_store.add_rule(x_tenant_id, rule_value)
rules = get_rules_for_tenant(x_tenant_id)
return {
"tenant_id": x_tenant_id,
"added_rule": rule_value,
"rules": rules
}
@router.post("/rules/bulk")
async def add_redflag_rules_bulk(
payload: BulkRulePayload,
x_tenant_id: str = Header(None)
):
"""
Adds multiple rules in one call.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
if not payload.rules:
raise HTTPException(status_code=400, detail="No rules provided")
cleaned = [rule.strip() for rule in payload.rules if rule.strip()]
added = rules_store.add_rules_bulk(x_tenant_id, cleaned)
rules = get_rules_for_tenant(x_tenant_id)
return {
"tenant_id": x_tenant_id,
"added_rules": added,
"rules": rules
}
@router.delete("/rules/{rule}")
async def delete_redflag_rule(
rule: str,
x_tenant_id: str = Header(None)
):
"""
Deletes a red-flag rule for this tenant.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
deleted = rules_store.delete_rule(x_tenant_id, rule)
if not deleted:
raise HTTPException(status_code=404, detail="Rule not found")
rules = get_rules_for_tenant(x_tenant_id)
return {
"tenant_id": x_tenant_id,
"deleted_rule": rule,
"rules": rules
}