ttup / uploader.py
Phoe2004's picture
Upload 8 files
9b2524c verified
import os
import time
import requests
from typing import Optional
from bot.logger import get_logger
logger = get_logger("uploader")
COOKIE_FILE = "cookies/cookies.txt"
HASHTAGS = "#movierecap #myanmar #fyp #viral #trending"
def _parse_sessionid(cookie_file: str) -> Optional[str]:
"""cookies.txt ဖိုင်ထဲမှ sessionid ကို ဖတ်ယူသည်"""
try:
with open(cookie_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) >= 7 and parts[5] == "sessionid":
return parts[6]
# Netscape format variation
if "sessionid" in line:
for part in line.split():
if part.startswith("sessionid="):
return part.split("=", 1)[1].strip(";")
logger.error("sessionid not found in cookies.txt")
return None
except FileNotFoundError:
logger.error(f"Cookie file not found: {cookie_file}")
return None
except Exception as e:
logger.error(f"Cookie parse error: {e}")
return None
def upload_to_tiktok(video: dict, file_path: str) -> bool:
"""TikTok API (Upload endpoint) ကို ခေါ်ပြီး video တင်သည်"""
sessionid = _parse_sessionid(COOKIE_FILE)
if not sessionid:
logger.error("Cannot upload: no sessionid")
return False
title = video["title"]
caption = f"{title}\n{HASHTAGS}"
headers = {
"Cookie": f"sessionid={sessionid}",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Referer": "https://www.tiktok.com/",
}
# Step 1: Init upload
init_url = "https://open.tiktokapis.com/v2/post/publish/inbox/video/init/"
file_size = os.path.getsize(file_path)
init_payload = {
"source_info": {
"source": "FILE_UPLOAD",
"video_size": file_size,
"chunk_size": file_size,
"total_chunk_count": 1,
}
}
try:
resp = requests.post(init_url, json=init_payload, headers=headers, timeout=30)
resp_data = resp.json()
logger.debug(f"TikTok init response: {resp_data}")
if resp_data.get("error", {}).get("code") != "ok":
logger.error(f"TikTok init failed: {resp_data}")
return False
upload_url = resp_data["data"]["upload_url"]
publish_id = resp_data["data"]["publish_id"]
# Step 2: Upload video binary
with open(file_path, "rb") as vf:
video_bytes = vf.read()
upload_headers = {
**headers,
"Content-Type": "video/mp4",
"Content-Length": str(file_size),
"Content-Range": f"bytes 0-{file_size-1}/{file_size}",
}
up_resp = requests.put(upload_url, data=video_bytes,
headers=upload_headers, timeout=300)
if up_resp.status_code not in (200, 201, 206):
logger.error(f"Upload PUT failed: {up_resp.status_code} {up_resp.text}")
return False
# Step 3: Publish
pub_url = "https://open.tiktokapis.com/v2/post/publish/video/init/"
pub_payload = {
"post_info": {
"title": caption[:2200],
"privacy_level": "PUBLIC_TO_EVERYONE",
"disable_duet": False,
"disable_comment": False,
"disable_stitch": False,
},
"source_info": {
"source": "PULL_FROM_URL",
"video_url": upload_url,
}
}
pub_resp = requests.post(pub_url, json=pub_payload, headers=headers, timeout=30)
pub_data = pub_resp.json()
logger.debug(f"TikTok publish response: {pub_data}")
if pub_data.get("error", {}).get("code") == "ok":
logger.info(f"TikTok upload SUCCESS: '{title}' (publish_id={publish_id})")
return True
else:
logger.error(f"TikTok publish failed: {pub_data}")
return False
except Exception as e:
logger.error(f"TikTok upload exception: {e}")
return False