| import os |
| import time |
| import requests |
| from typing import Optional |
| from bot.logger import get_logger |
|
|
| logger = get_logger("uploader") |
| COOKIE_FILE = "cookies/cookies.txt" |
| HASHTAGS = "#movierecap #myanmar #fyp #viral #trending" |
|
|
|
|
| def _parse_sessionid(cookie_file: str) -> Optional[str]: |
| """cookies.txt ဖိုင်ထဲမှ sessionid ကို ဖတ်ယူသည်""" |
| try: |
| with open(cookie_file, "r", encoding="utf-8") as f: |
| for line in f: |
| line = line.strip() |
| if not line or line.startswith("#"): |
| continue |
| parts = line.split("\t") |
| if len(parts) >= 7 and parts[5] == "sessionid": |
| return parts[6] |
| |
| if "sessionid" in line: |
| for part in line.split(): |
| if part.startswith("sessionid="): |
| return part.split("=", 1)[1].strip(";") |
| logger.error("sessionid not found in cookies.txt") |
| return None |
| except FileNotFoundError: |
| logger.error(f"Cookie file not found: {cookie_file}") |
| return None |
| except Exception as e: |
| logger.error(f"Cookie parse error: {e}") |
| return None |
|
|
|
|
| def upload_to_tiktok(video: dict, file_path: str) -> bool: |
| """TikTok API (Upload endpoint) ကို ခေါ်ပြီး video တင်သည်""" |
| sessionid = _parse_sessionid(COOKIE_FILE) |
| if not sessionid: |
| logger.error("Cannot upload: no sessionid") |
| return False |
|
|
| title = video["title"] |
| caption = f"{title}\n{HASHTAGS}" |
|
|
| headers = { |
| "Cookie": f"sessionid={sessionid}", |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", |
| "Referer": "https://www.tiktok.com/", |
| } |
|
|
| |
| init_url = "https://open.tiktokapis.com/v2/post/publish/inbox/video/init/" |
| file_size = os.path.getsize(file_path) |
|
|
| init_payload = { |
| "source_info": { |
| "source": "FILE_UPLOAD", |
| "video_size": file_size, |
| "chunk_size": file_size, |
| "total_chunk_count": 1, |
| } |
| } |
|
|
| try: |
| resp = requests.post(init_url, json=init_payload, headers=headers, timeout=30) |
| resp_data = resp.json() |
| logger.debug(f"TikTok init response: {resp_data}") |
|
|
| if resp_data.get("error", {}).get("code") != "ok": |
| logger.error(f"TikTok init failed: {resp_data}") |
| return False |
|
|
| upload_url = resp_data["data"]["upload_url"] |
| publish_id = resp_data["data"]["publish_id"] |
|
|
| |
| with open(file_path, "rb") as vf: |
| video_bytes = vf.read() |
|
|
| upload_headers = { |
| **headers, |
| "Content-Type": "video/mp4", |
| "Content-Length": str(file_size), |
| "Content-Range": f"bytes 0-{file_size-1}/{file_size}", |
| } |
| up_resp = requests.put(upload_url, data=video_bytes, |
| headers=upload_headers, timeout=300) |
| if up_resp.status_code not in (200, 201, 206): |
| logger.error(f"Upload PUT failed: {up_resp.status_code} {up_resp.text}") |
| return False |
|
|
| |
| pub_url = "https://open.tiktokapis.com/v2/post/publish/video/init/" |
| pub_payload = { |
| "post_info": { |
| "title": caption[:2200], |
| "privacy_level": "PUBLIC_TO_EVERYONE", |
| "disable_duet": False, |
| "disable_comment": False, |
| "disable_stitch": False, |
| }, |
| "source_info": { |
| "source": "PULL_FROM_URL", |
| "video_url": upload_url, |
| } |
| } |
| pub_resp = requests.post(pub_url, json=pub_payload, headers=headers, timeout=30) |
| pub_data = pub_resp.json() |
| logger.debug(f"TikTok publish response: {pub_data}") |
|
|
| if pub_data.get("error", {}).get("code") == "ok": |
| logger.info(f"TikTok upload SUCCESS: '{title}' (publish_id={publish_id})") |
| return True |
| else: |
| logger.error(f"TikTok publish failed: {pub_data}") |
| return False |
|
|
| except Exception as e: |
| logger.error(f"TikTok upload exception: {e}") |
| return False |
|
|