britto224's picture
Upload 130 files
5669b22 verified
import os
import requests
import tarfile
from pathlib import Path
from tqdm import tqdm
from loguru import logger
def get_github_asset_url(owner, repo, release_tag, filename_without_ext):
"""
Fetch the URL of a GitHub release asset by its filename (without extension).
Args:
owner (str): The owner of the repository.
repo (str): The name of the repository.
release_tag (str): The tag of the release.
filename_without_ext (str): The filename to search for (without extension).
Returns:
str: The download URL of the matched asset, or None if no match is found.
"""
url = f"https://api.github.com/repos/{owner}/{repo}/releases/tags/{release_tag}"
headers = {} # Add authentication headers if needed
try:
# Make a GET request to fetch release data
response = requests.get(url, headers=headers)
response.raise_for_status()
# Parse the JSON response
release_data = response.json()
assets = release_data.get("assets", [])
# Look for a matching file
for asset in assets:
if asset["name"].startswith(filename_without_ext):
logger.info(f"Match found: {asset['name']}")
return asset["browser_download_url"]
# If no match found, log the error
logger.error(
f"No match found for filename: {filename_without_ext} in release {release_tag}."
)
return None
except requests.exceptions.RequestException as e:
logger.error(f"An error occurred while fetching release data: {e}")
return None
def download_and_extract(url: str, output_dir: str) -> Path:
"""
Download a file from a URL and extract it if it is a tar.bz2 archive.
Args:
url (str): The URL to download the file from.
output_dir (str): The directory to save the downloaded file.
Returns:
Path: Path to the extracted directory if it's a tar.bz2 file,
otherwise Path to the downloaded file.
"""
# Create the output directory if it doesn't exist
Path(output_dir).mkdir(parents=True, exist_ok=True)
# Get the file name from the URL
file_name = url.split("/")[-1]
file_path = os.path.join(output_dir, file_name)
# Extract the root directory name from the filename (removing .tar.bz2)
root_dir = file_name.replace(".tar.bz2", "")
extracted_dir_path = Path(output_dir) / root_dir
# Check if the extracted directory already exists
if extracted_dir_path.exists():
logger.info(
f"✅ The directory {extracted_dir_path} already exists. I would assume that the model is already downloaded and we are ready to go. Skipping download and extraction."
)
return extracted_dir_path
# Download the file
logger.info(f"🏃‍♂️Downloading {url} to {file_path}...")
response = requests.get(url, stream=True)
response.raise_for_status() # Raise an error for bad status codes
total_size = int(response.headers.get("content-length", 0))
logger.debug(f"Total file size: {total_size / 1024 / 1024:.2f} MB")
with (
open(file_path, "wb") as f,
tqdm(
desc=file_name,
total=total_size,
unit="iB",
unit_scale=True,
unit_divisor=1024,
) as pbar,
):
for chunk in response.iter_content(chunk_size=8192):
size = f.write(chunk)
pbar.update(size)
logger.info(f"Downloaded {file_name} successfully.")
# Extract the tar.bz2 file
if file_name.endswith(".tar.bz2"):
logger.info(f"Extracting {file_name}...")
with tarfile.open(file_path, "r:bz2") as tar:
tar.extractall(path=output_dir)
logger.info("Extraction completed.")
# Delete the compressed file
os.remove(file_path)
logger.debug(f"Deleted the compressed file: {file_name}")
return extracted_dir_path
else:
logger.warning("The downloaded file is not a tar.bz2 archive.")
return Path(file_path)
def check_and_extract_local_file(url: str, output_dir: str) -> Path | None:
"""
Check if a local file exists and extract it if it is a tar.bz2 archive.
Args:
url (str): The URL of the file.
output_dir (str): The directory to save the extracted files.
Returns:
Path | None: Path to the extracted directory if it's a tar.bz2 file,
otherwise None.
"""
# Get the file name from the URL
file_name = url.split("/")[-1]
compressed_path = Path(output_dir) / file_name
# Check if the compressed file exists and is a tar.bz2 archive
extracted_dir = Path(output_dir) / file_name.replace(".tar.bz2", "")
if extracted_dir.exists():
logger.info(
f"✅ Extracted directory exists: {extracted_dir}, no operation needed."
)
return extracted_dir
if compressed_path.exists() and file_name.endswith(".tar.bz2"):
logger.info(f"🔍 Found local archive file: {compressed_path}")
try:
logger.info("⏳ Extracting archive file...")
with tarfile.open(compressed_path, "r:bz2") as tar:
tar.extractall(path=output_dir)
logger.success(f"Extracted archive to the path: {extracted_dir}")
os.remove(compressed_path) # Remove the compressed file
return extracted_dir
except Exception as e:
logger.error(f"Fail to extract file: {str(e)}")
return None
logger.warning(f"Local file not found or not a tar.bz2 archive: {compressed_path}")
return None
if __name__ == "__main__":
url = "https://github.com/k2-fsa/sherpa-onnx/releases/download/asr-models/sherpa-onnx-sense-voice-zh-en-ja-ko-yue-2024-07-17.tar.bz2"
output_dir = "./models"
# Try local extraction first.
local_result = check_and_extract_local_file(url, output_dir)
# Download if not available locally.
if local_result is None:
logger.info("Local archive not found. Starting download...")
download_and_extract(url, output_dir)
else:
logger.info("Extraction completed using local file.")