demoss / src /engine /instagram.py
nothere990's picture
update
f993cdc
#!/usr/bin/env python3
# coding: utf-8
# ytdlbot - instagram.py
import time
import pathlib
import re
import filetype
import requests
from engine.base import BaseDownloader
class InstagramDownload(BaseDownloader):
def extract_code(self):
patterns = [
# Instagram stories highlights
r"/stories/highlights/([a-zA-Z0-9_-]+)/",
# Posts
r"/p/([a-zA-Z0-9_-]+)/",
# Reels
r"/reel/([a-zA-Z0-9_-]+)/",
# TV
r"/tv/([a-zA-Z0-9_-]+)/",
# Threads post (both with @username and without)
r"(?:https?://)?(?:www\.)?(?:threads\.net)(?:/[@\w.]+)?(?:/post)?/([\w-]+)(?:/?\?.*)?$",
]
for pattern in patterns:
match = re.search(pattern, self._url)
if match:
if pattern == patterns[0]: # Check if it's the stories highlights pattern
# Return the URL as it is
return self._url
else:
# Return the code part (first group)
return match.group(1)
return None
def _setup_formats(self) -> list | None:
pass
def _download(self, formats=None):
try:
resp = requests.get(f"http://instagram:15000/?url={self._url}").json()
except Exception as e:
self._bot_msg.edit_text(f"Download failed!❌\n\n`{e}`")
pass
code = self.extract_code()
counter = 1
video_paths = []
found_media_types = set()
if url_results := resp.get("data"):
for media in url_results:
link = media["link"]
media_type = media["type"]
if media_type == "image":
ext = "jpg"
found_media_types.add("photo")
elif media_type == "video":
ext = "mp4"
found_media_types.add("video")
else:
continue
try:
req = requests.get(link, stream=True)
length = int(req.headers.get("content-length", 0) or req.headers.get("x-full-image-content-length", 0))
filename = f"Instagram_{code}-{counter}"
save_path = pathlib.Path(self._tempdir.name, filename)
chunk_size = 8192
downloaded = 0
start_time = time.time()
with open(save_path, "wb") as fp:
for chunk in req.iter_content(chunk_size):
if chunk:
downloaded += len(chunk)
fp.write(chunk)
elapsed_time = time.time() - start_time
if elapsed_time > 0:
speed = downloaded / elapsed_time # bytes per second
if speed >= 1024 * 1024: # MB/s
speed_str = f"{speed / (1024 * 1024):.2f}MB/s"
elif speed >= 1024: # KB/s
speed_str = f"{speed / 1024:.2f}KB/s"
else: # B/s
speed_str = f"{speed:.2f}B/s"
if length > 0:
eta_seconds = (length - downloaded) / speed
if eta_seconds >= 3600:
eta_str = f"{eta_seconds / 3600:.1f}h"
elif eta_seconds >= 60:
eta_str = f"{eta_seconds / 60:.1f}m"
else:
eta_str = f"{eta_seconds:.0f}s"
else:
eta_str = "N/A"
else:
speed_str = "N/A"
eta_str = "N/A"
# dictionary for calling the download_hook
d = {
"status": "downloading",
"downloaded_bytes": downloaded,
"total_bytes": length,
"_speed_str": speed_str,
"_eta_str": eta_str
}
self.download_hook(d)
if ext := filetype.guess_extension(save_path):
new_path = save_path.with_suffix(f".{ext}")
save_path.rename(new_path)
save_path = new_path
video_paths.append(str(save_path))
counter += 1
except Exception as e:
self._bot_msg.edit_text(f"Download failed!❌\n\n`{e}`")
return []
if "video" in found_media_types:
self._format = "video"
elif "photo" in found_media_types:
self._format = "photo"
else:
self._format = "document"
return video_paths
def _start(self):
downloaded_files = self._download()
self._upload(files=downloaded_files)