PureVersation / src /trust_agent.py
GitHub Actions
🚀 Automated sync from GitHub
eae31d8
import os
import requests
import json
import threading
import pandas as pd
import csv
import time
import re
import asyncio
from threading import Lock
from web3 import Web3
try:
from web3.middleware import ExtraDataToPOAMiddleware as geth_poa_middleware
except ImportError:
from web3.middleware import geth_poa_middleware
# Hardcoded ABI removed! Loaded dynamically via config.py / dynamic_compiler.py
class AgentTrust:
def __init__(self, config):
self.config = config
self.lock = Lock()
self.active_tasks = 0
self.hf_manager_ref = None
self.last_latency = 0.0
self.queue_file = "/app/pending_tx_queue.json"
self.HISTORY_FILE = "/app/minted_history.csv"
# 1. Hugging Face Queue Recovery
try:
from huggingface_hub import hf_hub_download
print("☁️ Checking Hugging Face for existing pending transaction queue...")
downloaded_path = hf_hub_download(
repo_id="toecm/PureChain_Dataset",
repo_type="dataset",
filename="pending_tx_queue.json",
token=getattr(self.config, 'HF_TOKEN', None)
)
with open(downloaded_path, "r") as src, open(self.queue_file, "w") as dst:
dst.write(src.read())
print("✅ Successfully recovered pending queue from Hugging Face!")
except Exception as e:
print("ℹ️ No existing queue found on Hugging Face. Starting fresh.")
# 2. 🟢 WEB3 & WALLET INITIALIZATION (The Missing Piece)
self.w3 = None
self.account = None
if self.config.PRIVATE_KEY and self.config.PURECHAIN_RPC_URL:
try:
# Connect to your Korean Node (Port 8548)
self.w3 = Web3(Web3.HTTPProvider(self.config.PURECHAIN_RPC_URL))
# Inject POA middleware (Required for Geth/Private Chains)
self.w3.middleware_onion.inject(geth_poa_middleware, layer=0)
if self.w3.is_connected():
self.account = self.w3.eth.account.from_key(self.config.PRIVATE_KEY)
print(f"🔗 Web3 Connected! Wallet: {self.account.address}")
# 🟢 DYNAMIC AUTO-DEPLOY LOGIC
if not self.config.PURECHAIN_CONTRACT_ADDRESS or self.config.PURECHAIN_CONTRACT_ADDRESS == "YOUR_CONTRACT_ADDRESS_HERE":
print("⚠️ No valid contract address found in config/env. Auto-deploying a new instance...")
try:
from src.dynamic_compiler import get_contract_interface
abi, bytecode = get_contract_interface()
Contract = self.w3.eth.contract(abi=abi, bytecode=bytecode)
tx = Contract.constructor().build_transaction({
'from': self.account.address,
'nonce': self.w3.eth.get_transaction_count(self.account.address),
'gas': 3000000,
'gasPrice': self.w3.eth.gas_price,
'chainId': getattr(self.config, 'PURECHAIN_ID', 900520900520)
})
signed_tx = self.w3.eth.account.sign_transaction(tx, self.config.PRIVATE_KEY)
tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction)
print("⏳ Deploying contract... waiting for receipt.")
tx_receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash)
new_address = tx_receipt.contractAddress
print(f"🎉 CONTRACT DEPLOYED DYNAMICALLY AT: {new_address}")
# Set it in memory for this session
self.config.PURECHAIN_CONTRACT_ADDRESS = new_address
os.environ["PURECHAIN_CONTRACT_ADDRESS"] = new_address
# Also inject into config object dynamically
self.config.CONTRACT_ABI = abi
except Exception as de:
print(f"❌ Auto-deploy failed: {de}")
else:
print(f"⚠️ Failed to connect to node at {self.config.PURECHAIN_RPC_URL}")
except Exception as e:
print(f"❌ Web3 Init Error: {e}")
print("🛡️ Agent 4 (Trust) Online: Async Saving & Retry Queue Enabled.")
# 🟢 START BACKGROUND RETRY LOOP
threading.Thread(target=self._retry_queue_loop, daemon=True).start()
# --- THE DEAD LETTER QUEUE LOGIC ---
def add_to_queue(self, data_dict):
with self.lock:
queue = []
if os.path.exists(self.queue_file):
try:
with open(self.queue_file, "r") as f:
queue = json.load(f)
except: pass
queue.append(data_dict)
with open(self.queue_file, "w") as f:
json.dump(queue, f, indent=2)
def _retry_queue_loop(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
while True:
time.sleep(60) # Check the queue every 60 seconds
if not os.path.exists(self.queue_file): continue
with self.lock:
try:
with open(self.queue_file, "r") as f:
queue = json.load(f)
except: queue = []
if not queue: continue
# Check if node is back online
if not self.w3 or not self.w3.is_connected():
continue
total_queued = len(queue)
print(f"🔄 Node Online! Attempting to mint {total_queued} queued transactions...")
remaining_queue = []
success_count = 0 # Track successes
for data in queue:
result = self.stamp_on_chain(data) # Removed from_queue=True as it's not in the signature
if result:
success_count += 1
else:
remaining_queue.append(data)
# --- NOTIFICATION LOGIC ---
if success_count > 0:
print(f"\n🔔 [TRUST AGENT ALERT] Successfully cleared {success_count}/{total_queued} backlog transactions.")
if not remaining_queue:
print("✅ ALL PENDING TRANSACTIONS MINTED. Queue is now empty.\n")
else:
print(f"⚠️ {len(remaining_queue)} transactions failed again and remain in queue.\n")
# Update the queue file with whatever is left
with self.lock:
with open(self.queue_file, "w") as f:
json.dump(remaining_queue, f, indent=2)
finally:
loop.close()
def get_unique_origins(self):
try:
# 1. Define the history file path inside the function
history_file = "/app/minted_history.csv"
if not os.path.exists(history_file):
return []
# 2. Read the history and get unique values from Data_Origin
df = pd.read_csv(history_file)
if "Data_Origin" in df.columns:
return sorted(df["Data_Origin"].dropna().unique().tolist())
return []
except Exception as e:
print(f"Error fetching unique origins: {e}")
return []
# --- IPFS HELPERS ---
def upload_file_to_pinata(self, filepath):
url = "https://api.pinata.cloud/pinning/pinFileToIPFS"
headers = {"Authorization": f"Bearer {self.config.PINATA_JWT}"}
try:
with open(filepath, 'rb') as f:
files = {'file': f}
response = requests.post(url, headers=headers, files=files)
return response.json().get('IpfsHash')
except Exception as e:
return None
def log_to_ipfs(self, data):
if not self.config.PINATA_JWT: return "Local-Log-Only"
headers = {"Authorization": f"Bearer {self.config.PINATA_JWT}"}
try:
res = requests.post("https://api.pinata.cloud/pinning/pinJSONToIPFS", headers=headers, json=data)
return res.json().get("IpfsHash", "Error")
except: return "IPFS_Fail"
# --- CORE BLOCKCHAIN LOGIC ---
def stamp_on_chain(self, payload):
"""
Hashes the validated sociolinguistic data and mints it to the Purechain ledger
using the PureVersation contract's proposeEntry function.
"""
import json
import hashlib
import os
from web3 import Web3
try:
from web3.middleware import ExtraDataToPOAMiddleware as geth_poa_middleware
except ImportError:
from web3.middleware import geth_poa_middleware
print("\n" + "="*40)
print("⛓️ INITIATING PURECHAIN MINTING SEQUENCE")
try:
# 1. Load Environment Variables (Updated to match your secrets)
rpc_url = os.environ.get("PURECHAIN_RPC_URL")
private_key = os.environ.get("PRIVATE_KEY")
contract_address = os.environ.get("PURECHAIN_CONTRACT_ADDRESS")
if not all([rpc_url, private_key, contract_address]):
print("⚠️ Purechain skipped: Missing RPC_URL, PRIVATE_KEY, or PURECHAIN_CONTRACT_ADDRESS.")
return False
# 2. Establish Network Connection
w3 = Web3(Web3.HTTPProvider(rpc_url))
w3.middleware_onion.inject(geth_poa_middleware, layer=0) # PoA compatibility
if not w3.is_connected():
print("🔴 Purechain connection failed. Network may be offline.")
return False
account = w3.eth.account.from_key(private_key)
wallet_address = account.address
print(f"✅ Authenticated as Operator: {wallet_address}")
# 3. Cryptographic Hashing of the Data
# We hash the full JSON payload to use as our "IPFS CID" / Data Hash proof
payload_str = json.dumps(payload, sort_keys=True)
data_hash = hashlib.sha256(payload_str.encode('utf-8')).hexdigest()
print(f"🔒 Payload SHA-256 Proof: {data_hash}")
# 4. Load the Smart Contract using your config.py ABI
# Ensure your config is imported/available in trust_agent.py
contract = w3.eth.contract(address=contract_address, abi=self.config.CONTRACT_ABI)
# 5. Extract values for the proposeEntry parameters
# function proposeEntry(string _phrase, string _dialect, string _ipfsCid, string _license)
phrase = payload.get("original", "Unknown Utterance")
dialect = payload.get("dialect", "Unknown Dialect")
license_type = "CC-BY-4.0 (Open Research)" # Academic open data license
# 🟢 FIX: Web3 Checksumming and Length Validation
raw_operator = str(payload.get("user", wallet_address)).strip().lower()
# Ensure it is exactly 42 characters and starts with 0x
if raw_operator.startswith("0x") and len(raw_operator) == 42:
try:
# Web3 strictly requires addresses to be checksummed (mixed upper/lower case hex)
final_operator_id = w3.to_checksum_address(raw_operator)
except Exception:
final_operator_id = w3.to_checksum_address(wallet_address)
else:
# If the React ID is missing or broken, fallback to the Lab Admin
final_operator_id = w3.to_checksum_address(wallet_address)
# 6. Build the Transaction
nonce = w3.eth.get_transaction_count(wallet_address)
# 🟢 UPDATED: Pass the finalized, checksummed address
tx = contract.functions.proposeEntry(
phrase,
dialect,
data_hash,
license_type,
final_operator_id
).build_transaction({
'chainId': w3.eth.chain_id,
'gas': 2000000,
'gasPrice': 0,
'nonce': nonce,
})
# 7. Sign and Broadcast
signed_tx = w3.eth.account.sign_transaction(tx, private_key)
tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
hex_hash = w3.to_hex(tx_hash)
print(f"🚀 Transaction Broadcasted! TX Hash: {hex_hash}")
# 8. Wait for Confirmation
receipt = w3.eth.wait_for_transaction_receipt(tx_hash, timeout=120)
if receipt.status == 1:
print(f"💎 SUCCESS: Entry '{phrase[:15]}...' irreversibly minted in Block {receipt.blockNumber}")
# OPTIONAL: Save the receipt to your Gradio Transaction History tab
if hasattr(self, '_log_transaction_history'):
self._log_transaction_history(payload, hex_hash, receipt.blockNumber)
return True
else:
print("⚠️ Transaction reverted by the EVM. (Check if wallet is registered)")
return False
except Exception as e:
import traceback
print(f"\n❌ SMART CONTRACT ERROR ❌\n{e}")
traceback.print_exc()
return False
finally:
print("="*40 + "\n")
def log_mint_to_history(self, utterance, dialect, tx_hash, block_num, cid):
history_file = "/app/minted_history.csv"
new_entry = {
"Timestamp": pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S'),
"Utterance": utterance, "Dialect": dialect, "Data_Origin": "Lab UI", # Match UI columns
"Block": block_num, "Speed": f"{self.last_latency}s", "TX Hash": tx_hash, "IPFS CID": cid
}
df = pd.DataFrame([new_entry])
with self.lock:
# 1. Save Locally
if not os.path.exists(history_file):
df.to_csv(history_file, index=False)
else:
df.to_csv(history_file, mode='a', header=False, index=False)
# 2. ☁️ Push the History File to Hugging Face
try:
hf_token = os.environ.get("HF_TOKEN")
repo_id = getattr(self.config, "HF_REPO_ID", "toecm/IEDID") # Fallback to your main dataset
if hf_token:
from huggingface_hub import HfApi
api = HfApi(token=hf_token)
print("☁️ Syncing minted_history.csv to Hugging Face...")
api.upload_file(
path_or_fileobj=history_file,
path_in_repo="minted_history.csv",
repo_id=repo_id,
repo_type="dataset",
commit_message="📜 Blockchain TX History Updated"
)
except Exception as e:
print(f"⚠️ Failed to sync history to HF: {e}")
def get_filtered_history(self, start_date=None, end_date=None):
"""Safely reads and filters the PureChain minted history for the Gradio UI."""
import pandas as pd
import os
history_file = getattr(self, 'HISTORY_FILE', "/app/minted_history.csv")
# The columns your Gradio UI expects
expected_cols = ["Timestamp", "Utterance", "Dialect", "Data_Origin", "Block", "Speed", "TX Hash", "IPFS CID"]
# 1. Safe File Check
if not os.path.exists(history_file):
return pd.DataFrame(columns=expected_cols)
try:
df = pd.read_csv(history_file)
# 2. Add missing columns safely (in case of old CSV formats)
for col in expected_cols:
if col not in df.columns:
df[col] = "Unknown" if col == "Data_Origin" else ""
if df.empty:
return df
# 3. Safe Date Parsing (Ignore errors, just keep data if dates are weird)
df["Timestamp_Parsed"] = pd.to_datetime(df["Timestamp"], errors='coerce')
# Apply Start Date
if start_date and str(start_date).strip():
try:
start_dt = pd.to_datetime(start_date)
df = df[df["Timestamp_Parsed"] >= start_dt]
except Exception as e:
print(f"Start date filter error: {e}")
# Apply End Date (Add 23:59:59 to include the whole end day)
if end_date and str(end_date).strip():
try:
end_dt = pd.to_datetime(end_date).replace(hour=23, minute=59, second=59)
df = df[df["Timestamp_Parsed"] <= end_dt]
except Exception as e:
print(f"End date filter error: {e}")
# 4. Clean up and reverse order (Newest first!)
df = df.drop(columns=["Timestamp_Parsed"])
# Return the reversed dataframe so you don't have to scroll down
return df.iloc[::-1].reset_index(drop=True)
except Exception as e:
import traceback
print(f"❌ Error reading history: {e}")
traceback.print_exc()
return pd.DataFrame(columns=expected_cols)
# --- SINGLE ENTRY SAVING ---
def check_if_exists(self, utterance, dialect, brain_agent, clarification="", tone=""):
if brain_agent.df.empty: return False
clean_text = str(utterance).strip().lower()
clean_dialect = str(dialect).strip()
clean_clar = str(clarification).strip().lower()
match = brain_agent.df[
(brain_agent.df["Utterance"].astype(str).str.strip().str.lower() == clean_text) &
(brain_agent.df["Dialect"].astype(str).str.strip() == clean_dialect) &
(brain_agent.df["Clarification"].astype(str).str.strip().str.lower() == clean_clar)
]
return not match.empty
def process_feedback(self, action, original_text, dialect, clarification, tone, context, brain_agent, audio_path=None, pragmatics=""):
timestamp = pd.Timestamp.now().isoformat()
feedback_data = {
"original": original_text, "dialect": dialect, "clarification": clarification,
"tone": tone, "linguistic_context": context, "pragmatics": pragmatics, "action": action, "timestamp": timestamp
}
def _background_save_task():
loop = asyncio.new_event_loop() # Create new loop
asyncio.set_event_loop(loop) # Set as active loop
self.active_tasks += 1
try:
# 🟢 Generate Acoustic Profile dynamically before saving
acoustic_profile = "{}"
if getattr(self, "acoustic_agent", None):
acoustic_profile = self.acoustic_agent.generate_phonetic_profile(original_text, dialect)
feedback_data["acoustic_profile"] = acoustic_profile
self.stamp_on_chain(feedback_data)
if action in ["Suggest Update", "Accept", "Force Overwrite"]:
syntax = r"\b" + re.escape(original_text.lower()) + r"\b"
self.update_dataset_csv(dialect, original_text, clarification, tone, context, syntax, audio_path, pragmatics, acoustic_profile=acoustic_profile)
if brain_agent: brain_agent.refresh_knowledge_base()
except Exception as e: print(f"❌ Background Task Error: {e}")
finally:
self.active_tasks -= 1
loop.close() # Cleanly close the loop
threading.Thread(target=_background_save_task, daemon=True).start()
return "✅ Request Queued!"
def update_dataset_csv(self, dialect, utterance, clarification, tone, context, syntax, audio_path=None, pragmatics="", sourceTag="Web", clar_source="User", userKey="", acoustic_profile="{}"):
import csv
clean_dialect = dialect.strip().title()
if not clean_dialect.endswith("English") and not clean_dialect.endswith("Dialect"):
clean_dialect += " English"
filepath = os.path.join(self.config.DATASET_DIR, f"{clean_dialect}.csv")
# 🟢 COLUMN FIX: Hardcode both columns to prevent schema crashes
expected_cols = ["Utterance", "Dialect", "Clarification", "Tone_Category", "Linguistic_Context", "Syntax_Pattern", "Pragmatic_Analysis", "Acoustic_Profile", "file_name", "audio_file_name", "Data_Origin", "Clarification_Source", "User"]
with self.lock:
if not os.path.exists(filepath):
df = pd.DataFrame(columns=expected_cols)
else:
try:
df = pd.read_csv(filepath, encoding='utf-8-sig', on_bad_lines='skip')
except:
df = pd.DataFrame(columns=expected_cols)
for col in expected_cols:
if col not in df.columns:
df[col] = ""
# 🟢 AUDIO FIX: Map to the standard 'file_name' column
final_audio = ""
if isinstance(audio_path, str) and os.path.exists(audio_path):
audio_filename = os.path.basename(audio_path)
final_audio = f"audio/{audio_filename}"
row_data = {
"Utterance": utterance, "Dialect": clean_dialect, "Clarification": clarification,
"Tone_Category": tone, "Linguistic_Context": context, "Pragmatic_Analysis": pragmatics,
"Syntax_Pattern": syntax,
"Acoustic_Profile": acoustic_profile,
"file_name": final_audio, # ⬅️ Valid Audio Path goes here
"audio_file_name": "", # ⬅️ Blank to prevent confusion
"Data_Origin": sourceTag, # ⬅️ Will now say "Game: X" or "Gradio App"
"Clarification_Source": clar_source,
"User": userKey # ⬅️ Will now say "Op: 0x... | Net-IP: 119..."
}
new_row = pd.DataFrame([{k: row_data.get(k, "") for k in expected_cols}])
final_df = pd.concat([df, new_row], ignore_index=True)
final_df.to_csv(filepath, index=False, quoting=csv.QUOTE_ALL)
# ==========================================
# ☁️ 4. HUGGING FACE CLOUD SYNC
# ==========================================
try:
hf_token = os.environ.get("HF_TOKEN")
# Fallback to your hardcoded ID if the config is missing it
repo_id = getattr(self.config, "HF_REPO_ID", "toecm/IEDID")
if hf_token:
from huggingface_hub import HfApi
api = HfApi(token=hf_token)
# 🎵 Upload the Audio File
if audio_path and os.path.exists(audio_path):
print(f"☁️ Syncing Audio to HF: {final_audio}")
api.upload_file(
path_or_fileobj=audio_path,
path_in_repo=final_audio,
repo_id=repo_id,
repo_type="dataset",
commit_message=f"🎙️ Added new audio sample for {clean_dialect}"
)
# 📝 Upload the CSV file
if os.path.exists(filepath):
csv_filename = os.path.basename(filepath)
print(f"☁️ Syncing CSV to HF: {csv_filename}")
api.upload_file(
path_or_fileobj=filepath,
path_in_repo=csv_filename,
repo_id=repo_id,
repo_type="dataset",
commit_message=f"📝 Auto-update {clean_dialect} sociolinguistic data"
)
except Exception as e:
print(f"⚠️ Hugging Face Sync Error: {e}")
# Optional Fallback: Use the old manager if the direct API fails
if getattr(self, "hf_manager_ref", None):
self.hf_manager_ref.push_update(filepath, f"Update: {utterance}")
return True
# ==========================================
# PURECHAIN HISTORY TAB
# ==========================================
def _log_transaction_history(self, payload, tx_hash, block_num):
import pandas as pd
import os
from datetime import datetime
from huggingface_hub import HfApi
history_file = os.path.join(self.config.DATASET_DIR, "minted_history.csv")
new_row = {
"Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"Utterance": payload.get("original", ""),
"Dialect": payload.get("dialect", ""),
"Data_Origin": "Lab Approved",
"Block": block_num,
"TX Hash": tx_hash
}
try:
if os.path.exists(history_file):
df = pd.read_csv(history_file)
else:
df = pd.DataFrame(columns=new_row.keys())
df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
df.to_csv(history_file, index=False)
# 🟢 FIX: Upload history back to Hugging Face so it survives restarts
if self.config.HF_TOKEN:
api = HfApi(token=self.config.HF_TOKEN)
repo_id = getattr(self.config, 'HF_REPO_ID', "toecm/IEDID")
api.upload_file(
path_or_fileobj=history_file,
path_in_repo="minted_history.csv",
repo_id=repo_id,
repo_type="dataset",
commit_message="⛓️ Logged PureChain transaction history"
)
except Exception as e:
print(f"History log error: {e}")
# ==========================================
# SYSTEM BACKUP & DEPLOY
# ==========================================
def create_system_backup(self, filename, note):
"""Uploads a file to IPFS and logs it permanently to PureChain."""
if not self.w3 or not self.account:
return "❌ Node offline. Cannot create immutable backup."
# Search for the file in your working directories
filepath = os.path.join(self.config.DATASET_DIR, filename)
if not os.path.exists(filepath):
filepath = os.path.join(self.config.PROFILES_DIR, filename)
if not os.path.exists(filepath):
return f"❌ File '{filename}' not found in datasets or profiles."
try:
# 1. Upload to IPFS
cid = self.upload_file_to_pinata(filepath)
if not cid: return "❌ Failed to pin file to IPFS."
# 2. Mint Backup to PureChain
contract = self.w3.eth.contract(address=self.config.PURECHAIN_CONTRACT_ADDRESS, abi=self.config.CONTRACT_ABI)
nonce = self.w3.eth.get_transaction_count(self.account.address, 'pending')
# Using your contract's createBackup(string _key, string _type, string _cid, string _desc)
file_ext = filename.split(".")[-1].upper()
tx = contract.functions.createBackup(
filename, file_ext, cid, note
).build_transaction({
'from': self.account.address,
'nonce': nonce,
'gas': 2000000,
'gasPrice': 0, # ZERO GAS!
'chainId': self.config.PURECHAIN_ID
})
signed_tx = self.w3.eth.account.sign_transaction(tx, private_key=self.config.PRIVATE_KEY)
tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction)
self.w3.eth.wait_for_transaction_receipt(tx_hash)
return f"✅ Immutable Backup Created!\nCID: {cid}\nTX: {self.w3.to_hex(tx_hash)}"
except Exception as e:
return f"❌ Blockchain Backup Error: {e}"
def force_deploy_contract(self, bytecode):
"""Forces a raw bytecode deployment to PureChain with Zero Gas."""
if not self.w3 or not self.account:
return "❌ Node offline. Cannot deploy."
if not bytecode or not bytecode.startswith("0x"):
return "❌ Invalid bytecode. Must start with '0x'."
try:
nonce = self.w3.eth.get_transaction_count(self.account.address, 'pending')
# Raw deployment transaction
tx = {
'from': self.account.address,
'data': bytecode,
'nonce': nonce,
'gas': 4000000, # Higher gas limit for deployments
'gasPrice': 0, # ZERO GAS
'chainId': self.config.PURECHAIN_ID
}
signed_tx = self.w3.eth.account.sign_transaction(tx, private_key=self.config.PRIVATE_KEY)
tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction)
receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash)
return f"✅ Contract Deployed Successfully!\nAddress: {receipt.contractAddress}"
except Exception as e:
return f"❌ Deployment Failed: {e}"
def rebuild_history_from_chain(self):
"""Ultimate Failsafe: Reconstructs the CSV directly from the PureChain Ledger."""
print("🌐 [TRUST AGENT] Rebuilding history from PureChain...")
if not self.w3 or not self.w3.is_connected():
print("❌ Cannot rebuild: Node offline.")
return False
try:
contract = self.w3.eth.contract(
address=self.config.PURECHAIN_CONTRACT_ADDRESS,
abi=self.config.CONTRACT_ABI
)
# Make sure your except block aligns perfectly with your try block!
except Exception as e:
print(f"⚠️ Web3 Contract Error: {e}")
contract = None