| import os |
| import json |
| import threading |
| import logging |
| import requests |
| import gradio as gr |
| from datetime import datetime |
|
|
| |
| from telegram import Update |
| from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes |
|
|
| |
| API_URL = "https://api.apilayer.com/advanced_scraper/page" |
| API_KEY = os.environ.get("SCRAPER_API_KEY") |
| TELEGRAM_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN") |
|
|
| |
| PLATFORMS = { |
| "Suno": { |
| "render": True, |
| "selector": "audio source[src]", |
| "wait_for_navigation": False, |
| "headers": {}, |
| }, |
| "Twitter": { |
| "render": True, |
| "selector": "article[data-testid='tweet']", |
| "wait_for_navigation": False, |
| "headers": {}, |
| }, |
| "Facebook": { |
| "render": True, |
| "selector": "div[data-pagelet='FeedUnit_0']", |
| "wait_for_navigation": False, |
| "headers": {}, |
| }, |
| "Instagram": { |
| "render": True, |
| "selector": "article", |
| "wait_for_navigation": False, |
| "headers": {}, |
| }, |
| "TikTok": { |
| "render": True, |
| "selector": "video source", |
| "wait_for_navigation": False, |
| "headers": {}, |
| }, |
| "YouTube": { |
| "render": True, |
| "selector": "meta[property='og:title']", |
| "wait_for_navigation": False, |
| "headers": {}, |
| }, |
| "Custom": { |
| "render": False, |
| "selector": "", |
| "wait_for_navigation": False, |
| "headers": {}, |
| } |
| } |
|
|
| |
| telegram_log = [] |
|
|
| def add_telegram_log(message): |
| global telegram_log |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| telegram_log.insert(0, f"[{timestamp}] {message}") |
| telegram_log = telegram_log[:20] |
|
|
| |
| def call_scraper_api(url, platform, custom_selector=None, render=None, wait=None, custom_headers=None): |
| """ |
| Make request to APILayer Scraper API with given parameters. |
| Returns JSON response or error string. |
| """ |
| if not API_KEY: |
| return "❌ API key not set. Please add SCRAPER_API_KEY as a secret." |
|
|
| preset = PLATFORMS.get(platform, PLATFORMS["Custom"]) |
| selector = custom_selector if custom_selector is not None else preset["selector"] |
| render_val = render if render is not None else preset["render"] |
| wait_val = wait if wait is not None else preset["wait_for_navigation"] |
|
|
| headers = { |
| "Authorization": f"Bearer {API_KEY}", |
| "Accept": "application/json" |
| } |
| if custom_headers: |
| for k, v in custom_headers.items(): |
| headers[f"X-{k}"] = v |
|
|
| params = { |
| "url": url, |
| "render": "true" if render_val else "false", |
| "selector": selector, |
| "wait_for_navigation": "true" if wait_val else "false", |
| "country": "", |
| "timeout": 30 |
| } |
|
|
| try: |
| response = requests.get(API_URL, headers=headers, params=params, timeout=60) |
| if response.status_code == 200: |
| return response.json() |
| else: |
| return f"❌ API error {response.status_code}: {response.text}" |
| except Exception as e: |
| return f"❌ Request failed: {str(e)}" |
|
|
| |
| def format_for_telegram(result, url, platform): |
| if isinstance(result, dict): |
| |
| data = result.get("data", "") |
| if isinstance(data, (dict, list)): |
| data = json.dumps(data, indent=2) |
| else: |
| data = str(data) |
| |
| if len(data) > 4000: |
| data = data[:4000] + "\n...(truncated)" |
| output = f"✅ *Scraped from {platform}*\n" |
| output += f"URL: {url}\n\n" |
| output += f"```\n{data}\n```" |
| else: |
| output = f"❌ *Error:* {result}" |
| return output |
|
|
| |
| async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| await update.message.reply_text( |
| "🤖 Hello! Send me a URL and I'll scrape it for you.\n" |
| "You can specify the platform by adding `#platform` after the URL, e.g.:\n" |
| "`https://suno.com/song/123 #Suno`\n" |
| "Supported platforms: " + ", ".join(PLATFORMS.keys()) |
| ) |
|
|
| async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| await update.message.reply_text( |
| "Usage:\n" |
| "• Send a URL to scrape with the default 'Custom' settings.\n" |
| "• Add `#PlatformName` to use a preset (e.g., `#Suno`).\n" |
| "• /platforms – list available platform presets.\n" |
| "• /help – show this message." |
| ) |
|
|
| async def platforms(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| msg = "Supported platforms:\n" + "\n".join(PLATFORMS.keys()) |
| await update.message.reply_text(msg) |
|
|
| async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| text = update.message.text.strip() |
| if not text: |
| return |
|
|
| |
| platform = "Custom" |
| if "#" in text: |
| parts = text.rsplit("#", 1) |
| url = parts[0].strip() |
| candidate = parts[1].strip() |
| if candidate in PLATFORMS: |
| platform = candidate |
| else: |
| |
| url = text |
| else: |
| url = text |
|
|
| add_telegram_log(f"Received from @{update.effective_user.username}: {url} (platform={platform})") |
|
|
| await update.message.reply_text(f"🔄 Scraping {url} with platform '{platform}'...") |
|
|
| result = call_scraper_api(url, platform, None, None, None, None) |
| formatted = format_for_telegram(result, url, platform) |
|
|
| |
| if len(formatted) > 4096: |
| for i in range(0, len(formatted), 4096): |
| await update.message.reply_text(formatted[i:i+4096], parse_mode="Markdown") |
| else: |
| await update.message.reply_text(formatted, parse_mode="Markdown") |
|
|
| add_telegram_log(f"Replied to {url}") |
|
|
| |
| def start_telegram_bot(): |
| if not TELEGRAM_TOKEN: |
| add_telegram_log("⚠️ TELEGRAM_BOT_TOKEN not set. Bot not started.") |
| return |
|
|
| try: |
| app = Application.builder().token(TELEGRAM_TOKEN).build() |
| app.add_handler(CommandHandler("start", start)) |
| app.add_handler(CommandHandler("help", help_command)) |
| app.add_handler(CommandHandler("platforms", platforms)) |
| app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) |
|
|
| add_telegram_log("🚀 Telegram bot started and polling...") |
| app.run_polling(allowed_updates=Update.ALL_TYPES) |
| except Exception as e: |
| add_telegram_log(f"❌ Telegram bot error: {e}") |
|
|
| |
| if TELEGRAM_TOKEN: |
| bot_thread = threading.Thread(target=start_telegram_bot, daemon=True) |
| bot_thread.start() |
| else: |
| add_telegram_log("ℹ️ No Telegram token provided. Bot disabled.") |
|
|
| |
| |
|
|
| def get_telegram_log(): |
| return "\n".join(telegram_log) |
|
|
| with gr.Blocks(title="Universal Scraper Dashboard") as demo: |
| gr.Markdown("# 🌐 Universal Scraper Dashboard") |
| gr.Markdown("Enter a URL, choose the platform, adjust advanced options, and scrape.") |
|
|
| with gr.Row(): |
| with gr.Column(scale=2): |
| url_input = gr.Textbox(label="Target URL", placeholder="https://...") |
| platform_dropdown = gr.Dropdown( |
| choices=list(PLATFORMS.keys()), |
| value="Custom", |
| label="Platform (presets apply default CSS selectors & render)" |
| ) |
| custom_selector = gr.Textbox(label="Custom CSS Selector (overrides preset)", placeholder="e.g., div.content") |
| with gr.Accordion("Advanced Options", open=False): |
| render_check = gr.Checkbox(label="Render JavaScript (headless browser)", value=False) |
| wait_check = gr.Checkbox(label="Wait for navigation after JS actions", value=False) |
| custom_headers = gr.Textbox( |
| label="Custom HTTP Headers (one per line, key:value)", |
| placeholder="User-Agent: my-bot\nCookie: session=abc", |
| lines=3 |
| ) |
| scrape_btn = gr.Button("Scrape", variant="primary") |
|
|
| with gr.Column(scale=3): |
| output = gr.Markdown(label="Scraped Result") |
| with gr.Accordion("Telegram Bot Log", open=False): |
| bot_log = gr.Textbox(label="Recent activity", lines=10, interactive=False) |
| refresh_log_btn = gr.Button("Refresh Log") |
|
|
| |
| def scrape_with_ui(url, platform, custom_selector, render, wait, headers_str): |
| |
| headers_dict = {} |
| if headers_str.strip(): |
| for line in headers_str.split("\n"): |
| if ":" in line: |
| k, v = line.split(":", 1) |
| headers_dict[k.strip()] = v.strip() |
| result = call_scraper_api(url, platform, custom_selector, render, wait, headers_dict) |
| if isinstance(result, dict): |
| output = f"**Scraped at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}**\n\n" |
| output += f"**URL:** {url}\n" |
| output += f"**Platform:** {platform}\n" |
| output += f"**Selector:** {result.get('options', {}).get('selector', 'N/A')}\n\n" |
| output += "**Data:**\n" |
| output += f"{json.dumps(result.get('data', {}), indent=2)}\n" |
| output += f"\n**Response headers:** {json.dumps(result.get('response_header', {}), indent=2)}" |
| else: |
| output = f"**Error:** {result}" |
| return output |
|
|
| scrape_btn.click( |
| fn=scrape_with_ui, |
| inputs=[url_input, platform_dropdown, custom_selector, render_check, wait_check, custom_headers], |
| outputs=output |
| ) |
|
|
| refresh_log_btn.click( |
| fn=get_telegram_log, |
| outputs=bot_log |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |