from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm from pathlib import Path import os import shutil def save_notebook(notebook): """Save uploaded notebook to /tmp/Notebook directory""" folder_path = Path("/tmp/Notebook") if folder_path.exists() and folder_path.is_dir(): shutil.rmtree(folder_path) folder_path.mkdir(parents=True, exist_ok=True) if notebook: file_path = notebook.name with open(os.path.join(folder_path, file_path), "wb") as file: file.write(notebook.getbuffer()) def save_datasets(datasets): """Save uploaded datasets to /tmp/Notebook directory""" folder_path = Path("/tmp/Notebook") # Ensure directory exists folder_path.mkdir(parents=True, exist_ok=True) if datasets: for dataset in datasets: file_path = dataset.name with open(os.path.join(folder_path, file_path), "wb") as file: file.write(dataset.getbuffer()) def get_dataset_files(directory_path): """Get list of dataset files in the directory""" folder_path = Path(directory_path) if not folder_path.exists(): return [] dataset_extensions = {'.csv', '.xlsx', '.xls', '.json', '.txt', '.parquet'} dataset_files = [f for f in folder_path.iterdir() if f.suffix.lower() in dataset_extensions and f.is_file()] return dataset_files def create_dataset_mapping(directory_path): """Create a mapping of common dataset names to actual files""" dataset_files = get_dataset_files(directory_path) mapping = {} for file in dataset_files: # Create various possible references to this file filename = file.name name_without_ext = file.stem # Common patterns notebooks might use mapping[filename] = filename mapping[name_without_ext] = filename mapping[filename.lower()] = filename mapping[name_without_ext.lower()] = filename # Common generic names if filename.lower().startswith('data'): mapping['data.csv'] = filename mapping['dataset.csv'] = filename # If it's the first/only CSV, make it the default if file.suffix.lower() == '.csv' and 'default.csv' not in mapping: mapping['default.csv'] = filename return mapping def safe_concurrent_batch(chain, inputs, max_workers=2): """Process inputs concurrently with error handling""" results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_input = {executor.submit(chain.invoke, inp): inp for inp in inputs} for future in tqdm(as_completed(future_to_input), total=len(inputs), desc="Processing"): inp = future_to_input[future] try: output = future.result() results.append({ "input": inp, "output": output, "status": "success" }) except Exception as e: results.append({ "input": inp, "output": None, "status": f"failed: {str(e)}" }) return results