import asyncio import os import re import io import sqlite3 import hashlib from collections import deque, defaultdict from datetime import datetime, timedelta, timezone from mimetypes import guess_extension from typing import List, Tuple, Optional, Dict from rapidfuzz import fuzz from telethon import TelegramClient, events from telethon.sessions import StringSession, MemorySession from telethon.errors.rpcerrorlist import FloodWaitError # ========= Configuration via Environment ========= API_ID = int(os.environ.get("API_ID", "0")) API_HASH = os.environ.get("API_HASH", "" ) STRING_SESSION = os.environ.get("STRING_SESSION", "") # --- Definisikan sumber sebagai CORE vs SUPPORT (pakai data milikmu) --- CORE_CHATS = [ "https://t.me/PEPE_Calls28", "https://t.me/HenryGems", "https://t.me/ChinaPumpCommunity", "https://t.me/SephirothGemCalls1", "https://t.me/GM_Degencalls", "https://t.me/Enthucalls", "https://t.me/kobecalls", "https://t.me/Kulture_Kall", ] SUPPORT_CHATS = [ "https://t.me/TheDonALPHAJournal", "https://t.me/savascalls", "https://t.me/Tanjirocall", "https://t.me/Zen_call", "https://t.me/ChapoInsider", "https://t.me/millionsgems", "https://t.me/Milagrosdegencalls", "https://t.me/kariusgemscalls", "https://t.me/Dwen_Exchange", "https://t.me/bat_gamble", "https://t.me/BatmanGamble", "https://t.me/hulkgemscalls_real", "https://t.me/MineGems", ] SOURCE_CHATS = CORE_CHATS + SUPPORT_CHATS TARGET_CHAT = os.environ.get("TARGET_CHAT", "https://t.me/MidasTouchsignalll") # Kata kunci topik + simbol '$' tetap dipakai THEME_KEYWORDS = [ "call", "signal", "entry", "buy", "sell", "tp", "sl", "pump", "spot", "futures", "setup", "pepe", "bnb", "eth", "btc", "sol", "meme", "$", ] KEYWORD_WEIGHT = 1.0 FUZZ_WEIGHT = 0.6 RELEVANCE_THRESHOLD = float(os.environ.get("RELEVANCE_THRESHOLD", "1.0")) EXCLUDE_PHRASES = [ "achievement unlocked", ] INCLUDE_MEDIA = os.environ.get("INCLUDE_MEDIA", "1") == "1" MAX_MEDIA_MB = float(os.environ.get("MAX_MEDIA_MB", "12")) SKIP_STICKERS = os.environ.get("SKIP_STICKERS", "1") == "1" ALLOW_GIFS_VIDEOS = os.environ.get("ALLOW_GIFS_VIDEOS", "0") == "1" INITIAL_BACKFILL = int(os.environ.get("INITIAL_BACKFILL", "20")) DEDUP_BUFFER_SIZE = int(os.environ.get("DEDUP_BUFFER_SIZE", "800")) CLASS_WINDOW_MINUTES = int(os.environ.get("CLASS_WINDOW_MINUTES", "20")) SUPPORT_MIN_UNIQUE = int(os.environ.get("SUPPORT_MIN_UNIQUE", "2")) # DRY RUN (tidak kirim apa pun ke TARGET_CHAT) DRY_RUN = os.environ.get("DRY_RUN", "0") == "1" # Backfill buffer: abaikan pesan lebih tua dari (startup_time - buffer) BACKFILL_BUFFER_MINUTES = int(os.environ.get("BACKFILL_BUFFER_MINUTES", "3")) # === Update behavior strategy === # edit : (default) edit pesan terakhir untuk entitas saat UPDATE # reply : balas (reply_to) pesan pertama entitas itu # new : kirim pesan baru untuk setiap UPDATE UPDATE_STRATEGY = os.environ.get("UPDATE_STRATEGY", "reply").lower() UPDATE_COOLDOWN_SEC = int(os.environ.get("UPDATE_COOLDOWN_SEC", "5")) # ========= Client bootstrap ========= def build_client() -> TelegramClient: if STRING_SESSION: print(">> Using StringSession (persistent).") return TelegramClient(StringSession(STRING_SESSION), API_ID, API_HASH) print(">> Using MemorySession (login tiap run).") return TelegramClient(MemorySession(), API_ID, API_HASH) client = build_client() recent_hashes: deque[str] = deque(maxlen=DEDUP_BUFFER_SIZE) recent_content_hashes: deque[str] = deque(maxlen=DEDUP_BUFFER_SIZE) # content-only dedup # entity-based dedup (CA/$ticker) recent_entity_keys: deque[str] = deque(maxlen=DEDUP_BUFFER_SIZE) # Peta id_chat -> "core" / "support" chat_roles: Dict[int, str] = {} # diisi saat startup setelah resolve entity startup_time_utc = datetime.now(timezone.utc) # ========= Persistence (SQLite) ========= DB_PATH = os.environ.get("BOTSIGNAL_DB", "/tmp/botsignal.db") def _db(): conn = sqlite3.connect(DB_PATH) conn.execute("PRAGMA journal_mode=WAL;") return conn def _init_db(): conn = _db() conn.executescript( """ CREATE TABLE IF NOT EXISTS last_posted ( keyword TEXT PRIMARY KEY, msg_id INTEGER NOT NULL, tier TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS kw_group_seen ( keyword TEXT NOT NULL, group_key TEXT NOT NULL, last_ts INTEGER NOT NULL, PRIMARY KEY (keyword, group_key) ); """ ) conn.commit() conn.close() def db_load_state(): """Load last_posted & kw_group_seen into memory on startup.""" conn = _db() last = {} for kw, mid, tier in conn.execute("SELECT keyword, msg_id, tier FROM last_posted"): last[kw] = {"msg_id": mid, "tier": tier} kw_map: defaultdict[str, dict[str, datetime]] = defaultdict(dict) for kw, gk, ts in conn.execute("SELECT keyword, group_key, last_ts FROM kw_group_seen"): kw_map[kw][gk] = datetime.fromtimestamp(ts, tz=timezone.utc) conn.close() return last, kw_map def db_save_last_posted(keyword: str, msg_id: int, tier: str): conn = _db() conn.execute( "INSERT INTO last_posted(keyword, msg_id, tier) VALUES(?,?,?) " "ON CONFLICT(keyword) DO UPDATE SET msg_id=excluded.msg_id, tier=excluded.tier", (keyword, msg_id, tier), ) conn.commit() conn.close() def db_upsert_kw_seen(keyword: str, group_key: str, ts: datetime): conn = _db() conn.execute( "INSERT INTO kw_group_seen(keyword, group_key, last_ts) VALUES(?,?,?) " "ON CONFLICT(keyword, group_key) DO UPDATE SET last_ts=excluded.last_ts", (keyword, group_key, int(ts.timestamp())), ) conn.commit() conn.close() def db_prune_expired(cutoff: datetime): conn = _db() conn.execute("DELETE FROM kw_group_seen WHERE last_ts < ?", (int(cutoff.timestamp()),)) conn.commit() conn.close() # ========= Utilities ========= def debug_log(reason: str, content: str = "") -> None: short = (content or "").replace("\n", " ")[:160] print(f"[DEBUG] {reason}: {short}") def normalize_for_filter(text: str) -> str: if not text: return "" s = re.sub(r"(?m)^>.*", "", text) s = re.sub(r"\s+", " ", s).strip() return s def _tokenize_words(s: str) -> List[str]: return re.findall(r"[a-z0-9\$\#]{1,64}", s.lower()) def _windows(tokens: List[str], size: int = 20): for i in range(0, len(tokens), size): yield " ".join(tokens[i : i + size]) # --- Bersihkan URL/CA untuk kepentingan SCORING relevansi --- CA_SOL_RE = re.compile(r"\b[1-9A-HJ-NP-Za-km-z]{32,48}\b") # Solana base58 (perkiraan) CA_EVM_RE = re.compile(r"\b0x[a-fA-F0-9]{40}\b") # EVM address CA_LABEL_RE = re.compile(r"\bCA\s*[:=]\s*\S+", re.IGNORECASE) # "CA: ..." potong tokennya def _strip_urls_and_mentions(s: str) -> str: s = re.sub(r"https?://\S+", "", s) s = re.sub(r"t\.me/[A-Za-z0-9_]+", "", s) s = re.sub(r"@[A-Za-z0-9_]+", "", s) return re.sub(r"\s+", " ", s).strip() def strip_contracts_for_scoring(s: str) -> str: """ Hilangkan URL/mention, alamat kontrak, dan token setelah 'CA:' agar kata 'pump' pada CA/URL (mis. pump.fun) tidak memengaruhi skor. """ s0 = _strip_urls_and_mentions(s) s1 = CA_LABEL_RE.sub(" ", s0) s2 = CA_EVM_RE.sub(" ", s1) s3 = CA_SOL_RE.sub(" ", s2) return re.sub(r"\s+", " ", s3).strip() def score_relevance(text: str, keywords: List[str]) -> float: """Skor: exact keyword + fuzzy windowed (top-3 rata-rata) agar adil untuk teks panjang.""" if not text: return 0.0 # Gunakan versi yang TIDAK mengandung URL/CA agar 'pump' di CA tidak ikut dihitung t = strip_contracts_for_scoring(text).lower() # exact hits (unik) exact_hits = 0 for kw in set(keywords): if kw in t or re.search(rf"\b{re.escape(kw)}\b", t): exact_hits += 1 exact_score = exact_hits * KEYWORD_WEIGHT # fuzzy windowed: ambil top-3 skor di antara jendela 20 token tokens = _tokenize_words(t) if not tokens: return exact_score scores = [] for w in _windows(tokens, 20): best = 0.0 for kw in keywords: sc = fuzz.partial_ratio(kw, w) / 100.0 if sc > best: best = sc scores.append(best) fuzzy_top3 = sorted(scores, reverse=True)[:3] fuzzy_score = (sum(fuzzy_top3) / max(1, len(fuzzy_top3))) * FUZZ_WEIGHT if fuzzy_top3 else 0.0 return exact_score + fuzzy_score def hash_for_dedup(text: str, msg) -> str: """Hash campuran (lama) – menahan duplikat per pesan+media.""" parts = [text or ""] if getattr(msg, "id", None) is not None: parts.append(str(msg.id)) doc = getattr(msg, "document", None) if doc and getattr(doc, "id", None) is not None: parts.append(f"doc:{doc.id}") if getattr(msg, "photo", None) is not None: ph = msg.photo ph_id = getattr(ph, "id", None) if ph_id is not None: parts.append(f"photo:{ph_id}") raw = "|".join(parts).encode("utf-8", errors="ignore") return hashlib.sha1(raw).hexdigest() def content_only_hash(text: str) -> str: """Hash berbasis isi saja (untuk lintas-grup crosspost).""" norm = _strip_urls_and_mentions(normalize_for_filter(text)) return hashlib.sha1(norm.encode("utf-8", errors="ignore")).hexdigest() # ========= Class aggregator (windowed unique groups) ========= keyword_group_last_seen: defaultdict[str, dict[str, datetime]] = defaultdict(dict) def _prune_expired(now: datetime) -> None: window = timedelta(minutes=CLASS_WINDOW_MINUTES) cutoff = now - window # in-memory prune for kw, m in list(keyword_group_last_seen.items()): for gk, ts in list(m.items()): if ts < cutoff: del m[gk] if not m: del keyword_group_last_seen[kw] # db prune db_prune_expired(cutoff) def update_and_classify(keyword: str, group_key: str, now: Optional[datetime] = None) -> Tuple[str, int]: if not now: now = datetime.now(timezone.utc) _prune_expired(now) bucket = keyword_group_last_seen[keyword] bucket[group_key] = now db_upsert_kw_seen(keyword, group_key, now) unique_groups = len(bucket) if unique_groups >= 4: return "kuat", unique_groups elif unique_groups >= 2: return "sedang", unique_groups else: return "rendah", unique_groups # ========= Sentence-level invite filter (smarter) ========= INVITE_PATTERNS = [ r"\bjoin\b", r"\bjoin (us|our|channel|group)\b", r"\bdm\b", r"\bdm (me|gw|gue|gua|saya|admin)\b", r"\bpm\b", r"\binbox\b", r"\bcontact\b", r"\bkontak\b", r"\bhubungi\b", r"\bvip\b", r"\bpremium\b", r"\bberbayar\b", r"\bpaid\b", r"\bexclusive\b", r"\bwhitelist\b", r"\bprivate( group| channel)?\b", r"\bmembership?\b", r"\bsubscribe\b", r"\blangganan\b", # kata kunci promo/iklan r"\bpromo\b", r"\bpromosi\b", r"\biklan\b", r"\badvert\b", r"\badvertise\b", r"\badvertisement\b", # tautan undangan/shortener r"(t\.me\/joinchat|t\.me\/\+|telegram\.me\/|discord\.gg\/|wa\.me\/|whatsapp\.com\/)", r"(bit\.ly|tinyurl\.com|linktr\.ee)", # UPGRADE: Aturan agresif yang memblokir semua link t.me biasa tetap dihapus # r"t\.me\/[A-Za-z0-9_]+", ] INVITE_REGEXES = [re.compile(p, re.IGNORECASE) for p in INVITE_PATTERNS] # whitelist: kalimat yang nampak "sinyal asli", jangan dihapus WHITELIST_STRONG_SIGNAL = [ r"\$[a-z0-9]{2,10}", # $TICKER r"\b(entry|entries|buy|sell)\b", r"\bsl\b", r"\btp\b", r"\btp\d\b", ] WHITELIST_REGEXES = [re.compile(p, re.IGNORECASE) for p in WHITELIST_STRONG_SIGNAL] def _is_invite_sentence(s: str) -> bool: t = s.strip() if not t: return False # Jika kalimat memuat sinyal kuat, jangan dibuang walau ada kata invite if any(r.search(t) for r in WHITELIST_REGEXES): return False # Jika ada 1+ pola ajakan, buang return any(r.search(t) for r in INVITE_REGEXES) def filter_invite_sentences(text: str) -> str: if not text: return text parts = re.split(r'(?<=[\.!\?])\s+|\n+', text, flags=re.UNICODE) kept = [p.strip() for p in parts if p and not _is_invite_sentence(p)] cleaned = "\n".join(kept).strip() cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) return cleaned # ========= Media helpers ========= def is_image_message(msg) -> bool: """ True untuk: - Photo bawaan Telegram - Image (image/*). Jika 'image/webp' (sticker), hormati SKIP_STICKERS - GIF/Video hanya jika ALLOW_GIFS_VIDEOS=True """ if getattr(msg, "photo", None): return True doc = getattr(msg, "document", None) if not doc: return False mt = (getattr(doc, "mime_type", "") or "").lower() # Skip sticker (sering berupa image/webp) if mt == "image/webp" and SKIP_STICKERS: return False if mt.startswith("image/"): # GIF diizinkan hanya jika ALLOW_GIFS_VIDEOS=True if mt == "image/gif": return ALLOW_GIFS_VIDEOS return True # Video diizinkan hanya jika ALLOW_GIFS_VIDEOS=True if mt.startswith("video/"): return ALLOW_GIFS_VIDEOS return False def media_too_big(msg) -> bool: doc = getattr(msg, "document", None) if not doc: return False size = getattr(doc, "size", None) if size is None: return False return (size / (1024 * 1024)) > MAX_MEDIA_MB # ========= Post-on-threshold with EDIT/REPLY/NEW (persisted) ========= TIER_ORDER = {"rendah": 0, "sedang": 1, "kuat": 2} last_posted: Dict[str, Dict[str, object]] = {} # keyword -> {"msg_id": int, "tier": str} # simpan body & waktu update terakhir per entitas last_body: Dict[str, str] = {} last_update_ts: Dict[str, float] = {} async def _send_initial(msg, text: str) -> int: if DRY_RUN: print("[DRY_RUN] send_initial:", text[:140]) return -1 # kirim media bila ada & allowed if INCLUDE_MEDIA and is_image_message(msg) and not media_too_big(msg): try: if getattr(msg, "photo", None): m = await client.send_file( TARGET_CHAT, msg.photo, caption=text, caption_entities=None, force_document=False ) return m.id doc = getattr(msg, "document", None) if doc: data = await client.download_media(msg, file=bytes) if data: bio = io.BytesIO(data) ext = ".jpg" mt = (getattr(doc, "mime_type", "") or "").lower() if mt: ext_guess = guess_extension(mt) or ".jpg" if ext_guess == ".jpe": ext_guess = ".jpg" ext = ext_guess bio.name = f"media{ext}" m = await client.send_file( TARGET_CHAT, bio, caption=text, caption_entities=None, force_document=False ) return m.id except FloodWaitError as e: await asyncio.sleep(e.seconds + 1) return await _send_initial(msg, text) except Exception as e: debug_log("Gagal kirim media awal, fallback text", str(e)) try: m = await client.send_message(TARGET_CHAT, text, link_preview=True) return m.id except FloodWaitError as e: await asyncio.sleep(e.seconds + 1) return await _send_initial(msg, text) async def post_or_update(keyword: str, body: str, new_tier: str, src_msg, *, update_like: bool = False) -> None: prefix = f"[{new_tier.upper()}] " text = prefix + body prev = last_posted.get(keyword) now_ts = datetime.now().timestamp() # kirim pertama kali if not prev: msg_id = await _send_initial(src_msg, text) last_posted[keyword] = {"msg_id": msg_id, "tier": new_tier} last_body[keyword] = body last_update_ts[keyword] = now_ts if msg_id != -1: db_save_last_posted(keyword, msg_id, new_tier) return # Jika tier naik, selalu edit if TIER_ORDER.get(new_tier, 0) > TIER_ORDER.get(prev["tier"], 0): try: await client.edit_message(TARGET_CHAT, prev["msg_id"], text) prev["tier"] = new_tier last_body[keyword] = body last_update_ts[keyword] = now_ts if prev["msg_id"] != -1: db_save_last_posted(keyword, prev["msg_id"], new_tier) except FloodWaitError as e: await asyncio.sleep(e.seconds + 1) await post_or_update(keyword, body, new_tier, src_msg, update_like=update_like) except Exception as e: debug_log("Edit gagal (tier naik), fallback kirim baru", str(e)) msg_id = await _send_initial(src_msg, text) last_posted[keyword] = {"msg_id": msg_id, "tier": new_tier} last_body[keyword] = body last_update_ts[keyword] = now_ts if msg_id != -1: db_save_last_posted(keyword, msg_id, new_tier) return # Tier sama: hanya proses jika ini update_like if not update_like: return # Hindari spam: kalau body sama atau masih cooldown, no-op if body.strip() == last_body.get(keyword, "").strip() and (now_ts - last_update_ts.get(keyword, 0) < UPDATE_COOLDOWN_SEC): return try: if UPDATE_STRATEGY == "edit": await client.edit_message(TARGET_CHAT, prev["msg_id"], text) last_body[keyword] = body last_update_ts[keyword] = now_ts if prev["msg_id"] != -1: db_save_last_posted(keyword, prev["msg_id"], new_tier) elif UPDATE_STRATEGY == "reply": # kirim balasan ke pesan awal entitas await client.send_message(TARGET_CHAT, text, reply_to=prev["msg_id"], link_preview=True) last_body[keyword] = body last_update_ts[keyword] = now_ts elif UPDATE_STRATEGY == "new": await client.send_message(TARGET_CHAT, text, link_preview=True) last_body[keyword] = body last_update_ts[keyword] = now_ts else: # fallback ke edit jika value salah await client.edit_message(TARGET_CHAT, prev["msg_id"], text) last_body[keyword] = body last_update_ts[keyword] = now_ts if prev["msg_id"] != -1: db_save_last_posted(keyword, prev["msg_id"], new_tier) except FloodWaitError as e: await asyncio.sleep(e.seconds + 1) await post_or_update(keyword, body, new_tier, src_msg, update_like=update_like) except Exception as e: debug_log("Update gagal (strategy), diabaikan", str(e)) # ========= Core actions (fallback kept) ========= async def send_as_is(msg, text_override: Optional[str] = None) -> None: if DRY_RUN: print("[DRY_RUN] send_as_is:", (text_override or msg.message or "")[:140]) return if text_override is not None: orig_text = text_override entities = None else: orig_text = msg.message or (getattr(msg, "raw_text", None) or "") entities = getattr(msg, "entities", None) if INCLUDE_MEDIA and is_image_message(msg) and not media_too_big(msg): try: if getattr(msg, "photo", None): await client.send_file( TARGET_CHAT, msg.photo, caption=orig_text, caption_entities=entities, force_document=False ) return doc = getattr(msg, "document", None) if doc: data = await client.download_media(msg, file=bytes) if data: bio = io.BytesIO(data) ext = ".jpg" mt = (getattr(doc, "mime_type", "") or "").lower() if mt: ext_guess = guess_extension(mt) or ".jpg" if ext_guess == ".jpe": ext_guess = ".jpg" ext = ext_guess bio.name = f"media{ext}" await client.send_file( TARGET_CHAT, bio, caption=orig_text, caption_entities=entities, force_document=False ) return except FloodWaitError as e: await asyncio.sleep(e.seconds + 1) except Exception as e: debug_log("Gagal kirim sebagai media, fallback ke text", str(e)) try: await client.send_message(TARGET_CHAT, orig_text, formatting_entities=entities, link_preview=True) except FloodWaitError as e: await asyncio.sleep(e.seconds + 1) await client.send_message(TARGET_CHAT, orig_text, formatting_entities=entities, link_preview=True) # ========= Keyword extraction ($ticker-aware) ========= TICKER_CLEAN_RE = re.compile(r"\$[A-Za-z0-9]{2,12}") TICKER_NOISY_RE = re.compile(r"\$[A-Za-z0-9](?:[^A-Za-z0-9]+[A-Za-z0-9]){1,11}") def _extract_tickers(text_norm: str) -> List[str]: """ Ambil $TICKER dengan dua cara: - Bersih: $ABC, $JBCOIN - Noisy: $J*BCOIN -> dinormalisasi jadi $JBCOIN untuk *keyword* saja. (Teks asli tetap dikirim apa adanya.) """ found = [] # bersih for m in TICKER_CLEAN_RE.finditer(text_norm): found.append(m.group(0).lower()) # noisy -> normalisasi internal for m in TICKER_NOISY_RE.finditer(text_norm): raw = m.group(0) norm = "$" + re.sub(r"[^A-Za-z0-9]+", "", raw[1:]) if 3 <= len(norm) <= 13: # termasuk '$' found.append(norm.lower()) # unik & pertahankan urutan seen = set() uniq = [] for x in found: if x not in seen: uniq.append(x) seen.add(x) return uniq def _extract_all_keywords(text_norm: str) -> List[str]: """ Deteksi SEMUA keyword dari THEME_KEYWORDS + $ticker. Tidak menghapus simbol '$' (sesuai permintaan). """ # toleran untuk pencarian keyword tema (seperti semula) t = re.sub(r"\$([a-z0-9]+)", r"\1", text_norm, flags=re.I) found = [] for kw in THEME_KEYWORDS: if re.search(rf"(^|\W){re.escape(kw)}(\W|$)", t, flags=re.I): found.append(kw.lower()) # gabungkan hasil $ticker tickers = _extract_tickers(text_norm) found.extend(tickers) # unik dengan urutan muncul pertama uniq = [] seen = set() for kw in found: if kw not in seen: uniq.append(kw) seen.add(kw) return uniq def _choose_dominant_keyword(text_norm: str, kws: List[str]) -> Optional[str]: if not kws: return None # pilih berdasarkan frekuensi kemunculan + preferensi $ticker + posisi paling awal score = {} for kw in kws: cnt = len(re.findall(rf"(^|\W){re.escape(kw)}(\W|$)", text_norm, flags=re.I)) first = re.search(rf"(^|\W){re.escape(kw)}(\W|$)", text_norm, flags=re.I) first_idx = first.start() if first else 1_000_000 bonus = 1 if kw.startswith("$") else 0 # prefer $ticker saat imbang score[kw] = (cnt, bonus, -first_idx) chosen = sorted(score.items(), key=lambda x: (x[1][0], x[1][1], x[1][2]), reverse=True)[0][0] return chosen def _role_of(chat_id: int) -> str: # DEFAULT KE SUPPORT agar tidak salah meloloskan chat yang tidak tertag return chat_roles.get(chat_id, "support") def _unique_counts_by_role(keyword: str) -> Tuple[int, int]: """ Hitung jumlah grup unik yang menyebut 'keyword' dalam window aktif, dipisah CORE vs SUPPORT. """ bucket = keyword_group_last_seen.get(keyword, {}) core_ids, sup_ids = set(), set() for gk in bucket.keys(): role = chat_roles.get(int(gk), "support") # default support untuk aman (core_ids if role == "core" else sup_ids).add(gk) return len(core_ids), len(sup_ids) # ========= Entity-key extraction (CA > $ticker) ========= def extract_entity_key(text: str) -> Optional[str]: """Kembalikan kunci entitas kanonik untuk penentuan 'kesamaan': - Jika ada CA -> 'ca:evm:<0x...>' atau 'ca:sol:' - Else jika ada $ticker -> 'ticker:' - Else None """ t = normalize_for_filter(text) # Prefer CA lebih dulu m = CA_EVM_RE.search(t) or CA_SOL_RE.search(t) if m: addr = m.group(0) kind = "evm" if addr.lower().startswith("0x") else "sol" return f"ca:{kind}:{addr.lower()}" # Fall back ke $ticker (pakai deteksi yang sudah ada) tickers = _extract_tickers(t.lower()) if tickers: return f"ticker:{tickers[0][1:].lower()}" return None async def process_message(msg, source_chat_id: int) -> None: """ Filter, content-dedup, relevansi, multi-kw -> pilih dominan, agregasi tier, gating support (CORE-anchored), filter ajakan, dan POST/EDIT/REPLY/NEW. """ orig_text = msg.message or (getattr(msg, "raw_text", None) or "") text_norm = normalize_for_filter(orig_text).lower() # Pengecualian eksplisit for phrase in EXCLUDE_PHRASES: if phrase.lower() in text_norm: debug_log("Dilewati karena EXCLUDE_PHRASES", orig_text) return # === Entity duplicate handling + UPDATE detection === entity_key = extract_entity_key(orig_text) duplicate_entity = bool(entity_key and entity_key in recent_entity_keys) UPDATE_HINTS = [ r"\bupdate\b", r"\bupd\b", r"\bmcap\b", r"\bmarket\s*cap\b", r"\bhit\b", r"\btp\d?\b", r"\btarget\b", r"\bath\b", r"\bnew\s*high\b", r"\bliq(uidity)?\b", r"\bvolume\b", r"\bnow\b", r"⇒|->|→", r"\bfrom\b.*\bto\b", r"\b\d+(\.\d+)?\s*[km]?\b", ] UPDATE_RE = re.compile("|".join(UPDATE_HINTS), re.IGNORECASE) update_like = bool(UPDATE_RE.search(orig_text or "")) # Jika duplikat entitas dan bukan update -> tahan if duplicate_entity and not update_like: debug_log("Entity-duplicate (non-update), dilewati", orig_text) return # Content-only dedup (lintas grup) ch = content_only_hash(orig_text) if ch in recent_content_hashes: debug_log("Content-duplicate (global), dilewati", orig_text) return recent_content_hashes.append(ch) # Dedup lama (per pesan/media) h = hash_for_dedup(text_norm, msg) if h in recent_hashes: debug_log("Duplikat (pesan/media), dilewati", orig_text) return recent_hashes.append(h) # Relevansi (pakai teks yang CA/URL-nya dinetralkan) score = score_relevance(text_norm, THEME_KEYWORDS) # >>> BYPASS RELEVANSI JIKA ADA CA (agar "CA-only" tidak diblok) allow_by_ca = bool(entity_key and entity_key.startswith("ca:")) debug_log(f"Skor relevansi={score:.2f} | allow_by_ca={allow_by_ca}", orig_text) if score < RELEVANCE_THRESHOLD and not allow_by_ca: return role = _role_of(source_chat_id) # 'core' / 'support' # Multi-kw -> pilih satu dominan untuk agregasi (fallback jika tak ada entity) all_kws = _extract_all_keywords(text_norm) main_kw = _choose_dominant_keyword(text_norm, all_kws) # topic key = entity_key (CA/$ticker) jika ada, else main_kw topic_key = entity_key or main_kw if not topic_key: debug_log("Tak ada keyword/entitas cocok, dilewati", orig_text) return # Agregasi & kelas (berdasar topic_key) group_key = str(source_chat_id) now = datetime.now(timezone.utc) class_label, unique_groups = update_and_classify(topic_key, group_key, now) # Gating SUPPORT (CORE-anchored) if role != "core": core_u, sup_u = _unique_counts_by_role(topic_key) if core_u >= 1: pass elif sup_u < SUPPORT_MIN_UNIQUE: debug_log( f"Support ditahan (core_u={core_u}, sup_u={sup_u} < {SUPPORT_MIN_UNIQUE})", orig_text, ) return # Filter kalimat ajakan (whitelist-aware) cleaned_body = filter_invite_sentences(orig_text) if not cleaned_body.strip(): debug_log("Semua kalimat terfilter (kosong), dilewati", orig_text) return # Backfill safety: saat startup, hindari pesan yang terlalu lama cutoff = startup_time_utc - timedelta( minutes=CLASS_WINDOW_MINUTES + BACKFILL_BUFFER_MINUTES ) if getattr(msg, "date", None): msg_dt = msg.date if isinstance(msg_dt, datetime) and msg_dt.replace(tzinfo=timezone.utc) < cutoff: debug_log("Lama (lewat cutoff backfill safety), dilewati", orig_text) return # simpan entity-key untuk throttle non-update; untuk update tetap simpan agar tak spam if entity_key: recent_entity_keys.append(entity_key) await post_or_update(topic_key, cleaned_body, class_label, msg, update_like=update_like) debug_log( f"Posted/Edited (role={role}, unique_groups={unique_groups}, key={topic_key}, tier={class_label}, update_like={update_like})", orig_text, ) async def backfill_history(entity, limit: int) -> None: if limit <= 0: return print(f"[Backfill] Tarik {limit} pesan terakhir dari {getattr(entity, 'title', 'Unknown')} ...") async for m in client.iter_messages(entity, limit=limit): try: chat_id = getattr(m.peer_id, 'channel_id', None) or \ getattr(m.peer_id, 'chat_id', None) or \ getattr(m.peer_id, 'user_id', None) if chat_id: await process_message(m, source_chat_id=abs(chat_id)) except Exception as e: debug_log(f"Error saat memproses backfill untuk pesan ID {m.id}", str(e)) # ========= Event handlers ========= @client.on(events.NewMessage(chats=SOURCE_CHATS)) async def on_new_message(event): try: await process_message(event.message, source_chat_id=abs(event.chat_id)) except Exception as e: print(f"Process error di chat {event.chat_id}: {e}") # ========= Entry points ========= async def _resolve_and_tag_chats(raw_list, role_label: str) -> list: resolved = [] for src in raw_list: try: ent = await client.get_entity(src) resolved.append(ent) chat_roles[abs(int(ent.id))] = role_label except Exception as e: print(f"Gagal resolve sumber {src}: {e}") return resolved async def start_bot_background() -> None: await client.start() _init_db() # Load persisted state global last_posted, keyword_group_last_seen last_posted, keyword_group_last_seen = db_load_state() resolved_core = await _resolve_and_tag_chats(CORE_CHATS, "core") resolved_support = await _resolve_and_tag_chats(SUPPORT_CHATS, "support") resolved_sources = resolved_core + resolved_support for ent in resolved_sources: try: await backfill_history(ent, INITIAL_BACKFILL) except Exception as e: print(f"Backfill gagal untuk {getattr(ent, 'title', 'Unknown')}: {e}") print("Kurator berjalan (background task). Menunggu pesan baru...") asyncio.create_task(client.run_until_disconnected()) async def app_main() -> None: await client.start() _init_db() global last_posted, keyword_group_last_seen last_posted, keyword_group_last_seen = db_load_state() resolved_core = await _resolve_and_tag_chats(CORE_CHATS, "core") resolved_support = await _resolve_and_tag_chats(SUPPORT_CHATS, "support") resolved_sources = resolved_core + resolved_support for ent in resolved_sources: await backfill_history(ent, INITIAL_BACKFILL) print("Kurator berjalan. Menunggu pesan baru... (Stop dengan interrupt).") await client.run_until_disconnected() if __name__ == "__main__": asyncio.run(app_main())