Clawdbot commited on
Commit
4301a95
·
1 Parent(s): af26c53

Add enterprise JSON structured logging and Ledger Audit Trail

Browse files
Files changed (3) hide show
  1. .gitignore +1 -0
  2. hub/logger.py +76 -0
  3. hub/main.py +23 -0
.gitignore CHANGED
@@ -1 +1,2 @@
1
  \n*.db\n*.sqlite3
 
 
1
  \n*.db\n*.sqlite3
2
+ /logs
hub/logger.py ADDED
@@ -0,0 +1,76 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import logging.handlers
3
+ import json
4
+ import os
5
+ from datetime import datetime, timezone
6
+
7
+ LOG_DIR = "logs"
8
+ os.makedirs(LOG_DIR, exist_ok=True)
9
+
10
+ class JSONFormatter(logging.Formatter):
11
+ """Format logs as JSON for easy ingestion into Datadog, ELK, etc."""
12
+ def format(self, record):
13
+ log_data = {
14
+ "timestamp": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
15
+ "level": record.levelname,
16
+ "logger": record.name,
17
+ "message": record.getMessage()
18
+ }
19
+
20
+ # Add any extra kwargs passed to the log call
21
+ if hasattr(record, "extra_fields"):
22
+ log_data.update(record.extra_fields)
23
+
24
+ # Add exception info if present
25
+ if record.exc_info:
26
+ log_data["exception"] = self.formatException(record.exc_info)
27
+
28
+ return json.dumps(log_data)
29
+
30
+ def setup_logger(name: str, log_file: str, level=logging.INFO, json_format=True) -> logging.Logger:
31
+ """Setup a rotating file logger."""
32
+ logger = logging.getLogger(name)
33
+ logger.setLevel(level)
34
+
35
+ # Avoid adding multiple handlers if setup is called multiple times
36
+ if logger.handlers:
37
+ return logger
38
+
39
+ # Rotate at 10MB, keep last 30 files
40
+ file_handler = logging.handlers.RotatingFileHandler(
41
+ os.path.join(LOG_DIR, log_file),
42
+ maxBytes=10*1024*1024,
43
+ backupCount=30
44
+ )
45
+
46
+ if json_format:
47
+ file_handler.setFormatter(JSONFormatter())
48
+ else:
49
+ # Standard text format for audit trails or simple logs
50
+ formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
51
+ file_handler.setFormatter(formatter)
52
+
53
+ logger.addHandler(file_handler)
54
+
55
+ # Also log to console for development
56
+ console_handler = logging.StreamHandler()
57
+ console_handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
58
+ logger.addHandler(console_handler)
59
+
60
+ return logger
61
+
62
+ # --- Hub Loggers ---
63
+ hub_logger = setup_logger("mep.hub", "hub.json")
64
+
65
+ # --- Audit Logger ---
66
+ # The Audit Trail is strictly append-only text to reconstruct the ledger if needed
67
+ audit_logger = setup_logger("mep.audit", "ledger_audit.log", json_format=False)
68
+
69
+ def log_event(event: str, message: str, **kwargs):
70
+ """Helper to log JSON events with extra fields."""
71
+ hub_logger.info(message, extra={"extra_fields": {"event": event, **kwargs}})
72
+
73
+ def log_audit(action: str, node_id: str, amount: float, new_balance: float, ref_id: str = ""):
74
+ """Helper to strictly log SECONDS moving in the ledger."""
75
+ sign = "+" if amount >= 0 else ""
76
+ audit_logger.info(f"AUDIT | {action} | Node: {node_id} | Amount: {sign}{amount:.6f} | Balance: {new_balance:.6f} | Ref: {ref_id}")
hub/main.py CHANGED
@@ -3,6 +3,7 @@ from typing import Dict, List
3
  import uuid
4
  import db
5
  import auth
 
6
 
7
  from models import NodeRegistration, TaskCreate, TaskResult, NodeBalance, TaskBid
8
 
@@ -37,6 +38,10 @@ async def register_node(node: NodeRegistration):
37
  # Registration derives the Node ID from the provided Public Key PEM
38
  node_id = auth.derive_node_id(node.pubkey)
39
  balance = db.register_node(node_id, node.pubkey)
 
 
 
 
40
  return {"status": "success", "node_id": node_id, "balance": balance}
41
 
42
  @app.get("/balance/{node_id}")
@@ -67,7 +72,13 @@ async def submit_task(task: TaskCreate, authenticated_node: str = Depends(verify
67
  if task.bounty > 0:
68
  success = db.deduct_balance(task.consumer_id, task.bounty)
69
  if not success:
 
70
  raise HTTPException(status_code=400, detail="Insufficient SECONDS balance")
 
 
 
 
 
71
 
72
  task_id = str(uuid.uuid4())
73
  task_data = {
@@ -126,6 +137,8 @@ async def place_bid(bid: TaskBid, authenticated_node: str = Depends(verify_reque
126
  task["status"] = "assigned"
127
  task["provider_id"] = bid.provider_id
128
 
 
 
129
  # Return the full payload to the winner
130
  return {
131
  "status": "accepted",
@@ -152,15 +165,25 @@ async def complete_task(result: TaskResult, authenticated_node: str = Depends(ve
152
  if bounty >= 0:
153
  # Standard Compute Market: Provider earns SECONDS
154
  db.add_balance(result.provider_id, bounty)
 
 
155
  else:
156
  # Data Market: Provider PAYS to receive this payload/task
157
  cost = abs(bounty)
158
  success = db.deduct_balance(result.provider_id, cost)
159
  if not success:
 
160
  raise HTTPException(status_code=400, detail="Provider lacks SECONDS to buy this data")
161
 
 
 
 
162
  db.add_balance(task["consumer_id"], cost) # The sender earns SECONDS
 
 
163
 
 
 
164
  # Move task to completed
165
  task["status"] = "completed"
166
  task["provider_id"] = result.provider_id
 
3
  import uuid
4
  import db
5
  import auth
6
+ from logger import log_event, log_audit, hub_logger
7
 
8
  from models import NodeRegistration, TaskCreate, TaskResult, NodeBalance, TaskBid
9
 
 
38
  # Registration derives the Node ID from the provided Public Key PEM
39
  node_id = auth.derive_node_id(node.pubkey)
40
  balance = db.register_node(node_id, node.pubkey)
41
+
42
+ log_event("node_registered", f"Node {node_id} registered with starting balance {balance}", node_id=node_id, starting_balance=balance)
43
+ log_audit("REGISTER", node_id, balance, balance, "START_BONUS")
44
+
45
  return {"status": "success", "node_id": node_id, "balance": balance}
46
 
47
  @app.get("/balance/{node_id}")
 
72
  if task.bounty > 0:
73
  success = db.deduct_balance(task.consumer_id, task.bounty)
74
  if not success:
75
+ log_event("task_rejected", f"Node {task.consumer_id} lacks SECONDS to submit task", consumer_id=task.consumer_id, bounty=task.bounty)
76
  raise HTTPException(status_code=400, detail="Insufficient SECONDS balance")
77
+
78
+ new_balance = db.get_balance(task.consumer_id)
79
+ log_audit("ESCROW", task.consumer_id, -task.bounty, new_balance, task_id)
80
+
81
+ log_event("task_submitted", f"Task {task_id[:8]} broadcasted by {task.consumer_id} for {task.bounty}", consumer_id=task.consumer_id, task_id=task_id, bounty=task.bounty)
82
 
83
  task_id = str(uuid.uuid4())
84
  task_data = {
 
137
  task["status"] = "assigned"
138
  task["provider_id"] = bid.provider_id
139
 
140
+ log_event("bid_accepted", f"Task {bid.task_id[:8]} assigned to {bid.provider_id}", task_id=bid.task_id, provider_id=bid.provider_id, bounty=task["bounty"])
141
+
142
  # Return the full payload to the winner
143
  return {
144
  "status": "accepted",
 
165
  if bounty >= 0:
166
  # Standard Compute Market: Provider earns SECONDS
167
  db.add_balance(result.provider_id, bounty)
168
+ new_balance = db.get_balance(result.provider_id)
169
+ log_audit("EARN_COMPUTE", result.provider_id, bounty, new_balance, result.task_id)
170
  else:
171
  # Data Market: Provider PAYS to receive this payload/task
172
  cost = abs(bounty)
173
  success = db.deduct_balance(result.provider_id, cost)
174
  if not success:
175
+ log_event("data_purchase_failed", f"Provider {result.provider_id} lacks SECONDS to buy {result.task_id}", task_id=result.task_id, provider_id=result.provider_id, cost=cost)
176
  raise HTTPException(status_code=400, detail="Provider lacks SECONDS to buy this data")
177
 
178
+ p_balance = db.get_balance(result.provider_id)
179
+ log_audit("BUY_DATA", result.provider_id, -cost, p_balance, result.task_id)
180
+
181
  db.add_balance(task["consumer_id"], cost) # The sender earns SECONDS
182
+ c_balance = db.get_balance(task["consumer_id"])
183
+ log_audit("SELL_DATA", task["consumer_id"], cost, c_balance, result.task_id)
184
 
185
+ log_event("task_completed", f"Task {result.task_id[:8]} completed by {result.provider_id}", task_id=result.task_id, provider_id=result.provider_id, bounty=bounty)
186
+
187
  # Move task to completed
188
  task["status"] = "completed"
189
  task["provider_id"] = result.provider_id