File size: 2,315 Bytes
2592a7b
 
 
 
 
56aa734
 
2592a7b
 
 
56aa734
2592a7b
 
 
 
 
7419e5c
 
2592a7b
 
7419e5c
 
56aa734
7419e5c
2592a7b
 
 
 
7419e5c
 
2592a7b
 
7419e5c
2592a7b
 
 
56aa734
2592a7b
7419e5c
 
 
 
 
56aa734
2592a7b
7419e5c
2592a7b
 
7419e5c
 
 
 
 
2592a7b
7419e5c
2592a7b
 
 
7419e5c
 
 
2592a7b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
"""
Represents a Youtube video
"""

import logging
import shutil
import uuid
from yt_dlp import YoutubeDL
import os
from pathlib import Path
import ffmpeg

format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.DEBUG,
                    datefmt="%H:%M:%S")

class YoutubeMedia:
    def __init__(self, url, dir, audio_format="wav", video_format="mp4"):
        self.url=url
        self.dir=dir
        self.audio_format=audio_format
        self.video_format=video_format

    def download(self):
        ydl_opts = {
            'outtmpl': os.path.join(self.dir, "%(id)s_%(epoch)s.%(ext)s"),
            'logger': logging,
            'progress_hooks': [self.progress_hook],
            'format': f'{self.video_format}/bestvideo',
            'keepvideo': True,
            'postprocessors': [{  # Extract audio using ffmpeg
                'key': 'FFmpegExtractAudio',
                'preferredcodec': self.audio_format
            }]
        }
        with YoutubeDL(ydl_opts) as ydl: 
            error_code = ydl.download([self.url.url])
    
    def resample(self,sr='16k'): #defaults to 16k sampling rate
        tmp_filename=os.path.join(self.dir,str(uuid.uuid4()))+"."+self.audio_format
        ffmpeg.input(self.audio_filename).output(tmp_filename,ar=sr).run()
        shutil.move(tmp_filename, self.audio_filename)
        logging.info(f"Succesfuly resampled {self.audio_filename}")

    def clean(self):
        if not self.audio_filename:
            logging.error("Audio not downloaded")
            return
        location=os.path.join(self.dir, self.audio_filename)
        if os.path.exists(self.audio_filename):
            os.remove(self.audio_filename)
            logging.info(f"File {self.audio_filename} successfully removed")
            self.audio_filename=None
        else:
            print(f"File {self.audio_filename} does not exist")

    def progress_hook(self, d):
        if d['status'] == 'finished':
            self.audio_filename=os.path.join(self.dir, Path(d.get('info_dict').get('_filename')).stem + "."+self.audio_format) 
            self.video_filename=os.path.join(self.dir, Path(d.get('info_dict').get('_filename')).stem + "."+self.video_format) 
            print(f'Done downloading {self.audio_filename}, now post-processing ...')