Empty / threads_util /main.py
r.go
test
9770efd
import requests
import json
import re
from urllib.parse import urlencode
class Threads:
API_URL = "https://www.threads.net/api/graphql"
TOKEN_URL = "https://www.threads.net/@instagram"
MAX_REDIRECTS = 20
DOC_IDS = {
"getUser": "23996318473300828",
"getUserThreads": "6232751443445612",
"getUserReplies": "6307072669391286",
"getPost": "5587632691339264",
"getPostLikers": "9360915773983802"
}
def __init__(self):
self.token = self.get_token()
self.default_headers = {
"Authority": "www.threads.net",
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.9",
"Cache-Control": "no-cache",
"Content-Type": "application/x-www-form-urlencoded",
"Connection": "keep-alive",
"Origin": "https://www.threads.net",
"Pragma": "no-cache",
"Sec-Fetch-Site": "same-origin",
"User-Agent": "python-requests",
"X-ASBD-ID": "129477",
"X-IG-App-ID": "238260118697367",
"X-FB-LSD": self.token
}
def get_token(self):
session = requests.Session()
session.max_redirects = self.MAX_REDIRECTS
response = session.get(self.TOKEN_URL, headers={"User-Agent": "python-requests"})
response.raise_for_status()
match = re.search(r'"token":"(\w+?)"', response.text)
if not match:
raise ValueError("Token not found in response")
return match.group(1)
def post_request(self, variables, doc_id, extra_headers=None):
headers = self.default_headers.copy()
if extra_headers:
headers.update(extra_headers)
data = {
"lsd": self.token,
"variables": json.dumps(variables),
"doc_id": doc_id
}
response = requests.post(self.API_URL, headers=headers, data=urlencode(data))
response.raise_for_status()
return response.json()
def get_post(self, post_id):
return self.post_request({"postID": post_id}, self.DOC_IDS["getPost"], {"X-FB-Friendly-Name": "BarcelonaPostPageQuery"})
def get_post_likers(self, media_id):
return self.post_request({"mediaID": media_id}, self.DOC_IDS["getPostLikers"])
def get_user(self, user_id):
return self.post_request({"userID": user_id}, self.DOC_IDS["getUser"], {"X-FB-Friendly-Name": "BarcelonaProfileRootQuery"})
def get_user_threads(self, user_id):
return self.post_request({"userID": user_id}, self.DOC_IDS["getUserThreads"], {"X-FB-Friendly-Name": "BarcelonaProfileThreadsTabQuery"})
def get_user_replies(self, user_id):
return self.post_request({"userID": user_id}, self.DOC_IDS["getUserReplies"], {"X-FB-Friendly-Name": "BarcelonaProfileRepliesTabQuery"})
def get_user_id(self, username):
url = f"https://www.threads.net/@{username}"
headers = self.default_headers.copy()
headers.update({
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Referer": url,
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "cross-site",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1"
})
headers.pop("X-ASBD-ID", None)
headers.pop("X-FB-LSD", None)
headers.pop("X-IG-App-ID", None)
response = requests.get(url, headers=headers)
response.raise_for_status()
match = re.search(r'"user_id":"(\d+)"', response.text)
if not match:
raise ValueError("User ID not found in response")
return int(match.group(1))