Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| # coding: utf-8 | |
| # ytdlbot - instagram.py | |
| import time | |
| import pathlib | |
| import re | |
| import filetype | |
| import requests | |
| from engine.base import BaseDownloader | |
| class InstagramDownload(BaseDownloader): | |
| def extract_code(self): | |
| patterns = [ | |
| # Instagram stories highlights | |
| r"/stories/highlights/([a-zA-Z0-9_-]+)/", | |
| # Posts | |
| r"/p/([a-zA-Z0-9_-]+)/", | |
| # Reels | |
| r"/reel/([a-zA-Z0-9_-]+)/", | |
| # TV | |
| r"/tv/([a-zA-Z0-9_-]+)/", | |
| # Threads post (both with @username and without) | |
| r"(?:https?://)?(?:www\.)?(?:threads\.net)(?:/[@\w.]+)?(?:/post)?/([\w-]+)(?:/?\?.*)?$", | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, self._url) | |
| if match: | |
| if pattern == patterns[0]: # Check if it's the stories highlights pattern | |
| # Return the URL as it is | |
| return self._url | |
| else: | |
| # Return the code part (first group) | |
| return match.group(1) | |
| return None | |
| def _setup_formats(self) -> list | None: | |
| pass | |
| def _download(self, formats=None): | |
| try: | |
| resp = requests.get(f"http://instagram:15000/?url={self._url}").json() | |
| except Exception as e: | |
| self._bot_msg.edit_text(f"Download failed!❌\n\n`{e}`") | |
| pass | |
| code = self.extract_code() | |
| counter = 1 | |
| video_paths = [] | |
| found_media_types = set() | |
| if url_results := resp.get("data"): | |
| for media in url_results: | |
| link = media["link"] | |
| media_type = media["type"] | |
| if media_type == "image": | |
| ext = "jpg" | |
| found_media_types.add("photo") | |
| elif media_type == "video": | |
| ext = "mp4" | |
| found_media_types.add("video") | |
| else: | |
| continue | |
| try: | |
| req = requests.get(link, stream=True) | |
| length = int(req.headers.get("content-length", 0) or req.headers.get("x-full-image-content-length", 0)) | |
| filename = f"Instagram_{code}-{counter}" | |
| save_path = pathlib.Path(self._tempdir.name, filename) | |
| chunk_size = 8192 | |
| downloaded = 0 | |
| start_time = time.time() | |
| with open(save_path, "wb") as fp: | |
| for chunk in req.iter_content(chunk_size): | |
| if chunk: | |
| downloaded += len(chunk) | |
| fp.write(chunk) | |
| elapsed_time = time.time() - start_time | |
| if elapsed_time > 0: | |
| speed = downloaded / elapsed_time # bytes per second | |
| if speed >= 1024 * 1024: # MB/s | |
| speed_str = f"{speed / (1024 * 1024):.2f}MB/s" | |
| elif speed >= 1024: # KB/s | |
| speed_str = f"{speed / 1024:.2f}KB/s" | |
| else: # B/s | |
| speed_str = f"{speed:.2f}B/s" | |
| if length > 0: | |
| eta_seconds = (length - downloaded) / speed | |
| if eta_seconds >= 3600: | |
| eta_str = f"{eta_seconds / 3600:.1f}h" | |
| elif eta_seconds >= 60: | |
| eta_str = f"{eta_seconds / 60:.1f}m" | |
| else: | |
| eta_str = f"{eta_seconds:.0f}s" | |
| else: | |
| eta_str = "N/A" | |
| else: | |
| speed_str = "N/A" | |
| eta_str = "N/A" | |
| # dictionary for calling the download_hook | |
| d = { | |
| "status": "downloading", | |
| "downloaded_bytes": downloaded, | |
| "total_bytes": length, | |
| "_speed_str": speed_str, | |
| "_eta_str": eta_str | |
| } | |
| self.download_hook(d) | |
| if ext := filetype.guess_extension(save_path): | |
| new_path = save_path.with_suffix(f".{ext}") | |
| save_path.rename(new_path) | |
| save_path = new_path | |
| video_paths.append(str(save_path)) | |
| counter += 1 | |
| except Exception as e: | |
| self._bot_msg.edit_text(f"Download failed!❌\n\n`{e}`") | |
| return [] | |
| if "video" in found_media_types: | |
| self._format = "video" | |
| elif "photo" in found_media_types: | |
| self._format = "photo" | |
| else: | |
| self._format = "document" | |
| return video_paths | |
| def _start(self): | |
| downloaded_files = self._download() | |
| self._upload(files=downloaded_files) |