saeid1999 commited on
Commit
70564c8
·
verified ·
1 Parent(s): 0838a22

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +280 -86
app.py CHANGED
@@ -1,11 +1,44 @@
1
- import gradio as gr
 
 
2
  from pytubefix import YouTube
3
  from pytubefix.cli import on_progress
4
  import os
 
 
 
 
5
  import re
6
 
7
- def is_youtube_url(url):
8
- """Check if the provided string is a valid YouTube URL"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
  if not url or not isinstance(url, str):
10
  return False
11
 
@@ -16,106 +49,267 @@ def is_youtube_url(url):
16
  )
17
  return bool(re.match(youtube_regex, url))
18
 
19
- def download_youtube_video(url):
20
- """Download YouTube video and return the file path"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
 
22
- if not url or not url.strip():
23
- return None, "Please provide a YouTube URL"
 
 
 
24
 
25
  if not is_youtube_url(url):
26
- return None, "Invalid YouTube URL. Please provide a valid YouTube link."
27
 
28
  try:
29
- # Create output directory
30
- output_dir = "downloads"
31
- os.makedirs(output_dir, exist_ok=True)
32
-
33
- # Create YouTube object
34
  yt = YouTube(url, on_progress_callback=on_progress)
35
 
36
  # Get video info
37
- video_title = yt.title
38
  duration = yt.length
39
- duration_str = f"{duration // 60}:{duration % 60:02d}" if duration else "Unknown"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
 
41
- # Get highest resolution progressive stream (video + audio combined)
42
- stream = yt.streams.get_highest_resolution()
 
 
 
 
 
 
 
 
43
 
44
  if not stream:
45
- return None, " No suitable stream found for this video"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46
 
47
- # Download the video
48
- output_path = stream.download(output_path=output_dir)
 
 
 
 
 
 
 
49
 
50
- if os.path.exists(output_path):
51
- success_msg = f"✅ Downloaded Successfully!\n\nTitle: {video_title}\nDuration: {duration_str}\nResolution: {stream.resolution}"
52
- return output_path, success_msg
53
- else:
54
- return None, "❌ Error: Video file not found after download"
55
-
56
  except Exception as e:
57
- error_msg = str(e)
58
- if "unavailable" in error_msg.lower():
59
- return None, "❌ This video is unavailable or private"
60
- elif "regex" in error_msg.lower():
61
- return None, "❌ YouTube changed their format. Please try updating pytubefix"
62
- else:
63
- return None, f"❌ Error: {error_msg}"
64
-
65
- def chat_youtube(message, history):
66
- """Process chat messages and download YouTube videos"""
67
-
68
- # Handle None or empty messages
69
- if not message:
70
- return "Please send me a YouTube video URL!"
71
-
72
- # Convert message to string
73
- message_text = str(message).strip()
74
-
75
- # Check if message contains a YouTube URL
76
- if is_youtube_url(message_text):
77
- video_path, status_msg = download_youtube_video(message_text)
78
- return status_msg
79
- else:
80
- # Regular chat response
81
- return """👋 Hi! I'm your YouTube Video Downloader assistant.
82
-
83
- **How to use:**
84
- 1. Send me a YouTube video URL
85
- 2. I'll download the video for you
86
- 3. You can then save it to your device
87
-
88
- **Example URLs:**
89
- - https://www.youtube.com/watch?v=jNQXAC9IVRw
90
- - https://youtu.be/jNQXAC9IVRw
91
-
92
- Just paste any YouTube link to get started!"""
93
-
94
- # Create Gradio interface
95
- with gr.Blocks(title="YouTube Video Downloader") as demo:
96
- gr.Markdown("# 🎥 YouTube Video Downloader")
97
- gr.Markdown("Send a YouTube video URL and download videos easily!")
98
-
99
- gr.ChatInterface(
100
- fn=chat_youtube,
101
- examples=[
102
- "How does this work?",
103
- "https://www.youtube.com/watch?v=jNQXAC9IVRw"
104
- ],
105
- type="messages"
106
  )
 
 
 
 
 
 
 
 
107
 
108
- gr.Markdown("""
109
- ### 📝 Instructions:
110
- - Paste any YouTube video URL (short or long format)
111
- - The bot will download the video in highest quality
112
- - Videos are saved and can be downloaded
113
 
114
- ### ⚠️ Note:
115
- - Download time depends on video length
116
- - Private/unavailable videos cannot be downloaded
117
- - Uses pytubefix library for reliable downloads
118
- """)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
119
 
120
  if __name__ == "__main__":
121
- demo.launch()
 
 
1
+ from fastapi import FastAPI, HTTPException, Query
2
+ from fastapi.responses import FileResponse
3
+ from pydantic import BaseModel
4
  from pytubefix import YouTube
5
  from pytubefix.cli import on_progress
6
  import os
7
+ import csv
8
+ import uuid
9
+ from datetime import datetime
10
+ from typing import List, Optional
11
  import re
12
 
13
+ app = FastAPI(title="YouTube Download API", version="1.0.0")
14
+
15
+ # Database file
16
+ DB_FILE = "download_logs.csv"
17
+ DOWNLOAD_DIR = "downloads"
18
+
19
+ # Initialize CSV database
20
+ def init_database():
21
+ """Initialize CSV database if it doesn't exist"""
22
+ os.makedirs(DOWNLOAD_DIR, exist_ok=True)
23
+
24
+ if not os.path.exists(DB_FILE):
25
+ with open(DB_FILE, 'w', newline='', encoding='utf-8') as f:
26
+ writer = csv.writer(f)
27
+ writer.writerow(['id', 'username', 'youtube_url', 'choice_type', 'quality', 'timestamp', 'status'])
28
+
29
+ def log_download(username: str, youtube_url: str, choice_type: str, quality: str, status: str):
30
+ """Log download to CSV database"""
31
+ download_id = str(uuid.uuid4())
32
+ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
33
+
34
+ with open(DB_FILE, 'a', newline='', encoding='utf-8') as f:
35
+ writer = csv.writer(f)
36
+ writer.writerow([download_id, username, youtube_url, choice_type, quality, timestamp, status])
37
+
38
+ return download_id
39
+
40
+ def is_youtube_url(url: str) -> bool:
41
+ """Validate YouTube URL"""
42
  if not url or not isinstance(url, str):
43
  return False
44
 
 
49
  )
50
  return bool(re.match(youtube_regex, url))
51
 
52
+ # Pydantic models
53
+ class VideoChoice(BaseModel):
54
+ id: str
55
+ type: str # "video" or "audio"
56
+ quality: str
57
+ resolution: Optional[str] = None
58
+ fps: Optional[int] = None
59
+ file_size: Optional[str] = None
60
+ mime_type: str
61
+
62
+ class VideoInfoResponse(BaseModel):
63
+ title: str
64
+ duration: str
65
+ thumbnail: str
66
+ author: str
67
+ views: int
68
+ choices: List[VideoChoice]
69
+
70
+ class DownloadRequest(BaseModel):
71
+ youtube_url: str
72
+ choice_id: str
73
+ username: str
74
+
75
+ class DownloadResponse(BaseModel):
76
+ id: str
77
+ status: str
78
+ message: str
79
+ download_url: Optional[str] = None
80
+
81
+ # API Endpoints
82
+
83
+ @app.get("/")
84
+ def read_root():
85
+ """Root endpoint with API documentation"""
86
+ return {
87
+ "message": "YouTube Download API",
88
+ "version": "1.0.0",
89
+ "endpoints": {
90
+ "/get-choices": "GET - Get available download options for a YouTube video",
91
+ "/download": "POST - Download video with selected options",
92
+ "/logs": "GET - View download logs (admin)",
93
+ "/health": "GET - Health check"
94
+ }
95
+ }
96
+
97
+ @app.get("/get-choices", response_model=VideoInfoResponse)
98
+ def get_video_choices(url: str = Query(..., description="YouTube video URL")):
99
+ """
100
+ Get all available download choices for a YouTube video
101
 
102
+ Returns:
103
+ - Video information (title, duration, thumbnail, etc.)
104
+ - Available video qualities (MP4)
105
+ - Available audio qualities (MP3)
106
+ """
107
 
108
  if not is_youtube_url(url):
109
+ raise HTTPException(status_code=400, detail="Invalid YouTube URL")
110
 
111
  try:
 
 
 
 
 
112
  yt = YouTube(url, on_progress_callback=on_progress)
113
 
114
  # Get video info
115
+ title = yt.title
116
  duration = yt.length
117
+ duration_str = f"{duration // 60}:{duration % 60:02d}"
118
+ thumbnail = yt.thumbnail_url
119
+ author = yt.author
120
+ views = yt.views
121
+
122
+ choices = []
123
+
124
+ # Get video streams (MP4)
125
+ video_streams = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc()
126
+
127
+ for idx, stream in enumerate(video_streams):
128
+ file_size_mb = stream.filesize / (1024 * 1024) if stream.filesize else 0
129
+ choices.append(VideoChoice(
130
+ id=f"video_{idx}_{stream.itag}",
131
+ type="video",
132
+ quality=f"{stream.resolution} - {stream.fps}fps",
133
+ resolution=stream.resolution,
134
+ fps=stream.fps,
135
+ file_size=f"{file_size_mb:.2f} MB",
136
+ mime_type=stream.mime_type
137
+ ))
138
+
139
+ # Get audio-only streams (MP3)
140
+ audio_streams = yt.streams.filter(only_audio=True).order_by('abr').desc()
141
+
142
+ for idx, stream in enumerate(audio_streams):
143
+ file_size_mb = stream.filesize / (1024 * 1024) if stream.filesize else 0
144
+ choices.append(VideoChoice(
145
+ id=f"audio_{idx}_{stream.itag}",
146
+ type="audio",
147
+ quality=stream.abr if stream.abr else "Unknown",
148
+ file_size=f"{file_size_mb:.2f} MB",
149
+ mime_type=stream.mime_type
150
+ ))
151
+
152
+ return VideoInfoResponse(
153
+ title=title,
154
+ duration=duration_str,
155
+ thumbnail=thumbnail,
156
+ author=author,
157
+ views=views,
158
+ choices=choices
159
+ )
160
+
161
+ except Exception as e:
162
+ raise HTTPException(status_code=500, detail=f"Error fetching video info: {str(e)}")
163
+
164
+ @app.post("/download", response_model=DownloadResponse)
165
+ def download_video(request: DownloadRequest):
166
+ """
167
+ Download video with selected quality/format
168
+
169
+ Body:
170
+ - youtube_url: YouTube video URL
171
+ - choice_id: Choice ID from /get-choices endpoint
172
+ - username: Username for logging
173
+ """
174
+
175
+ if not is_youtube_url(request.youtube_url):
176
+ raise HTTPException(status_code=400, detail="Invalid YouTube URL")
177
+
178
+ if not request.username:
179
+ raise HTTPException(status_code=400, detail="Username is required")
180
+
181
+ try:
182
+ yt = YouTube(request.youtube_url, on_progress_callback=on_progress)
183
 
184
+ # Parse choice_id to get itag
185
+ choice_parts = request.choice_id.split('_')
186
+ if len(choice_parts) < 3:
187
+ raise HTTPException(status_code=400, detail="Invalid choice_id format")
188
+
189
+ choice_type = choice_parts[0] # "video" or "audio"
190
+ itag = int(choice_parts[2])
191
+
192
+ # Get the stream by itag
193
+ stream = yt.streams.get_by_itag(itag)
194
 
195
  if not stream:
196
+ raise HTTPException(status_code=404, detail="Selected stream not found")
197
+
198
+ # Download the file
199
+ output_path = stream.download(output_path=DOWNLOAD_DIR)
200
+
201
+ # If audio, convert extension to mp3 (rename)
202
+ if choice_type == "audio":
203
+ base_name = os.path.splitext(output_path)[0]
204
+ new_path = f"{base_name}.mp3"
205
+ os.rename(output_path, new_path)
206
+ output_path = new_path
207
+
208
+ # Log to database
209
+ quality = f"{stream.resolution}" if choice_type == "video" else f"{stream.abr}"
210
+ download_id = log_download(
211
+ username=request.username,
212
+ youtube_url=request.youtube_url,
213
+ choice_type=choice_type,
214
+ quality=quality,
215
+ status="success"
216
+ )
217
 
218
+ # Get filename for download URL
219
+ filename = os.path.basename(output_path)
220
+
221
+ return DownloadResponse(
222
+ id=download_id,
223
+ status="success",
224
+ message="Download completed successfully",
225
+ download_url=f"/download-file/{filename}"
226
+ )
227
 
 
 
 
 
 
 
228
  except Exception as e:
229
+ # Log failed download
230
+ log_download(
231
+ username=request.username,
232
+ youtube_url=request.youtube_url,
233
+ choice_type="unknown",
234
+ quality="unknown",
235
+ status=f"failed: {str(e)}"
236
+ )
237
+
238
+ raise HTTPException(status_code=500, detail=f"Download error: {str(e)}")
239
+
240
+ @app.get("/download-file/{filename}")
241
+ def download_file(filename: str):
242
+ """
243
+ Download the actual file
244
+ """
245
+ file_path = os.path.join(DOWNLOAD_DIR, filename)
246
+
247
+ if not os.path.exists(file_path):
248
+ raise HTTPException(status_code=404, detail="File not found")
249
+
250
+ return FileResponse(
251
+ path=file_path,
252
+ filename=filename,
253
+ media_type='application/octet-stream'
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
254
  )
255
+
256
+ @app.get("/logs")
257
+ def get_logs(limit: int = Query(100, description="Number of logs to retrieve")):
258
+ """
259
+ Get download logs from CSV database
260
+ """
261
+ if not os.path.exists(DB_FILE):
262
+ return {"logs": [], "message": "No logs found"}
263
 
264
+ logs = []
265
+ with open(DB_FILE, 'r', newline='', encoding='utf-8') as f:
266
+ reader = csv.DictReader(f)
267
+ for row in reader:
268
+ logs.append(row)
269
 
270
+ # Return last N logs
271
+ return {
272
+ "total": len(logs),
273
+ "logs": logs[-limit:] if len(logs) > limit else logs
274
+ }
275
+
276
+ @app.get("/logs/user/{username}")
277
+ def get_user_logs(username: str):
278
+ """
279
+ Get download logs for a specific user
280
+ """
281
+ if not os.path.exists(DB_FILE):
282
+ return {"logs": [], "message": "No logs found"}
283
+
284
+ logs = []
285
+ with open(DB_FILE, 'r', newline='', encoding='utf-8') as f:
286
+ reader = csv.DictReader(f)
287
+ for row in reader:
288
+ if row['username'] == username:
289
+ logs.append(row)
290
+
291
+ return {
292
+ "username": username,
293
+ "total": len(logs),
294
+ "logs": logs
295
+ }
296
+
297
+ @app.get("/health")
298
+ def health_check():
299
+ """Health check endpoint"""
300
+ return {
301
+ "status": "healthy",
302
+ "database": "connected" if os.path.exists(DB_FILE) else "not initialized",
303
+ "timestamp": datetime.now().isoformat()
304
+ }
305
+
306
+ # Initialize database on startup
307
+ @app.on_event("startup")
308
+ def startup_event():
309
+ init_database()
310
+ print("✅ Database initialized")
311
+ print("✅ YouTube Download API is ready!")
312
 
313
  if __name__ == "__main__":
314
+ import uvicorn
315
+ uvicorn.run(app, host="0.0.0.0", port=7860)