Spaces:
Running
Running
Commit ·
500061c
1
Parent(s): eb9e808
Add billing schema drift support with field renaming
Browse files- Add get_schema(), apply_schema_drift(), and _field_map to BillingSystem
- Add BILLING to VALID_TARGETS for schema_drift
- Add billing-specific schema drift renames (amount, status, date_tick, items)
- Split SCHEMA_DRIFT_RENAMES into CRM and BILLING variants
sentinelops_arena/demo.py
CHANGED
|
@@ -122,7 +122,7 @@ class RandomizedAttacker:
|
|
| 122 |
),
|
| 123 |
]
|
| 124 |
|
| 125 |
-
|
| 126 |
{"old_field": "name", "new_field": "full_name"},
|
| 127 |
{"old_field": "contact_email", "new_field": "email_address"},
|
| 128 |
{"old_field": "region", "new_field": "geo_region"},
|
|
@@ -130,6 +130,13 @@ class RandomizedAttacker:
|
|
| 130 |
{"old_field": "notes", "new_field": "annotations"},
|
| 131 |
]
|
| 132 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 133 |
POLICY_DRIFT_CHANGES_BILLING = [
|
| 134 |
{"window_ticks": 4, "requires_approval": True, "max_amount": 2000},
|
| 135 |
{"window_ticks": 2, "requires_approval": True, "max_amount": 500},
|
|
@@ -159,7 +166,10 @@ class RandomizedAttacker:
|
|
| 159 |
def _build_params(self, atype: AttackType, target: TargetSystem) -> dict:
|
| 160 |
"""Build randomised attack parameters for the given attack type."""
|
| 161 |
if atype == AttackType.SCHEMA_DRIFT:
|
| 162 |
-
|
|
|
|
|
|
|
|
|
|
| 163 |
return {
|
| 164 |
"attack_type": atype.value,
|
| 165 |
"target_system": target.value,
|
|
@@ -192,7 +202,7 @@ class RandomizedAttacker:
|
|
| 192 |
|
| 193 |
# Valid target systems per attack type (not all systems support all attacks)
|
| 194 |
VALID_TARGETS = {
|
| 195 |
-
AttackType.SCHEMA_DRIFT: [TargetSystem.CRM
|
| 196 |
AttackType.POLICY_DRIFT: [TargetSystem.BILLING, TargetSystem.TICKETING],
|
| 197 |
AttackType.SOCIAL_ENGINEERING: [TargetSystem.CRM, TargetSystem.BILLING, TargetSystem.TICKETING],
|
| 198 |
AttackType.RATE_LIMIT: [TargetSystem.CRM, TargetSystem.BILLING, TargetSystem.TICKETING],
|
|
|
|
| 122 |
),
|
| 123 |
]
|
| 124 |
|
| 125 |
+
SCHEMA_DRIFT_RENAMES_CRM = [
|
| 126 |
{"old_field": "name", "new_field": "full_name"},
|
| 127 |
{"old_field": "contact_email", "new_field": "email_address"},
|
| 128 |
{"old_field": "region", "new_field": "geo_region"},
|
|
|
|
| 130 |
{"old_field": "notes", "new_field": "annotations"},
|
| 131 |
]
|
| 132 |
|
| 133 |
+
SCHEMA_DRIFT_RENAMES_BILLING = [
|
| 134 |
+
{"old_field": "amount", "new_field": "total_amount"},
|
| 135 |
+
{"old_field": "status", "new_field": "invoice_status"},
|
| 136 |
+
{"old_field": "date_tick", "new_field": "created_at_tick"},
|
| 137 |
+
{"old_field": "items", "new_field": "line_items"},
|
| 138 |
+
]
|
| 139 |
+
|
| 140 |
POLICY_DRIFT_CHANGES_BILLING = [
|
| 141 |
{"window_ticks": 4, "requires_approval": True, "max_amount": 2000},
|
| 142 |
{"window_ticks": 2, "requires_approval": True, "max_amount": 500},
|
|
|
|
| 166 |
def _build_params(self, atype: AttackType, target: TargetSystem) -> dict:
|
| 167 |
"""Build randomised attack parameters for the given attack type."""
|
| 168 |
if atype == AttackType.SCHEMA_DRIFT:
|
| 169 |
+
if target == TargetSystem.BILLING:
|
| 170 |
+
rename = self.rng.choice(self.SCHEMA_DRIFT_RENAMES_BILLING)
|
| 171 |
+
else:
|
| 172 |
+
rename = self.rng.choice(self.SCHEMA_DRIFT_RENAMES_CRM)
|
| 173 |
return {
|
| 174 |
"attack_type": atype.value,
|
| 175 |
"target_system": target.value,
|
|
|
|
| 202 |
|
| 203 |
# Valid target systems per attack type (not all systems support all attacks)
|
| 204 |
VALID_TARGETS = {
|
| 205 |
+
AttackType.SCHEMA_DRIFT: [TargetSystem.CRM, TargetSystem.BILLING],
|
| 206 |
AttackType.POLICY_DRIFT: [TargetSystem.BILLING, TargetSystem.TICKETING],
|
| 207 |
AttackType.SOCIAL_ENGINEERING: [TargetSystem.CRM, TargetSystem.BILLING, TargetSystem.TICKETING],
|
| 208 |
AttackType.RATE_LIMIT: [TargetSystem.CRM, TargetSystem.BILLING, TargetSystem.TICKETING],
|
sentinelops_arena/systems/billing.py
CHANGED
|
@@ -12,6 +12,7 @@ class BillingSystem:
|
|
| 12 |
self.refund_policy: RefundPolicy = RefundPolicy()
|
| 13 |
self._rate_limit: int = 0 # 0 means no limit
|
| 14 |
self._call_count: int = 0
|
|
|
|
| 15 |
|
| 16 |
def initialize(self, invoices: List[Invoice]):
|
| 17 |
"""Populate billing from Invoice models."""
|
|
@@ -19,6 +20,7 @@ class BillingSystem:
|
|
| 19 |
self.refund_policy = RefundPolicy()
|
| 20 |
self._rate_limit = 0
|
| 21 |
self._call_count = 0
|
|
|
|
| 22 |
|
| 23 |
def check_balance(self, customer_id: str) -> Dict:
|
| 24 |
"""Return all invoices for a customer and total balance."""
|
|
@@ -153,6 +155,20 @@ class BillingSystem:
|
|
| 153 |
"policy": self.refund_policy.model_dump(),
|
| 154 |
}
|
| 155 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 156 |
def apply_policy_drift(self, changes: Dict):
|
| 157 |
"""Modify refund policy fields."""
|
| 158 |
data = self.refund_policy.model_dump()
|
|
|
|
| 12 |
self.refund_policy: RefundPolicy = RefundPolicy()
|
| 13 |
self._rate_limit: int = 0 # 0 means no limit
|
| 14 |
self._call_count: int = 0
|
| 15 |
+
self._field_map: Dict[str, str] = {} # old_name -> new_name for drift
|
| 16 |
|
| 17 |
def initialize(self, invoices: List[Invoice]):
|
| 18 |
"""Populate billing from Invoice models."""
|
|
|
|
| 20 |
self.refund_policy = RefundPolicy()
|
| 21 |
self._rate_limit = 0
|
| 22 |
self._call_count = 0
|
| 23 |
+
self._field_map = {}
|
| 24 |
|
| 25 |
def check_balance(self, customer_id: str) -> Dict:
|
| 26 |
"""Return all invoices for a customer and total balance."""
|
|
|
|
| 155 |
"policy": self.refund_policy.model_dump(),
|
| 156 |
}
|
| 157 |
|
| 158 |
+
def get_schema(self) -> Dict:
|
| 159 |
+
"""Return current field names after any drift."""
|
| 160 |
+
fields = list(Invoice.model_fields.keys())
|
| 161 |
+
for old, new in self._field_map.items():
|
| 162 |
+
fields = [new if f == old else f for f in fields]
|
| 163 |
+
return {"system": "billing", "fields": fields}
|
| 164 |
+
|
| 165 |
+
def apply_schema_drift(self, old_field: str, new_field: str):
|
| 166 |
+
"""Rename a field across all invoice records."""
|
| 167 |
+
self._field_map[old_field] = new_field
|
| 168 |
+
for inv_id in self.invoices:
|
| 169 |
+
if old_field in self.invoices[inv_id]:
|
| 170 |
+
self.invoices[inv_id][new_field] = self.invoices[inv_id].pop(old_field)
|
| 171 |
+
|
| 172 |
def apply_policy_drift(self, changes: Dict):
|
| 173 |
"""Modify refund policy fields."""
|
| 174 |
data = self.refund_policy.model_dump()
|