Block-Analyzer / app.py
Dhanyakumar's picture
Upload 4 files
4a8bd50 verified
"""
Blockchain Wallet Analyzer - A tool for analyzing Ethereum wallet contents and NFT holdings.
This module provides a complete implementation of a blockchain wallet analysis tool
with a Gradio web interface. It includes wallet analysis, NFT tracking, and
interactive chat capabilities using the Groq API.
Author: Mane
Date: March 2025
"""
from __future__ import annotations
import os
import re
import json
import time
import logging
import asyncio
from typing import List, Dict, Tuple, Any, Optional, TypeVar, cast
from datetime import datetime
from decimal import Decimal
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
import aiohttp
import gradio as gr
from tenacity import retry, stop_after_attempt, wait_exponential
import groq
# Type variables
T = TypeVar('T')
WalletData = Dict[str, Any]
ChatHistory = List[Tuple[str, str]]
# API Keys - Define as constants in the script
GROQ_API_KEY = "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"
ETHERSCAN_API_KEY = "V562U3P76KKEUJ2DV9Q3U2EPDK89T5CKNH"
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('blockchain_analyzer.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class ConfigError(Exception):
"""Raised when there's an error in configuration."""
pass
class APIError(Exception):
"""Raised when there's an error in API calls."""
pass
class ValidationError(Exception):
"""Raised when there's an error in input validation."""
pass
@dataclass
class Config:
"""Application configuration settings."""
SYSTEM_PROMPT: str = """
You are MANE 👑 (Learning & Observing Smart Systems Digital Output Generator),
an adorable blockchain-sniffing puppy!
Your personality:
- Friendly and enthusiastic
- Explain findings in fun, simple ways
Instructions:
- You have access to detailed wallet data in your context
- Use this data to provide specific answers about holdings
- Reference exact numbers and collections when discussing NFTs
- Compare wallets if multiple are available
"""
ETHERSCAN_BASE_URL: str = "https://api.etherscan.io/api"
ETHEREUM_ADDRESS_REGEX: str = r"0x[a-fA-F0-9]{40}"
RATE_LIMIT_DELAY: float = 0.2 # 5 requests per second max for free tier
MAX_RETRIES: int = 3
GROQ_MODEL: str = "llama3-70b-8192" # Groq's high-performance model
MAX_TOKENS: int = 4000
TEMPERATURE: float = 0.7
HISTORY_LIMIT: int = 5
@classmethod
def load(cls, config_path: str | Path) -> Config:
"""Load configuration from a JSON file."""
try:
with open(config_path) as f:
config_data = json.load(f)
return cls(**config_data)
except Exception as e:
logger.error(f"Error loading config: {e}")
return cls()
class WalletAnalyzer:
"""Analyzes Ethereum wallet contents using Etherscan API."""
def __init__(self):
"""Initialize the analyzer with API key."""
self.api_key = ETHERSCAN_API_KEY
self.base_url = Config.ETHERSCAN_BASE_URL
self.last_request_time = 0
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self) -> WalletAnalyzer:
"""Create aiohttp session on context manager enter."""
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Close aiohttp session on context manager exit."""
if self.session:
await self.session.close()
self.session = None
@retry(
stop=stop_after_attempt(Config.MAX_RETRIES),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def _fetch_data(self, params: Dict[str, str]) -> Dict[str, Any]:
"""Fetch data from Etherscan API with retry logic."""
if not self.session:
raise APIError("No active session. Use context manager.")
await self._rate_limit()
params["apikey"] = self.api_key
try:
async with self.session.get(self.base_url, params=params) as response:
if response.status != 200:
raise APIError(f"API request failed: {response.status}")
data = await response.json()
if data["status"] == "0":
error_msg = data.get('message', 'Unknown error')
if "Max rate limit reached" in error_msg:
raise APIError("Rate limit exceeded")
raise APIError(f"API error: {error_msg}")
return data
except aiohttp.ClientError as e:
raise APIError(f"Network error: {str(e)}")
except Exception as e:
raise APIError(f"Unexpected error: {str(e)}")
async def _rate_limit(self) -> None:
"""Implement rate limiting for Etherscan API."""
current_time = time.time()
time_passed = current_time - self.last_request_time
if time_passed < Config.RATE_LIMIT_DELAY:
await asyncio.sleep(Config.RATE_LIMIT_DELAY - time_passed)
self.last_request_time = time.time()
@staticmethod
def _validate_address(address: str) -> bool:
"""Validate Ethereum address format."""
return bool(re.match(Config.ETHEREUM_ADDRESS_REGEX, address))
async def get_portfolio_data(self, address: str) -> WalletData:
"""Get complete portfolio including ETH, tokens, and NFTs."""
if not self._validate_address(address):
raise ValidationError(f"Invalid Ethereum address: {address}")
logger.info(f"Fetching portfolio data for {address}")
# Get ETH balance
eth_balance = await self._get_eth_balance(address)
# Get token data
token_holdings = await self._get_token_holdings(address)
# Get NFT data
nft_collections = await self._get_nft_holdings(address)
return {
"address": address,
"last_updated": datetime.now().isoformat(),
"eth_balance": float(eth_balance),
"tokens": token_holdings,
"nft_collections": nft_collections
}
async def _get_eth_balance(self, address: str) -> Decimal:
"""Get ETH balance for address."""
params = {
"module": "account",
"action": "balance",
"address": address,
"tag": "latest"
}
data = await self._fetch_data(params)
return Decimal(data["result"]) / Decimal("1000000000000000000")
async def _get_token_holdings(self, address: str) -> List[Dict[str, Any]]:
"""Get token holdings for address."""
params = {
"module": "account",
"action": "tokentx",
"address": address,
"sort": "desc"
}
data = await self._fetch_data(params)
token_holdings: Dict[str, Dict[str, Any]] = {}
for tx in data.get("result", []):
contract = tx["contractAddress"]
if contract not in token_holdings:
token_holdings[contract] = {
"name": tx["tokenName"],
"symbol": tx["tokenSymbol"],
"decimals": int(tx["tokenDecimal"]),
"balance": Decimal(0)
}
amount = Decimal(tx["value"]) / Decimal(10 ** int(tx["tokenDecimal"]))
if tx["to"].lower() == address.lower():
token_holdings[contract]["balance"] += amount
elif tx["from"].lower() == address.lower():
token_holdings[contract]["balance"] -= amount
return [
{
"name": data["name"],
"symbol": data["symbol"],
"balance": float(data["balance"])
}
for data in token_holdings.values()
if data["balance"] > 0
]
async def _get_nft_holdings(self, address: str) -> Dict[str, Dict[str, Any]]:
"""Get NFT holdings for address."""
params = {
"module": "account",
"action": "tokennfttx",
"address": address,
"sort": "desc"
}
data = await self._fetch_data(params)
nft_holdings: Dict[str, Dict[str, Any]] = {}
collections: Dict[str, List[str]] = {}
for tx in data.get("result", []):
collection_name = tx.get("tokenName", "Unknown Collection")
token_id = tx["tokenID"]
key = f"{tx['contractAddress']}_{token_id}"
if tx["to"].lower() == address.lower():
nft_holdings[key] = {
"collection": collection_name,
"token_id": token_id,
"contract": tx["contractAddress"],
"acquired_time": tx["timeStamp"]
}
if collection_name not in collections:
collections[collection_name] = []
collections[collection_name].append(token_id)
elif tx["from"].lower() == address.lower():
nft_holdings.pop(key, None)
if collection_name in collections and token_id in collections[collection_name]:
collections[collection_name].remove(token_id)
return {
name: {
"count": len(tokens),
"token_ids": tokens
}
for name, tokens in collections.items()
if tokens # Only include collections with tokens
}
class ChatInterface:
"""Handles chat interaction using Groq API."""
def __init__(self):
"""Initialize chat interface with Groq client."""
self.groq_client = groq.Client(api_key=GROQ_API_KEY)
self.context: Dict[str, Any] = {}
def _format_context_message(self) -> str:
"""Format wallet data as context message."""
if not self.context:
return ""
context_msg = ["Current Wallet Data:\n"]
for addr, data in self.context.items():
context_msg.extend([
f"Wallet {addr[:8]}...{addr[-6:]}:",
f"- ETH Balance: {data['eth_balance']:.4f} ETH",
f"- Tokens: {len(data['tokens'])}"
])
if data['tokens']:
context_msg.append(" Token Holdings:")
for token in data['tokens']:
context_msg.append(
f" * {token['name']} ({token['symbol']}): {token['balance']}"
)
if data['nft_collections']:
context_msg.append(" NFT Collections:")
for name, info in data['nft_collections'].items():
context_msg.append(f" * {name}: {info['count']} NFTs")
if info['count'] <= 5:
context_msg.append(
f" Token IDs: {', '.join(map(str, info['token_ids']))}"
)
return "\n".join(context_msg)
async def process_message(
self,
message: str,
history: Optional[ChatHistory] = None
) -> Tuple[ChatHistory, Dict[str, Any], str]:
"""Process user message and generate response."""
if not message.strip():
return history or [], self.context, ""
history = history or []
# Check for Ethereum address
match = re.search(Config.ETHEREUM_ADDRESS_REGEX, message)
if match:
try:
address = match.group(0)
async with WalletAnalyzer() as analyzer:
wallet_data = await analyzer.get_portfolio_data(address)
self.context[address] = wallet_data
summary = [
f"📊 Portfolio Summary for {address[:8]}...{address[-6:]}",
f"💎 ETH Balance: {wallet_data['eth_balance']:.4f} ETH",
f"🪙 Tokens: {len(wallet_data['tokens'])} different tokens"
]
total_nfts = sum(
coll['count'] for coll in wallet_data['nft_collections'].values())
summary.append(
f"🎨 NFTs: {total_nfts} NFTs in {len(wallet_data['nft_collections'])} collections"
)
bot_message = "\n".join(summary)
history.append((message, bot_message))
return history, self.context, ""
except Exception as e:
logger.error(f"Error analyzing wallet: {e}")
error_message = f"Error analyzing wallet: {str(e)}"
history.append((message, error_message))
return history, self.context, ""
# Generate response using Groq
try:
# Format context message
context_msg = self._format_context_message()
# Convert history to Groq format
chat_history = []
for user_msg, assistant_msg in history[-Config.HISTORY_LIMIT:]:
chat_history.extend([
{"role": "user", "content": user_msg},
{"role": "assistant", "content": assistant_msg}
])
# Generate response using Groq
response = self.groq_client.chat.completions.create(
model=Config.GROQ_MODEL,
messages=[
{"role": "system", "content": Config.SYSTEM_PROMPT},
{"role": "system", "content": context_msg},
*chat_history,
{"role": "user", "content": message}
],
temperature=Config.TEMPERATURE,
max_tokens=Config.MAX_TOKENS
)
bot_message = response.choices[0].message.content
history.append((message, bot_message))
return history, self.context, ""
except Exception as e:
logger.error(f"Error generating response: {e}")
error_message = f"Error generating response: {str(e)}"
history.append((message, error_message))
return history, self.context, ""
def clear_context(self) -> Tuple[Dict[str, Any], List[Tuple[str, str]]]:
"""Clear the wallet context and chat history."""
self.context = {}
return {}, []
class GradioInterface:
"""Handles Gradio web interface setup and interactions."""
def __init__(self):
"""Initialize Gradio interface."""
self.chat_interface = ChatInterface()
self.demo = self._create_interface()
def _create_interface(self) -> gr.Blocks:
"""Create and configure Gradio interface."""
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# MANE👑: Blockchain Wallet Analyzer
Welcome to 👑 MANE WAR - Your friendly blockchain analysis companion!
- Input an Ethereum wallet address to analyze
- Chat about your wallet contents with context!
""")
# Main Interface
with gr.Row():
# Chat Area (Left Side)
with gr.Column(scale=2):
chatbot = gr.Chatbot(
label="Chat History",
height=500,
value=[]
)
with gr.Row():
msg_input = gr.Textbox(
label="Message",
placeholder="Enter wallet address or ask about holdings...",
show_label=True
)
send_btn = gr.Button("Send", variant="primary")
# Context Sidebar (Right Side)
with gr.Column(scale=1):
wallet_context = gr.JSON(
label="Active Wallet Context",
show_label=True,
value={}
)
clear_btn = gr.Button("Clear Context", variant="secondary")
async def handle_message(
message: str,
chat_history: List[Tuple[str, str]],
context: Dict[str, Any]
) -> Tuple[List[Tuple[str, str]], Dict[str, Any]]:
"""Handle incoming messages."""
try:
history, new_context, _ = await self.chat_interface.process_message(
message,
chat_history
)
return history, new_context
except Exception as e:
logger.error(f"Error handling message: {e}")
if chat_history is None:
chat_history = []
chat_history.append((message, f"Error: {str(e)}"))
return chat_history, context
# Connect Event Handlers
clear_btn.click(
fn=self.chat_interface.clear_context,
inputs=[],
outputs=[wallet_context, chatbot]
)
# Message Handling
msg_input.submit(
fn=handle_message,
inputs=[msg_input, chatbot, wallet_context],
outputs=[chatbot, wallet_context]
).then(
lambda: gr.update(value=""),
None,
[msg_input]
)
send_btn.click(
fn=handle_message,
inputs=[msg_input, chatbot, wallet_context],
outputs=[chatbot, wallet_context]
).then(
lambda: gr.update(value=""),
None,
[msg_input]
)
return demo
def launch(self, **kwargs):
"""Launch the Gradio interface."""
self.demo.queue()
self.demo.launch(**kwargs)
def main():
"""Main entry point for the application."""
try:
# Load configuration
config = Config.load("config.json")
# Initialize and launch interface
interface = GradioInterface()
interface.launch(
server_name="0.0.0.0",
server_port=7860,
share=True
)
except Exception as e:
logger.error(f"Application startup failed: {e}")
raise
if __name__ == "__main__":
main()