Spaces:
Paused
Paused
File size: 5,520 Bytes
e99a4e1 f993cdc e99a4e1 f993cdc c67f649 e99a4e1 f993cdc e99a4e1 c67f649 f993cdc e99a4e1 f993cdc e99a4e1 f993cdc e99a4e1 f993cdc e99a4e1 f993cdc c67f649 f993cdc e99a4e1 f993cdc e99a4e1 f993cdc c67f649 f993cdc c67f649 f993cdc c67f649 f993cdc c67f649 f993cdc c67f649 f993cdc c67f649 f993cdc c67f649 f993cdc c67f649 f993cdc c67f649 f993cdc c67f649 f993cdc e99a4e1 f993cdc b576cce f993cdc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | #!/usr/bin/env python3
# coding: utf-8
# ytdlbot - instagram.py
import time
import pathlib
import re
import filetype
import requests
from engine.base import BaseDownloader
class InstagramDownload(BaseDownloader):
def extract_code(self):
patterns = [
# Instagram stories highlights
r"/stories/highlights/([a-zA-Z0-9_-]+)/",
# Posts
r"/p/([a-zA-Z0-9_-]+)/",
# Reels
r"/reel/([a-zA-Z0-9_-]+)/",
# TV
r"/tv/([a-zA-Z0-9_-]+)/",
# Threads post (both with @username and without)
r"(?:https?://)?(?:www\.)?(?:threads\.net)(?:/[@\w.]+)?(?:/post)?/([\w-]+)(?:/?\?.*)?$",
]
for pattern in patterns:
match = re.search(pattern, self._url)
if match:
if pattern == patterns[0]: # Check if it's the stories highlights pattern
# Return the URL as it is
return self._url
else:
# Return the code part (first group)
return match.group(1)
return None
def _setup_formats(self) -> list | None:
pass
def _download(self, formats=None):
try:
resp = requests.get(f"http://instagram:15000/?url={self._url}").json()
except Exception as e:
self._bot_msg.edit_text(f"Download failed!❌\n\n`{e}`")
pass
code = self.extract_code()
counter = 1
video_paths = []
found_media_types = set()
if url_results := resp.get("data"):
for media in url_results:
link = media["link"]
media_type = media["type"]
if media_type == "image":
ext = "jpg"
found_media_types.add("photo")
elif media_type == "video":
ext = "mp4"
found_media_types.add("video")
else:
continue
try:
req = requests.get(link, stream=True)
length = int(req.headers.get("content-length", 0) or req.headers.get("x-full-image-content-length", 0))
filename = f"Instagram_{code}-{counter}"
save_path = pathlib.Path(self._tempdir.name, filename)
chunk_size = 8192
downloaded = 0
start_time = time.time()
with open(save_path, "wb") as fp:
for chunk in req.iter_content(chunk_size):
if chunk:
downloaded += len(chunk)
fp.write(chunk)
elapsed_time = time.time() - start_time
if elapsed_time > 0:
speed = downloaded / elapsed_time # bytes per second
if speed >= 1024 * 1024: # MB/s
speed_str = f"{speed / (1024 * 1024):.2f}MB/s"
elif speed >= 1024: # KB/s
speed_str = f"{speed / 1024:.2f}KB/s"
else: # B/s
speed_str = f"{speed:.2f}B/s"
if length > 0:
eta_seconds = (length - downloaded) / speed
if eta_seconds >= 3600:
eta_str = f"{eta_seconds / 3600:.1f}h"
elif eta_seconds >= 60:
eta_str = f"{eta_seconds / 60:.1f}m"
else:
eta_str = f"{eta_seconds:.0f}s"
else:
eta_str = "N/A"
else:
speed_str = "N/A"
eta_str = "N/A"
# dictionary for calling the download_hook
d = {
"status": "downloading",
"downloaded_bytes": downloaded,
"total_bytes": length,
"_speed_str": speed_str,
"_eta_str": eta_str
}
self.download_hook(d)
if ext := filetype.guess_extension(save_path):
new_path = save_path.with_suffix(f".{ext}")
save_path.rename(new_path)
save_path = new_path
video_paths.append(str(save_path))
counter += 1
except Exception as e:
self._bot_msg.edit_text(f"Download failed!❌\n\n`{e}`")
return []
if "video" in found_media_types:
self._format = "video"
elif "photo" in found_media_types:
self._format = "photo"
else:
self._format = "document"
return video_paths
def _start(self):
downloaded_files = self._download()
self._upload(files=downloaded_files) |