"""
with open(html_file, "w", encoding="utf-8") as f:
f.write(html_content)
return html_file
# --- Data Fetching Functions ---
def fetch_coingecko(session: requests.Session) -> List[Dict[str, Any]]:
threading.current_thread().name = f"user_{user_id}"
tokens: List[Dict[str, Any]] = []
use_key = bool(COINGECKO_API_KEY and COINGECKO_API_KEY != "CONFIG_REQUIRED_CG")
for page in range(1, 5):
try:
url = "https://api.coingecko.com/api/v3/coins/markets"
params = {"vs_currency": "usd", "order": "market_cap_desc", "per_page": 250, "page": page}
headers = STEALTH_HEADERS.copy()
if use_key:
if page == 1: print(" ⚡ Scanning CoinGecko...")
headers["x-cg-demo-api-key"] = COINGECKO_API_KEY
delay = 0.05
else:
if page == 1: print(" 🐌 Scanning CoinGecko (Slow Mode)...")
delay = 0.2
r = session.get(url, params=params, headers=headers, timeout=15)
if use_key and r.status_code in [401, 403, 429]:
use_key = False
delay = 0.2
r = session.get(url, params=params, headers=STEALTH_HEADERS, timeout=15)
r.raise_for_status()
for t in r.json():
symbol = (t.get("symbol") or "").upper()
if symbol in STABLECOINS: continue
vol, mc = float(t.get("total_volume") or 0), float(t.get("market_cap") or 0)
# Fetching pre-filter
if mc > 0 and (vol / mc) >= FETCH_THRESHOLD:
tokens.append({"symbol": symbol, "marketcap": mc, "volume": vol, "source": "CG"})
time.sleep(delay)
except Exception: continue
print(f" ✅ CoinGecko: {len(tokens)} tokens")
return tokens
def fetch_coinmarketcap(session: requests.Session) -> List[Dict[str, Any]]:
threading.current_thread().name = f"user_{user_id}"
tokens: List[Dict[str, Any]] = []
if not CMC_API_KEY or CMC_API_KEY == "CONFIG_REQUIRED_CMC": return tokens
print(" ⚡ Scanning CoinMarketCap...")
headers = STEALTH_HEADERS.copy()
headers["X-CMC_PRO_API_KEY"] = CMC_API_KEY
for start in range(1, 1001, 100):
try:
r = session.get("https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest",
headers=headers, params={"start": start, "limit": 100, "convert": "USD"}, timeout=15)
r.raise_for_status()
for t in r.json().get("data", []):
symbol = (t.get("symbol") or "").upper()
if symbol in STABLECOINS: continue
q = t.get("quote", {}).get("USD", {})
vol, mc = float(q.get("volume_24h") or 0), float(q.get("market_cap") or 0)
if mc > 0 and (vol / mc) >= FETCH_THRESHOLD:
tokens.append({"symbol": symbol, "marketcap": mc, "volume": vol, "source": "CMC"})
time.sleep(0.2)
except Exception: continue
print(f" ✅ CoinMarketCap: {len(tokens)} tokens")
return tokens
def fetch_livecoinwatch(session: requests.Session) -> List[Dict[str, Any]]:
threading.current_thread().name = f"user_{user_id}"
tokens: List[Dict[str, Any]] = []
if not LIVECOINWATCH_API_KEY or LIVECOINWATCH_API_KEY == "CONFIG_REQUIRED_LCW": return tokens
print(" ⚡ Scanning LiveCoinWatch...")
headers = STEALTH_HEADERS.copy()
headers.update({"content-type": "application/json", "x-api-key": LIVECOINWATCH_API_KEY})
payload = {"currency": "USD", "sort": "rank", "order": "ascending", "offset": 0, "limit": 1000, "meta": True}
try:
r = session.post("https://api.livecoinwatch.com/coins/list", json=payload, headers=headers, timeout=20)
r.raise_for_status()
for t in r.json():
symbol = (t.get("code") or "").upper()
if symbol in STABLECOINS: continue
vol, mc = float(t.get("volume") or 0), float(t.get("cap") or 0)
if mc > 0 and (vol / mc) >= FETCH_THRESHOLD:
tokens.append({"symbol": symbol, "marketcap": mc, "volume": vol, "source": "LCW"})
except Exception: pass
print(f" ✅ LiveCoinWatch: {len(tokens)} tokens")
return tokens
def fetch_all_sources() -> Tuple[List[Dict[str, Any]], int]:
print(" 🔍 Scanning for high-volumed tokens...")
print(" ⬆️ CoinGecko data for accuracy... ")
sources = [fetch_coingecko, fetch_coinmarketcap, fetch_livecoinwatch]
results = []
with ThreadPoolExecutor(max_workers=3) as exe:
futures = [exe.submit(fn, requests.Session()) for fn in sources]
for f in as_completed(futures):
try:
res = f.result(timeout=60)
if res: results.extend(res)
except Exception: continue
print(f" 📊 Total raw results: {len(results)}")
return results, len(results)
# --- Processing Logic ---
raw_tokens, _ = fetch_all_sources()
all_data = {}
for t in raw_tokens:
all_data.setdefault(t['symbol'], []).append(t)
verified_tokens = []
for sym, tokens in all_data.items():
# Identify CoinGecko entry specifically
cg_data = next((t for t in tokens if t['source'] == 'CG'), None)
if cg_data:
# --- GATEKEEPER RULE: COINGECKO IS SOVEREIGN ---
# If CG has it, we ignore all other sources and use CG metrics alone
volume, marketcap = cg_data['volume'], cg_data['marketcap']
ratio = volume / marketcap
is_large = (marketcap > LC_THRESHOLD)
# Flexible Thresholds
if (is_large and ratio >= MIN_LC_VTMR and ratio <= MAX_VTMR) or \
(not is_large and ratio >= MIN_VTMR and ratio <= MAX_VTMR):
verified_tokens.append({
"symbol": sym, "marketcap": marketcap, "volume": volume,
"flipping_multiple": ratio, "source_count": len(tokens), "large_cap": is_large
})
else:
# --- FALLBACK RULE: MULTI-SOURCE VERIFICATION ---
# If CG is missing, the token MUST have at least 2 other sources to even be considered
if len(tokens) >= 2:
volume = sum(t['volume'] for t in tokens) / len(tokens)
marketcap = sum(t['marketcap'] for t in tokens) / len(tokens)
ratio = volume / marketcap
is_large = any(t['marketcap'] > LC_THRESHOLD for t in tokens)
if (is_large and ratio >= MIN_LC_VTMR and ratio <= MAX_VTMR) or \
(not is_large and ratio >= MIN_VTMR and ratio <= MAX_VTMR):
verified_tokens.append({
"symbol": sym, "marketcap": marketcap, "volume": volume,
"flipping_multiple": ratio, "source_count": len(tokens), "large_cap": is_large
})
hot_tokens = sorted(verified_tokens, key=lambda x: x["flipping_multiple"], reverse=True)
html_file = create_html_report(hot_tokens)
#-- Print
report_filename = html_file.name
now_h = datetime.datetime.now().strftime("%H:%M:%S")
print(f" 💎 Found {len(hot_tokens)} high-volume tokens at {now_h}")
print(f" 📂 HTML report saved: /reports-list/{report_filename}")
print(" 🏁 Spot volume analysis completed!")