|
|
from dotenv import dotenv_values, load_dotenv |
|
|
load_dotenv(override=True) |
|
|
|
|
|
import os |
|
|
import asyncio |
|
|
from quart import Quart, jsonify |
|
|
from twikit import Client |
|
|
from datetime import datetime |
|
|
import diskcache as dc |
|
|
import re |
|
|
import requests |
|
|
import xmltodict |
|
|
import urllib |
|
|
import traceback |
|
|
import threading |
|
|
import json |
|
|
import subprocess |
|
|
import queue |
|
|
from threads_util.main import Threads |
|
|
import asyncpraw |
|
|
import instagrapi |
|
|
import time |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
response_queue = queue.Queue() |
|
|
|
|
|
def read_from_node(proc): |
|
|
"""Continuously read lines from the Node.js process's stdout and parse JSON when possible.""" |
|
|
buffer = "" |
|
|
for line in iter(proc.stdout.readline, ''): |
|
|
buffer += line |
|
|
try: |
|
|
print(buffer) |
|
|
data = json.loads(buffer) |
|
|
response_queue.put(data) |
|
|
buffer = "" |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
|
|
|
|
|
|
print("Node.js process terminated unexpectedly.") |
|
|
global node_proc |
|
|
node_proc = subprocess.Popen( |
|
|
["npx", "ts-node", "scraper.ts"], |
|
|
stdin=subprocess.PIPE, |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.PIPE, |
|
|
text=True |
|
|
) |
|
|
|
|
|
threading.Thread(target=read_from_node, args=(node_proc,), daemon=True).start() |
|
|
threading.Thread(target=read_stderr, args=(node_proc,), daemon=True).start() |
|
|
|
|
|
def read_stderr(proc): |
|
|
"""Continuously read and log stderr output from the Node.js process.""" |
|
|
for line in iter(proc.stderr.readline, ''): |
|
|
print(f"Node.js Error: {line.strip()}") |
|
|
|
|
|
def send_to_node(proc, message, timeout=30): |
|
|
"""Send a message to the Node.js process and wait for a response.""" |
|
|
try: |
|
|
proc.stdin.write(message + "\n") |
|
|
proc.stdin.flush() |
|
|
|
|
|
try: |
|
|
response = response_queue.get(timeout=timeout) |
|
|
return response |
|
|
except queue.Empty: |
|
|
return "Error: No response received from Node.js" |
|
|
|
|
|
except BrokenPipeError: |
|
|
return "Error: Broken pipe - Node.js process might have exited." |
|
|
except Exception as e: |
|
|
return f"Error writing to Node process: {e}" |
|
|
|
|
|
node_proc = subprocess.Popen( |
|
|
["npx", "ts-node", "scraper.ts"], |
|
|
stdin=subprocess.PIPE, |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.PIPE, |
|
|
text=True |
|
|
) |
|
|
|
|
|
threading.Thread(target=read_from_node, args=(node_proc,), daemon=True).start() |
|
|
threading.Thread(target=read_stderr, args=(node_proc,), daemon=True).start() |
|
|
TWI_COOKIE_PATH = f"social_session/{os.environ.get('TWI_USERNAME')}_cookies.json" |
|
|
INSTA_COOKIE_PATH = f"social_session/insta_{os.environ.get('INSTA_USERNAME')}_cookies.json" |
|
|
os.makedirs(os.path.dirname(TWI_COOKIE_PATH), exist_ok=True) |
|
|
os.makedirs(os.path.dirname(INSTA_COOKIE_PATH), exist_ok=True) |
|
|
|
|
|
app = Quart("Auto notifier thingy") |
|
|
cache = dc.Cache('/tmp/cache_dir/') |
|
|
|
|
|
x_client = Client('en-US') |
|
|
insta_client = instagrapi.Client() |
|
|
reddit_client = None |
|
|
|
|
|
|
|
|
max_attempts = 5 |
|
|
for attempt in range(1, max_attempts + 1): |
|
|
try: |
|
|
threads_client = Threads() |
|
|
print("Threads client created successfully.") |
|
|
break |
|
|
except Exception as e: |
|
|
print(f"Attempt {attempt} failed: {e}") |
|
|
traceback.print_exc() |
|
|
if attempt < max_attempts: |
|
|
print("Retrying in 5 seconds...") |
|
|
time.sleep(5) |
|
|
else: |
|
|
print("Max attempts reached. Raising exception.") |
|
|
raise |
|
|
|
|
|
async def login_instagram(): |
|
|
if not os.path.exists(INSTA_COOKIE_PATH): |
|
|
print("logging in") |
|
|
insta_client.login( |
|
|
os.environ.get("INSTA_USERNAME"), os.environ.get("INSTA_PASSWORD") |
|
|
) |
|
|
insta_client.dump_settings(INSTA_COOKIE_PATH) |
|
|
else: |
|
|
session = insta_client.load_settings(INSTA_COOKIE_PATH) |
|
|
insta_client.set_settings(session) |
|
|
|
|
|
insta_client.login( |
|
|
os.environ.get("INSTA_USERNAME"), os.environ.get("INSTA_PASSWORD") |
|
|
) |
|
|
try: |
|
|
insta_client.get_timeline_feed() |
|
|
except instagrapi.exceptions.LoginRequired: |
|
|
print("Session is invalid, need to login via username and password") |
|
|
old_session = insta_client.get_settings() |
|
|
|
|
|
insta_client.set_settings({}) |
|
|
insta_client.set_uuids(old_session["uuids"]) |
|
|
|
|
|
insta_client.login( os.environ.get("INSTA_USERNAME"), os.environ.get("INSTA_PASSWORD")) |
|
|
insta_client.dump_settings(INSTA_COOKIE_PATH) |
|
|
|
|
|
async def initialize_client(): |
|
|
global reddit_client |
|
|
reddit_client = asyncpraw.Reddit( |
|
|
client_id=os.environ.get("REDDIT_CLIENTAPI"), |
|
|
client_secret=os.environ.get("REDDIT_CLIENTSECRET"), |
|
|
username=os.environ.get("REDDIT_USERNAME"), |
|
|
password=os.environ.get("REDDIT_PASSWORD"), |
|
|
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36", |
|
|
) |
|
|
|
|
|
if not os.path.exists(TWI_COOKIE_PATH): |
|
|
print(TWI_COOKIE_PATH, "Login twi using pw") |
|
|
await x_client.login( |
|
|
auth_info_1=os.environ.get('TWI_USERNAME'), |
|
|
auth_info_2=os.environ.get('TWI_EMAIL'), |
|
|
password=os.environ.get('TWI_PASSWORD') |
|
|
) |
|
|
x_client.save_cookies(TWI_COOKIE_PATH) |
|
|
else: |
|
|
x_client.load_cookies(TWI_COOKIE_PATH) |
|
|
|
|
|
|
|
|
@app.before_serving |
|
|
async def before_serving(): |
|
|
await login_instagram() |
|
|
await initialize_client() |
|
|
|
|
|
def remove_circular(obj, seen=None): |
|
|
if seen is None: |
|
|
seen = set() |
|
|
obj_id = id(obj) |
|
|
if obj_id in seen: |
|
|
return None |
|
|
seen.add(obj_id) |
|
|
|
|
|
if isinstance(obj, dict): |
|
|
return {k: remove_circular(v, seen) for k, v in obj.items()} |
|
|
elif isinstance(obj, list): |
|
|
return [remove_circular(item, seen) for item in obj] |
|
|
elif isinstance(obj, tuple): |
|
|
return tuple(remove_circular(item, seen) for item in obj) |
|
|
elif hasattr(obj, '__dict__'): |
|
|
return remove_circular(obj.__dict__, seen) |
|
|
else: |
|
|
return obj |
|
|
|
|
|
def remove_client(data): |
|
|
if isinstance(data, dict): |
|
|
return {k: remove_client(v) for k, v in data.items() if k != '_client' and k != 'user'} |
|
|
elif isinstance(data, list): |
|
|
return [remove_client(item) for item in data] |
|
|
else: |
|
|
return data |
|
|
|
|
|
def serialize_tweet(tweet): |
|
|
def parse_date(date_str): |
|
|
try: |
|
|
return datetime.strptime(date_str, "%a %b %d %H:%M:%S %z %Y").isoformat() |
|
|
except (ValueError, AttributeError): |
|
|
return date_str |
|
|
|
|
|
return { |
|
|
"id": tweet.id, |
|
|
"text": tweet.full_text, |
|
|
"created_at": parse_date(tweet.created_at), |
|
|
"lang": tweet.lang, |
|
|
"retweet_count": tweet.retweet_count, |
|
|
"favorite_count": tweet.favorite_count, |
|
|
"user": { |
|
|
"id": tweet.user.id, |
|
|
"screen_name": tweet.user.screen_name, |
|
|
"name": tweet.user.name, |
|
|
"followers_count": tweet.user.followers_count, |
|
|
}, |
|
|
"media": [ |
|
|
{ |
|
|
**media |
|
|
} |
|
|
for media in (tweet.media or []) |
|
|
] |
|
|
} |
|
|
|
|
|
async def fetch_user_and_tweets(username): |
|
|
userid = await x_client.get_user_by_screen_name(screen_name=username) |
|
|
if userid is None: |
|
|
return None, None |
|
|
|
|
|
tweets = await x_client.get_user_tweets(user_id=userid.id, tweet_type='tweets', count=2) |
|
|
return userid, tweets |
|
|
|
|
|
@app.route('/twitter/<username>') |
|
|
async def get_posts_twitter(username): |
|
|
cache_key = f"user_tweets_{username}" |
|
|
|
|
|
|
|
|
if cache_key in cache: |
|
|
serialized_tweets = cache[cache_key] |
|
|
else: |
|
|
userid, tweets = await fetch_user_and_tweets(username) |
|
|
if userid is None: |
|
|
return jsonify({"error": True, "errorlog": "User not found"}), 404 |
|
|
|
|
|
if tweets is None or len(tweets) == 0: |
|
|
return jsonify({"error": True, "errorlog": "No tweets found for this user"}), 404 |
|
|
|
|
|
serialized_tweets = [serialize_tweet(tweet) for tweet in tweets] |
|
|
|
|
|
cache.set(cache_key, serialized_tweets, expire=300) |
|
|
|
|
|
return jsonify(serialized_tweets) |
|
|
|
|
|
|
|
|
@app.route('/tiktok/<username>') |
|
|
async def get_posts_tiktok(username): |
|
|
cache_key = f"user_tiktok_{username}" |
|
|
|
|
|
if cache_key in cache: |
|
|
serialized_data = cache[cache_key] |
|
|
else: |
|
|
serialized_data = send_to_node(node_proc, username) |
|
|
if isinstance(serialized_data, str): |
|
|
return jsonify({"error": True, "errorlog": serialized_data}), 500 |
|
|
cache.set(cache_key, serialized_data, expire=300) |
|
|
return jsonify(serialized_data) |
|
|
|
|
|
|
|
|
@app.route('/youtube/<path:channel_url>') |
|
|
async def get_uploads_youtube(channel_url): |
|
|
format_youtube_url = lambda url: f"https://youtube.com/channel/{url.split('/')[-1]}" if "/channel/" in url else f"https://www.youtube.com/@{url.split('@')[-1]}" if "youtube.com/@" in url else url |
|
|
decoded_url = urllib.parse.unquote(channel_url) |
|
|
yt_url = format_youtube_url(decoded_url) |
|
|
|
|
|
cache_key = f"user_uploads_{yt_url}" |
|
|
if cache_key in cache: |
|
|
uploads = cache[cache_key] |
|
|
else: |
|
|
headers = {"User-Agent": "Mozilla/5.0"} |
|
|
def get_last_url_segment(html): |
|
|
"""Extracts the last segment of the canonical URL from the HTML content.""" |
|
|
match = re.search(r'<link[^>]+rel=[\'"]canonical[\'"][^>]+href=[\'"]([^\'"]+)[\'"]', html) |
|
|
if match: |
|
|
return match.group(1).rstrip('/').split('/')[-1] |
|
|
return None |
|
|
|
|
|
def get_last_url_segment_from_webpage(url): |
|
|
"""Fetches the webpage and extracts the last URL segment from the canonical link.""" |
|
|
|
|
|
try: |
|
|
response = requests.get(url, headers=headers, timeout=10) |
|
|
if response.status_code == 200: |
|
|
return get_last_url_segment(response.text) |
|
|
else: |
|
|
return jsonify({"error": True, "errorlog": f"Failed to fetch page. Status code: {response.status_code}"}) |
|
|
except requests.exceptions.RequestException as e: |
|
|
return jsonify({"error":True, "errorlog": f"Request Failed ${e}"}) |
|
|
|
|
|
return None |
|
|
last_segment = get_last_url_segment_from_webpage(yt_url) |
|
|
if not isinstance(last_segment, str): |
|
|
return last_segment |
|
|
response = requests.get("https://www.youtube.com/feeds/videos.xml?channel_id=" + last_segment) |
|
|
if response.status_code != 200: |
|
|
return jsonify({"error": True, "errorlog": f"Failed to fetch page videos.xml. Status code: {response.status_code}"}) |
|
|
uploads = xmltodict.parse(response.text) |
|
|
|
|
|
if uploads is None or len(uploads) == 0: |
|
|
return jsonify({"error": True, "errorlog": "No uploads found for this user"}), 404 |
|
|
|
|
|
cache.set(cache_key, uploads, expire=300) |
|
|
return jsonify(uploads) |
|
|
|
|
|
@app.route('/twitch/<username>') |
|
|
async def is_live_twitch(username): |
|
|
|
|
|
cache_key = f"user_twitch_{username}" |
|
|
|
|
|
if cache_key in cache: |
|
|
serialized_data = cache[cache_key] |
|
|
else: |
|
|
serialized_data = send_to_node(node_proc, f"twitch {username}") |
|
|
if isinstance(serialized_data, str): |
|
|
return jsonify({"error": True, "errorlog": serialized_data}), 500 |
|
|
if not serialized_data.get("error"): |
|
|
cache.set(cache_key, serialized_data, expire=300) |
|
|
return jsonify(serialized_data) |
|
|
|
|
|
|
|
|
|
|
|
@app.route('/threads/<username>') |
|
|
async def threads_user(username): |
|
|
cache_key = f"user_threads_{username}" |
|
|
|
|
|
if cache_key in cache: |
|
|
serialized_data = cache[cache_key] |
|
|
else: |
|
|
serialized_dataid = threads_client.get_user_id(username) |
|
|
if isinstance(serialized_dataid, str): |
|
|
return jsonify({"error": True, "errorlog": serialized_dataid}), 500 |
|
|
serialized_data = threads_client.get_user_threads(serialized_dataid) |
|
|
cache.set(cache_key, serialized_data, expire=300) |
|
|
return jsonify(serialized_data) |
|
|
|
|
|
def extract_serializable_sum(obj, visited=None): |
|
|
""" |
|
|
Recursively extract the JSON-serializable parts of an object. |
|
|
If a circular reference is detected (object already seen), return None. |
|
|
For non-serializable objects that are not basic types, fallback to str(obj). |
|
|
""" |
|
|
if visited is None: |
|
|
visited = set() |
|
|
obj_id = id(obj) |
|
|
if obj_id in visited: |
|
|
|
|
|
return None |
|
|
visited.add(obj_id) |
|
|
|
|
|
|
|
|
if isinstance(obj, (str, int, float, bool)) or obj is None: |
|
|
return obj |
|
|
|
|
|
|
|
|
elif isinstance(obj, (list, tuple, set)): |
|
|
return [extract_serializable_sum(item, visited) for item in obj] |
|
|
|
|
|
|
|
|
elif isinstance(obj, dict): |
|
|
new_dict = {} |
|
|
|
|
|
for key, value in obj.items(): |
|
|
|
|
|
if not isinstance(key, str): |
|
|
key = str(key) |
|
|
new_dict[key] = extract_serializable_sum(value, visited) |
|
|
return new_dict |
|
|
|
|
|
|
|
|
elif hasattr(obj, '__dict__'): |
|
|
data = {} |
|
|
|
|
|
for key, value in obj.__dict__.items(): |
|
|
|
|
|
if key.startswith('_reddit'): |
|
|
continue |
|
|
data[key] = extract_serializable_sum(value, visited) |
|
|
return data |
|
|
|
|
|
|
|
|
else: |
|
|
try: |
|
|
json.dumps(obj) |
|
|
return obj |
|
|
except (TypeError, OverflowError): |
|
|
return str(obj) |
|
|
|
|
|
@app.route('/reddit/user/<username>') |
|
|
async def reddit_user(username): |
|
|
cache_key = f"user_redditor_{username}" |
|
|
|
|
|
if cache_key in cache: |
|
|
serialized_data = cache[cache_key] |
|
|
else: |
|
|
user = await reddit_client.redditor(username) |
|
|
serialized_data = [] |
|
|
async for submission in user.new(limit=10): |
|
|
serialized_data.append(extract_serializable_sum(submission)) |
|
|
cache.set(cache_key, serialized_data, expire=300) |
|
|
return jsonify(serialized_data) |
|
|
|
|
|
@app.route('/reddit/subreddit/<subreddit>') |
|
|
async def reddit_subreddit(subreddit): |
|
|
cache_key = f"group_reddit_{subreddit}" |
|
|
|
|
|
if cache_key in cache: |
|
|
serialized_data = cache[cache_key] |
|
|
else: |
|
|
print(subreddit) |
|
|
subreddit_d = await reddit_client.subreddit(subreddit) |
|
|
serialized_data = [] |
|
|
async for submission in subreddit_d.new(limit=10): |
|
|
serialized_data.append(extract_serializable_sum(submission)) |
|
|
cache.set(cache_key, serialized_data, expire=300) |
|
|
return jsonify(serialized_data) |
|
|
|
|
|
|
|
|
@app.route('/instagram/<username>') |
|
|
async def instagram_profile(username): |
|
|
cache_key = f"user_instagram_{username}" |
|
|
|
|
|
if cache_key in cache: |
|
|
serialized_data = cache[cache_key] |
|
|
else: |
|
|
print(username) |
|
|
if username.isnumeric(): |
|
|
print("numeric") |
|
|
user_id = username |
|
|
else: |
|
|
user_id = insta_client.user_info_by_username(username) |
|
|
vserialized_data = [] |
|
|
|
|
|
|
|
|
for media_obj in insta_client.user_medias_paginated(user_id.pk): |
|
|
vserialized_data.append(extract_serializable_sum(media_obj)) |
|
|
serialized_data = {"userinfo": extract_serializable_sum(user_id), "data": vserialized_data} |
|
|
cache.set(cache_key, serialized_data, expire=300) |
|
|
return jsonify(extract_serializable_sum(serialized_data)) |
|
|
|
|
|
@app.route('/kick/<username>') |
|
|
async def kick_streaming(username): |
|
|
cache_key = f"user_kick{username}" |
|
|
|
|
|
if cache_key in cache: |
|
|
serialized_data = cache[cache_key] |
|
|
else: |
|
|
serialized_data = send_to_node(node_proc, f"kick {username}") |
|
|
if isinstance(serialized_data, str): |
|
|
return jsonify({"error": True, "errorlog": serialized_data}), 500 |
|
|
if not serialized_data.get("error"): |
|
|
cache.set(cache_key, serialized_data, expire=300) |
|
|
|
|
|
return jsonify(extract_serializable_sum(serialized_data)) |
|
|
|
|
|
@app.route('/backinstagram/<username>') |
|
|
async def backinstagram_profile(username): |
|
|
cache_key = f"user_backinstagram_{username}" |
|
|
|
|
|
if cache_key in cache: |
|
|
serialized_data = cache[cache_key] |
|
|
else: |
|
|
headers = { |
|
|
'accept': '*/*', |
|
|
'accept-language': 'en-US,en;q=0.9', |
|
|
'cache-control': 'no-cache', |
|
|
'pragma': 'no-cache', |
|
|
'priority': 'u=1, i', |
|
|
|
|
|
'sec-ch-prefers-color-scheme': 'dark', |
|
|
'sec-ch-ua': '"Not A(Brand";v="8", "Chromium";v="132", "Google Chrome";v="132"', |
|
|
'sec-ch-ua-full-version-list': '"Not A(Brand";v="8.0.0.0", "Chromium";v="132.0.6834.160", "Google Chrome";v="132.0.6834.160"', |
|
|
'sec-ch-ua-mobile': '?0', |
|
|
'sec-ch-ua-model': '""', |
|
|
'sec-ch-ua-platform': '"Windows"', |
|
|
'sec-ch-ua-platform-version': '"19.0.0"', |
|
|
'sec-fetch-dest': 'empty', |
|
|
'sec-fetch-mode': 'cors', |
|
|
'sec-fetch-site': 'same-origin', |
|
|
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36', |
|
|
'x-asbd-id': '129477', |
|
|
'x-ig-app-id': '936619743392459', |
|
|
'x-ig-www-claim': '0', |
|
|
'x-requested-with': 'XMLHttpRequest', |
|
|
} |
|
|
|
|
|
params = { |
|
|
'username': username, |
|
|
} |
|
|
|
|
|
response = requests.get('https://www.instagram.com/api/v1/users/web_profile_info/', params=params, headers=headers) |
|
|
serialized_data = response.json() |
|
|
|
|
|
return jsonify(serialized_data) |
|
|
|
|
|
|
|
|
@app.route('/') |
|
|
async def main_route_defaultpage(): |
|
|
message = "Hello, there isn't any docs, so if you see this, you shouldn't be here<br><br>" |
|
|
routes = [f"{rule.methods} {rule}<br>" for rule in app.url_map.iter_rules()] |
|
|
return message + "".join(routes) |
|
|
@app.errorhandler(404) |
|
|
async def handle_not_found(error): |
|
|
return jsonify({"error": True, "errorlog": "Not found"}), 404 |
|
|
|
|
|
@app.errorhandler(Exception) |
|
|
async def handle_error(error): |
|
|
|
|
|
stack_trace = traceback.format_exc() |
|
|
print(error, stack_trace) |
|
|
|
|
|
return jsonify({ |
|
|
"error": True, "errorlog": str(error), |
|
|
"stack_trace": stack_trace |
|
|
}), 500 |
|
|
|
|
|
if __name__ == '__main__': |
|
|
app.run(port=7860) |
|
|
|