Spaces:
Sleeping
Sleeping
File size: 2,928 Bytes
c16e1c9 e4abb85 c16e1c9 e4abb85 c16e1c9 e4abb85 c16e1c9 e4abb85 c16e1c9 e4abb85 c16e1c9 e4abb85 c16e1c9 e4abb85 c16e1c9 e4abb85 c16e1c9 e4abb85 c16e1c9 e4abb85 c16e1c9 e4abb85 c16e1c9 e4abb85 c16e1c9 e4abb85 c16e1c9 e4abb85 c16e1c9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
from fastapi import APIRouter, Header, HTTPException
from pydantic import BaseModel
from typing import List, Optional
from backend.api.storage.rules_store import RulesStore
router = APIRouter()
rules_store = RulesStore()
class RulePayload(BaseModel):
rule: str
class BulkRulePayload(BaseModel):
rules: List[str]
def get_rules_for_tenant(tenant_id: str) -> List[str]:
return rules_store.get_rules(tenant_id)
@router.get("/rules")
async def get_redflag_rules(
x_tenant_id: str = Header(None)
):
"""
Returns all red-flag rules for this tenant.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
return {
"tenant_id": x_tenant_id,
"rules": get_rules_for_tenant(x_tenant_id)
}
@router.post("/rules")
async def add_redflag_rule(
payload: Optional[RulePayload] = None,
rule: Optional[str] = None,
x_tenant_id: str = Header(None)
):
"""
Adds a new red-flag rule to this tenant.
Accepts either JSON body {"rule": "..."} or query parameter ?rule=...
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
rule_value = payload.rule if payload else rule
if not rule_value:
raise HTTPException(status_code=400, detail="Missing rule text")
rule_value = rule_value.strip()
if not rule_value:
raise HTTPException(status_code=400, detail="Rule cannot be empty")
rules_store.add_rule(x_tenant_id, rule_value)
rules = get_rules_for_tenant(x_tenant_id)
return {
"tenant_id": x_tenant_id,
"added_rule": rule_value,
"rules": rules
}
@router.post("/rules/bulk")
async def add_redflag_rules_bulk(
payload: BulkRulePayload,
x_tenant_id: str = Header(None)
):
"""
Adds multiple rules in one call.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
if not payload.rules:
raise HTTPException(status_code=400, detail="No rules provided")
cleaned = [rule.strip() for rule in payload.rules if rule.strip()]
added = rules_store.add_rules_bulk(x_tenant_id, cleaned)
rules = get_rules_for_tenant(x_tenant_id)
return {
"tenant_id": x_tenant_id,
"added_rules": added,
"rules": rules
}
@router.delete("/rules/{rule}")
async def delete_redflag_rule(
rule: str,
x_tenant_id: str = Header(None)
):
"""
Deletes a red-flag rule for this tenant.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
deleted = rules_store.delete_rule(x_tenant_id, rule)
if not deleted:
raise HTTPException(status_code=404, detail="Rule not found")
rules = get_rules_for_tenant(x_tenant_id)
return {
"tenant_id": x_tenant_id,
"deleted_rule": rule,
"rules": rules
}
|