pydeploy-studio / utils /file_handler.py
codeboosterstech's picture
Upload 12 files
fcd463d verified
"""
File handling utilities
"""
import io
import json
import zipfile
import tempfile
import os
from pathlib import Path
from typing import Union, Dict, Any
def create_zip_archive(files: Dict[str, Union[str, bytes]]) -> bytes:
"""Creates a zip archive in memory from a dictionary of filename: content."""
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for filename, content in files.items():
if isinstance(content, str):
content = content.encode('utf-8')
zip_file.writestr(filename, content)
zip_buffer.seek(0)
return zip_buffer.getvalue()
def read_file_content(file_obj) -> str:
"""Read content from Gradio file object or path."""
if file_obj is None:
return ""
try:
if hasattr(file_obj, 'read'):
content = file_obj.read()
if isinstance(content, bytes):
return content.decode('utf-8', errors='ignore')
return str(content)
elif isinstance(file_obj, (str, Path)):
path = Path(file_obj)
if path.exists():
return path.read_text(encoding='utf-8', errors='ignore')
except Exception as e:
print(f"Error reading file: {e}")
return ""
def parse_notebook(file_path: str) -> str:
"""Extracts code cells from a Jupyter notebook file."""
try:
import nbformat
with open(file_path, 'r', encoding='utf-8') as f:
nb = nbformat.read(f, as_version=4)
code_cells = []
for cell in nb.cells:
if cell.cell_type == 'code':
source = cell.source
if source.strip():
code_cells.append(source)
return '\n\n'.join(code_cells)
except Exception as e:
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
code_cells = []
for cell in data.get('cells', []):
if cell.get('cell_type') == 'code':
source = cell.get('source', '')
if isinstance(source, list):
source = ''.join(source)
if source.strip():
code_cells.append(source)
return '\n\n'.join(code_cells)
except Exception:
return ""
def save_individual_file(filename: str, content: Union[str, bytes], binary: bool = False) -> str:
"""Save an individual file to temporary storage and return the path."""
temp_dir = tempfile.mkdtemp()
file_path = os.path.join(temp_dir, filename)
if binary:
with open(file_path, 'wb') as f:
f.write(content)
else:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return file_path
def cleanup_temp_files(file_paths: list):
"""Clean up temporary files."""
for file_path in file_paths:
try:
if os.path.exists(file_path):
os.remove(file_path)
except:
pass