qcrypt-rng / quantum-oracle /client-sdk /python /quantum_randomness_client.py
rocRevyAreGoals15's picture
Add quantum dashboard, VRF, PQC, data protection, and HF Spaces deployment
bab1185
"""
Quantum Randomness Oracle - Python Client SDK
"""
import asyncio
import json
from typing import Optional, Dict, Any
from dataclasses import dataclass
from web3 import Web3
@dataclass
class RandomnessRequest:
"""Represents a randomness request"""
request_id: int
requester: str
fee_paid: int
block_number: int
timestamp: float
fulfilled: bool = False
randomness: Optional[int] = None
class QuantumRandomnessClient:
"""Python client for interacting with the Quantum Randomness Oracle"""
def __init__(self, rpc_url: str, contract_address: str, private_key: Optional[str] = None):
self.web3 = Web3(Web3.HTTPProvider(rpc_url))
self.contract_address = contract_address
self.private_key = private_key
# Setup contract
self.contract = self._setup_contract()
if private_key:
self.account = self.web3.eth.account.from_key(private_key)
else:
self.account = None
def _setup_contract(self):
"""Setup the smart contract instance with a minimal ABI"""
contract_abi = [
{
"inputs": [
{"name": "_fee", "type": "uint256"},
{"name": "_oracleNode", "type": "address"}
],
"stateMutability": "nonpayable",
"type": "constructor"
},
{
"inputs": [],
"name": "requestRandomness",
"outputs": [{"name": "requestId", "type": "uint256"}],
"stateMutability": "payable",
"type": "function"
},
{
"inputs": [{"name": "requestId", "type": "uint256"}],
"name": "getRequest",
"outputs": [
{"name": "requester", "type": "address"},
{"name": "fee", "type": "uint256"},
{"name": "commitment", "type": "bytes32"},
{"name": "blockNumber", "type": "uint256"},
{"name": "fulfilled", "type": "bool"},
{"name": "randomness", "type": "uint256"},
{"name": "timestamp", "type": "uint256"}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [{"name": "requestId", "type": "uint256"}],
"name": "isRequestFulfilled",
"outputs": [{"name": "", "type": "bool"}],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "fee",
"outputs": [{"name": "", "type": "uint256"}],
"stateMutability": "view",
"type": "function"
}
]
return self.web3.eth.contract(
address=self.contract_address,
abi=contract_abi
)
def get_request_fee(self) -> int:
"""Get the current fee for requesting randomness"""
return self.contract.functions.fee().call()
def request_randomness_sync(self, value: Optional[int] = None) -> int:
"""Synchronously request randomness from the oracle"""
if not self.account:
raise ValueError("Private key required for sending transactions")
fee = self.get_request_fee()
value = value or fee
# Build transaction
transaction = self.contract.functions.requestRandomness().build_transaction({
'from': self.account.address,
'value': value,
'nonce': self.web3.eth.get_transaction_count(self.account.address),
'gasPrice': self.web3.eth.gas_price
})
# Sign and send transaction
signed_txn = self.web3.eth.account.sign_transaction(transaction, self.private_key)
tx_hash = self.web3.eth.send_raw_transaction(signed_txn.rawTransaction)
# Wait for transaction receipt
receipt = self.web3.eth.wait_for_transaction_receipt(tx_hash)
if receipt.status != 1:
raise Exception("Transaction failed")
# Parse the requestId from logs
for log in receipt.logs:
# This is a simplified parsing - in real implementation,
# you'd decode the event properly
pass
# For now, return a dummy request ID (in real implementation, parse from event logs)
return 0 # This should be parsed from the event logs
async def request_randomness(self, value: Optional[int] = None) -> int:
"""Request randomness from the oracle"""
return self.request_randomness_sync(value)
def get_request_status_sync(self, request_id: int) -> Dict[str, Any]:
"""Get the status of a randomness request"""
request_data = self.contract.functions.getRequest(request_id).call()
return {
'request_id': request_id,
'requester': request_data[0],
'fee_paid': request_data[1],
'commitment': request_data[2],
'block_number': request_data[3],
'fulfilled': request_data[4],
'randomness': request_data[5] if request_data[4] else None,
'timestamp': request_data[6]
}
async def get_request_status(self, request_id: int) -> Dict[str, Any]:
"""Get the status of a randomness request"""
return self.get_request_status_sync(request_id)
def is_request_fulfilled_sync(self, request_id: int) -> bool:
"""Check if a request is fulfilled"""
return self.contract.functions.isRequestFulfilled(request_id).call()
async def is_request_fulfilled(self, request_id: int) -> bool:
"""Check if a request is fulfilled"""
return self.is_request_fulfilled_sync(request_id)
async def wait_for_fulfillment(self, request_id: int, timeout: int = 300) -> Dict[str, Any]:
"""Wait for a request to be fulfilled"""
import time
start_time = time.time()
while time.time() - start_time < timeout:
status = await self.get_request_status(request_id)
if status['fulfilled']:
return status
await asyncio.sleep(5) # Poll every 5 seconds
raise TimeoutError(f"Request {request_id} not fulfilled within {timeout} seconds")
# Example usage
async def main():
"""Example usage of the Quantum Randomness Client"""
print("Quantum Randomness Oracle Client Demo")
print("=" * 40)
# Initialize client (using a test RPC URL and contract address)
client = QuantumRandomnessClient(
rpc_url="http://localhost:8545", # Replace with actual RPC URL
contract_address="0x0000000000000000000000000000000000000000", # Replace with actual address
private_key="0x0000000000000000000000000000000000000000000000000000000000000000" # Replace with actual private key
)
try:
# Get the current request fee
fee = client.get_request_fee()
print(f"Current request fee: {client.web3.fromWei(fee, 'ether')} ETH")
# Request randomness (this would require a real contract and funds)
# request_id = await client.request_randomness()
# print(f"Requested randomness with ID: {request_id}")
# Check status of a request (replace with actual request ID)
# status = await client.get_request_status(request_id)
# print(f"Request status: {status}")
print("Client initialized successfully!")
except Exception as e:
print(f"Demo error (expected without real contract): {e}")
if __name__ == "__main__":
asyncio.run(main())