File size: 5,520 Bytes
e99a4e1
 
f993cdc
e99a4e1
f993cdc
c67f649
e99a4e1
 
f993cdc
e99a4e1
 
c67f649
f993cdc
 
e99a4e1
 
 
 
 
 
 
 
 
 
 
 
 
 
f993cdc
e99a4e1
 
 
 
 
 
 
 
 
f993cdc
e99a4e1
f993cdc
e99a4e1
 
f993cdc
c67f649
 
 
 
 
 
f993cdc
e99a4e1
 
 
f993cdc
 
e99a4e1
f993cdc
 
 
 
 
 
 
 
 
 
 
 
 
c67f649
 
 
 
 
 
 
 
f993cdc
c67f649
 
 
 
 
f993cdc
c67f649
 
 
f993cdc
c67f649
 
 
 
 
 
f993cdc
c67f649
 
 
 
 
 
 
 
 
 
 
 
 
f993cdc
c67f649
 
 
 
 
 
 
 
f993cdc
c67f649
f993cdc
c67f649
 
 
 
f993cdc
 
c67f649
f993cdc
c67f649
 
 
f993cdc
 
 
 
 
 
 
 
e99a4e1
f993cdc
b576cce
f993cdc
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/env python3
# coding: utf-8
 
# ytdlbot - instagram.py
 
import time
import pathlib
import re
 
import filetype
import requests
from engine.base import BaseDownloader
 
 
class InstagramDownload(BaseDownloader):
    def extract_code(self):
        patterns = [
            # Instagram stories highlights
            r"/stories/highlights/([a-zA-Z0-9_-]+)/",
            # Posts
            r"/p/([a-zA-Z0-9_-]+)/",
            # Reels
            r"/reel/([a-zA-Z0-9_-]+)/",
            # TV
            r"/tv/([a-zA-Z0-9_-]+)/",
            # Threads post (both with @username and without)
            r"(?:https?://)?(?:www\.)?(?:threads\.net)(?:/[@\w.]+)?(?:/post)?/([\w-]+)(?:/?\?.*)?$",
        ]
 
        for pattern in patterns:
            match = re.search(pattern, self._url)
            if match:
                if pattern == patterns[0]:  # Check if it's the stories highlights pattern
                    # Return the URL as it is
                    return self._url
                else:
                    # Return the code part (first group)
                    return match.group(1)
 
        return None
 
    def _setup_formats(self) -> list | None:
        pass
 
    def _download(self, formats=None):
        try:
            resp = requests.get(f"http://instagram:15000/?url={self._url}").json()
        except Exception as e:
            self._bot_msg.edit_text(f"Download failed!❌\n\n`{e}`")
            pass
 
        code = self.extract_code()
        counter = 1
        video_paths = []
        found_media_types = set()
 
        if url_results := resp.get("data"):
            for media in url_results:
                link = media["link"]
                media_type = media["type"]
 
                if media_type == "image":
                    ext = "jpg"
                    found_media_types.add("photo")
                elif media_type == "video":
                    ext = "mp4"
                    found_media_types.add("video")
                else:
                    continue
 
                try:
                    req = requests.get(link, stream=True)
                    length = int(req.headers.get("content-length", 0) or req.headers.get("x-full-image-content-length", 0))
                    filename = f"Instagram_{code}-{counter}"
                    save_path = pathlib.Path(self._tempdir.name, filename)
                    chunk_size = 8192
                    downloaded = 0
                    start_time = time.time()
 
                    with open(save_path, "wb") as fp:
                        for chunk in req.iter_content(chunk_size):
                            if chunk:
                                downloaded += len(chunk)
                                fp.write(chunk)
 
                                elapsed_time = time.time() - start_time
                                if elapsed_time > 0:
                                    speed = downloaded / elapsed_time  # bytes per second
 
                                    if speed >= 1024 * 1024:  # MB/s
                                        speed_str = f"{speed / (1024 * 1024):.2f}MB/s"
                                    elif speed >= 1024:  # KB/s
                                        speed_str = f"{speed / 1024:.2f}KB/s"
                                    else:  # B/s
                                        speed_str = f"{speed:.2f}B/s"
 
                                    if length > 0:
                                        eta_seconds = (length - downloaded) / speed
                                        if eta_seconds >= 3600:
                                            eta_str = f"{eta_seconds / 3600:.1f}h"
                                        elif eta_seconds >= 60:
                                            eta_str = f"{eta_seconds / 60:.1f}m"
                                        else:
                                            eta_str = f"{eta_seconds:.0f}s"
                                    else:
                                        eta_str = "N/A"
                                else:
                                    speed_str = "N/A"
                                    eta_str = "N/A"
 
                                # dictionary for calling the download_hook
                                d = {
                                    "status": "downloading",
                                    "downloaded_bytes": downloaded,
                                    "total_bytes": length,
                                    "_speed_str": speed_str,
                                    "_eta_str": eta_str
                                }
 
                                self.download_hook(d)
 
                    if ext := filetype.guess_extension(save_path):
                        new_path = save_path.with_suffix(f".{ext}")
                        save_path.rename(new_path)
                        save_path = new_path
 
                    video_paths.append(str(save_path))
                    counter += 1
 
                except Exception as e:
                    self._bot_msg.edit_text(f"Download failed!❌\n\n`{e}`")
                    return []
 
        if "video" in found_media_types:
            self._format = "video"
        elif "photo" in found_media_types:
            self._format = "photo"
        else:
            self._format = "document"
 
        return video_paths
 
    def _start(self):
        downloaded_files = self._download()
        self._upload(files=downloaded_files)