import os import requests import json import threading import pandas as pd import csv import time import re import asyncio from threading import Lock from web3 import Web3 try: from web3.middleware import ExtraDataToPOAMiddleware as geth_poa_middleware except ImportError: from web3.middleware import geth_poa_middleware # Hardcoded ABI removed! Loaded dynamically via config.py / dynamic_compiler.py class AgentTrust: def __init__(self, config): self.config = config self.lock = Lock() self.active_tasks = 0 self.hf_manager_ref = None self.last_latency = 0.0 self.queue_file = "/app/pending_tx_queue.json" self.HISTORY_FILE = "/app/minted_history.csv" # 1. Hugging Face Queue Recovery try: from huggingface_hub import hf_hub_download print("☁️ Checking Hugging Face for existing pending transaction queue...") downloaded_path = hf_hub_download( repo_id="toecm/PureChain_Dataset", repo_type="dataset", filename="pending_tx_queue.json", token=getattr(self.config, 'HF_TOKEN', None) ) with open(downloaded_path, "r") as src, open(self.queue_file, "w") as dst: dst.write(src.read()) print("✅ Successfully recovered pending queue from Hugging Face!") except Exception as e: print("ℹ️ No existing queue found on Hugging Face. Starting fresh.") # 2. 🟢 WEB3 & WALLET INITIALIZATION (The Missing Piece) self.w3 = None self.account = None if self.config.PRIVATE_KEY and self.config.PURECHAIN_RPC_URL: try: # Connect to your Korean Node (Port 8548) self.w3 = Web3(Web3.HTTPProvider(self.config.PURECHAIN_RPC_URL)) # Inject POA middleware (Required for Geth/Private Chains) self.w3.middleware_onion.inject(geth_poa_middleware, layer=0) if self.w3.is_connected(): self.account = self.w3.eth.account.from_key(self.config.PRIVATE_KEY) print(f"🔗 Web3 Connected! Wallet: {self.account.address}") # 🟢 DYNAMIC AUTO-DEPLOY LOGIC if not self.config.PURECHAIN_CONTRACT_ADDRESS or self.config.PURECHAIN_CONTRACT_ADDRESS == "YOUR_CONTRACT_ADDRESS_HERE": print("⚠️ No valid contract address found in config/env. Auto-deploying a new instance...") try: from src.dynamic_compiler import get_contract_interface abi, bytecode = get_contract_interface() Contract = self.w3.eth.contract(abi=abi, bytecode=bytecode) tx = Contract.constructor().build_transaction({ 'from': self.account.address, 'nonce': self.w3.eth.get_transaction_count(self.account.address), 'gas': 3000000, 'gasPrice': 0, 'chainId': getattr(self.config, 'PURECHAIN_ID', 900520900520) }) signed_tx = self.w3.eth.account.sign_transaction(tx, self.config.PRIVATE_KEY) tx_hash = self.w3.eth.send_raw_transaction(signed_tx.raw_transaction) print("⏳ Deploying contract... waiting for receipt.") tx_receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash) new_address = tx_receipt.contractAddress print(f"🎉 CONTRACT DEPLOYED DYNAMICALLY AT: {new_address}") # Set it in memory for this session self.config.PURECHAIN_CONTRACT_ADDRESS = new_address os.environ["PURECHAIN_CONTRACT_ADDRESS"] = new_address # Also inject into config object dynamically self.config.CONTRACT_ABI = abi except Exception as de: print(f"❌ Auto-deploy failed: {de}") else: print(f"⚠️ Failed to connect to node at {self.config.PURECHAIN_RPC_URL}") except Exception as e: print(f"❌ Web3 Init Error: {e}") print("🛡️ Agent 4 (Trust) Online: Async Saving & Retry Queue Enabled.") # 🟢 START BACKGROUND RETRY LOOP threading.Thread(target=self._retry_queue_loop, daemon=True).start() # --- THE DEAD LETTER QUEUE LOGIC --- def add_to_queue(self, data_dict): with self.lock: queue = [] if os.path.exists(self.queue_file): try: with open(self.queue_file, "r") as f: queue = json.load(f) except: pass queue.append(data_dict) with open(self.queue_file, "w") as f: json.dump(queue, f, indent=2) def _retry_queue_loop(self): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: while True: time.sleep(60) # Check the queue every 60 seconds if not os.path.exists(self.queue_file): continue with self.lock: try: with open(self.queue_file, "r") as f: queue = json.load(f) except: queue = [] if not queue: continue # Check if node is back online if not self.w3 or not self.w3.is_connected(): continue total_queued = len(queue) print(f"🔄 Node Online! Attempting to mint {total_queued} queued transactions...") remaining_queue = [] success_count = 0 # Track successes for data in queue: result = self.stamp_on_chain(data) # Removed from_queue=True as it's not in the signature if result: success_count += 1 else: remaining_queue.append(data) # --- NOTIFICATION LOGIC --- if success_count > 0: print(f"\n🔔 [TRUST AGENT ALERT] Successfully cleared {success_count}/{total_queued} backlog transactions.") if not remaining_queue: print("✅ ALL PENDING TRANSACTIONS MINTED. Queue is now empty.\n") else: print(f"⚠️ {len(remaining_queue)} transactions failed again and remain in queue.\n") # Update the queue file with whatever is left with self.lock: with open(self.queue_file, "w") as f: json.dump(remaining_queue, f, indent=2) finally: loop.close() def get_unique_origins(self): try: # 1. Define the history file path inside the function history_file = "/app/minted_history.csv" if not os.path.exists(history_file): return [] # 2. Read the history and get unique values from Data_Origin df = pd.read_csv(history_file) if "Data_Origin" in df.columns: return sorted(df["Data_Origin"].dropna().unique().tolist()) return [] except Exception as e: print(f"Error fetching unique origins: {e}") return [] # --- IPFS HELPERS --- def upload_file_to_pinata(self, filepath): url = "https://api.pinata.cloud/pinning/pinFileToIPFS" headers = {"Authorization": f"Bearer {self.config.PINATA_JWT}"} try: with open(filepath, 'rb') as f: files = {'file': f} response = requests.post(url, headers=headers, files=files) return response.json().get('IpfsHash') except Exception as e: return None def log_to_ipfs(self, data): if not self.config.PINATA_JWT: return "Local-Log-Only" headers = {"Authorization": f"Bearer {self.config.PINATA_JWT}"} try: res = requests.post("https://api.pinata.cloud/pinning/pinJSONToIPFS", headers=headers, json=data) return res.json().get("IpfsHash", "Error") except: return "IPFS_Fail" # --- CORE BLOCKCHAIN LOGIC --- def stamp_on_chain(self, payload): """ Hashes the validated sociolinguistic data and mints it to the Purechain ledger using the PureVersation contract's proposeEntry function. """ import json import hashlib import os from web3 import Web3 try: from web3.middleware import ExtraDataToPOAMiddleware as geth_poa_middleware except ImportError: from web3.middleware import geth_poa_middleware print("\n" + "="*40) print("⛓️ INITIATING PURECHAIN MINTING SEQUENCE") try: # 1. Load Environment Variables (Updated to match your secrets) rpc_url = os.environ.get("PURECHAIN_RPC_URL") private_key = os.environ.get("PRIVATE_KEY") contract_address = os.environ.get("PURECHAIN_CONTRACT_ADDRESS") if not all([rpc_url, private_key, contract_address]): print("⚠️ Purechain skipped: Missing RPC_URL, PRIVATE_KEY, or PURECHAIN_CONTRACT_ADDRESS.") return False # 2. Establish Network Connection w3 = Web3(Web3.HTTPProvider(rpc_url)) w3.middleware_onion.inject(geth_poa_middleware, layer=0) # PoA compatibility if not w3.is_connected(): print("🔴 Purechain connection failed. Network may be offline.") return False account = w3.eth.account.from_key(private_key) wallet_address = account.address print(f"✅ Authenticated as Operator: {wallet_address}") # 3. Cryptographic Hashing of the Data # We hash the full JSON payload to use as our "IPFS CID" / Data Hash proof payload_str = json.dumps(payload, sort_keys=True) data_hash = hashlib.sha256(payload_str.encode('utf-8')).hexdigest() print(f"🔒 Payload SHA-256 Proof: {data_hash}") # 4. Load the Smart Contract using your config.py ABI # Ensure your config is imported/available in trust_agent.py contract = w3.eth.contract(address=contract_address, abi=self.config.CONTRACT_ABI) # 5. Extract values for the proposeEntry parameters # function proposeEntry(string _phrase, string _dialect, string _ipfsCid, string _license) phrase = payload.get("original", "Unknown Utterance") dialect = payload.get("dialect", "Unknown Dialect") license_type = "CC-BY-4.0 (Open Research)" # Academic open data license # 🟢 FIX: Web3 Checksumming and Length Validation raw_operator = str(payload.get("user", wallet_address)).strip().lower() # Ensure it is exactly 42 characters and starts with 0x if raw_operator.startswith("0x") and len(raw_operator) == 42: try: # Web3 strictly requires addresses to be checksummed (mixed upper/lower case hex) final_operator_id = w3.to_checksum_address(raw_operator) except Exception: final_operator_id = w3.to_checksum_address(wallet_address) else: # If the React ID is missing or broken, fallback to the Lab Admin final_operator_id = w3.to_checksum_address(wallet_address) # 6. Build the Transaction nonce = w3.eth.get_transaction_count(wallet_address) # 🟢 UPDATED: Pass the finalized, checksummed address tx = contract.functions.proposeEntry( phrase, dialect, data_hash, license_type, final_operator_id ).build_transaction({ 'chainId': w3.eth.chain_id, 'gas': 2000000, 'gasPrice': 0, 'nonce': nonce, }) # 7. Sign and Broadcast signed_tx = w3.eth.account.sign_transaction(tx, private_key) tx_hash = w3.eth.send_raw_transaction(signed_tx.raw_transaction) hex_hash = w3.to_hex(tx_hash) print(f"🚀 Transaction Broadcasted! TX Hash: {hex_hash}") # 8. Wait for Confirmation receipt = w3.eth.wait_for_transaction_receipt(tx_hash, timeout=120) if receipt.status == 1: print(f"💎 SUCCESS: Entry '{phrase[:15]}...' irreversibly minted in Block {receipt.blockNumber}") # OPTIONAL: Save the receipt to your Gradio Transaction History tab if hasattr(self, '_log_transaction_history'): self._log_transaction_history(payload, hex_hash, receipt.blockNumber) return True else: print("⚠️ Transaction reverted by the EVM. (Check if wallet is registered)") return False except Exception as e: import traceback print(f"\n❌ SMART CONTRACT ERROR ❌\n{e}") traceback.print_exc() return False finally: print("="*40 + "\n") def log_mint_to_history(self, utterance, dialect, tx_hash, block_num, cid): history_file = "/app/minted_history.csv" new_entry = { "Timestamp": pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S'), "Utterance": utterance, "Dialect": dialect, "Data_Origin": "Lab UI", # Match UI columns "Block": block_num, "Speed": f"{self.last_latency}s", "TX Hash": tx_hash, "IPFS CID": cid } df = pd.DataFrame([new_entry]) with self.lock: # 1. Save Locally if not os.path.exists(history_file): df.to_csv(history_file, index=False) else: df.to_csv(history_file, mode='a', header=False, index=False) # 2. ☁️ Push the History File to Hugging Face try: hf_token = os.environ.get("HF_TOKEN") repo_id = getattr(self.config, "HF_REPO_ID", "toecm/IEDID") # Fallback to your main dataset if hf_token: from huggingface_hub import HfApi api = HfApi(token=hf_token) print("☁️ Syncing minted_history.csv to Hugging Face...") api.upload_file( path_or_fileobj=history_file, path_in_repo="minted_history.csv", repo_id=repo_id, repo_type="dataset", commit_message="📜 Blockchain TX History Updated" ) except Exception as e: print(f"⚠️ Failed to sync history to HF: {e}") def get_filtered_history(self, start_date=None, end_date=None): """Safely reads and filters the PureChain minted history for the Gradio UI.""" import pandas as pd import os history_file = getattr(self, 'HISTORY_FILE', "/app/minted_history.csv") # The columns your Gradio UI expects expected_cols = ["Timestamp", "Utterance", "Dialect", "Data_Origin", "Block", "Speed", "TX Hash", "IPFS CID"] # 1. Safe File Check if not os.path.exists(history_file): return pd.DataFrame(columns=expected_cols) try: df = pd.read_csv(history_file) # 2. Add missing columns safely (in case of old CSV formats) for col in expected_cols: if col not in df.columns: df[col] = "Unknown" if col == "Data_Origin" else "" if df.empty: return df # 3. Safe Date Parsing (Ignore errors, just keep data if dates are weird) df["Timestamp_Parsed"] = pd.to_datetime(df["Timestamp"], errors='coerce') # Apply Start Date if start_date and str(start_date).strip(): try: start_dt = pd.to_datetime(start_date) df = df[df["Timestamp_Parsed"] >= start_dt] except Exception as e: print(f"Start date filter error: {e}") # Apply End Date (Add 23:59:59 to include the whole end day) if end_date and str(end_date).strip(): try: end_dt = pd.to_datetime(end_date).replace(hour=23, minute=59, second=59) df = df[df["Timestamp_Parsed"] <= end_dt] except Exception as e: print(f"End date filter error: {e}") # 4. Clean up and reverse order (Newest first!) df = df.drop(columns=["Timestamp_Parsed"]) # Return the reversed dataframe so you don't have to scroll down return df.iloc[::-1].reset_index(drop=True) except Exception as e: import traceback print(f"❌ Error reading history: {e}") traceback.print_exc() return pd.DataFrame(columns=expected_cols) # --- SINGLE ENTRY SAVING --- def check_if_exists(self, utterance, dialect, brain_agent, clarification="", tone=""): if brain_agent.df.empty: return False clean_text = str(utterance).strip().lower() clean_dialect = str(dialect).strip() clean_clar = str(clarification).strip().lower() match = brain_agent.df[ (brain_agent.df["Utterance"].astype(str).str.strip().str.lower() == clean_text) & (brain_agent.df["Dialect"].astype(str).str.strip() == clean_dialect) & (brain_agent.df["Clarification"].astype(str).str.strip().str.lower() == clean_clar) ] return not match.empty def process_feedback(self, action, original_text, dialect, clarification, tone, context, brain_agent, audio_path=None, pragmatics=""): timestamp = pd.Timestamp.now().isoformat() feedback_data = { "original": original_text, "dialect": dialect, "clarification": clarification, "tone": tone, "linguistic_context": context, "pragmatics": pragmatics, "action": action, "timestamp": timestamp } def _background_save_task(): loop = asyncio.new_event_loop() # Create new loop asyncio.set_event_loop(loop) # Set as active loop self.active_tasks += 1 try: # 🟢 Generate Acoustic Profile dynamically before saving acoustic_profile = "{}" if getattr(self, "acoustic_agent", None): acoustic_profile = self.acoustic_agent.generate_phonetic_profile(original_text, dialect) feedback_data["acoustic_profile"] = acoustic_profile self.stamp_on_chain(feedback_data) if action in ["Suggest Update", "Accept", "Force Overwrite"]: syntax = r"\b" + re.escape(original_text.lower()) + r"\b" self.update_dataset_csv(dialect, original_text, clarification, tone, context, syntax, audio_path, pragmatics, acoustic_profile=acoustic_profile) if brain_agent: brain_agent.refresh_knowledge_base() except Exception as e: print(f"❌ Background Task Error: {e}") finally: self.active_tasks -= 1 loop.close() # Cleanly close the loop threading.Thread(target=_background_save_task, daemon=True).start() return "✅ Request Queued!" def update_dataset_csv(self, dialect, utterance, clarification, tone, context, syntax, audio_path=None, pragmatics="", sourceTag="Web", clar_source="User", userKey="", acoustic_profile="{}"): import csv clean_dialect = dialect.strip().title() if not clean_dialect.endswith("English") and not clean_dialect.endswith("Dialect"): clean_dialect += " English" filepath = os.path.join(self.config.DATASET_DIR, f"{clean_dialect}.csv") # 🟢 COLUMN FIX: Hardcode both columns to prevent schema crashes expected_cols = ["Utterance", "Dialect", "Clarification", "Tone_Category", "Linguistic_Context", "Syntax_Pattern", "Pragmatic_Analysis", "Acoustic_Profile", "file_name", "audio_file_name", "Data_Origin", "Clarification_Source", "User"] with self.lock: if not os.path.exists(filepath): df = pd.DataFrame(columns=expected_cols) else: try: df = pd.read_csv(filepath, encoding='utf-8-sig', on_bad_lines='skip') except: df = pd.DataFrame(columns=expected_cols) for col in expected_cols: if col not in df.columns: df[col] = "" # 🟢 AUDIO FIX: Map to the standard 'file_name' column final_audio = "" if isinstance(audio_path, str) and os.path.exists(audio_path): audio_filename = os.path.basename(audio_path) final_audio = f"audio/{audio_filename}" row_data = { "Utterance": utterance, "Dialect": clean_dialect, "Clarification": clarification, "Tone_Category": tone, "Linguistic_Context": context, "Pragmatic_Analysis": pragmatics, "Syntax_Pattern": syntax, "Acoustic_Profile": acoustic_profile, "file_name": final_audio, # ⬅️ Valid Audio Path goes here "audio_file_name": "", # ⬅️ Blank to prevent confusion "Data_Origin": sourceTag, # ⬅️ Will now say "Game: X" or "Gradio App" "Clarification_Source": clar_source, "User": userKey # ⬅️ Will now say "Op: 0x... | Net-IP: 119..." } new_row = pd.DataFrame([{k: row_data.get(k, "") for k in expected_cols}]) final_df = pd.concat([df, new_row], ignore_index=True) final_df.to_csv(filepath, index=False, quoting=csv.QUOTE_ALL) # ========================================== # ☁️ 4. HUGGING FACE CLOUD SYNC # ========================================== try: hf_token = os.environ.get("HF_TOKEN") # Fallback to your hardcoded ID if the config is missing it repo_id = getattr(self.config, "HF_REPO_ID", "toecm/IEDID") if hf_token: from huggingface_hub import HfApi api = HfApi(token=hf_token) # 🎵 Upload the Audio File if isinstance(audio_path, str) and os.path.exists(audio_path): print(f"☁️ Syncing Audio to HF: {final_audio}") api.upload_file( path_or_fileobj=audio_path, path_in_repo=final_audio, repo_id=repo_id, repo_type="dataset", commit_message=f"🎙️ Added new audio sample for {clean_dialect}" ) # 📝 Upload the CSV file if os.path.exists(filepath): csv_filename = os.path.basename(filepath) print(f"☁️ Syncing CSV to HF: {csv_filename}") api.upload_file( path_or_fileobj=filepath, path_in_repo=csv_filename, repo_id=repo_id, repo_type="dataset", commit_message=f"📝 Auto-update {clean_dialect} sociolinguistic data" ) except Exception as e: print(f"⚠️ Hugging Face Sync Error: {e}") # Optional Fallback: Use the old manager if the direct API fails if getattr(self, "hf_manager_ref", None): self.hf_manager_ref.push_update(filepath, f"Update: {utterance}") return True # ========================================== # PURECHAIN HISTORY TAB # ========================================== def _log_transaction_history(self, payload, tx_hash, block_num): import pandas as pd import os from datetime import datetime from huggingface_hub import HfApi history_file = getattr(self, 'HISTORY_FILE', "/app/minted_history.csv") new_row = { "Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Utterance": payload.get("original", ""), "Dialect": payload.get("dialect", ""), "Data_Origin": payload.get("Data_Origin", "Unknown Origin"), "Data_Approval": self.account.address if self.account else "Lab Admin", "Block": block_num, "TX Hash": tx_hash } try: if os.path.exists(history_file): df = pd.read_csv(history_file) else: df = pd.DataFrame(columns=new_row.keys()) df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) df.to_csv(history_file, index=False) # 🟢 FIX: Upload history back to Hugging Face so it survives restarts if self.config.HF_TOKEN: api = HfApi(token=self.config.HF_TOKEN) repo_id = getattr(self.config, 'HF_REPO_ID', "toecm/IEDID") api.upload_file( path_or_fileobj=history_file, path_in_repo="minted_history.csv", repo_id=repo_id, repo_type="dataset", commit_message="⛓️ Logged PureChain transaction history" ) except Exception as e: print(f"History log error: {e}") # ========================================== # SYSTEM BACKUP & DEPLOY # ========================================== def create_system_backup(self, filename, note): """Uploads a file to IPFS and logs it permanently to PureChain.""" if not self.w3 or not self.account: return "❌ Node offline. Cannot create immutable backup." # Search for the file in your working directories filepath = os.path.join(self.config.DATASET_DIR, filename) if not os.path.exists(filepath): filepath = os.path.join(self.config.PROFILES_DIR, filename) if not os.path.exists(filepath): return f"❌ File '{filename}' not found in datasets or profiles." try: # 1. Upload to IPFS cid = self.upload_file_to_pinata(filepath) if not cid: return "❌ Failed to pin file to IPFS." # 2. Mint Backup to PureChain contract = self.w3.eth.contract(address=self.config.PURECHAIN_CONTRACT_ADDRESS, abi=self.config.CONTRACT_ABI) nonce = self.w3.eth.get_transaction_count(self.account.address, 'pending') # Using your contract's createBackup(string _key, string _type, string _cid, string _desc) file_ext = filename.split(".")[-1].upper() tx = contract.functions.createBackup( filename, file_ext, cid, note ).build_transaction({ 'from': self.account.address, 'nonce': nonce, 'gas': 2000000, 'gasPrice': 0, # ZERO GAS! 'chainId': self.config.PURECHAIN_ID }) signed_tx = self.w3.eth.account.sign_transaction(tx, private_key=self.config.PRIVATE_KEY) tx_hash = self.w3.eth.send_raw_transaction(signed_tx.raw_transaction) self.w3.eth.wait_for_transaction_receipt(tx_hash) return f"✅ Immutable Backup Created!\nCID: {cid}\nTX: {self.w3.to_hex(tx_hash)}" except Exception as e: return f"❌ Blockchain Backup Error: {e}" def force_deploy_contract(self, bytecode): """Forces a raw bytecode deployment to PureChain with Zero Gas.""" if not self.w3 or not self.account: return "❌ Node offline. Cannot deploy." if not bytecode or not bytecode.startswith("0x"): return "❌ Invalid bytecode. Must start with '0x'." try: nonce = self.w3.eth.get_transaction_count(self.account.address, 'pending') # Raw deployment transaction tx = { 'from': self.account.address, 'data': bytecode, 'nonce': nonce, 'gas': 4000000, # Higher gas limit for deployments 'gasPrice': 0, # ZERO GAS 'chainId': self.config.PURECHAIN_ID } signed_tx = self.w3.eth.account.sign_transaction(tx, self.config.PRIVATE_KEY) tx_hash = self.w3.eth.send_raw_transaction(signed_tx.raw_transaction) receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash) return f"✅ Contract Deployed Successfully!\nAddress: {receipt.contractAddress}" except Exception as e: return f"❌ Deployment Failed: {e}" def rebuild_history_from_chain(self): """Ultimate Failsafe: Reconstructs the CSV directly from the PureChain Ledger.""" print("🌐 [TRUST AGENT] Rebuilding history from PureChain...") if not self.w3 or not self.w3.is_connected(): print("❌ Cannot rebuild: Node offline.") return False try: contract = self.w3.eth.contract( address=self.config.PURECHAIN_CONTRACT_ADDRESS, abi=self.config.CONTRACT_ABI ) # Make sure your except block aligns perfectly with your try block! except Exception as e: print(f"⚠️ Web3 Contract Error: {e}") contract = None