Spaces:
Sleeping
Sleeping
File size: 7,882 Bytes
bab1185 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 | """
Quantum Randomness Oracle - Python Client SDK
"""
import asyncio
import json
from typing import Optional, Dict, Any
from dataclasses import dataclass
from web3 import Web3
@dataclass
class RandomnessRequest:
"""Represents a randomness request"""
request_id: int
requester: str
fee_paid: int
block_number: int
timestamp: float
fulfilled: bool = False
randomness: Optional[int] = None
class QuantumRandomnessClient:
"""Python client for interacting with the Quantum Randomness Oracle"""
def __init__(self, rpc_url: str, contract_address: str, private_key: Optional[str] = None):
self.web3 = Web3(Web3.HTTPProvider(rpc_url))
self.contract_address = contract_address
self.private_key = private_key
# Setup contract
self.contract = self._setup_contract()
if private_key:
self.account = self.web3.eth.account.from_key(private_key)
else:
self.account = None
def _setup_contract(self):
"""Setup the smart contract instance with a minimal ABI"""
contract_abi = [
{
"inputs": [
{"name": "_fee", "type": "uint256"},
{"name": "_oracleNode", "type": "address"}
],
"stateMutability": "nonpayable",
"type": "constructor"
},
{
"inputs": [],
"name": "requestRandomness",
"outputs": [{"name": "requestId", "type": "uint256"}],
"stateMutability": "payable",
"type": "function"
},
{
"inputs": [{"name": "requestId", "type": "uint256"}],
"name": "getRequest",
"outputs": [
{"name": "requester", "type": "address"},
{"name": "fee", "type": "uint256"},
{"name": "commitment", "type": "bytes32"},
{"name": "blockNumber", "type": "uint256"},
{"name": "fulfilled", "type": "bool"},
{"name": "randomness", "type": "uint256"},
{"name": "timestamp", "type": "uint256"}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [{"name": "requestId", "type": "uint256"}],
"name": "isRequestFulfilled",
"outputs": [{"name": "", "type": "bool"}],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "fee",
"outputs": [{"name": "", "type": "uint256"}],
"stateMutability": "view",
"type": "function"
}
]
return self.web3.eth.contract(
address=self.contract_address,
abi=contract_abi
)
def get_request_fee(self) -> int:
"""Get the current fee for requesting randomness"""
return self.contract.functions.fee().call()
def request_randomness_sync(self, value: Optional[int] = None) -> int:
"""Synchronously request randomness from the oracle"""
if not self.account:
raise ValueError("Private key required for sending transactions")
fee = self.get_request_fee()
value = value or fee
# Build transaction
transaction = self.contract.functions.requestRandomness().build_transaction({
'from': self.account.address,
'value': value,
'nonce': self.web3.eth.get_transaction_count(self.account.address),
'gasPrice': self.web3.eth.gas_price
})
# Sign and send transaction
signed_txn = self.web3.eth.account.sign_transaction(transaction, self.private_key)
tx_hash = self.web3.eth.send_raw_transaction(signed_txn.rawTransaction)
# Wait for transaction receipt
receipt = self.web3.eth.wait_for_transaction_receipt(tx_hash)
if receipt.status != 1:
raise Exception("Transaction failed")
# Parse the requestId from logs
for log in receipt.logs:
# This is a simplified parsing - in real implementation,
# you'd decode the event properly
pass
# For now, return a dummy request ID (in real implementation, parse from event logs)
return 0 # This should be parsed from the event logs
async def request_randomness(self, value: Optional[int] = None) -> int:
"""Request randomness from the oracle"""
return self.request_randomness_sync(value)
def get_request_status_sync(self, request_id: int) -> Dict[str, Any]:
"""Get the status of a randomness request"""
request_data = self.contract.functions.getRequest(request_id).call()
return {
'request_id': request_id,
'requester': request_data[0],
'fee_paid': request_data[1],
'commitment': request_data[2],
'block_number': request_data[3],
'fulfilled': request_data[4],
'randomness': request_data[5] if request_data[4] else None,
'timestamp': request_data[6]
}
async def get_request_status(self, request_id: int) -> Dict[str, Any]:
"""Get the status of a randomness request"""
return self.get_request_status_sync(request_id)
def is_request_fulfilled_sync(self, request_id: int) -> bool:
"""Check if a request is fulfilled"""
return self.contract.functions.isRequestFulfilled(request_id).call()
async def is_request_fulfilled(self, request_id: int) -> bool:
"""Check if a request is fulfilled"""
return self.is_request_fulfilled_sync(request_id)
async def wait_for_fulfillment(self, request_id: int, timeout: int = 300) -> Dict[str, Any]:
"""Wait for a request to be fulfilled"""
import time
start_time = time.time()
while time.time() - start_time < timeout:
status = await self.get_request_status(request_id)
if status['fulfilled']:
return status
await asyncio.sleep(5) # Poll every 5 seconds
raise TimeoutError(f"Request {request_id} not fulfilled within {timeout} seconds")
# Example usage
async def main():
"""Example usage of the Quantum Randomness Client"""
print("Quantum Randomness Oracle Client Demo")
print("=" * 40)
# Initialize client (using a test RPC URL and contract address)
client = QuantumRandomnessClient(
rpc_url="http://localhost:8545", # Replace with actual RPC URL
contract_address="0x0000000000000000000000000000000000000000", # Replace with actual address
private_key="0x0000000000000000000000000000000000000000000000000000000000000000" # Replace with actual private key
)
try:
# Get the current request fee
fee = client.get_request_fee()
print(f"Current request fee: {client.web3.fromWei(fee, 'ether')} ETH")
# Request randomness (this would require a real contract and funds)
# request_id = await client.request_randomness()
# print(f"Requested randomness with ID: {request_id}")
# Check status of a request (replace with actual request ID)
# status = await client.get_request_status(request_id)
# print(f"Request status: {status}")
print("Client initialized successfully!")
except Exception as e:
print(f"Demo error (expected without real contract): {e}")
if __name__ == "__main__":
asyncio.run(main()) |