File size: 11,522 Bytes
d93103e
 
49e124f
a52125e
d93103e
 
 
a52125e
49e124f
a52125e
 
49e124f
91f2ea1
d6f2ed2
a52125e
49e124f
 
a52125e
49e124f
a52125e
d6f2ed2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a52125e
d93103e
 
 
cc15047
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49e124f
d93103e
 
a52125e
49e124f
 
 
8295114
d93103e
49e124f
 
d93103e
 
 
 
 
49e124f
 
 
 
 
 
 
 
 
a52125e
 
 
49e124f
 
 
 
 
 
a52125e
 
 
49e124f
a52125e
 
 
49e124f
a52125e
49e124f
 
 
 
 
a52125e
49e124f
 
 
 
 
 
 
 
a52125e
49e124f
 
a52125e
49e124f
 
a52125e
 
49e124f
 
a52125e
91f2ea1
d6f2ed2
 
 
 
 
49e124f
91f2ea1
 
d6f2ed2
91f2ea1
a52125e
91f2ea1
 
a52125e
91f2ea1
 
49e124f
91f2ea1
 
 
 
 
 
 
a52125e
96e399b
91f2ea1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cc15047
 
 
 
 
 
49e124f
 
 
 
a52125e
49e124f
 
a52125e
cc15047
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a52125e
49e124f
 
 
 
cc15047
49e124f
 
 
 
 
 
a52125e
cc15047
 
49e124f
 
a52125e
49e124f
 
 
 
 
 
 
 
a52125e
 
49e124f
 
 
 
 
 
 
 
 
 
d93103e
49e124f
d93103e
49e124f
 
d93103e
 
49e124f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
from fastapi.responses import RedirectResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from src.inference import ClusterPredictor
from src.llm import PersonaExplainer
import os
import requests
import uuid
import time
import random
from dotenv import load_dotenv
from diskcache import Cache
from typing import Optional, Dict, Any

# Load environment variables
load_dotenv()

# Load all available Dune API Keys
DUNE_API_KEYS = []
# 1. Primary Key
if os.getenv("DUNE_API_KEY"):
    DUNE_API_KEYS.append(os.getenv("DUNE_API_KEY"))
# 2. Secondary Keys (DUNE_API_KEY_2, _3, etc.)
i = 2
while True:
    key = os.getenv(f"DUNE_API_KEY_{i}")
    if not key:
        break
    DUNE_API_KEYS.append(key)
    i += 1

print(f"Loaded {len(DUNE_API_KEYS)} Dune API Keys.")

# Initialize Limiter
limiter = Limiter(key_func=get_remote_address)

# --- Helper: Contract Check ---
def check_is_contract(wallet_address: str) -> bool:
    """
    Checks if an address is a smart contract using a public RPC.
    Returns True if contract, False if EOA (User).
    """
    rpc_url = "https://eth.llamarpc.com"
    payload = {
        "jsonrpc": "2.0",
        "method": "eth_getCode",
        "params": [wallet_address, "latest"],
        "id": 1
    }
    
    try:
        res = requests.post(rpc_url, json=payload, timeout=5)
        if res.status_code == 200:
            result = res.json().get("result")
            # '0x' means no code (EOA). Anything longer means Contract.
            return result != "0x"
    except Exception as e:
        print(f"Contract check failed: {e}")
        return False
    
    return False

app = FastAPI(title="Crypto Wallet Persona API", description="Async API with AI-powered Wallet Analysis.")
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

origins = [
    "http://localhost",
    "http://localhost:5173", # Default Vite port
    "http://localhost:5173", # Vite port when accessed via IP
    "http://127.0.0.1:5173", # Vite port when accessed via IP
]

# Add Vercel Frontend URL from Env (if set)
vercel_frontend = os.getenv("FRONTEND_URL")
if vercel_frontend:
    origins.append(vercel_frontend)

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# --- Configuration & Initialization ---
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MODEL_PATH = os.path.join(BASE_DIR, "kmeans_model.pkl")
PREPROCESSOR_PATH = os.path.join(BASE_DIR, "wallet_power_transformer.pkl")
CACHE_DIR = os.path.join(BASE_DIR, "cache_data")

# Initialize Components
cache = Cache(CACHE_DIR)
# Expire cache entries after 24 hours (86400 seconds)
CACHE_TTL = 86400 

try:
    predictor = ClusterPredictor(model_path=MODEL_PATH, preprocessor_path=PREPROCESSOR_PATH)
    explainer = PersonaExplainer()
except FileNotFoundError as e:
    print(f"Startup Error: {e}")
    predictor = None
    explainer = None

# --- Models ---
class JobResponse(BaseModel):
    job_id: str
    status: str # "queued", "processing", "completed", "failed"
    wallet_address: str

class AnalysisResult(BaseModel):
    status: str
    wallet_address: Optional[str] = None
    persona: Optional[str] = None
    confidence_scores: Optional[Dict[str, float]] = None
    explanation: Optional[str] = None
    stats: Optional[Dict[str, Any]] = None
    error: Optional[str] = None

# --- Background Worker ---
def process_wallet_analysis(job_id: str, wallet_address: str):
    """
    Background task that fetches data, predicts persona, and generates AI explanation.
    Updates the cache state as it progresses.
    """
    try:
        # Update status to processing
        cache.set(job_id, {"status": "processing", "wallet": wallet_address}, expire=CACHE_TTL)
        
        # 1. Execute Dune Query (Start New Run)
        if not DUNE_API_KEYS:
            raise Exception("Dune API configuration missing.")

        # Select a random API key to distribute load
        selected_api_key = random.choice(DUNE_API_KEYS)

        # Step A: Submit Execution
        execute_url = "https://api.dune.com/api/v1/query/6252521/execute"
        headers = {"X-Dune-API-Key": selected_api_key}
        payload = {"query_parameters": {"wallet": wallet_address}}
        
        print(f"Submitting Dune query for {wallet_address}...")
        exec_res = requests.post(execute_url, headers=headers, json=payload, timeout=10)
        
        if exec_res.status_code != 200:
            raise Exception(f"Dune Execution Failed: {exec_res.status_code} - {exec_res.text}")
            
        execution_id = exec_res.json().get("execution_id")
        if not execution_id:
            raise Exception("No execution_id returned from Dune.")

        # Step B: Poll for Completion
        print(f"Polling Dune execution {execution_id}...")
        status_url = f"https://api.dune.com/api/v1/execution/{execution_id}/status"
        
        max_retries = 150 # 150 * 2s = 300s (5 mins) max wait
        for i in range(max_retries):
            status_res = requests.get(status_url, headers=headers, timeout=10)
            if status_res.status_code != 200:
                 # Temporary network glitch? Wait and retry.
                 time.sleep(2)
                 continue
                 
            state = status_res.json().get("state")
            if state == "QUERY_STATE_COMPLETED":
                break
            elif state == "QUERY_STATE_FAILED":
                raise Exception("Dune Query Execution FAILED internally.")
            elif state == "QUERY_STATE_CANCELLED":
                raise Exception("Dune Query was CANCELLED.")
                
            time.sleep(2)
        else:
            raise Exception("Dune Query Timed Out (60s).")

        # Step C: Fetch Results
        results_url = f"https://api.dune.com/api/v1/execution/{execution_id}/results"
        results_res = requests.get(results_url, headers=headers, timeout=15)
        
        if results_res.status_code != 200:
             raise Exception(f"Failed to fetch results: {results_res.status_code}")

        data_json = results_res.json()
        rows = data_json.get("result", {}).get("rows", [])
        
        if not rows:
            # If the query ran but returned no rows (maybe empty wallet?)
            # We can either fail or proceed with zero-filled data.
            # Given the SQL logic, it usually returns 1 row with 0s if empty, 
            # but let's be safe.
             raise Exception("Dune returned no data rows for this wallet.")
        
        # We trust the execution result because we just ran it with the param.
        row_data = rows[0]
            
        # 2. Heuristic Analysis & Prediction
        
        # A. Contract Check
        is_contract = check_is_contract(wallet_address)
        
        # B. Feature Extraction
        if predictor is None:
             raise Exception("Inference Model not loaded.")

        model_input = {}
        for feature in predictor.FEATURES:
            val = row_data.get(feature)
            model_input[feature] = float(val) if val is not None else 0.0

        # C. Decide Persona (Hybrid: Rules + AI)
        final_persona = None
        confidence = {}
        
        if is_contract:
            final_persona = "Smart Contract Protocol"
            # Flat scores for contract (or custom distribution)
            confidence = {
                "High-Value NFT & Crypto Traders (Degen Whales)": 0.0,
                "High-Frequency Bots / Automated Traders": 0.0,
                "Active Retail Users / Everyday Traders": 0.0,
                "Ultra-Whales / Institutional & Exchange Wallets": 0.0
            }
        elif model_input.get('tx_count', 0) < 30:
            final_persona = "Dormant / New User"
            # Flat scores for new user
            confidence = {
                "High-Value NFT & Crypto Traders (Degen Whales)": 0.0,
                "High-Frequency Bots / Automated Traders": 0.0,
                "Active Retail Users / Everyday Traders": 1.0, # Lean towards Retail
                "Ultra-Whales / Institutional & Exchange Wallets": 0.0
            }
        else:
            # Run AI Inference
            prediction_result = predictor.predict(model_input)
            final_persona = prediction_result['persona']
            confidence = prediction_result['probabilities']
        
        # 3. Generate AI Explanation
        explanation = "AI Analysis unavailable."
        if explainer:
            explanation = explainer.generate_explanation(
                final_persona, 
                model_input
            )

        # 4. Save Final Result to Cache
        final_result = {
            "status": "completed",
            "wallet_address": wallet_address,
            "persona": final_persona,
            "confidence_scores": confidence,
            "explanation": explanation,
            "stats": model_input
        }
        
        # Cache key for the JOB
        cache.set(job_id, final_result, expire=CACHE_TTL)
        
        # ALSO Cache key for the WALLET (for instant lookup later)
        # We prefix with 'wallet:' to distinguish from job_ids
        cache.set(f"wallet:{wallet_address}", final_result, expire=CACHE_TTL)
        print(f"Job {job_id} completed successfully.")

    except Exception as e:
        print(f"Job {job_id} failed: {e}")
        error_state = {
            "status": "failed", 
            "wallet_address": wallet_address,
            "error": str(e)
        }
        cache.set(job_id, error_state, expire=CACHE_TTL)

# --- Endpoints ---

@app.get("/", include_in_schema=False)
def home():
    return RedirectResponse(url="/docs")

@app.post("/analyze/start/{wallet_address}", response_model=JobResponse)
@limiter.limit("5/minute")
def start_analysis(wallet_address: str, background_tasks: BackgroundTasks, request: Request):
    """
    Starts the analysis job. Returns a job_id immediately.
    Checks cache first for instant results.
    """
    # 1. Check Cache for this wallet
    cached_result = cache.get(f"wallet:{wallet_address}")
    if cached_result and cached_result['status'] == 'completed':
        # Create a "virtual" completed job for API consistency
        # Or we could just return the result directly? 
        # For this pattern, let's return a completed job_id that points to this data
        job_id = f"cached_{uuid.uuid4().hex[:8]}"
        cache.set(job_id, cached_result, expire=300) # Short TTL for temp job pointer
        return {"job_id": job_id, "status": "completed", "wallet_address": wallet_address}

    # 2. Start New Job
    job_id = str(uuid.uuid4())
    cache.set(job_id, {"status": "queued", "wallet": wallet_address}, expire=CACHE_TTL)
    
    background_tasks.add_task(process_wallet_analysis, job_id, wallet_address)
    
    return {"job_id": job_id, "status": "queued", "wallet_address": wallet_address}

@app.get("/analyze/status/{job_id}", response_model=AnalysisResult)
def check_status(job_id: str):
    """
    Poll this endpoint to get the result.
    """
    result = cache.get(job_id)
    if not result:
        raise HTTPException(status_code=404, detail="Job ID not found or expired.")
    
    return result