Spaces:
Sleeping
Sleeping
| import asyncio | |
| import os | |
| import re | |
| import io | |
| import sqlite3 | |
| import hashlib | |
| from collections import deque, defaultdict | |
| from datetime import datetime, timedelta, timezone | |
| from mimetypes import guess_extension | |
| from typing import List, Tuple, Optional, Dict | |
| from rapidfuzz import fuzz | |
| from telethon import TelegramClient, events | |
| from telethon.sessions import StringSession, MemorySession | |
| from telethon.errors.rpcerrorlist import FloodWaitError | |
| # ========= Configuration via Environment ========= | |
| API_ID = int(os.environ.get("API_ID", "0")) | |
| API_HASH = os.environ.get("API_HASH", "" ) | |
| STRING_SESSION = os.environ.get("STRING_SESSION", "") | |
| # --- Definisikan sumber sebagai CORE vs SUPPORT (pakai data milikmu) --- | |
| CORE_CHATS = [ | |
| "https://t.me/PEPE_Calls28", | |
| "https://t.me/HenryGems", | |
| "https://t.me/ChinaPumpCommunity", | |
| "https://t.me/SephirothGemCalls1", | |
| "https://t.me/GM_Degencalls", | |
| "https://t.me/Enthucalls", | |
| "https://t.me/kobecalls", | |
| "https://t.me/Kulture_Kall", | |
| ] | |
| SUPPORT_CHATS = [ | |
| "https://t.me/TheDonALPHAJournal", | |
| "https://t.me/savascalls", | |
| "https://t.me/Tanjirocall", | |
| "https://t.me/Zen_call", | |
| "https://t.me/ChapoInsider", | |
| "https://t.me/millionsgems", | |
| "https://t.me/Milagrosdegencalls", | |
| "https://t.me/kariusgemscalls", | |
| "https://t.me/Dwen_Exchange", | |
| "https://t.me/bat_gamble", | |
| "https://t.me/BatmanGamble", | |
| "https://t.me/hulkgemscalls_real", | |
| "https://t.me/MineGems", | |
| ] | |
| SOURCE_CHATS = CORE_CHATS + SUPPORT_CHATS | |
| TARGET_CHAT = os.environ.get("TARGET_CHAT", "https://t.me/MidasTouchsignalll") | |
| # Kata kunci topik + simbol '$' tetap dipakai | |
| THEME_KEYWORDS = [ | |
| "call", "signal", "entry", "buy", "sell", "tp", "sl", | |
| "pump", "spot", "futures", "setup", | |
| "pepe", "bnb", "eth", "btc", "sol", "meme", "$", | |
| ] | |
| KEYWORD_WEIGHT = 1.0 | |
| FUZZ_WEIGHT = 0.6 | |
| RELEVANCE_THRESHOLD = float(os.environ.get("RELEVANCE_THRESHOLD", "1.0")) | |
| EXCLUDE_PHRASES = [ | |
| "achievement unlocked", | |
| ] | |
| INCLUDE_MEDIA = os.environ.get("INCLUDE_MEDIA", "1") == "1" | |
| MAX_MEDIA_MB = float(os.environ.get("MAX_MEDIA_MB", "12")) | |
| SKIP_STICKERS = os.environ.get("SKIP_STICKERS", "1") == "1" | |
| ALLOW_GIFS_VIDEOS = os.environ.get("ALLOW_GIFS_VIDEOS", "0") == "1" | |
| INITIAL_BACKFILL = int(os.environ.get("INITIAL_BACKFILL", "20")) | |
| DEDUP_BUFFER_SIZE = int(os.environ.get("DEDUP_BUFFER_SIZE", "800")) | |
| CLASS_WINDOW_MINUTES = int(os.environ.get("CLASS_WINDOW_MINUTES", "20")) | |
| SUPPORT_MIN_UNIQUE = int(os.environ.get("SUPPORT_MIN_UNIQUE", "2")) | |
| # DRY RUN (tidak kirim apa pun ke TARGET_CHAT) | |
| DRY_RUN = os.environ.get("DRY_RUN", "0") == "1" | |
| # Backfill buffer: abaikan pesan lebih tua dari (startup_time - buffer) | |
| BACKFILL_BUFFER_MINUTES = int(os.environ.get("BACKFILL_BUFFER_MINUTES", "3")) | |
| # === Update behavior strategy === | |
| # edit : (default) edit pesan terakhir untuk entitas saat UPDATE | |
| # reply : balas (reply_to) pesan pertama entitas itu | |
| # new : kirim pesan baru untuk setiap UPDATE | |
| UPDATE_STRATEGY = os.environ.get("UPDATE_STRATEGY", "reply").lower() | |
| UPDATE_COOLDOWN_SEC = int(os.environ.get("UPDATE_COOLDOWN_SEC", "5")) | |
| # ========= Client bootstrap ========= | |
| def build_client() -> TelegramClient: | |
| if STRING_SESSION: | |
| print(">> Using StringSession (persistent).") | |
| return TelegramClient(StringSession(STRING_SESSION), API_ID, API_HASH) | |
| print(">> Using MemorySession (login tiap run).") | |
| return TelegramClient(MemorySession(), API_ID, API_HASH) | |
| client = build_client() | |
| recent_hashes: deque[str] = deque(maxlen=DEDUP_BUFFER_SIZE) | |
| recent_content_hashes: deque[str] = deque(maxlen=DEDUP_BUFFER_SIZE) # content-only dedup | |
| # entity-based dedup (CA/$ticker) | |
| recent_entity_keys: deque[str] = deque(maxlen=DEDUP_BUFFER_SIZE) | |
| # Peta id_chat -> "core" / "support" | |
| chat_roles: Dict[int, str] = {} # diisi saat startup setelah resolve entity | |
| startup_time_utc = datetime.now(timezone.utc) | |
| # ========= Persistence (SQLite) ========= | |
| DB_PATH = os.environ.get("BOTSIGNAL_DB", "/tmp/botsignal.db") | |
| def _db(): | |
| conn = sqlite3.connect(DB_PATH) | |
| conn.execute("PRAGMA journal_mode=WAL;") | |
| return conn | |
| def _init_db(): | |
| conn = _db() | |
| conn.executescript( | |
| """ | |
| CREATE TABLE IF NOT EXISTS last_posted ( | |
| keyword TEXT PRIMARY KEY, | |
| msg_id INTEGER NOT NULL, | |
| tier TEXT NOT NULL | |
| ); | |
| CREATE TABLE IF NOT EXISTS kw_group_seen ( | |
| keyword TEXT NOT NULL, | |
| group_key TEXT NOT NULL, | |
| last_ts INTEGER NOT NULL, | |
| PRIMARY KEY (keyword, group_key) | |
| ); | |
| """ | |
| ) | |
| conn.commit() | |
| conn.close() | |
| def db_load_state(): | |
| """Load last_posted & kw_group_seen into memory on startup.""" | |
| conn = _db() | |
| last = {} | |
| for kw, mid, tier in conn.execute("SELECT keyword, msg_id, tier FROM last_posted"): | |
| last[kw] = {"msg_id": mid, "tier": tier} | |
| kw_map: defaultdict[str, dict[str, datetime]] = defaultdict(dict) | |
| for kw, gk, ts in conn.execute("SELECT keyword, group_key, last_ts FROM kw_group_seen"): | |
| kw_map[kw][gk] = datetime.fromtimestamp(ts, tz=timezone.utc) | |
| conn.close() | |
| return last, kw_map | |
| def db_save_last_posted(keyword: str, msg_id: int, tier: str): | |
| conn = _db() | |
| conn.execute( | |
| "INSERT INTO last_posted(keyword, msg_id, tier) VALUES(?,?,?) " | |
| "ON CONFLICT(keyword) DO UPDATE SET msg_id=excluded.msg_id, tier=excluded.tier", | |
| (keyword, msg_id, tier), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| def db_upsert_kw_seen(keyword: str, group_key: str, ts: datetime): | |
| conn = _db() | |
| conn.execute( | |
| "INSERT INTO kw_group_seen(keyword, group_key, last_ts) VALUES(?,?,?) " | |
| "ON CONFLICT(keyword, group_key) DO UPDATE SET last_ts=excluded.last_ts", | |
| (keyword, group_key, int(ts.timestamp())), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| def db_prune_expired(cutoff: datetime): | |
| conn = _db() | |
| conn.execute("DELETE FROM kw_group_seen WHERE last_ts < ?", (int(cutoff.timestamp()),)) | |
| conn.commit() | |
| conn.close() | |
| # ========= Utilities ========= | |
| def debug_log(reason: str, content: str = "") -> None: | |
| short = (content or "").replace("\n", " ")[:160] | |
| print(f"[DEBUG] {reason}: {short}") | |
| def normalize_for_filter(text: str) -> str: | |
| if not text: | |
| return "" | |
| s = re.sub(r"(?m)^>.*", "", text) | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| def _tokenize_words(s: str) -> List[str]: | |
| return re.findall(r"[a-z0-9\$\#]{1,64}", s.lower()) | |
| def _windows(tokens: List[str], size: int = 20): | |
| for i in range(0, len(tokens), size): | |
| yield " ".join(tokens[i : i + size]) | |
| # --- Bersihkan URL/CA untuk kepentingan SCORING relevansi --- | |
| CA_SOL_RE = re.compile(r"\b[1-9A-HJ-NP-Za-km-z]{32,48}\b") # Solana base58 (perkiraan) | |
| CA_EVM_RE = re.compile(r"\b0x[a-fA-F0-9]{40}\b") # EVM address | |
| CA_LABEL_RE = re.compile(r"\bCA\s*[:=]\s*\S+", re.IGNORECASE) # "CA: ..." potong tokennya | |
| def _strip_urls_and_mentions(s: str) -> str: | |
| s = re.sub(r"https?://\S+", "", s) | |
| s = re.sub(r"t\.me/[A-Za-z0-9_]+", "", s) | |
| s = re.sub(r"@[A-Za-z0-9_]+", "", s) | |
| return re.sub(r"\s+", " ", s).strip() | |
| def strip_contracts_for_scoring(s: str) -> str: | |
| """ | |
| Hilangkan URL/mention, alamat kontrak, dan token setelah 'CA:' | |
| agar kata 'pump' pada CA/URL (mis. pump.fun) tidak memengaruhi skor. | |
| """ | |
| s0 = _strip_urls_and_mentions(s) | |
| s1 = CA_LABEL_RE.sub(" ", s0) | |
| s2 = CA_EVM_RE.sub(" ", s1) | |
| s3 = CA_SOL_RE.sub(" ", s2) | |
| return re.sub(r"\s+", " ", s3).strip() | |
| def score_relevance(text: str, keywords: List[str]) -> float: | |
| """Skor: exact keyword + fuzzy windowed (top-3 rata-rata) agar adil untuk teks panjang.""" | |
| if not text: | |
| return 0.0 | |
| # Gunakan versi yang TIDAK mengandung URL/CA agar 'pump' di CA tidak ikut dihitung | |
| t = strip_contracts_for_scoring(text).lower() | |
| # exact hits (unik) | |
| exact_hits = 0 | |
| for kw in set(keywords): | |
| if kw in t or re.search(rf"\b{re.escape(kw)}\b", t): | |
| exact_hits += 1 | |
| exact_score = exact_hits * KEYWORD_WEIGHT | |
| # fuzzy windowed: ambil top-3 skor di antara jendela 20 token | |
| tokens = _tokenize_words(t) | |
| if not tokens: | |
| return exact_score | |
| scores = [] | |
| for w in _windows(tokens, 20): | |
| best = 0.0 | |
| for kw in keywords: | |
| sc = fuzz.partial_ratio(kw, w) / 100.0 | |
| if sc > best: | |
| best = sc | |
| scores.append(best) | |
| fuzzy_top3 = sorted(scores, reverse=True)[:3] | |
| fuzzy_score = (sum(fuzzy_top3) / max(1, len(fuzzy_top3))) * FUZZ_WEIGHT if fuzzy_top3 else 0.0 | |
| return exact_score + fuzzy_score | |
| def hash_for_dedup(text: str, msg) -> str: | |
| """Hash campuran (lama) – menahan duplikat per pesan+media.""" | |
| parts = [text or ""] | |
| if getattr(msg, "id", None) is not None: | |
| parts.append(str(msg.id)) | |
| doc = getattr(msg, "document", None) | |
| if doc and getattr(doc, "id", None) is not None: | |
| parts.append(f"doc:{doc.id}") | |
| if getattr(msg, "photo", None) is not None: | |
| ph = msg.photo | |
| ph_id = getattr(ph, "id", None) | |
| if ph_id is not None: | |
| parts.append(f"photo:{ph_id}") | |
| raw = "|".join(parts).encode("utf-8", errors="ignore") | |
| return hashlib.sha1(raw).hexdigest() | |
| def content_only_hash(text: str) -> str: | |
| """Hash berbasis isi saja (untuk lintas-grup crosspost).""" | |
| norm = _strip_urls_and_mentions(normalize_for_filter(text)) | |
| return hashlib.sha1(norm.encode("utf-8", errors="ignore")).hexdigest() | |
| # ========= Class aggregator (windowed unique groups) ========= | |
| keyword_group_last_seen: defaultdict[str, dict[str, datetime]] = defaultdict(dict) | |
| def _prune_expired(now: datetime) -> None: | |
| window = timedelta(minutes=CLASS_WINDOW_MINUTES) | |
| cutoff = now - window | |
| # in-memory prune | |
| for kw, m in list(keyword_group_last_seen.items()): | |
| for gk, ts in list(m.items()): | |
| if ts < cutoff: | |
| del m[gk] | |
| if not m: | |
| del keyword_group_last_seen[kw] | |
| # db prune | |
| db_prune_expired(cutoff) | |
| def update_and_classify(keyword: str, group_key: str, now: Optional[datetime] = None) -> Tuple[str, int]: | |
| if not now: | |
| now = datetime.now(timezone.utc) | |
| _prune_expired(now) | |
| bucket = keyword_group_last_seen[keyword] | |
| bucket[group_key] = now | |
| db_upsert_kw_seen(keyword, group_key, now) | |
| unique_groups = len(bucket) | |
| if unique_groups >= 4: | |
| return "kuat", unique_groups | |
| elif unique_groups >= 2: | |
| return "sedang", unique_groups | |
| else: | |
| return "rendah", unique_groups | |
| # ========= Sentence-level invite filter (smarter) ========= | |
| INVITE_PATTERNS = [ | |
| r"\bjoin\b", r"\bjoin (us|our|channel|group)\b", | |
| r"\bdm\b", r"\bdm (me|gw|gue|gua|saya|admin)\b", | |
| r"\bpm\b", r"\binbox\b", r"\bcontact\b", r"\bkontak\b", r"\bhubungi\b", | |
| r"\bvip\b", r"\bpremium\b", r"\bberbayar\b", r"\bpaid\b", r"\bexclusive\b", | |
| r"\bwhitelist\b", r"\bprivate( group| channel)?\b", r"\bmembership?\b", | |
| r"\bsubscribe\b", r"\blangganan\b", | |
| # kata kunci promo/iklan | |
| r"\bpromo\b", r"\bpromosi\b", r"\biklan\b", | |
| r"\badvert\b", r"\badvertise\b", r"\badvertisement\b", | |
| # tautan undangan/shortener | |
| r"(t\.me\/joinchat|t\.me\/\+|telegram\.me\/|discord\.gg\/|wa\.me\/|whatsapp\.com\/)", | |
| r"(bit\.ly|tinyurl\.com|linktr\.ee)", | |
| # UPGRADE: Aturan agresif yang memblokir semua link t.me biasa tetap dihapus | |
| # r"t\.me\/[A-Za-z0-9_]+", | |
| ] | |
| INVITE_REGEXES = [re.compile(p, re.IGNORECASE) for p in INVITE_PATTERNS] | |
| # whitelist: kalimat yang nampak "sinyal asli", jangan dihapus | |
| WHITELIST_STRONG_SIGNAL = [ | |
| r"\$[a-z0-9]{2,10}", # $TICKER | |
| r"\b(entry|entries|buy|sell)\b", | |
| r"\bsl\b", r"\btp\b", r"\btp\d\b", | |
| ] | |
| WHITELIST_REGEXES = [re.compile(p, re.IGNORECASE) for p in WHITELIST_STRONG_SIGNAL] | |
| def _is_invite_sentence(s: str) -> bool: | |
| t = s.strip() | |
| if not t: | |
| return False | |
| # Jika kalimat memuat sinyal kuat, jangan dibuang walau ada kata invite | |
| if any(r.search(t) for r in WHITELIST_REGEXES): | |
| return False | |
| # Jika ada 1+ pola ajakan, buang | |
| return any(r.search(t) for r in INVITE_REGEXES) | |
| def filter_invite_sentences(text: str) -> str: | |
| if not text: | |
| return text | |
| parts = re.split(r'(?<=[\.!\?])\s+|\n+', text, flags=re.UNICODE) | |
| kept = [p.strip() for p in parts if p and not _is_invite_sentence(p)] | |
| cleaned = "\n".join(kept).strip() | |
| cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) | |
| return cleaned | |
| # ========= Media helpers ========= | |
| def is_image_message(msg) -> bool: | |
| """ | |
| True untuk: | |
| - Photo bawaan Telegram | |
| - Image (image/*). Jika 'image/webp' (sticker), hormati SKIP_STICKERS | |
| - GIF/Video hanya jika ALLOW_GIFS_VIDEOS=True | |
| """ | |
| if getattr(msg, "photo", None): | |
| return True | |
| doc = getattr(msg, "document", None) | |
| if not doc: | |
| return False | |
| mt = (getattr(doc, "mime_type", "") or "").lower() | |
| # Skip sticker (sering berupa image/webp) | |
| if mt == "image/webp" and SKIP_STICKERS: | |
| return False | |
| if mt.startswith("image/"): | |
| # GIF diizinkan hanya jika ALLOW_GIFS_VIDEOS=True | |
| if mt == "image/gif": | |
| return ALLOW_GIFS_VIDEOS | |
| return True | |
| # Video diizinkan hanya jika ALLOW_GIFS_VIDEOS=True | |
| if mt.startswith("video/"): | |
| return ALLOW_GIFS_VIDEOS | |
| return False | |
| def media_too_big(msg) -> bool: | |
| doc = getattr(msg, "document", None) | |
| if not doc: | |
| return False | |
| size = getattr(doc, "size", None) | |
| if size is None: | |
| return False | |
| return (size / (1024 * 1024)) > MAX_MEDIA_MB | |
| # ========= Post-on-threshold with EDIT/REPLY/NEW (persisted) ========= | |
| TIER_ORDER = {"rendah": 0, "sedang": 1, "kuat": 2} | |
| last_posted: Dict[str, Dict[str, object]] = {} # keyword -> {"msg_id": int, "tier": str} | |
| # simpan body & waktu update terakhir per entitas | |
| last_body: Dict[str, str] = {} | |
| last_update_ts: Dict[str, float] = {} | |
| async def _send_initial(msg, text: str) -> int: | |
| if DRY_RUN: | |
| print("[DRY_RUN] send_initial:", text[:140]) | |
| return -1 | |
| # kirim media bila ada & allowed | |
| if INCLUDE_MEDIA and is_image_message(msg) and not media_too_big(msg): | |
| try: | |
| if getattr(msg, "photo", None): | |
| m = await client.send_file( | |
| TARGET_CHAT, msg.photo, caption=text, caption_entities=None, force_document=False | |
| ) | |
| return m.id | |
| doc = getattr(msg, "document", None) | |
| if doc: | |
| data = await client.download_media(msg, file=bytes) | |
| if data: | |
| bio = io.BytesIO(data) | |
| ext = ".jpg" | |
| mt = (getattr(doc, "mime_type", "") or "").lower() | |
| if mt: | |
| ext_guess = guess_extension(mt) or ".jpg" | |
| if ext_guess == ".jpe": | |
| ext_guess = ".jpg" | |
| ext = ext_guess | |
| bio.name = f"media{ext}" | |
| m = await client.send_file( | |
| TARGET_CHAT, bio, caption=text, caption_entities=None, force_document=False | |
| ) | |
| return m.id | |
| except FloodWaitError as e: | |
| await asyncio.sleep(e.seconds + 1) | |
| return await _send_initial(msg, text) | |
| except Exception as e: | |
| debug_log("Gagal kirim media awal, fallback text", str(e)) | |
| try: | |
| m = await client.send_message(TARGET_CHAT, text, link_preview=True) | |
| return m.id | |
| except FloodWaitError as e: | |
| await asyncio.sleep(e.seconds + 1) | |
| return await _send_initial(msg, text) | |
| async def post_or_update(keyword: str, body: str, new_tier: str, src_msg, *, update_like: bool = False) -> None: | |
| prefix = f"[{new_tier.upper()}] " | |
| text = prefix + body | |
| prev = last_posted.get(keyword) | |
| now_ts = datetime.now().timestamp() | |
| # kirim pertama kali | |
| if not prev: | |
| msg_id = await _send_initial(src_msg, text) | |
| last_posted[keyword] = {"msg_id": msg_id, "tier": new_tier} | |
| last_body[keyword] = body | |
| last_update_ts[keyword] = now_ts | |
| if msg_id != -1: | |
| db_save_last_posted(keyword, msg_id, new_tier) | |
| return | |
| # Jika tier naik, selalu edit | |
| if TIER_ORDER.get(new_tier, 0) > TIER_ORDER.get(prev["tier"], 0): | |
| try: | |
| await client.edit_message(TARGET_CHAT, prev["msg_id"], text) | |
| prev["tier"] = new_tier | |
| last_body[keyword] = body | |
| last_update_ts[keyword] = now_ts | |
| if prev["msg_id"] != -1: | |
| db_save_last_posted(keyword, prev["msg_id"], new_tier) | |
| except FloodWaitError as e: | |
| await asyncio.sleep(e.seconds + 1) | |
| await post_or_update(keyword, body, new_tier, src_msg, update_like=update_like) | |
| except Exception as e: | |
| debug_log("Edit gagal (tier naik), fallback kirim baru", str(e)) | |
| msg_id = await _send_initial(src_msg, text) | |
| last_posted[keyword] = {"msg_id": msg_id, "tier": new_tier} | |
| last_body[keyword] = body | |
| last_update_ts[keyword] = now_ts | |
| if msg_id != -1: | |
| db_save_last_posted(keyword, msg_id, new_tier) | |
| return | |
| # Tier sama: hanya proses jika ini update_like | |
| if not update_like: | |
| return | |
| # Hindari spam: kalau body sama atau masih cooldown, no-op | |
| if body.strip() == last_body.get(keyword, "").strip() and (now_ts - last_update_ts.get(keyword, 0) < UPDATE_COOLDOWN_SEC): | |
| return | |
| try: | |
| if UPDATE_STRATEGY == "edit": | |
| await client.edit_message(TARGET_CHAT, prev["msg_id"], text) | |
| last_body[keyword] = body | |
| last_update_ts[keyword] = now_ts | |
| if prev["msg_id"] != -1: | |
| db_save_last_posted(keyword, prev["msg_id"], new_tier) | |
| elif UPDATE_STRATEGY == "reply": | |
| # kirim balasan ke pesan awal entitas | |
| await client.send_message(TARGET_CHAT, text, reply_to=prev["msg_id"], link_preview=True) | |
| last_body[keyword] = body | |
| last_update_ts[keyword] = now_ts | |
| elif UPDATE_STRATEGY == "new": | |
| await client.send_message(TARGET_CHAT, text, link_preview=True) | |
| last_body[keyword] = body | |
| last_update_ts[keyword] = now_ts | |
| else: | |
| # fallback ke edit jika value salah | |
| await client.edit_message(TARGET_CHAT, prev["msg_id"], text) | |
| last_body[keyword] = body | |
| last_update_ts[keyword] = now_ts | |
| if prev["msg_id"] != -1: | |
| db_save_last_posted(keyword, prev["msg_id"], new_tier) | |
| except FloodWaitError as e: | |
| await asyncio.sleep(e.seconds + 1) | |
| await post_or_update(keyword, body, new_tier, src_msg, update_like=update_like) | |
| except Exception as e: | |
| debug_log("Update gagal (strategy), diabaikan", str(e)) | |
| # ========= Core actions (fallback kept) ========= | |
| async def send_as_is(msg, text_override: Optional[str] = None) -> None: | |
| if DRY_RUN: | |
| print("[DRY_RUN] send_as_is:", (text_override or msg.message or "")[:140]) | |
| return | |
| if text_override is not None: | |
| orig_text = text_override | |
| entities = None | |
| else: | |
| orig_text = msg.message or (getattr(msg, "raw_text", None) or "") | |
| entities = getattr(msg, "entities", None) | |
| if INCLUDE_MEDIA and is_image_message(msg) and not media_too_big(msg): | |
| try: | |
| if getattr(msg, "photo", None): | |
| await client.send_file( | |
| TARGET_CHAT, msg.photo, caption=orig_text, caption_entities=entities, force_document=False | |
| ) | |
| return | |
| doc = getattr(msg, "document", None) | |
| if doc: | |
| data = await client.download_media(msg, file=bytes) | |
| if data: | |
| bio = io.BytesIO(data) | |
| ext = ".jpg" | |
| mt = (getattr(doc, "mime_type", "") or "").lower() | |
| if mt: | |
| ext_guess = guess_extension(mt) or ".jpg" | |
| if ext_guess == ".jpe": | |
| ext_guess = ".jpg" | |
| ext = ext_guess | |
| bio.name = f"media{ext}" | |
| await client.send_file( | |
| TARGET_CHAT, bio, caption=orig_text, caption_entities=entities, force_document=False | |
| ) | |
| return | |
| except FloodWaitError as e: | |
| await asyncio.sleep(e.seconds + 1) | |
| except Exception as e: | |
| debug_log("Gagal kirim sebagai media, fallback ke text", str(e)) | |
| try: | |
| await client.send_message(TARGET_CHAT, orig_text, formatting_entities=entities, link_preview=True) | |
| except FloodWaitError as e: | |
| await asyncio.sleep(e.seconds + 1) | |
| await client.send_message(TARGET_CHAT, orig_text, formatting_entities=entities, link_preview=True) | |
| # ========= Keyword extraction ($ticker-aware) ========= | |
| TICKER_CLEAN_RE = re.compile(r"\$[A-Za-z0-9]{2,12}") | |
| TICKER_NOISY_RE = re.compile(r"\$[A-Za-z0-9](?:[^A-Za-z0-9]+[A-Za-z0-9]){1,11}") | |
| def _extract_tickers(text_norm: str) -> List[str]: | |
| """ | |
| Ambil $TICKER dengan dua cara: | |
| - Bersih: $ABC, $JBCOIN | |
| - Noisy: $J*BCOIN -> dinormalisasi jadi $JBCOIN untuk *keyword* saja. | |
| (Teks asli tetap dikirim apa adanya.) | |
| """ | |
| found = [] | |
| # bersih | |
| for m in TICKER_CLEAN_RE.finditer(text_norm): | |
| found.append(m.group(0).lower()) | |
| # noisy -> normalisasi internal | |
| for m in TICKER_NOISY_RE.finditer(text_norm): | |
| raw = m.group(0) | |
| norm = "$" + re.sub(r"[^A-Za-z0-9]+", "", raw[1:]) | |
| if 3 <= len(norm) <= 13: # termasuk '$' | |
| found.append(norm.lower()) | |
| # unik & pertahankan urutan | |
| seen = set() | |
| uniq = [] | |
| for x in found: | |
| if x not in seen: | |
| uniq.append(x) | |
| seen.add(x) | |
| return uniq | |
| def _extract_all_keywords(text_norm: str) -> List[str]: | |
| """ | |
| Deteksi SEMUA keyword dari THEME_KEYWORDS + $ticker. | |
| Tidak menghapus simbol '$' (sesuai permintaan). | |
| """ | |
| # toleran untuk pencarian keyword tema (seperti semula) | |
| t = re.sub(r"\$([a-z0-9]+)", r"\1", text_norm, flags=re.I) | |
| found = [] | |
| for kw in THEME_KEYWORDS: | |
| if re.search(rf"(^|\W){re.escape(kw)}(\W|$)", t, flags=re.I): | |
| found.append(kw.lower()) | |
| # gabungkan hasil $ticker | |
| tickers = _extract_tickers(text_norm) | |
| found.extend(tickers) | |
| # unik dengan urutan muncul pertama | |
| uniq = [] | |
| seen = set() | |
| for kw in found: | |
| if kw not in seen: | |
| uniq.append(kw) | |
| seen.add(kw) | |
| return uniq | |
| def _choose_dominant_keyword(text_norm: str, kws: List[str]) -> Optional[str]: | |
| if not kws: | |
| return None | |
| # pilih berdasarkan frekuensi kemunculan + preferensi $ticker + posisi paling awal | |
| score = {} | |
| for kw in kws: | |
| cnt = len(re.findall(rf"(^|\W){re.escape(kw)}(\W|$)", text_norm, flags=re.I)) | |
| first = re.search(rf"(^|\W){re.escape(kw)}(\W|$)", text_norm, flags=re.I) | |
| first_idx = first.start() if first else 1_000_000 | |
| bonus = 1 if kw.startswith("$") else 0 # prefer $ticker saat imbang | |
| score[kw] = (cnt, bonus, -first_idx) | |
| chosen = sorted(score.items(), key=lambda x: (x[1][0], x[1][1], x[1][2]), reverse=True)[0][0] | |
| return chosen | |
| def _role_of(chat_id: int) -> str: | |
| # DEFAULT KE SUPPORT agar tidak salah meloloskan chat yang tidak tertag | |
| return chat_roles.get(chat_id, "support") | |
| def _unique_counts_by_role(keyword: str) -> Tuple[int, int]: | |
| """ | |
| Hitung jumlah grup unik yang menyebut 'keyword' dalam window aktif, | |
| dipisah CORE vs SUPPORT. | |
| """ | |
| bucket = keyword_group_last_seen.get(keyword, {}) | |
| core_ids, sup_ids = set(), set() | |
| for gk in bucket.keys(): | |
| role = chat_roles.get(int(gk), "support") # default support untuk aman | |
| (core_ids if role == "core" else sup_ids).add(gk) | |
| return len(core_ids), len(sup_ids) | |
| # ========= Entity-key extraction (CA > $ticker) ========= | |
| def extract_entity_key(text: str) -> Optional[str]: | |
| """Kembalikan kunci entitas kanonik untuk penentuan 'kesamaan': | |
| - Jika ada CA -> 'ca:evm:<0x...>' atau 'ca:sol:<base58>' | |
| - Else jika ada $ticker -> 'ticker:<lowercase>' | |
| - Else None | |
| """ | |
| t = normalize_for_filter(text) | |
| # Prefer CA lebih dulu | |
| m = CA_EVM_RE.search(t) or CA_SOL_RE.search(t) | |
| if m: | |
| addr = m.group(0) | |
| kind = "evm" if addr.lower().startswith("0x") else "sol" | |
| return f"ca:{kind}:{addr.lower()}" | |
| # Fall back ke $ticker (pakai deteksi yang sudah ada) | |
| tickers = _extract_tickers(t.lower()) | |
| if tickers: | |
| return f"ticker:{tickers[0][1:].lower()}" | |
| return None | |
| async def process_message(msg, source_chat_id: int) -> None: | |
| """ | |
| Filter, content-dedup, relevansi, multi-kw -> pilih dominan, | |
| agregasi tier, gating support (CORE-anchored), filter ajakan, dan POST/EDIT/REPLY/NEW. | |
| """ | |
| orig_text = msg.message or (getattr(msg, "raw_text", None) or "") | |
| text_norm = normalize_for_filter(orig_text).lower() | |
| # Pengecualian eksplisit | |
| for phrase in EXCLUDE_PHRASES: | |
| if phrase.lower() in text_norm: | |
| debug_log("Dilewati karena EXCLUDE_PHRASES", orig_text) | |
| return | |
| # === Entity duplicate handling + UPDATE detection === | |
| entity_key = extract_entity_key(orig_text) | |
| duplicate_entity = bool(entity_key and entity_key in recent_entity_keys) | |
| UPDATE_HINTS = [ | |
| r"\bupdate\b", r"\bupd\b", r"\bmcap\b", r"\bmarket\s*cap\b", | |
| r"\bhit\b", r"\btp\d?\b", r"\btarget\b", r"\bath\b", r"\bnew\s*high\b", | |
| r"\bliq(uidity)?\b", r"\bvolume\b", r"\bnow\b", r"⇒|->|→", | |
| r"\bfrom\b.*\bto\b", r"\b\d+(\.\d+)?\s*[km]?\b", | |
| ] | |
| UPDATE_RE = re.compile("|".join(UPDATE_HINTS), re.IGNORECASE) | |
| update_like = bool(UPDATE_RE.search(orig_text or "")) | |
| # Jika duplikat entitas dan bukan update -> tahan | |
| if duplicate_entity and not update_like: | |
| debug_log("Entity-duplicate (non-update), dilewati", orig_text) | |
| return | |
| # Content-only dedup (lintas grup) | |
| ch = content_only_hash(orig_text) | |
| if ch in recent_content_hashes: | |
| debug_log("Content-duplicate (global), dilewati", orig_text) | |
| return | |
| recent_content_hashes.append(ch) | |
| # Dedup lama (per pesan/media) | |
| h = hash_for_dedup(text_norm, msg) | |
| if h in recent_hashes: | |
| debug_log("Duplikat (pesan/media), dilewati", orig_text) | |
| return | |
| recent_hashes.append(h) | |
| # Relevansi (pakai teks yang CA/URL-nya dinetralkan) | |
| score = score_relevance(text_norm, THEME_KEYWORDS) | |
| # >>> BYPASS RELEVANSI JIKA ADA CA (agar "CA-only" tidak diblok) | |
| allow_by_ca = bool(entity_key and entity_key.startswith("ca:")) | |
| debug_log(f"Skor relevansi={score:.2f} | allow_by_ca={allow_by_ca}", orig_text) | |
| if score < RELEVANCE_THRESHOLD and not allow_by_ca: | |
| return | |
| role = _role_of(source_chat_id) # 'core' / 'support' | |
| # Multi-kw -> pilih satu dominan untuk agregasi (fallback jika tak ada entity) | |
| all_kws = _extract_all_keywords(text_norm) | |
| main_kw = _choose_dominant_keyword(text_norm, all_kws) | |
| # topic key = entity_key (CA/$ticker) jika ada, else main_kw | |
| topic_key = entity_key or main_kw | |
| if not topic_key: | |
| debug_log("Tak ada keyword/entitas cocok, dilewati", orig_text) | |
| return | |
| # Agregasi & kelas (berdasar topic_key) | |
| group_key = str(source_chat_id) | |
| now = datetime.now(timezone.utc) | |
| class_label, unique_groups = update_and_classify(topic_key, group_key, now) | |
| # Gating SUPPORT (CORE-anchored) | |
| if role != "core": | |
| core_u, sup_u = _unique_counts_by_role(topic_key) | |
| if core_u >= 1: | |
| pass | |
| elif sup_u < SUPPORT_MIN_UNIQUE: | |
| debug_log( | |
| f"Support ditahan (core_u={core_u}, sup_u={sup_u} < {SUPPORT_MIN_UNIQUE})", | |
| orig_text, | |
| ) | |
| return | |
| # Filter kalimat ajakan (whitelist-aware) | |
| cleaned_body = filter_invite_sentences(orig_text) | |
| if not cleaned_body.strip(): | |
| debug_log("Semua kalimat terfilter (kosong), dilewati", orig_text) | |
| return | |
| # Backfill safety: saat startup, hindari pesan yang terlalu lama | |
| cutoff = startup_time_utc - timedelta( | |
| minutes=CLASS_WINDOW_MINUTES + BACKFILL_BUFFER_MINUTES | |
| ) | |
| if getattr(msg, "date", None): | |
| msg_dt = msg.date | |
| if isinstance(msg_dt, datetime) and msg_dt.replace(tzinfo=timezone.utc) < cutoff: | |
| debug_log("Lama (lewat cutoff backfill safety), dilewati", orig_text) | |
| return | |
| # simpan entity-key untuk throttle non-update; untuk update tetap simpan agar tak spam | |
| if entity_key: | |
| recent_entity_keys.append(entity_key) | |
| await post_or_update(topic_key, cleaned_body, class_label, msg, update_like=update_like) | |
| debug_log( | |
| f"Posted/Edited (role={role}, unique_groups={unique_groups}, key={topic_key}, tier={class_label}, update_like={update_like})", | |
| orig_text, | |
| ) | |
| async def backfill_history(entity, limit: int) -> None: | |
| if limit <= 0: | |
| return | |
| print(f"[Backfill] Tarik {limit} pesan terakhir dari {getattr(entity, 'title', 'Unknown')} ...") | |
| async for m in client.iter_messages(entity, limit=limit): | |
| try: | |
| chat_id = getattr(m.peer_id, 'channel_id', None) or \ | |
| getattr(m.peer_id, 'chat_id', None) or \ | |
| getattr(m.peer_id, 'user_id', None) | |
| if chat_id: | |
| await process_message(m, source_chat_id=abs(chat_id)) | |
| except Exception as e: | |
| debug_log(f"Error saat memproses backfill untuk pesan ID {m.id}", str(e)) | |
| # ========= Event handlers ========= | |
| async def on_new_message(event): | |
| try: | |
| await process_message(event.message, source_chat_id=abs(event.chat_id)) | |
| except Exception as e: | |
| print(f"Process error di chat {event.chat_id}: {e}") | |
| # ========= Entry points ========= | |
| async def _resolve_and_tag_chats(raw_list, role_label: str) -> list: | |
| resolved = [] | |
| for src in raw_list: | |
| try: | |
| ent = await client.get_entity(src) | |
| resolved.append(ent) | |
| chat_roles[abs(int(ent.id))] = role_label | |
| except Exception as e: | |
| print(f"Gagal resolve sumber {src}: {e}") | |
| return resolved | |
| async def start_bot_background() -> None: | |
| await client.start() | |
| _init_db() | |
| # Load persisted state | |
| global last_posted, keyword_group_last_seen | |
| last_posted, keyword_group_last_seen = db_load_state() | |
| resolved_core = await _resolve_and_tag_chats(CORE_CHATS, "core") | |
| resolved_support = await _resolve_and_tag_chats(SUPPORT_CHATS, "support") | |
| resolved_sources = resolved_core + resolved_support | |
| for ent in resolved_sources: | |
| try: | |
| await backfill_history(ent, INITIAL_BACKFILL) | |
| except Exception as e: | |
| print(f"Backfill gagal untuk {getattr(ent, 'title', 'Unknown')}: {e}") | |
| print("Kurator berjalan (background task). Menunggu pesan baru...") | |
| asyncio.create_task(client.run_until_disconnected()) | |
| async def app_main() -> None: | |
| await client.start() | |
| _init_db() | |
| global last_posted, keyword_group_last_seen | |
| last_posted, keyword_group_last_seen = db_load_state() | |
| resolved_core = await _resolve_and_tag_chats(CORE_CHATS, "core") | |
| resolved_support = await _resolve_and_tag_chats(SUPPORT_CHATS, "support") | |
| resolved_sources = resolved_core + resolved_support | |
| for ent in resolved_sources: | |
| await backfill_history(ent, INITIAL_BACKFILL) | |
| print("Kurator berjalan. Menunggu pesan baru... (Stop dengan interrupt).") | |
| await client.run_until_disconnected() | |
| if __name__ == "__main__": | |
| asyncio.run(app_main()) |