saeid1999 commited on
Commit
55e47e9
·
verified ·
1 Parent(s): b575b51

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +85 -288
app.py CHANGED
@@ -1,47 +1,16 @@
1
- from fastapi import FastAPI, HTTPException, Query
2
- from fastapi.responses import FileResponse
3
- from pydantic import BaseModel
4
  from pytubefix import YouTube
5
  from pytubefix.cli import on_progress
 
6
  import os
7
- import csv
8
- import uuid
9
- from datetime import datetime
10
- from typing import List, Optional
11
  import re
12
 
13
- app = FastAPI(title="YouTube Download API", version="1.0.0")
14
 
15
- # Database file
16
- DB_FILE = "download_logs.csv"
17
- DOWNLOAD_DIR = "downloads"
18
-
19
- # Initialize CSV database
20
- def init_database():
21
- """Initialize CSV database if it doesn't exist"""
22
- os.makedirs(DOWNLOAD_DIR, exist_ok=True)
23
-
24
- if not os.path.exists(DB_FILE):
25
- with open(DB_FILE, 'w', newline='', encoding='utf-8') as f:
26
- writer = csv.writer(f)
27
- writer.writerow(['id', 'username', 'youtube_url', 'choice_type', 'quality', 'timestamp', 'status'])
28
-
29
- def log_download(username: str, youtube_url: str, choice_type: str, quality: str, status: str):
30
- """Log download to CSV database"""
31
- download_id = str(uuid.uuid4())
32
- timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
33
-
34
- with open(DB_FILE, 'a', newline='', encoding='utf-8') as f:
35
- writer = csv.writer(f)
36
- writer.writerow([download_id, username, youtube_url, choice_type, quality, timestamp, status])
37
-
38
- return download_id
39
-
40
- def is_youtube_url(url: str) -> bool:
41
- """Validate YouTube URL"""
42
  if not url or not isinstance(url, str):
43
  return False
44
-
45
  youtube_regex = (
46
  r'(https?://)?(www\.)?'
47
  '(youtube|youtu|youtube-nocookie)\.(com|be)/'
@@ -49,267 +18,95 @@ def is_youtube_url(url: str) -> bool:
49
  )
50
  return bool(re.match(youtube_regex, url))
51
 
52
- # Pydantic models
53
- class VideoChoice(BaseModel):
54
- id: str
55
- type: str # "video" or "audio"
56
- quality: str
57
- resolution: Optional[str] = None
58
- fps: Optional[int] = None
59
- file_size: Optional[str] = None
60
- mime_type: str
61
-
62
- class VideoInfoResponse(BaseModel):
63
- title: str
64
- duration: str
65
- thumbnail: str
66
- author: str
67
- views: int
68
- choices: List[VideoChoice]
69
-
70
- class DownloadRequest(BaseModel):
71
- youtube_url: str
72
- choice_id: str
73
- username: str
74
 
75
- class DownloadResponse(BaseModel):
76
- id: str
77
- status: str
78
- message: str
79
- download_url: Optional[str] = None
80
-
81
- # API Endpoints
82
-
83
- @app.get("/")
84
- def read_root():
85
- """Root endpoint with API documentation"""
86
- return {
87
- "message": "YouTube Download API",
88
- "version": "1.0.0",
89
- "endpoints": {
90
- "/get-choices": "GET - Get available download options for a YouTube video",
91
- "/download": "POST - Download video with selected options",
92
- "/logs": "GET - View download logs (admin)",
93
- "/health": "GET - Health check"
94
- }
95
- }
96
-
97
- @app.get("/get-choices", response_model=VideoInfoResponse)
98
- def get_video_choices(url: str = Query(..., description="YouTube video URL")):
99
- """
100
- Get all available download choices for a YouTube video
101
-
102
- Returns:
103
- - Video information (title, duration, thumbnail, etc.)
104
- - Available video qualities (MP4)
105
- - Available audio qualities (MP3)
106
- """
107
-
108
  if not is_youtube_url(url):
109
- raise HTTPException(status_code=400, detail="Invalid YouTube URL")
110
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
111
  try:
112
- yt = YouTube(url, on_progress_callback=on_progress)
113
-
114
- # Get video info
115
- title = yt.title
116
- duration = yt.length
117
- duration_str = f"{duration // 60}:{duration % 60:02d}"
118
- thumbnail = yt.thumbnail_url
119
- author = yt.author
120
- views = yt.views
121
-
122
- choices = []
123
-
124
- # Get video streams (MP4)
125
- video_streams = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc()
126
-
127
- for idx, stream in enumerate(video_streams):
128
- file_size_mb = stream.filesize / (1024 * 1024) if stream.filesize else 0
129
- choices.append(VideoChoice(
130
- id=f"video_{idx}_{stream.itag}",
131
- type="video",
132
- quality=f"{stream.resolution} - {stream.fps}fps",
133
- resolution=stream.resolution,
134
- fps=stream.fps,
135
- file_size=f"{file_size_mb:.2f} MB",
136
- mime_type=stream.mime_type
137
- ))
138
-
139
- # Get audio-only streams (MP3)
140
- audio_streams = yt.streams.filter(only_audio=True).order_by('abr').desc()
141
-
142
- for idx, stream in enumerate(audio_streams):
143
- file_size_mb = stream.filesize / (1024 * 1024) if stream.filesize else 0
144
- choices.append(VideoChoice(
145
- id=f"audio_{idx}_{stream.itag}",
146
- type="audio",
147
- quality=stream.abr if stream.abr else "Unknown",
148
- file_size=f"{file_size_mb:.2f} MB",
149
- mime_type=stream.mime_type
150
- ))
151
-
152
- return VideoInfoResponse(
153
- title=title,
154
- duration=duration_str,
155
- thumbnail=thumbnail,
156
- author=author,
157
- views=views,
158
- choices=choices
159
- )
160
-
161
  except Exception as e:
162
- raise HTTPException(status_code=500, detail=f"Error fetching video info: {str(e)}")
163
 
164
- @app.post("/download", response_model=DownloadResponse)
165
- def download_video(request: DownloadRequest):
166
- """
167
- Download video with selected quality/format
168
-
169
- Body:
170
- - youtube_url: YouTube video URL
171
- - choice_id: Choice ID from /get-choices endpoint
172
- - username: Username for logging
173
- """
174
-
175
- if not is_youtube_url(request.youtube_url):
176
- raise HTTPException(status_code=400, detail="Invalid YouTube URL")
177
-
178
- if not request.username:
179
- raise HTTPException(status_code=400, detail="Username is required")
180
-
181
  try:
182
- yt = YouTube(request.youtube_url, on_progress_callback=on_progress)
183
-
184
- # Parse choice_id to get itag
185
- choice_parts = request.choice_id.split('_')
186
- if len(choice_parts) < 3:
187
- raise HTTPException(status_code=400, detail="Invalid choice_id format")
188
-
189
- choice_type = choice_parts[0] # "video" or "audio"
190
- itag = int(choice_parts[2])
191
-
192
- # Get the stream by itag
193
- stream = yt.streams.get_by_itag(itag)
194
-
 
 
 
 
 
 
 
 
 
 
 
 
195
  if not stream:
196
- raise HTTPException(status_code=404, detail="Selected stream not found")
197
-
198
- # Download the file
199
- output_path = stream.download(output_path=DOWNLOAD_DIR)
200
-
201
- # If audio, convert extension to mp3 (rename)
202
- if choice_type == "audio":
203
- base_name = os.path.splitext(output_path)[0]
204
- new_path = f"{base_name}.mp3"
205
- os.rename(output_path, new_path)
206
- output_path = new_path
207
-
208
- # Log to database
209
- quality = f"{stream.resolution}" if choice_type == "video" else f"{stream.abr}"
210
- download_id = log_download(
211
- username=request.username,
212
- youtube_url=request.youtube_url,
213
- choice_type=choice_type,
214
- quality=quality,
215
- status="success"
216
- )
217
-
218
- # Get filename for download URL
219
- filename = os.path.basename(output_path)
220
-
221
- return DownloadResponse(
222
- id=download_id,
223
- status="success",
224
- message="Download completed successfully",
225
- download_url=f"/download-file/{filename}"
226
- )
227
-
228
  except Exception as e:
229
- # Log failed download
230
- log_download(
231
- username=request.username,
232
- youtube_url=request.youtube_url,
233
- choice_type="unknown",
234
- quality="unknown",
235
- status=f"failed: {str(e)}"
236
- )
237
-
238
- raise HTTPException(status_code=500, detail=f"Download error: {str(e)}")
239
-
240
- @app.get("/download-file/{filename}")
241
- def download_file(filename: str):
242
- """
243
- Download the actual file
244
- """
245
- file_path = os.path.join(DOWNLOAD_DIR, filename)
246
-
247
- if not os.path.exists(file_path):
248
- raise HTTPException(status_code=404, detail="File not found")
249
-
250
- return FileResponse(
251
- path=file_path,
252
- filename=filename,
253
- media_type='application/octet-stream'
254
- )
255
 
256
- @app.get("/logs")
257
- def get_logs(limit: int = Query(100, description="Number of logs to retrieve")):
258
- """
259
- Get download logs from CSV database
260
- """
261
- if not os.path.exists(DB_FILE):
262
- return {"logs": [], "message": "No logs found"}
263
-
264
- logs = []
265
- with open(DB_FILE, 'r', newline='', encoding='utf-8') as f:
266
- reader = csv.DictReader(f)
267
- for row in reader:
268
- logs.append(row)
269
-
270
- # Return last N logs
271
- return {
272
- "total": len(logs),
273
- "logs": logs[-limit:] if len(logs) > limit else logs
274
- }
275
-
276
- @app.get("/logs/user/{username}")
277
- def get_user_logs(username: str):
278
- """
279
- Get download logs for a specific user
280
- """
281
- if not os.path.exists(DB_FILE):
282
- return {"logs": [], "message": "No logs found"}
283
-
284
- logs = []
285
- with open(DB_FILE, 'r', newline='', encoding='utf-8') as f:
286
- reader = csv.DictReader(f)
287
- for row in reader:
288
- if row['username'] == username:
289
- logs.append(row)
290
-
291
- return {
292
- "username": username,
293
- "total": len(logs),
294
- "logs": logs
295
- }
296
-
297
- @app.get("/health")
298
- def health_check():
299
- """Health check endpoint"""
300
- return {
301
- "status": "healthy",
302
- "database": "connected" if os.path.exists(DB_FILE) else "not initialized",
303
- "timestamp": datetime.now().isoformat()
304
- }
305
-
306
- # Initialize database on startup
307
- @app.on_event("startup")
308
- def startup_event():
309
- init_database()
310
- print("✅ Database initialized")
311
- print("✅ YouTube Download API is ready!")
312
 
313
  if __name__ == "__main__":
314
  import uvicorn
315
- uvicorn.run(app, host="0.0.0.0", port=7860)
 
1
+ from fastapi import FastAPI, Query
2
+ from fastapi.responses import FileResponse, JSONResponse
 
3
  from pytubefix import YouTube
4
  from pytubefix.cli import on_progress
5
+ from moviepy.editor import AudioFileClip
6
  import os
 
 
 
 
7
  import re
8
 
9
+ app = FastAPI(title="YouTube Downloader API")
10
 
11
+ def is_youtube_url(url):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  if not url or not isinstance(url, str):
13
  return False
 
14
  youtube_regex = (
15
  r'(https?://)?(www\.)?'
16
  '(youtube|youtu|youtube-nocookie)\.(com|be)/'
 
18
  )
19
  return bool(re.match(youtube_regex, url))
20
 
21
+ def estimate_compressed_size(original_size_mb, bitrate_factor=0.6):
22
+ if not original_size_mb:
23
+ return "?"
24
+ compressed = original_size_mb * bitrate_factor
25
+ return f"{round(compressed, 2)} MB (est.)"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
 
27
+ def get_download_options(url):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
  if not is_youtube_url(url):
29
+ return None, "Invalid YouTube URL"
30
+ yt = YouTube(url, on_progress_callback=on_progress)
31
+ streams = yt.streams.order_by('resolution').desc()
32
+ choices = []
33
+ for s in streams:
34
+ res = s.resolution or "Audio only"
35
+ type_ = "video+audio" if s.is_progressive else ("video only" if s.includes_video_track else "audio only")
36
+ ext = s.mime_type.split("/")[-1]
37
+ size_mb = round(s.filesize / 1048576, 2) if s.filesize else None
38
+ est_size = estimate_compressed_size(size_mb)
39
+ choices.append({
40
+ "label": f"{res} | {type_} | {ext} | {est_size}",
41
+ "resolution": res,
42
+ "type": type_,
43
+ "extension": ext
44
+ })
45
+ choices.append({"label": "Convert to MP3 – High Quality (320kbps)", "type": "mp3_high"})
46
+ choices.append({"label": "Convert to MP3 – Medium Quality (192kbps)", "type": "mp3_medium"})
47
+ choices.append({"label": "Convert to MP3 – Low Quality (128kbps)", "type": "mp3_low"})
48
+ return choices, yt.title
49
+
50
+ def convert_to_mp3(temp_path, title, quality):
51
+ output_dir = "downloads"
52
+ os.makedirs(output_dir, exist_ok=True)
53
+ mp3_path = os.path.join(output_dir, f"{title} ({quality}).mp3")
54
+ clip = AudioFileClip(temp_path)
55
+ bitrate = {"High": "320k", "Medium": "192k", "Low": "128k"}[quality]
56
+ clip.write_audiofile(mp3_path, bitrate=bitrate)
57
+ clip.close()
58
+ os.remove(temp_path)
59
+ return mp3_path
60
+
61
+ @app.get("/get_choices")
62
+ def api_get_choices(url: str = Query(..., description="YouTube video URL")):
63
  try:
64
+ choices, title = get_download_options(url)
65
+ if not choices:
66
+ return JSONResponse(status_code=400, content={"error": title})
67
+ return {"title": title, "choices": choices}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
  except Exception as e:
69
+ return JSONResponse(status_code=500, content={"error": str(e)})
70
 
71
+ @app.get("/download")
72
+ def api_download(url: str = Query(...), choice: str = Query(...)):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  try:
74
+ if not is_youtube_url(url):
75
+ return JSONResponse(status_code=400, content={"error": "Invalid YouTube URL"})
76
+
77
+ yt = YouTube(url, on_progress_callback=on_progress)
78
+ output_dir = "downloads"
79
+ os.makedirs(output_dir, exist_ok=True)
80
+
81
+ if "mp3" in choice.lower():
82
+ stream = yt.streams.filter(only_audio=True).first()
83
+ if not stream:
84
+ return JSONResponse(status_code=400, content={"error": "No audio stream available"})
85
+ temp_path = stream.download(output_path=output_dir, filename="temp.mp4")
86
+ if "high" in choice.lower():
87
+ mp3_path = convert_to_mp3(temp_path, yt.title, "High")
88
+ elif "medium" in choice.lower():
89
+ mp3_path = convert_to_mp3(temp_path, yt.title, "Medium")
90
+ else:
91
+ mp3_path = convert_to_mp3(temp_path, yt.title, "Low")
92
+ return FileResponse(mp3_path, filename=os.path.basename(mp3_path))
93
+
94
+ res = choice.split(" | ")[0]
95
+ ext = choice.split(" | ")[2]
96
+ stream = yt.streams.filter(res=res, mime_type=f"video/{ext}").first()
97
+ if not stream:
98
+ stream = yt.streams.filter(res=res).first()
99
  if not stream:
100
+ return JSONResponse(status_code=400, content={"error": "Selected quality not available"})
101
+ output_path = stream.download(output_path=output_dir)
102
+ return FileResponse(output_path, filename=os.path.basename(output_path))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
103
  except Exception as e:
104
+ return JSONResponse(status_code=500, content={"error": str(e)})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
105
 
106
+ @app.get("/")
107
+ def home():
108
+ return {"message": "YouTube Downloader API running! Use /get_choices and /download endpoints."}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
109
 
110
  if __name__ == "__main__":
111
  import uvicorn
112
+ uvicorn.run("app:app", host="0.0.0.0", port=7860)