worker-universal / shared /credits_system.py
Bc-AI's picture
Upload folder using huggingface_hub
af68acb verified
"""
Cloud Credits System for SACCP Network
Handles credit tracking, earning, and spending in the distributed network
"""
import json
import sqlite3
import time
from datetime import datetime
from typing import Optional, List, Dict, Any
from enum import Enum
from dataclasses import dataclass
from pydantic import BaseModel
class TransactionType(str, Enum):
EARNED = "earned"
SPENT = "spent"
TRANSFERRED = "transferred"
class CreditReason(str, Enum):
TASK_COMPLETION = "task_completion"
RESOURCE_CONTRIBUTION = "resource_contribution"
SERVICE_PURCHASE = "service_purchase"
REFERRAL_BONUS = "referral_bonus"
STAKING_REWARD = "staking_reward"
@dataclass
class CreditTransaction:
"""Represents a credit transaction"""
transaction_id: str
node_id: str
amount: float
transaction_type: TransactionType
reason: CreditReason
timestamp: int
service_type: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
class CreditBalance(BaseModel):
"""Model for node credit balance"""
node_id: str
balance: float
total_earned: float
total_spent: float
last_updated: int
class CreditsSystem:
"""Main system for managing cloud credits in the SACCP network"""
def __init__(self, db_path: str = "./saccp_credits.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""Initialize the credits database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create balances table
cursor.execute('''
CREATE TABLE IF NOT EXISTS balances (
node_id TEXT PRIMARY KEY,
balance REAL DEFAULT 0.0,
total_earned REAL DEFAULT 0.0,
total_spent REAL DEFAULT 0.0,
last_updated INTEGER
)
''')
# Create transactions table
cursor.execute('''
CREATE TABLE IF NOT EXISTS transactions (
transaction_id TEXT PRIMARY KEY,
node_id TEXT NOT NULL,
amount REAL NOT NULL,
transaction_type TEXT NOT NULL,
reason TEXT NOT NULL,
timestamp INTEGER NOT NULL,
service_type TEXT,
metadata TEXT -- JSON string
)
''')
conn.commit()
conn.close()
def add_credits(self, node_id: str, amount: float, reason: CreditReason,
service_type: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> bool:
"""Add credits to a node's balance"""
if amount <= 0:
return False
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# Get current balance
cursor.execute('SELECT balance, total_earned FROM balances WHERE node_id = ?', (node_id,))
result = cursor.fetchone()
if result:
current_balance, total_earned = result
new_balance = current_balance + amount
new_total_earned = total_earned + amount
else:
new_balance = amount
new_total_earned = amount
# Insert new record if it doesn't exist
cursor.execute('''
INSERT INTO balances (node_id, balance, total_earned, total_spent, last_updated)
VALUES (?, ?, ?, ?, ?)
''', (node_id, 0.0, 0.0, 0.0, int(time.time())))
# Update balance
cursor.execute('''
UPDATE balances
SET balance = ?, total_earned = ?, last_updated = ?
WHERE node_id = ?
''', (new_balance, new_total_earned, int(time.time()), node_id))
# Record transaction
transaction_id = f"credit_{int(time.time())}_{node_id}_{hash(str(time.time()))}"
cursor.execute('''
INSERT INTO transactions
(transaction_id, node_id, amount, transaction_type, reason, timestamp, service_type, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
transaction_id,
node_id,
amount,
TransactionType.EARNED.value,
reason.value,
int(time.time()),
service_type,
json.dumps(metadata) if metadata else None
))
conn.commit()
return True
except Exception as e:
conn.rollback()
print(f"Error adding credits: {e}")
return False
finally:
conn.close()
def spend_credits(self, node_id: str, amount: float, reason: CreditReason,
service_type: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> bool:
"""Spend credits from a node's balance"""
if amount <= 0:
return False
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# Get current balance
cursor.execute('SELECT balance FROM balances WHERE node_id = ?', (node_id,))
result = cursor.fetchone()
if not result:
return False # Node doesn't exist
current_balance = result[0]
if current_balance < amount:
return False # Insufficient credits
# Update balance
new_balance = current_balance - amount
cursor.execute('''
UPDATE balances
SET balance = ?, total_spent = total_spent + ?, last_updated = ?
WHERE node_id = ?
''', (new_balance, amount, int(time.time()), node_id))
# Record transaction
transaction_id = f"debit_{int(time.time())}_{node_id}_{hash(str(time.time()))}"
cursor.execute('''
INSERT INTO transactions
(transaction_id, node_id, amount, transaction_type, reason, timestamp, service_type, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
transaction_id,
node_id,
-amount, # Negative because it's a debit
TransactionType.SPENT.value,
reason.value,
int(time.time()),
service_type,
json.dumps(metadata) if metadata else None
))
conn.commit()
return True
except Exception as e:
conn.rollback()
print(f"Error spending credits: {e}")
return False
finally:
conn.close()
def get_balance(self, node_id: str) -> CreditBalance:
"""Get credit balance for a node"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT balance, total_earned, total_spent, last_updated
FROM balances
WHERE node_id = ?
''', (node_id,))
result = cursor.fetchone()
conn.close()
if result:
balance, total_earned, total_spent, last_updated = result
return CreditBalance(
node_id=node_id,
balance=balance,
total_earned=total_earned,
total_spent=total_spent,
last_updated=last_updated
)
else:
# Return default values if node doesn't exist
return CreditBalance(
node_id=node_id,
balance=0.0,
total_earned=0.0,
total_spent=0.0,
last_updated=int(time.time())
)
def get_transaction_history(self, node_id: str, limit: int = 50) -> List[CreditTransaction]:
"""Get transaction history for a node"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT transaction_id, amount, transaction_type, reason, timestamp, service_type, metadata
FROM transactions
WHERE node_id = ?
ORDER BY timestamp DESC
LIMIT ?
''', (node_id, limit))
rows = cursor.fetchall()
conn.close()
transactions = []
for row in rows:
transaction_id, amount, trans_type, reason, timestamp, service_type, metadata_str = row
metadata = json.loads(metadata_str) if metadata_str else None
transaction = CreditTransaction(
transaction_id=transaction_id,
node_id=node_id,
amount=amount,
transaction_type=TransactionType(trans_type),
reason=CreditReason(reason),
timestamp=timestamp,
service_type=service_type,
metadata=metadata
)
transactions.append(transaction)
return transactions
def transfer_credits(self, from_node_id: str, to_node_id: str, amount: float,
reason: CreditReason = CreditReason.TRANSFERRED) -> bool:
"""Transfer credits from one node to another"""
if amount <= 0:
return False
# First spend from sender
if not self.spend_credits(from_node_id, amount, reason):
return False
# Then add to receiver
if not self.add_credits(to_node_id, amount, reason):
# Rollback: if adding to receiver fails, refund sender
self.add_credits(from_node_id, amount, CreditReason.REFUND,
metadata={"original_transaction": "transfer_failed"})
return False
return True
def get_top_nodes_by_balance(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get top nodes by credit balance"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT node_id, balance, total_earned, total_spent
FROM balances
ORDER BY balance DESC
LIMIT ?
''', (limit,))
rows = cursor.fetchall()
conn.close()
top_nodes = []
for row in rows:
node_id, balance, total_earned, total_spent = row
top_nodes.append({
"node_id": node_id,
"balance": balance,
"total_earned": total_earned,
"total_spent": total_spent
})
return top_nodes
# Global instance of the credits system
credits_system = CreditsSystem()