MandyDeep's picture
Create app.py
2d97fb5 verified
import os
import json
import threading
import logging
import requests
import gradio as gr
from datetime import datetime
# Telegram imports
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
# ---------- Configuration ----------
API_URL = "https://api.apilayer.com/advanced_scraper/page" # Replace with actual endpoint
API_KEY = os.environ.get("SCRAPER_API_KEY")
TELEGRAM_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN")
# Platform presets (same as before)
PLATFORMS = {
"Suno": {
"render": True,
"selector": "audio source[src]",
"wait_for_navigation": False,
"headers": {},
},
"Twitter": {
"render": True,
"selector": "article[data-testid='tweet']",
"wait_for_navigation": False,
"headers": {},
},
"Facebook": {
"render": True,
"selector": "div[data-pagelet='FeedUnit_0']",
"wait_for_navigation": False,
"headers": {},
},
"Instagram": {
"render": True,
"selector": "article",
"wait_for_navigation": False,
"headers": {},
},
"TikTok": {
"render": True,
"selector": "video source",
"wait_for_navigation": False,
"headers": {},
},
"YouTube": {
"render": True,
"selector": "meta[property='og:title']",
"wait_for_navigation": False,
"headers": {},
},
"Custom": {
"render": False,
"selector": "",
"wait_for_navigation": False,
"headers": {},
}
}
# ---------- Global log for Telegram activity ----------
telegram_log = []
def add_telegram_log(message):
global telegram_log
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
telegram_log.insert(0, f"[{timestamp}] {message}")
telegram_log = telegram_log[:20] # keep last 20
# ---------- Scraper API call (same as before, but now reusable) ----------
def call_scraper_api(url, platform, custom_selector=None, render=None, wait=None, custom_headers=None):
"""
Make request to APILayer Scraper API with given parameters.
Returns JSON response or error string.
"""
if not API_KEY:
return "❌ API key not set. Please add SCRAPER_API_KEY as a secret."
preset = PLATFORMS.get(platform, PLATFORMS["Custom"])
selector = custom_selector if custom_selector is not None else preset["selector"]
render_val = render if render is not None else preset["render"]
wait_val = wait if wait is not None else preset["wait_for_navigation"]
headers = {
"Authorization": f"Bearer {API_KEY}",
"Accept": "application/json"
}
if custom_headers:
for k, v in custom_headers.items():
headers[f"X-{k}"] = v
params = {
"url": url,
"render": "true" if render_val else "false",
"selector": selector,
"wait_for_navigation": "true" if wait_val else "false",
"country": "",
"timeout": 30
}
try:
response = requests.get(API_URL, headers=headers, params=params, timeout=60)
if response.status_code == 200:
return response.json()
else:
return f"❌ API error {response.status_code}: {response.text}"
except Exception as e:
return f"❌ Request failed: {str(e)}"
# ---------- Format scraped result for Telegram (short and safe) ----------
def format_for_telegram(result, url, platform):
if isinstance(result, dict):
# Extract data (might be dict or list)
data = result.get("data", "")
if isinstance(data, (dict, list)):
data = json.dumps(data, indent=2)
else:
data = str(data)
# Truncate to 4000 chars (Telegram limit is 4096)
if len(data) > 4000:
data = data[:4000] + "\n...(truncated)"
output = f"✅ *Scraped from {platform}*\n"
output += f"URL: {url}\n\n"
output += f"```\n{data}\n```"
else:
output = f"❌ *Error:* {result}"
return output
# ---------- Telegram Bot Handlers ----------
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(
"🤖 Hello! Send me a URL and I'll scrape it for you.\n"
"You can specify the platform by adding `#platform` after the URL, e.g.:\n"
"`https://suno.com/song/123 #Suno`\n"
"Supported platforms: " + ", ".join(PLATFORMS.keys())
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(
"Usage:\n"
"• Send a URL to scrape with the default 'Custom' settings.\n"
"• Add `#PlatformName` to use a preset (e.g., `#Suno`).\n"
"• /platforms – list available platform presets.\n"
"• /help – show this message."
)
async def platforms(update: Update, context: ContextTypes.DEFAULT_TYPE):
msg = "Supported platforms:\n" + "\n".join(PLATFORMS.keys())
await update.message.reply_text(msg)
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
text = update.message.text.strip()
if not text:
return
# Parse platform from URL (e.g., "url #Twitter")
platform = "Custom"
if "#" in text:
parts = text.rsplit("#", 1)
url = parts[0].strip()
candidate = parts[1].strip()
if candidate in PLATFORMS:
platform = candidate
else:
# If not a valid platform, treat as part of URL
url = text
else:
url = text
add_telegram_log(f"Received from @{update.effective_user.username}: {url} (platform={platform})")
await update.message.reply_text(f"🔄 Scraping {url} with platform '{platform}'...")
result = call_scraper_api(url, platform, None, None, None, None)
formatted = format_for_telegram(result, url, platform)
# Send result (split if too long)
if len(formatted) > 4096:
for i in range(0, len(formatted), 4096):
await update.message.reply_text(formatted[i:i+4096], parse_mode="Markdown")
else:
await update.message.reply_text(formatted, parse_mode="Markdown")
add_telegram_log(f"Replied to {url}")
# ---------- Start Telegram bot in background thread ----------
def start_telegram_bot():
if not TELEGRAM_TOKEN:
add_telegram_log("⚠️ TELEGRAM_BOT_TOKEN not set. Bot not started.")
return
try:
app = Application.builder().token(TELEGRAM_TOKEN).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("help", help_command))
app.add_handler(CommandHandler("platforms", platforms))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
add_telegram_log("🚀 Telegram bot started and polling...")
app.run_polling(allowed_updates=Update.ALL_TYPES)
except Exception as e:
add_telegram_log(f"❌ Telegram bot error: {e}")
# Start the bot in a separate thread (non‑blocking)
if TELEGRAM_TOKEN:
bot_thread = threading.Thread(target=start_telegram_bot, daemon=True)
bot_thread.start()
else:
add_telegram_log("ℹ️ No Telegram token provided. Bot disabled.")
# ---------- Gradio Dashboard (same as before, but with a log panel) ----------
# ... (keep your previous Gradio code but add a section to display telegram_log)
def get_telegram_log():
return "\n".join(telegram_log)
with gr.Blocks(title="Universal Scraper Dashboard") as demo:
gr.Markdown("# 🌐 Universal Scraper Dashboard")
gr.Markdown("Enter a URL, choose the platform, adjust advanced options, and scrape.")
with gr.Row():
with gr.Column(scale=2):
url_input = gr.Textbox(label="Target URL", placeholder="https://...")
platform_dropdown = gr.Dropdown(
choices=list(PLATFORMS.keys()),
value="Custom",
label="Platform (presets apply default CSS selectors & render)"
)
custom_selector = gr.Textbox(label="Custom CSS Selector (overrides preset)", placeholder="e.g., div.content")
with gr.Accordion("Advanced Options", open=False):
render_check = gr.Checkbox(label="Render JavaScript (headless browser)", value=False)
wait_check = gr.Checkbox(label="Wait for navigation after JS actions", value=False)
custom_headers = gr.Textbox(
label="Custom HTTP Headers (one per line, key:value)",
placeholder="User-Agent: my-bot\nCookie: session=abc",
lines=3
)
scrape_btn = gr.Button("Scrape", variant="primary")
with gr.Column(scale=3):
output = gr.Markdown(label="Scraped Result")
with gr.Accordion("Telegram Bot Log", open=False):
bot_log = gr.Textbox(label="Recent activity", lines=10, interactive=False)
refresh_log_btn = gr.Button("Refresh Log")
# Wire up scrape function (reuse the same call_scraper_api but with UI parameters)
def scrape_with_ui(url, platform, custom_selector, render, wait, headers_str):
# Convert headers string to dict
headers_dict = {}
if headers_str.strip():
for line in headers_str.split("\n"):
if ":" in line:
k, v = line.split(":", 1)
headers_dict[k.strip()] = v.strip()
result = call_scraper_api(url, platform, custom_selector, render, wait, headers_dict)
if isinstance(result, dict):
output = f"**Scraped at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}**\n\n"
output += f"**URL:** {url}\n"
output += f"**Platform:** {platform}\n"
output += f"**Selector:** {result.get('options', {}).get('selector', 'N/A')}\n\n"
output += "**Data:**\n"
output += f"{json.dumps(result.get('data', {}), indent=2)}\n"
output += f"\n**Response headers:** {json.dumps(result.get('response_header', {}), indent=2)}"
else:
output = f"**Error:** {result}"
return output
scrape_btn.click(
fn=scrape_with_ui,
inputs=[url_input, platform_dropdown, custom_selector, render_check, wait_check, custom_headers],
outputs=output
)
refresh_log_btn.click(
fn=get_telegram_log,
outputs=bot_log
)
if __name__ == "__main__":
demo.launch()