bot-signal-telegram / botsignal.py
agus1111's picture
Update botsignal.py
dd72ac2 verified
raw
history blame
31.9 kB
import asyncio
import os
import re
import io
import sqlite3
import hashlib
from collections import deque, defaultdict
from datetime import datetime, timedelta, timezone
from mimetypes import guess_extension
from typing import List, Tuple, Optional, Dict
from rapidfuzz import fuzz
from telethon import TelegramClient, events
from telethon.sessions import StringSession, MemorySession
from telethon.errors.rpcerrorlist import FloodWaitError
# ========= Configuration via Environment =========
API_ID = int(os.environ.get("API_ID", "0"))
API_HASH = os.environ.get("API_HASH", "" )
STRING_SESSION = os.environ.get("STRING_SESSION", "")
# --- Definisikan sumber sebagai CORE vs SUPPORT (pakai data milikmu) ---
CORE_CHATS = [
"https://t.me/PEPE_Calls28",
"https://t.me/HenryGems",
"https://t.me/ChinaPumpCommunity",
"https://t.me/SephirothGemCalls1",
"https://t.me/GM_Degencalls",
"https://t.me/Enthucalls",
"https://t.me/kobecalls",
"https://t.me/Kulture_Kall",
]
SUPPORT_CHATS = [
"https://t.me/TheDonALPHAJournal",
"https://t.me/savascalls",
"https://t.me/Tanjirocall",
"https://t.me/Zen_call",
"https://t.me/ChapoInsider",
"https://t.me/millionsgems",
"https://t.me/Milagrosdegencalls",
"https://t.me/kariusgemscalls",
"https://t.me/Dwen_Exchange",
"https://t.me/bat_gamble",
"https://t.me/BatmanGamble",
"https://t.me/hulkgemscalls_real",
"https://t.me/MineGems",
]
SOURCE_CHATS = CORE_CHATS + SUPPORT_CHATS
TARGET_CHAT = os.environ.get("TARGET_CHAT", "https://t.me/MidasTouchsignalll")
# Kata kunci topik + simbol '$' tetap dipakai
THEME_KEYWORDS = [
"call", "signal", "entry", "buy", "sell", "tp", "sl",
"pump", "spot", "futures", "setup",
"pepe", "bnb", "eth", "btc", "sol", "meme", "$",
]
KEYWORD_WEIGHT = 1.0
FUZZ_WEIGHT = 0.6
RELEVANCE_THRESHOLD = float(os.environ.get("RELEVANCE_THRESHOLD", "1.0"))
EXCLUDE_PHRASES = [
"achievement unlocked",
]
INCLUDE_MEDIA = os.environ.get("INCLUDE_MEDIA", "1") == "1"
MAX_MEDIA_MB = float(os.environ.get("MAX_MEDIA_MB", "12"))
SKIP_STICKERS = os.environ.get("SKIP_STICKERS", "1") == "1"
ALLOW_GIFS_VIDEOS = os.environ.get("ALLOW_GIFS_VIDEOS", "0") == "1"
INITIAL_BACKFILL = int(os.environ.get("INITIAL_BACKFILL", "20"))
DEDUP_BUFFER_SIZE = int(os.environ.get("DEDUP_BUFFER_SIZE", "800"))
CLASS_WINDOW_MINUTES = int(os.environ.get("CLASS_WINDOW_MINUTES", "20"))
SUPPORT_MIN_UNIQUE = int(os.environ.get("SUPPORT_MIN_UNIQUE", "2"))
# DRY RUN (tidak kirim apa pun ke TARGET_CHAT)
DRY_RUN = os.environ.get("DRY_RUN", "0") == "1"
# Backfill buffer: abaikan pesan lebih tua dari (startup_time - buffer)
BACKFILL_BUFFER_MINUTES = int(os.environ.get("BACKFILL_BUFFER_MINUTES", "3"))
# === Update behavior strategy ===
# edit : (default) edit pesan terakhir untuk entitas saat UPDATE
# reply : balas (reply_to) pesan pertama entitas itu
# new : kirim pesan baru untuk setiap UPDATE
UPDATE_STRATEGY = os.environ.get("UPDATE_STRATEGY", "reply").lower()
UPDATE_COOLDOWN_SEC = int(os.environ.get("UPDATE_COOLDOWN_SEC", "5"))
# ========= Client bootstrap =========
def build_client() -> TelegramClient:
if STRING_SESSION:
print(">> Using StringSession (persistent).")
return TelegramClient(StringSession(STRING_SESSION), API_ID, API_HASH)
print(">> Using MemorySession (login tiap run).")
return TelegramClient(MemorySession(), API_ID, API_HASH)
client = build_client()
recent_hashes: deque[str] = deque(maxlen=DEDUP_BUFFER_SIZE)
recent_content_hashes: deque[str] = deque(maxlen=DEDUP_BUFFER_SIZE) # content-only dedup
# entity-based dedup (CA/$ticker)
recent_entity_keys: deque[str] = deque(maxlen=DEDUP_BUFFER_SIZE)
# Peta id_chat -> "core" / "support"
chat_roles: Dict[int, str] = {} # diisi saat startup setelah resolve entity
startup_time_utc = datetime.now(timezone.utc)
# ========= Persistence (SQLite) =========
DB_PATH = os.environ.get("BOTSIGNAL_DB", "/tmp/botsignal.db")
def _db():
conn = sqlite3.connect(DB_PATH)
conn.execute("PRAGMA journal_mode=WAL;")
return conn
def _init_db():
conn = _db()
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS last_posted (
keyword TEXT PRIMARY KEY,
msg_id INTEGER NOT NULL,
tier TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS kw_group_seen (
keyword TEXT NOT NULL,
group_key TEXT NOT NULL,
last_ts INTEGER NOT NULL,
PRIMARY KEY (keyword, group_key)
);
"""
)
conn.commit()
conn.close()
def db_load_state():
"""Load last_posted & kw_group_seen into memory on startup."""
conn = _db()
last = {}
for kw, mid, tier in conn.execute("SELECT keyword, msg_id, tier FROM last_posted"):
last[kw] = {"msg_id": mid, "tier": tier}
kw_map: defaultdict[str, dict[str, datetime]] = defaultdict(dict)
for kw, gk, ts in conn.execute("SELECT keyword, group_key, last_ts FROM kw_group_seen"):
kw_map[kw][gk] = datetime.fromtimestamp(ts, tz=timezone.utc)
conn.close()
return last, kw_map
def db_save_last_posted(keyword: str, msg_id: int, tier: str):
conn = _db()
conn.execute(
"INSERT INTO last_posted(keyword, msg_id, tier) VALUES(?,?,?) "
"ON CONFLICT(keyword) DO UPDATE SET msg_id=excluded.msg_id, tier=excluded.tier",
(keyword, msg_id, tier),
)
conn.commit()
conn.close()
def db_upsert_kw_seen(keyword: str, group_key: str, ts: datetime):
conn = _db()
conn.execute(
"INSERT INTO kw_group_seen(keyword, group_key, last_ts) VALUES(?,?,?) "
"ON CONFLICT(keyword, group_key) DO UPDATE SET last_ts=excluded.last_ts",
(keyword, group_key, int(ts.timestamp())),
)
conn.commit()
conn.close()
def db_prune_expired(cutoff: datetime):
conn = _db()
conn.execute("DELETE FROM kw_group_seen WHERE last_ts < ?", (int(cutoff.timestamp()),))
conn.commit()
conn.close()
# ========= Utilities =========
def debug_log(reason: str, content: str = "") -> None:
short = (content or "").replace("\n", " ")[:160]
print(f"[DEBUG] {reason}: {short}")
def normalize_for_filter(text: str) -> str:
if not text:
return ""
s = re.sub(r"(?m)^>.*", "", text)
s = re.sub(r"\s+", " ", s).strip()
return s
def _tokenize_words(s: str) -> List[str]:
return re.findall(r"[a-z0-9\$\#]{1,64}", s.lower())
def _windows(tokens: List[str], size: int = 20):
for i in range(0, len(tokens), size):
yield " ".join(tokens[i : i + size])
# --- Bersihkan URL/CA untuk kepentingan SCORING relevansi ---
CA_SOL_RE = re.compile(r"\b[1-9A-HJ-NP-Za-km-z]{32,48}\b") # Solana base58 (perkiraan)
CA_EVM_RE = re.compile(r"\b0x[a-fA-F0-9]{40}\b") # EVM address
CA_LABEL_RE = re.compile(r"\bCA\s*[:=]\s*\S+", re.IGNORECASE) # "CA: ..." potong tokennya
def _strip_urls_and_mentions(s: str) -> str:
s = re.sub(r"https?://\S+", "", s)
s = re.sub(r"t\.me/[A-Za-z0-9_]+", "", s)
s = re.sub(r"@[A-Za-z0-9_]+", "", s)
return re.sub(r"\s+", " ", s).strip()
def strip_contracts_for_scoring(s: str) -> str:
"""
Hilangkan URL/mention, alamat kontrak, dan token setelah 'CA:'
agar kata 'pump' pada CA/URL (mis. pump.fun) tidak memengaruhi skor.
"""
s0 = _strip_urls_and_mentions(s)
s1 = CA_LABEL_RE.sub(" ", s0)
s2 = CA_EVM_RE.sub(" ", s1)
s3 = CA_SOL_RE.sub(" ", s2)
return re.sub(r"\s+", " ", s3).strip()
def score_relevance(text: str, keywords: List[str]) -> float:
"""Skor: exact keyword + fuzzy windowed (top-3 rata-rata) agar adil untuk teks panjang."""
if not text:
return 0.0
# Gunakan versi yang TIDAK mengandung URL/CA agar 'pump' di CA tidak ikut dihitung
t = strip_contracts_for_scoring(text).lower()
# exact hits (unik)
exact_hits = 0
for kw in set(keywords):
if kw in t or re.search(rf"\b{re.escape(kw)}\b", t):
exact_hits += 1
exact_score = exact_hits * KEYWORD_WEIGHT
# fuzzy windowed: ambil top-3 skor di antara jendela 20 token
tokens = _tokenize_words(t)
if not tokens:
return exact_score
scores = []
for w in _windows(tokens, 20):
best = 0.0
for kw in keywords:
sc = fuzz.partial_ratio(kw, w) / 100.0
if sc > best:
best = sc
scores.append(best)
fuzzy_top3 = sorted(scores, reverse=True)[:3]
fuzzy_score = (sum(fuzzy_top3) / max(1, len(fuzzy_top3))) * FUZZ_WEIGHT if fuzzy_top3 else 0.0
return exact_score + fuzzy_score
def hash_for_dedup(text: str, msg) -> str:
"""Hash campuran (lama) – menahan duplikat per pesan+media."""
parts = [text or ""]
if getattr(msg, "id", None) is not None:
parts.append(str(msg.id))
doc = getattr(msg, "document", None)
if doc and getattr(doc, "id", None) is not None:
parts.append(f"doc:{doc.id}")
if getattr(msg, "photo", None) is not None:
ph = msg.photo
ph_id = getattr(ph, "id", None)
if ph_id is not None:
parts.append(f"photo:{ph_id}")
raw = "|".join(parts).encode("utf-8", errors="ignore")
return hashlib.sha1(raw).hexdigest()
def content_only_hash(text: str) -> str:
"""Hash berbasis isi saja (untuk lintas-grup crosspost)."""
norm = _strip_urls_and_mentions(normalize_for_filter(text))
return hashlib.sha1(norm.encode("utf-8", errors="ignore")).hexdigest()
# ========= Class aggregator (windowed unique groups) =========
keyword_group_last_seen: defaultdict[str, dict[str, datetime]] = defaultdict(dict)
def _prune_expired(now: datetime) -> None:
window = timedelta(minutes=CLASS_WINDOW_MINUTES)
cutoff = now - window
# in-memory prune
for kw, m in list(keyword_group_last_seen.items()):
for gk, ts in list(m.items()):
if ts < cutoff:
del m[gk]
if not m:
del keyword_group_last_seen[kw]
# db prune
db_prune_expired(cutoff)
def update_and_classify(keyword: str, group_key: str, now: Optional[datetime] = None) -> Tuple[str, int]:
if not now:
now = datetime.now(timezone.utc)
_prune_expired(now)
bucket = keyword_group_last_seen[keyword]
bucket[group_key] = now
db_upsert_kw_seen(keyword, group_key, now)
unique_groups = len(bucket)
if unique_groups >= 4:
return "kuat", unique_groups
elif unique_groups >= 2:
return "sedang", unique_groups
else:
return "rendah", unique_groups
# ========= Sentence-level invite filter (smarter) =========
INVITE_PATTERNS = [
r"\bjoin\b", r"\bjoin (us|our|channel|group)\b",
r"\bdm\b", r"\bdm (me|gw|gue|gua|saya|admin)\b",
r"\bpm\b", r"\binbox\b", r"\bcontact\b", r"\bkontak\b", r"\bhubungi\b",
r"\bvip\b", r"\bpremium\b", r"\bberbayar\b", r"\bpaid\b", r"\bexclusive\b",
r"\bwhitelist\b", r"\bprivate( group| channel)?\b", r"\bmembership?\b",
r"\bsubscribe\b", r"\blangganan\b",
# kata kunci promo/iklan
r"\bpromo\b", r"\bpromosi\b", r"\biklan\b",
r"\badvert\b", r"\badvertise\b", r"\badvertisement\b",
# tautan undangan/shortener
r"(t\.me\/joinchat|t\.me\/\+|telegram\.me\/|discord\.gg\/|wa\.me\/|whatsapp\.com\/)",
r"(bit\.ly|tinyurl\.com|linktr\.ee)",
# UPGRADE: Aturan agresif yang memblokir semua link t.me biasa tetap dihapus
# r"t\.me\/[A-Za-z0-9_]+",
]
INVITE_REGEXES = [re.compile(p, re.IGNORECASE) for p in INVITE_PATTERNS]
# whitelist: kalimat yang nampak "sinyal asli", jangan dihapus
WHITELIST_STRONG_SIGNAL = [
r"\$[a-z0-9]{2,10}", # $TICKER
r"\b(entry|entries|buy|sell)\b",
r"\bsl\b", r"\btp\b", r"\btp\d\b",
]
WHITELIST_REGEXES = [re.compile(p, re.IGNORECASE) for p in WHITELIST_STRONG_SIGNAL]
def _is_invite_sentence(s: str) -> bool:
t = s.strip()
if not t:
return False
# Jika kalimat memuat sinyal kuat, jangan dibuang walau ada kata invite
if any(r.search(t) for r in WHITELIST_REGEXES):
return False
# Jika ada 1+ pola ajakan, buang
return any(r.search(t) for r in INVITE_REGEXES)
def filter_invite_sentences(text: str) -> str:
if not text:
return text
parts = re.split(r'(?<=[\.!\?])\s+|\n+', text, flags=re.UNICODE)
kept = [p.strip() for p in parts if p and not _is_invite_sentence(p)]
cleaned = "\n".join(kept).strip()
cleaned = re.sub(r"\n{3,}", "\n\n", cleaned)
return cleaned
# ========= Media helpers =========
def is_image_message(msg) -> bool:
"""
True untuk:
- Photo bawaan Telegram
- Image (image/*). Jika 'image/webp' (sticker), hormati SKIP_STICKERS
- GIF/Video hanya jika ALLOW_GIFS_VIDEOS=True
"""
if getattr(msg, "photo", None):
return True
doc = getattr(msg, "document", None)
if not doc:
return False
mt = (getattr(doc, "mime_type", "") or "").lower()
# Skip sticker (sering berupa image/webp)
if mt == "image/webp" and SKIP_STICKERS:
return False
if mt.startswith("image/"):
# GIF diizinkan hanya jika ALLOW_GIFS_VIDEOS=True
if mt == "image/gif":
return ALLOW_GIFS_VIDEOS
return True
# Video diizinkan hanya jika ALLOW_GIFS_VIDEOS=True
if mt.startswith("video/"):
return ALLOW_GIFS_VIDEOS
return False
def media_too_big(msg) -> bool:
doc = getattr(msg, "document", None)
if not doc:
return False
size = getattr(doc, "size", None)
if size is None:
return False
return (size / (1024 * 1024)) > MAX_MEDIA_MB
# ========= Post-on-threshold with EDIT/REPLY/NEW (persisted) =========
TIER_ORDER = {"rendah": 0, "sedang": 1, "kuat": 2}
last_posted: Dict[str, Dict[str, object]] = {} # keyword -> {"msg_id": int, "tier": str}
# simpan body & waktu update terakhir per entitas
last_body: Dict[str, str] = {}
last_update_ts: Dict[str, float] = {}
async def _send_initial(msg, text: str) -> int:
if DRY_RUN:
print("[DRY_RUN] send_initial:", text[:140])
return -1
# kirim media bila ada & allowed
if INCLUDE_MEDIA and is_image_message(msg) and not media_too_big(msg):
try:
if getattr(msg, "photo", None):
m = await client.send_file(
TARGET_CHAT, msg.photo, caption=text, caption_entities=None, force_document=False
)
return m.id
doc = getattr(msg, "document", None)
if doc:
data = await client.download_media(msg, file=bytes)
if data:
bio = io.BytesIO(data)
ext = ".jpg"
mt = (getattr(doc, "mime_type", "") or "").lower()
if mt:
ext_guess = guess_extension(mt) or ".jpg"
if ext_guess == ".jpe":
ext_guess = ".jpg"
ext = ext_guess
bio.name = f"media{ext}"
m = await client.send_file(
TARGET_CHAT, bio, caption=text, caption_entities=None, force_document=False
)
return m.id
except FloodWaitError as e:
await asyncio.sleep(e.seconds + 1)
return await _send_initial(msg, text)
except Exception as e:
debug_log("Gagal kirim media awal, fallback text", str(e))
try:
m = await client.send_message(TARGET_CHAT, text, link_preview=True)
return m.id
except FloodWaitError as e:
await asyncio.sleep(e.seconds + 1)
return await _send_initial(msg, text)
async def post_or_update(keyword: str, body: str, new_tier: str, src_msg, *, update_like: bool = False) -> None:
prefix = f"[{new_tier.upper()}] "
text = prefix + body
prev = last_posted.get(keyword)
now_ts = datetime.now().timestamp()
# kirim pertama kali
if not prev:
msg_id = await _send_initial(src_msg, text)
last_posted[keyword] = {"msg_id": msg_id, "tier": new_tier}
last_body[keyword] = body
last_update_ts[keyword] = now_ts
if msg_id != -1:
db_save_last_posted(keyword, msg_id, new_tier)
return
# Jika tier naik, selalu edit
if TIER_ORDER.get(new_tier, 0) > TIER_ORDER.get(prev["tier"], 0):
try:
await client.edit_message(TARGET_CHAT, prev["msg_id"], text)
prev["tier"] = new_tier
last_body[keyword] = body
last_update_ts[keyword] = now_ts
if prev["msg_id"] != -1:
db_save_last_posted(keyword, prev["msg_id"], new_tier)
except FloodWaitError as e:
await asyncio.sleep(e.seconds + 1)
await post_or_update(keyword, body, new_tier, src_msg, update_like=update_like)
except Exception as e:
debug_log("Edit gagal (tier naik), fallback kirim baru", str(e))
msg_id = await _send_initial(src_msg, text)
last_posted[keyword] = {"msg_id": msg_id, "tier": new_tier}
last_body[keyword] = body
last_update_ts[keyword] = now_ts
if msg_id != -1:
db_save_last_posted(keyword, msg_id, new_tier)
return
# Tier sama: hanya proses jika ini update_like
if not update_like:
return
# Hindari spam: kalau body sama atau masih cooldown, no-op
if body.strip() == last_body.get(keyword, "").strip() and (now_ts - last_update_ts.get(keyword, 0) < UPDATE_COOLDOWN_SEC):
return
try:
if UPDATE_STRATEGY == "edit":
await client.edit_message(TARGET_CHAT, prev["msg_id"], text)
last_body[keyword] = body
last_update_ts[keyword] = now_ts
if prev["msg_id"] != -1:
db_save_last_posted(keyword, prev["msg_id"], new_tier)
elif UPDATE_STRATEGY == "reply":
# kirim balasan ke pesan awal entitas
await client.send_message(TARGET_CHAT, text, reply_to=prev["msg_id"], link_preview=True)
last_body[keyword] = body
last_update_ts[keyword] = now_ts
elif UPDATE_STRATEGY == "new":
await client.send_message(TARGET_CHAT, text, link_preview=True)
last_body[keyword] = body
last_update_ts[keyword] = now_ts
else:
# fallback ke edit jika value salah
await client.edit_message(TARGET_CHAT, prev["msg_id"], text)
last_body[keyword] = body
last_update_ts[keyword] = now_ts
if prev["msg_id"] != -1:
db_save_last_posted(keyword, prev["msg_id"], new_tier)
except FloodWaitError as e:
await asyncio.sleep(e.seconds + 1)
await post_or_update(keyword, body, new_tier, src_msg, update_like=update_like)
except Exception as e:
debug_log("Update gagal (strategy), diabaikan", str(e))
# ========= Core actions (fallback kept) =========
async def send_as_is(msg, text_override: Optional[str] = None) -> None:
if DRY_RUN:
print("[DRY_RUN] send_as_is:", (text_override or msg.message or "")[:140])
return
if text_override is not None:
orig_text = text_override
entities = None
else:
orig_text = msg.message or (getattr(msg, "raw_text", None) or "")
entities = getattr(msg, "entities", None)
if INCLUDE_MEDIA and is_image_message(msg) and not media_too_big(msg):
try:
if getattr(msg, "photo", None):
await client.send_file(
TARGET_CHAT, msg.photo, caption=orig_text, caption_entities=entities, force_document=False
)
return
doc = getattr(msg, "document", None)
if doc:
data = await client.download_media(msg, file=bytes)
if data:
bio = io.BytesIO(data)
ext = ".jpg"
mt = (getattr(doc, "mime_type", "") or "").lower()
if mt:
ext_guess = guess_extension(mt) or ".jpg"
if ext_guess == ".jpe":
ext_guess = ".jpg"
ext = ext_guess
bio.name = f"media{ext}"
await client.send_file(
TARGET_CHAT, bio, caption=orig_text, caption_entities=entities, force_document=False
)
return
except FloodWaitError as e:
await asyncio.sleep(e.seconds + 1)
except Exception as e:
debug_log("Gagal kirim sebagai media, fallback ke text", str(e))
try:
await client.send_message(TARGET_CHAT, orig_text, formatting_entities=entities, link_preview=True)
except FloodWaitError as e:
await asyncio.sleep(e.seconds + 1)
await client.send_message(TARGET_CHAT, orig_text, formatting_entities=entities, link_preview=True)
# ========= Keyword extraction ($ticker-aware) =========
TICKER_CLEAN_RE = re.compile(r"\$[A-Za-z0-9]{2,12}")
TICKER_NOISY_RE = re.compile(r"\$[A-Za-z0-9](?:[^A-Za-z0-9]+[A-Za-z0-9]){1,11}")
def _extract_tickers(text_norm: str) -> List[str]:
"""
Ambil $TICKER dengan dua cara:
- Bersih: $ABC, $JBCOIN
- Noisy: $J*BCOIN -> dinormalisasi jadi $JBCOIN untuk *keyword* saja.
(Teks asli tetap dikirim apa adanya.)
"""
found = []
# bersih
for m in TICKER_CLEAN_RE.finditer(text_norm):
found.append(m.group(0).lower())
# noisy -> normalisasi internal
for m in TICKER_NOISY_RE.finditer(text_norm):
raw = m.group(0)
norm = "$" + re.sub(r"[^A-Za-z0-9]+", "", raw[1:])
if 3 <= len(norm) <= 13: # termasuk '$'
found.append(norm.lower())
# unik & pertahankan urutan
seen = set()
uniq = []
for x in found:
if x not in seen:
uniq.append(x)
seen.add(x)
return uniq
def _extract_all_keywords(text_norm: str) -> List[str]:
"""
Deteksi SEMUA keyword dari THEME_KEYWORDS + $ticker.
Tidak menghapus simbol '$' (sesuai permintaan).
"""
# toleran untuk pencarian keyword tema (seperti semula)
t = re.sub(r"\$([a-z0-9]+)", r"\1", text_norm, flags=re.I)
found = []
for kw in THEME_KEYWORDS:
if re.search(rf"(^|\W){re.escape(kw)}(\W|$)", t, flags=re.I):
found.append(kw.lower())
# gabungkan hasil $ticker
tickers = _extract_tickers(text_norm)
found.extend(tickers)
# unik dengan urutan muncul pertama
uniq = []
seen = set()
for kw in found:
if kw not in seen:
uniq.append(kw)
seen.add(kw)
return uniq
def _choose_dominant_keyword(text_norm: str, kws: List[str]) -> Optional[str]:
if not kws:
return None
# pilih berdasarkan frekuensi kemunculan + preferensi $ticker + posisi paling awal
score = {}
for kw in kws:
cnt = len(re.findall(rf"(^|\W){re.escape(kw)}(\W|$)", text_norm, flags=re.I))
first = re.search(rf"(^|\W){re.escape(kw)}(\W|$)", text_norm, flags=re.I)
first_idx = first.start() if first else 1_000_000
bonus = 1 if kw.startswith("$") else 0 # prefer $ticker saat imbang
score[kw] = (cnt, bonus, -first_idx)
chosen = sorted(score.items(), key=lambda x: (x[1][0], x[1][1], x[1][2]), reverse=True)[0][0]
return chosen
def _role_of(chat_id: int) -> str:
# DEFAULT KE SUPPORT agar tidak salah meloloskan chat yang tidak tertag
return chat_roles.get(chat_id, "support")
def _unique_counts_by_role(keyword: str) -> Tuple[int, int]:
"""
Hitung jumlah grup unik yang menyebut 'keyword' dalam window aktif,
dipisah CORE vs SUPPORT.
"""
bucket = keyword_group_last_seen.get(keyword, {})
core_ids, sup_ids = set(), set()
for gk in bucket.keys():
role = chat_roles.get(int(gk), "support") # default support untuk aman
(core_ids if role == "core" else sup_ids).add(gk)
return len(core_ids), len(sup_ids)
# ========= Entity-key extraction (CA > $ticker) =========
def extract_entity_key(text: str) -> Optional[str]:
"""Kembalikan kunci entitas kanonik untuk penentuan 'kesamaan':
- Jika ada CA -> 'ca:evm:<0x...>' atau 'ca:sol:<base58>'
- Else jika ada $ticker -> 'ticker:<lowercase>'
- Else None
"""
t = normalize_for_filter(text)
# Prefer CA lebih dulu
m = CA_EVM_RE.search(t) or CA_SOL_RE.search(t)
if m:
addr = m.group(0)
kind = "evm" if addr.lower().startswith("0x") else "sol"
return f"ca:{kind}:{addr.lower()}"
# Fall back ke $ticker (pakai deteksi yang sudah ada)
tickers = _extract_tickers(t.lower())
if tickers:
return f"ticker:{tickers[0][1:].lower()}"
return None
async def process_message(msg, source_chat_id: int) -> None:
"""
Filter, content-dedup, relevansi, multi-kw -> pilih dominan,
agregasi tier, gating support (CORE-anchored), filter ajakan, dan POST/EDIT/REPLY/NEW.
"""
orig_text = msg.message or (getattr(msg, "raw_text", None) or "")
text_norm = normalize_for_filter(orig_text).lower()
# Pengecualian eksplisit
for phrase in EXCLUDE_PHRASES:
if phrase.lower() in text_norm:
debug_log("Dilewati karena EXCLUDE_PHRASES", orig_text)
return
# === Entity duplicate handling + UPDATE detection ===
entity_key = extract_entity_key(orig_text)
duplicate_entity = bool(entity_key and entity_key in recent_entity_keys)
UPDATE_HINTS = [
r"\bupdate\b", r"\bupd\b", r"\bmcap\b", r"\bmarket\s*cap\b",
r"\bhit\b", r"\btp\d?\b", r"\btarget\b", r"\bath\b", r"\bnew\s*high\b",
r"\bliq(uidity)?\b", r"\bvolume\b", r"\bnow\b", r"⇒|->|→",
r"\bfrom\b.*\bto\b", r"\b\d+(\.\d+)?\s*[km]?\b",
]
UPDATE_RE = re.compile("|".join(UPDATE_HINTS), re.IGNORECASE)
update_like = bool(UPDATE_RE.search(orig_text or ""))
# Jika duplikat entitas dan bukan update -> tahan
if duplicate_entity and not update_like:
debug_log("Entity-duplicate (non-update), dilewati", orig_text)
return
# Content-only dedup (lintas grup)
ch = content_only_hash(orig_text)
if ch in recent_content_hashes:
debug_log("Content-duplicate (global), dilewati", orig_text)
return
recent_content_hashes.append(ch)
# Dedup lama (per pesan/media)
h = hash_for_dedup(text_norm, msg)
if h in recent_hashes:
debug_log("Duplikat (pesan/media), dilewati", orig_text)
return
recent_hashes.append(h)
# Relevansi (pakai teks yang CA/URL-nya dinetralkan)
score = score_relevance(text_norm, THEME_KEYWORDS)
# >>> BYPASS RELEVANSI JIKA ADA CA (agar "CA-only" tidak diblok)
allow_by_ca = bool(entity_key and entity_key.startswith("ca:"))
debug_log(f"Skor relevansi={score:.2f} | allow_by_ca={allow_by_ca}", orig_text)
if score < RELEVANCE_THRESHOLD and not allow_by_ca:
return
role = _role_of(source_chat_id) # 'core' / 'support'
# Multi-kw -> pilih satu dominan untuk agregasi (fallback jika tak ada entity)
all_kws = _extract_all_keywords(text_norm)
main_kw = _choose_dominant_keyword(text_norm, all_kws)
# topic key = entity_key (CA/$ticker) jika ada, else main_kw
topic_key = entity_key or main_kw
if not topic_key:
debug_log("Tak ada keyword/entitas cocok, dilewati", orig_text)
return
# Agregasi & kelas (berdasar topic_key)
group_key = str(source_chat_id)
now = datetime.now(timezone.utc)
class_label, unique_groups = update_and_classify(topic_key, group_key, now)
# Gating SUPPORT (CORE-anchored)
if role != "core":
core_u, sup_u = _unique_counts_by_role(topic_key)
if core_u >= 1:
pass
elif sup_u < SUPPORT_MIN_UNIQUE:
debug_log(
f"Support ditahan (core_u={core_u}, sup_u={sup_u} < {SUPPORT_MIN_UNIQUE})",
orig_text,
)
return
# Filter kalimat ajakan (whitelist-aware)
cleaned_body = filter_invite_sentences(orig_text)
if not cleaned_body.strip():
debug_log("Semua kalimat terfilter (kosong), dilewati", orig_text)
return
# Backfill safety: saat startup, hindari pesan yang terlalu lama
cutoff = startup_time_utc - timedelta(
minutes=CLASS_WINDOW_MINUTES + BACKFILL_BUFFER_MINUTES
)
if getattr(msg, "date", None):
msg_dt = msg.date
if isinstance(msg_dt, datetime) and msg_dt.replace(tzinfo=timezone.utc) < cutoff:
debug_log("Lama (lewat cutoff backfill safety), dilewati", orig_text)
return
# simpan entity-key untuk throttle non-update; untuk update tetap simpan agar tak spam
if entity_key:
recent_entity_keys.append(entity_key)
await post_or_update(topic_key, cleaned_body, class_label, msg, update_like=update_like)
debug_log(
f"Posted/Edited (role={role}, unique_groups={unique_groups}, key={topic_key}, tier={class_label}, update_like={update_like})",
orig_text,
)
async def backfill_history(entity, limit: int) -> None:
if limit <= 0:
return
print(f"[Backfill] Tarik {limit} pesan terakhir dari {getattr(entity, 'title', 'Unknown')} ...")
async for m in client.iter_messages(entity, limit=limit):
try:
chat_id = getattr(m.peer_id, 'channel_id', None) or \
getattr(m.peer_id, 'chat_id', None) or \
getattr(m.peer_id, 'user_id', None)
if chat_id:
await process_message(m, source_chat_id=abs(chat_id))
except Exception as e:
debug_log(f"Error saat memproses backfill untuk pesan ID {m.id}", str(e))
# ========= Event handlers =========
@client.on(events.NewMessage(chats=SOURCE_CHATS))
async def on_new_message(event):
try:
await process_message(event.message, source_chat_id=abs(event.chat_id))
except Exception as e:
print(f"Process error di chat {event.chat_id}: {e}")
# ========= Entry points =========
async def _resolve_and_tag_chats(raw_list, role_label: str) -> list:
resolved = []
for src in raw_list:
try:
ent = await client.get_entity(src)
resolved.append(ent)
chat_roles[abs(int(ent.id))] = role_label
except Exception as e:
print(f"Gagal resolve sumber {src}: {e}")
return resolved
async def start_bot_background() -> None:
await client.start()
_init_db()
# Load persisted state
global last_posted, keyword_group_last_seen
last_posted, keyword_group_last_seen = db_load_state()
resolved_core = await _resolve_and_tag_chats(CORE_CHATS, "core")
resolved_support = await _resolve_and_tag_chats(SUPPORT_CHATS, "support")
resolved_sources = resolved_core + resolved_support
for ent in resolved_sources:
try:
await backfill_history(ent, INITIAL_BACKFILL)
except Exception as e:
print(f"Backfill gagal untuk {getattr(ent, 'title', 'Unknown')}: {e}")
print("Kurator berjalan (background task). Menunggu pesan baru...")
asyncio.create_task(client.run_until_disconnected())
async def app_main() -> None:
await client.start()
_init_db()
global last_posted, keyword_group_last_seen
last_posted, keyword_group_last_seen = db_load_state()
resolved_core = await _resolve_and_tag_chats(CORE_CHATS, "core")
resolved_support = await _resolve_and_tag_chats(SUPPORT_CHATS, "support")
resolved_sources = resolved_core + resolved_support
for ent in resolved_sources:
await backfill_history(ent, INITIAL_BACKFILL)
print("Kurator berjalan. Menunggu pesan baru... (Stop dengan interrupt).")
await client.run_until_disconnected()
if __name__ == "__main__":
asyncio.run(app_main())