import asyncio import json import os import traceback import time from datetime import datetime from aiohttp import web from TikTokLive import TikTokLiveClient from TikTokLive.events import ( ConnectEvent, DisconnectEvent, CommentEvent, GiftEvent, FollowEvent, JoinEvent, LikeEvent, RoomUserSeqEvent, ControlEvent ) # ========================================== # 1. Global State # ========================================== connected_clients = set() app_state = { "username": "", "connected": False, "connecting": False, "last_error": "", "comments": [], "followers": [], "gifts": {"count": 0, "diamonds": 0}, "likes": 0, "viewers": 0, "gift_alerts": [], } tiktok_client = None tiktok_task = None last_connect_attempt = 0 CONNECT_COOLDOWN = 30 MAX_COMMENTS = 20 MAX_FOLLOWERS = 5 MAX_GIFT_ALERTS = 2 # Only 2 cards max def log_print(msg): ts = datetime.now().strftime("%H:%M:%S") print(f"[{ts}] {msg}", flush=True) # ========================================== # 2. WebSocket Broadcast # ========================================== async def broadcast(msg): msg["time"] = datetime.now().strftime("%H:%M:%S") dead = set() for ws in connected_clients: try: await ws.send_json(msg) except Exception: dead.add(ws) connected_clients.difference_update(dead) # ========================================== # 3. Safe User Extraction # ========================================== def safe_extract_user(event): """Safely extract user info, avoiding nickName bug""" try: user = event.user if user: return user except Exception as e: log_print(f"USER_FALLBACK: {str(e)[:60]}") try: user_info = getattr(event, 'user_info', None) or getattr(event, 'userInfo', None) if user_info: class SimpleUser: pass su = SimpleUser() su.unique_id = getattr(user_info, 'uniqueId', None) or getattr(user_info, 'unique_id', None) or getattr(user_info, 'userId', None) or 'unknown' su.nickname = getattr(user_info, 'nickname', None) or getattr(user_info, 'nickName', None) or getattr(user_info, 'nick_name', None) or su.unique_id # Avatar - try ALL possible fields avatar = None for attr in ['avatarThumb', 'avatar_thumb', 'avatar', 'avatarMedium', 'avatar_medium', 'avatarLarger', 'avatar_larger']: avatar = getattr(user_info, attr, None) if avatar: break if avatar and hasattr(avatar, 'url'): su.avatar = avatar.url elif avatar and isinstance(avatar, str): su.avatar = avatar else: su.avatar = None return su except Exception as e2: log_print(f"USER_EXTRACT_FAILED: {str(e2)[:60]}") class DummyUser: unique_id = 'unknown' nickname = 'مستخدم' avatar = None return DummyUser() def get_user_avatar(user): """Extract real avatar URL from user object""" # Try all possible avatar fields for attr in ['avatar', 'avatar_thumb', 'avatarThumb', 'avatarMedium', 'avatar_medium', 'avatarLarger', 'avatar_larger']: val = getattr(user, attr, None) if val: if isinstance(val, str) and val.startswith('http'): return val if hasattr(val, 'url'): url = val.url if url and url.startswith('http'): return url if hasattr(val, 'urls') and val.urls: url = val.urls[0] if url and url.startswith('http'): return url # Fallback to generated uid = getattr(user, 'unique_id', None) or getattr(user, 'uniqueId', None) or 'user' return f"https://api.dicebear.com/7.x/avataaars/svg?seed={uid}" def get_user_name(user): return getattr(user, 'nickname', None) or getattr(user, 'nickName', None) or getattr(user, 'nick_name', None) or getattr(user, 'unique_id', None) or getattr(user, 'uniqueId', None) or 'مستخدم' # ========================================== # 4. Gift Icon URL Extractor # ========================================== def get_gift_icon_url(gift): """Extract real gift icon URL from TikTok CDN""" # Try icon field icon = getattr(gift, 'icon', None) if icon: if hasattr(icon, 'url_list') and icon.url_list: return icon.url_list[0] if hasattr(icon, 'url') and icon.url: return icon.url if hasattr(icon, 'urls') and icon.urls: return icon.urls[0] # Try image field image = getattr(gift, 'image', None) if image: if hasattr(image, 'url_list') and image.url_list: return image.url_list[0] if hasattr(image, 'url') and image.url: return image.url # Try gift_image field gift_image = getattr(gift, 'gift_image', None) or getattr(gift, 'giftImage', None) if gift_image: if hasattr(gift_image, 'url_list') and gift_image.url_list: return gift_image.url_list[0] if hasattr(gift_image, 'url') and gift_image.url: return gift_image.url return "" # ========================================== # 5. TikTok Handlers # ========================================== async def start_tiktok_stream(username): global tiktok_client, last_connect_attempt if not username.startswith('@'): username = '@' + username now = time.time() elapsed = now - last_connect_attempt if elapsed < CONNECT_COOLDOWN: wait = CONNECT_COOLDOWN - elapsed await broadcast({"type": "toast", "msg": f"⏳ انتظر {wait:.0f} ثانية", "color": "#f59e0b"}) await asyncio.sleep(wait) last_connect_attempt = time.time() app_state["connecting"] = True app_state["last_error"] = "" await broadcast({"type": "connecting", "msg": "جاري الاتصال بـ TikTok..."}) log_print(f"Connecting to TikTok: {username}") try: tiktok_client = TikTokLiveClient( unique_id=username, web_kwargs={}, ws_kwargs={"ping_interval": 20, "ping_timeout": 10} ) except Exception as e: app_state["connecting"] = False app_state["last_error"] = str(e) await broadcast({"type": "status", "connected": False}) await broadcast({"type": "toast", "msg": f"❌ خطأ: {str(e)[:100]}", "color": "#ef4444"}) return @tiktok_client.on(ConnectEvent) async def on_connect(e): app_state["connected"] = True app_state["connecting"] = False app_state["username"] = username log_print(f"CONNECTED: {username}") await broadcast({"type": "status", "connected": True, "username": username}) await broadcast({"type": "toast", "msg": f"✅ متصل ببث: {username}", "color": "#10b981"}) @tiktok_client.on(DisconnectEvent) async def on_disconnect(e): app_state["connected"] = False app_state["connecting"] = False await broadcast({"type": "status", "connected": False}) await broadcast({"type": "toast", "msg": "❌ انقطع الاتصال", "color": "#ef4444"}) @tiktok_client.on(CommentEvent) async def on_comment(e): try: user = safe_extract_user(e) comment_data = { "type": "comment", "name": get_user_name(user), "avatar": get_user_avatar(user), "text": e.comment, "id": getattr(user, 'unique_id', None) or getattr(user, 'uniqueId', None) or 'unknown' } app_state["comments"].insert(0, comment_data) if len(app_state["comments"]) > MAX_COMMENTS: app_state["comments"] = app_state["comments"][:MAX_COMMENTS] await broadcast({"type": "comment", "data": comment_data, "total": len(app_state["comments"])}) except Exception as ex: log_print(f"COMMENT_ERROR: {str(ex)[:80]}") @tiktok_client.on(GiftEvent) async def on_gift(e): try: gift = e.gift user = safe_extract_user(e) diamonds = getattr(gift, 'diamond_count', 0) or getattr(gift, 'diamondCount', 0) or 0 repeat_count = getattr(gift, 'repeat_count', 1) or 1 gift_name = getattr(gift, 'name', None) or getattr(gift, 'giftName', None) or 'هدية' if hasattr(gift, 'repeat_end') and getattr(gift, 'repeat_end', 1): diamonds *= repeat_count icon_url = get_gift_icon_url(gift) app_state["gifts"]["count"] += 1 app_state["gifts"]["diamonds"] += int(diamonds) # Check if same user sending same gift - merge into existing alert merged = False for alert in app_state["gift_alerts"]: if alert["user"] == get_user_name(user) and alert["gift_name"] == gift_name: alert["repeat"] += repeat_count alert["diamonds"] += int(diamonds) alert["updated"] = time.time() merged = True break if not merged: alert = { "user": get_user_name(user), "gift_name": gift_name, "gift_icon": icon_url, "diamonds": int(diamonds), "repeat": repeat_count, "id": f"gift_{int(time.time()*1000)}", "updated": time.time() } app_state["gift_alerts"].insert(0, alert) if len(app_state["gift_alerts"]) > MAX_GIFT_ALERTS: app_state["gift_alerts"] = app_state["gift_alerts"][:MAX_GIFT_ALERTS] await broadcast({ "type": "gift", "name": gift_name, "icon": icon_url, "diamonds": int(diamonds), "repeat": repeat_count, "total_diamonds": app_state["gifts"]["diamonds"], "total_count": app_state["gifts"]["count"], "user": get_user_name(user), "alerts": app_state["gift_alerts"], "merged": merged }) except Exception as ex: log_print(f"GIFT_ERROR: {str(ex)[:80]}") @tiktok_client.on(FollowEvent) async def on_follow(e): try: user = safe_extract_user(e) follower_data = { "name": get_user_name(user), "avatar": get_user_avatar(user), "id": getattr(user, 'unique_id', None) or getattr(user, 'uniqueId', None) or 'unknown' } app_state["followers"].insert(0, follower_data) if len(app_state["followers"]) > MAX_FOLLOWERS: app_state["followers"] = app_state["followers"][:MAX_FOLLOWERS] await broadcast({"type": "follower", "data": follower_data, "list": app_state["followers"]}) except Exception as ex: log_print(f"FOLLOW_ERROR: {str(ex)[:80]}") @tiktok_client.on(JoinEvent) async def on_join(e): pass # Use RoomUserSeqEvent for accurate count @tiktok_client.on(LikeEvent) async def on_like(e): try: # LikeEvent gives us the increment, not total count = getattr(e, 'like_count', 1) app_state["likes"] += int(count) await broadcast({"type": "like", "count": app_state["likes"]}) except Exception as ex: log_print(f"LIKE_ERROR: {str(ex)[:80]}") @tiktok_client.on(RoomUserSeqEvent) async def on_room_user_seq(e): """Accurate viewer count from TikTok""" try: viewer_count = getattr(e, 'viewer_count', None) or getattr(e, 'viewerCount', None) if viewer_count: app_state["viewers"] = int(viewer_count) await broadcast({"type": "viewers", "count": app_state["viewers"]}) # Some versions also send total likes in this event total_likes = getattr(e, 'total_likes', None) or getattr(e, 'totalLikes', None) if total_likes: app_state["likes"] = int(total_likes) await broadcast({"type": "like", "count": app_state["likes"]}) except Exception as ex: log_print(f"ROOM_SEQ_ERROR: {str(ex)[:80]}") @tiktok_client.on(ControlEvent) async def on_control(e): try: action = getattr(e, 'action', None) if action == 3: await broadcast({"type": "toast", "msg": "🏁 انتهى البث", "color": "#f59e0b"}) await broadcast({"type": "status", "connected": False}) except Exception as ex: pass try: log_print(f"STARTING_CLIENT: {username}") await tiktok_client.start() except Exception as e: app_state["connected"] = False app_state["connecting"] = False err_str = str(e) log_print(f"CONNECTION_ERROR: {err_str}") if "RATE_LIMIT" in err_str or "rate_limit" in err_str or "Too many" in err_str: app_state["last_error"] = "RATE_LIMIT" await broadcast({"type": "status", "connected": False, "error": "rate_limit"}) await broadcast({"type": "toast", "msg": "⚠️ تقييد TikTok: انتظر 30 ثانية", "color": "#f59e0b"}) else: app_state["last_error"] = err_str await broadcast({"type": "status", "connected": False}) await broadcast({"type": "toast", "msg": f"⚠️ خطأ: {err_str[:120]}", "color": "#f59e0b"}) # ========================================== # 6. Web Server # ========================================== async def websocket_handler(request): ws = web.WebSocketResponse() await ws.prepare(request) connected_clients.add(ws) await ws.send_json({"type": "init", "state": app_state}) try: async for msg in ws: if msg.type == web.WSMsgType.TEXT: data = json.loads(msg.data) global tiktok_task, tiktok_client if data["action"] == "connect": if tiktok_task and not tiktok_task.done(): tiktok_task.cancel() if tiktok_client: try: await tiktok_client.disconnect() except: pass app_state["comments"] = [] app_state["followers"] = [] app_state["gifts"] = {"count": 0, "diamonds": 0} app_state["likes"] = 0 app_state["viewers"] = 0 app_state["gift_alerts"] = [] app_state["last_error"] = "" tiktok_task = asyncio.create_task(start_tiktok_stream(data["username"])) elif data["action"] == "disconnect": if tiktok_task and not tiktok_task.done(): tiktok_task.cancel() if tiktok_client: try: await tiktok_client.disconnect() except: pass app_state["connected"] = False app_state["connecting"] = False await broadcast({"type": "status", "connected": False}) except Exception as e: log_print(f"WS_ERROR: {str(e)[:80]}") finally: connected_clients.discard(ws) return ws async def html_handler(request): return web.Response(text=HTML_CONTENT, content_type='text/html') # ========================================== # 7. Mobile UI - Professional Design # ========================================== HTML_CONTENT = """ TikTok Live Dashboard

جاري الاتصال...

قد يستغرق 30 ثانية

غير متصل
👁 0 مشاهد

TikTok Live

لوحة التحكم التفاعلية

0
الهدايا
0
الماس 💎
0
إعجابات ❤️

آخر المتابعين

0
لا يوجد متابعين جدد

التعليقات المباشرة

0
انتظر الاتصال بالبث...
🔧 Debug
""" async def init_app(): app = web.Application() app.router.add_get('/', html_handler) app.router.add_get('/ws', websocket_handler) return app if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) log_print(f"\n🚀 TikTok Live Dashboard running on port {port}") log_print(f"🌍 Open: http://0.0.0.0:{port}\n") web.run_app(init_app(), host='0.0.0.0', port=port)