import os import json import threading import logging import requests import gradio as gr from datetime import datetime # Telegram imports from telegram import Update from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes # ---------- Configuration ---------- API_URL = "https://api.apilayer.com/advanced_scraper/page" # Replace with actual endpoint API_KEY = os.environ.get("SCRAPER_API_KEY") TELEGRAM_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN") # Platform presets (same as before) PLATFORMS = { "Suno": { "render": True, "selector": "audio source[src]", "wait_for_navigation": False, "headers": {}, }, "Twitter": { "render": True, "selector": "article[data-testid='tweet']", "wait_for_navigation": False, "headers": {}, }, "Facebook": { "render": True, "selector": "div[data-pagelet='FeedUnit_0']", "wait_for_navigation": False, "headers": {}, }, "Instagram": { "render": True, "selector": "article", "wait_for_navigation": False, "headers": {}, }, "TikTok": { "render": True, "selector": "video source", "wait_for_navigation": False, "headers": {}, }, "YouTube": { "render": True, "selector": "meta[property='og:title']", "wait_for_navigation": False, "headers": {}, }, "Custom": { "render": False, "selector": "", "wait_for_navigation": False, "headers": {}, } } # ---------- Global log for Telegram activity ---------- telegram_log = [] def add_telegram_log(message): global telegram_log timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") telegram_log.insert(0, f"[{timestamp}] {message}") telegram_log = telegram_log[:20] # keep last 20 # ---------- Scraper API call (same as before, but now reusable) ---------- def call_scraper_api(url, platform, custom_selector=None, render=None, wait=None, custom_headers=None): """ Make request to APILayer Scraper API with given parameters. Returns JSON response or error string. """ if not API_KEY: return "❌ API key not set. Please add SCRAPER_API_KEY as a secret." preset = PLATFORMS.get(platform, PLATFORMS["Custom"]) selector = custom_selector if custom_selector is not None else preset["selector"] render_val = render if render is not None else preset["render"] wait_val = wait if wait is not None else preset["wait_for_navigation"] headers = { "Authorization": f"Bearer {API_KEY}", "Accept": "application/json" } if custom_headers: for k, v in custom_headers.items(): headers[f"X-{k}"] = v params = { "url": url, "render": "true" if render_val else "false", "selector": selector, "wait_for_navigation": "true" if wait_val else "false", "country": "", "timeout": 30 } try: response = requests.get(API_URL, headers=headers, params=params, timeout=60) if response.status_code == 200: return response.json() else: return f"❌ API error {response.status_code}: {response.text}" except Exception as e: return f"❌ Request failed: {str(e)}" # ---------- Format scraped result for Telegram (short and safe) ---------- def format_for_telegram(result, url, platform): if isinstance(result, dict): # Extract data (might be dict or list) data = result.get("data", "") if isinstance(data, (dict, list)): data = json.dumps(data, indent=2) else: data = str(data) # Truncate to 4000 chars (Telegram limit is 4096) if len(data) > 4000: data = data[:4000] + "\n...(truncated)" output = f"✅ *Scraped from {platform}*\n" output += f"URL: {url}\n\n" output += f"```\n{data}\n```" else: output = f"❌ *Error:* {result}" return output # ---------- Telegram Bot Handlers ---------- async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text( "🤖 Hello! Send me a URL and I'll scrape it for you.\n" "You can specify the platform by adding `#platform` after the URL, e.g.:\n" "`https://suno.com/song/123 #Suno`\n" "Supported platforms: " + ", ".join(PLATFORMS.keys()) ) async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text( "Usage:\n" "• Send a URL to scrape with the default 'Custom' settings.\n" "• Add `#PlatformName` to use a preset (e.g., `#Suno`).\n" "• /platforms – list available platform presets.\n" "• /help – show this message." ) async def platforms(update: Update, context: ContextTypes.DEFAULT_TYPE): msg = "Supported platforms:\n" + "\n".join(PLATFORMS.keys()) await update.message.reply_text(msg) async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): text = update.message.text.strip() if not text: return # Parse platform from URL (e.g., "url #Twitter") platform = "Custom" if "#" in text: parts = text.rsplit("#", 1) url = parts[0].strip() candidate = parts[1].strip() if candidate in PLATFORMS: platform = candidate else: # If not a valid platform, treat as part of URL url = text else: url = text add_telegram_log(f"Received from @{update.effective_user.username}: {url} (platform={platform})") await update.message.reply_text(f"🔄 Scraping {url} with platform '{platform}'...") result = call_scraper_api(url, platform, None, None, None, None) formatted = format_for_telegram(result, url, platform) # Send result (split if too long) if len(formatted) > 4096: for i in range(0, len(formatted), 4096): await update.message.reply_text(formatted[i:i+4096], parse_mode="Markdown") else: await update.message.reply_text(formatted, parse_mode="Markdown") add_telegram_log(f"Replied to {url}") # ---------- Start Telegram bot in background thread ---------- def start_telegram_bot(): if not TELEGRAM_TOKEN: add_telegram_log("⚠️ TELEGRAM_BOT_TOKEN not set. Bot not started.") return try: app = Application.builder().token(TELEGRAM_TOKEN).build() app.add_handler(CommandHandler("start", start)) app.add_handler(CommandHandler("help", help_command)) app.add_handler(CommandHandler("platforms", platforms)) app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) add_telegram_log("🚀 Telegram bot started and polling...") app.run_polling(allowed_updates=Update.ALL_TYPES) except Exception as e: add_telegram_log(f"❌ Telegram bot error: {e}") # Start the bot in a separate thread (non‑blocking) if TELEGRAM_TOKEN: bot_thread = threading.Thread(target=start_telegram_bot, daemon=True) bot_thread.start() else: add_telegram_log("ℹ️ No Telegram token provided. Bot disabled.") # ---------- Gradio Dashboard (same as before, but with a log panel) ---------- # ... (keep your previous Gradio code but add a section to display telegram_log) def get_telegram_log(): return "\n".join(telegram_log) with gr.Blocks(title="Universal Scraper Dashboard") as demo: gr.Markdown("# 🌐 Universal Scraper Dashboard") gr.Markdown("Enter a URL, choose the platform, adjust advanced options, and scrape.") with gr.Row(): with gr.Column(scale=2): url_input = gr.Textbox(label="Target URL", placeholder="https://...") platform_dropdown = gr.Dropdown( choices=list(PLATFORMS.keys()), value="Custom", label="Platform (presets apply default CSS selectors & render)" ) custom_selector = gr.Textbox(label="Custom CSS Selector (overrides preset)", placeholder="e.g., div.content") with gr.Accordion("Advanced Options", open=False): render_check = gr.Checkbox(label="Render JavaScript (headless browser)", value=False) wait_check = gr.Checkbox(label="Wait for navigation after JS actions", value=False) custom_headers = gr.Textbox( label="Custom HTTP Headers (one per line, key:value)", placeholder="User-Agent: my-bot\nCookie: session=abc", lines=3 ) scrape_btn = gr.Button("Scrape", variant="primary") with gr.Column(scale=3): output = gr.Markdown(label="Scraped Result") with gr.Accordion("Telegram Bot Log", open=False): bot_log = gr.Textbox(label="Recent activity", lines=10, interactive=False) refresh_log_btn = gr.Button("Refresh Log") # Wire up scrape function (reuse the same call_scraper_api but with UI parameters) def scrape_with_ui(url, platform, custom_selector, render, wait, headers_str): # Convert headers string to dict headers_dict = {} if headers_str.strip(): for line in headers_str.split("\n"): if ":" in line: k, v = line.split(":", 1) headers_dict[k.strip()] = v.strip() result = call_scraper_api(url, platform, custom_selector, render, wait, headers_dict) if isinstance(result, dict): output = f"**Scraped at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}**\n\n" output += f"**URL:** {url}\n" output += f"**Platform:** {platform}\n" output += f"**Selector:** {result.get('options', {}).get('selector', 'N/A')}\n\n" output += "**Data:**\n" output += f"{json.dumps(result.get('data', {}), indent=2)}\n" output += f"\n**Response headers:** {json.dumps(result.get('response_header', {}), indent=2)}" else: output = f"**Error:** {result}" return output scrape_btn.click( fn=scrape_with_ui, inputs=[url_input, platform_dropdown, custom_selector, render_check, wait_check, custom_headers], outputs=output ) refresh_log_btn.click( fn=get_telegram_log, outputs=bot_log ) if __name__ == "__main__": demo.launch()