|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""File-system agnostic IO APIs"""
|
|
|
|
|
|
import hashlib
|
|
|
import os
|
|
|
import shutil
|
|
|
import tempfile
|
|
|
|
|
|
try:
|
|
|
from hdfs_io import copy, exists, makedirs
|
|
|
except ImportError:
|
|
|
from .hdfs_io import copy, exists, makedirs
|
|
|
|
|
|
__all__ = ["copy", "exists", "makedirs"]
|
|
|
|
|
|
_HDFS_PREFIX = "hdfs://"
|
|
|
|
|
|
|
|
|
def is_non_local(path):
|
|
|
return path.startswith(_HDFS_PREFIX)
|
|
|
|
|
|
|
|
|
def md5_encode(path: str) -> str:
|
|
|
return hashlib.md5(path.encode()).hexdigest()
|
|
|
|
|
|
|
|
|
def get_local_temp_path(hdfs_path: str, cache_dir: str) -> str:
|
|
|
"""Generate a unique local cache path for an HDFS resource.
|
|
|
Creates a MD5-hashed subdirectory in cache_dir to avoid name conflicts,
|
|
|
then returns path combining this subdirectory with the HDFS basename.
|
|
|
|
|
|
Args:
|
|
|
hdfs_path (str): Source HDFS path to be cached
|
|
|
cache_dir (str): Local directory for storing cached files
|
|
|
|
|
|
Returns:
|
|
|
str: Absolute local filesystem path in format:
|
|
|
{cache_dir}/{md5(hdfs_path)}/{basename(hdfs_path)}
|
|
|
"""
|
|
|
|
|
|
encoded_hdfs_path = md5_encode(hdfs_path)
|
|
|
temp_dir = os.path.join(cache_dir, encoded_hdfs_path)
|
|
|
os.makedirs(temp_dir, exist_ok=True)
|
|
|
dst = os.path.join(temp_dir, os.path.basename(hdfs_path))
|
|
|
return dst
|
|
|
|
|
|
|
|
|
def _record_directory_structure(folder_path):
|
|
|
record_file = os.path.join(folder_path, ".directory_record.txt")
|
|
|
with open(record_file, "w") as f:
|
|
|
for root, dirs, files in os.walk(folder_path):
|
|
|
for dir_name in dirs:
|
|
|
relative_dir = os.path.relpath(os.path.join(root, dir_name), folder_path)
|
|
|
f.write(f"dir:{relative_dir}\n")
|
|
|
for file_name in files:
|
|
|
if file_name != ".directory_record.txt":
|
|
|
relative_file = os.path.relpath(os.path.join(root, file_name), folder_path)
|
|
|
f.write(f"file:{relative_file}\n")
|
|
|
return record_file
|
|
|
|
|
|
|
|
|
def _check_directory_structure(folder_path, record_file):
|
|
|
if not os.path.exists(record_file):
|
|
|
return False
|
|
|
existing_entries = set()
|
|
|
for root, dirs, files in os.walk(folder_path):
|
|
|
for dir_name in dirs:
|
|
|
relative_dir = os.path.relpath(os.path.join(root, dir_name), folder_path)
|
|
|
existing_entries.add(f"dir:{relative_dir}")
|
|
|
for file_name in files:
|
|
|
if file_name != ".directory_record.txt":
|
|
|
relative_file = os.path.relpath(os.path.join(root, file_name), folder_path)
|
|
|
existing_entries.add(f"file:{relative_file}")
|
|
|
with open(record_file) as f:
|
|
|
recorded_entries = set(f.read().splitlines())
|
|
|
return existing_entries == recorded_entries
|
|
|
|
|
|
|
|
|
def copy_to_local(src: str, cache_dir=None, filelock=".file.lock", verbose=False, always_recopy=False) -> str:
|
|
|
"""Copy files/directories from HDFS to local cache with validation.
|
|
|
|
|
|
Args:
|
|
|
src (str): Source path - HDFS path (hdfs://...) or local filesystem path
|
|
|
cache_dir (str, optional): Local directory for cached files. Uses system tempdir if None
|
|
|
filelock (str): Base name for file lock. Defaults to ".file.lock"
|
|
|
verbose (bool): Enable copy operation logging. Defaults to False
|
|
|
always_recopy (bool): Force fresh copy ignoring cache. Defaults to False
|
|
|
|
|
|
Returns:
|
|
|
str: Local filesystem path to copied resource
|
|
|
"""
|
|
|
return copy_local_path_from_hdfs(src, cache_dir, filelock, verbose, always_recopy)
|
|
|
|
|
|
|
|
|
def copy_local_path_from_hdfs(src: str, cache_dir=None, filelock=".file.lock", verbose=False, always_recopy=False) -> str:
|
|
|
"""Deprecated. Please use copy_to_local instead."""
|
|
|
from filelock import FileLock
|
|
|
|
|
|
assert src[-1] != "/", f"Make sure the last char in src is not / because it will cause error. Got {src}"
|
|
|
|
|
|
if is_non_local(src):
|
|
|
|
|
|
if cache_dir is None:
|
|
|
|
|
|
cache_dir = tempfile.gettempdir()
|
|
|
os.makedirs(cache_dir, exist_ok=True)
|
|
|
assert os.path.exists(cache_dir)
|
|
|
local_path = get_local_temp_path(src, cache_dir)
|
|
|
|
|
|
filelock = md5_encode(src) + ".lock"
|
|
|
lock_file = os.path.join(cache_dir, filelock)
|
|
|
with FileLock(lock_file=lock_file):
|
|
|
if always_recopy and os.path.exists(local_path):
|
|
|
if os.path.isdir(local_path):
|
|
|
shutil.rmtree(local_path, ignore_errors=True)
|
|
|
else:
|
|
|
os.remove(local_path)
|
|
|
if not os.path.exists(local_path):
|
|
|
if verbose:
|
|
|
print(f"Copy from {src} to {local_path}")
|
|
|
copy(src, local_path)
|
|
|
if os.path.isdir(local_path):
|
|
|
_record_directory_structure(local_path)
|
|
|
elif os.path.isdir(local_path):
|
|
|
|
|
|
record_file = os.path.join(local_path, ".directory_record.txt")
|
|
|
if not _check_directory_structure(local_path, record_file):
|
|
|
if verbose:
|
|
|
print(f"Recopy from {src} to {local_path} due to missing files or directories.")
|
|
|
shutil.rmtree(local_path, ignore_errors=True)
|
|
|
copy(src, local_path)
|
|
|
_record_directory_structure(local_path)
|
|
|
return local_path
|
|
|
else:
|
|
|
return src
|
|
|
|