|
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
from tqdm import tqdm |
|
|
from pathlib import Path |
|
|
import os |
|
|
import shutil |
|
|
|
|
|
def save_notebook(notebook): |
|
|
"""Save uploaded notebook to /tmp/Notebook directory""" |
|
|
folder_path = Path("/tmp/Notebook") |
|
|
|
|
|
if folder_path.exists() and folder_path.is_dir(): |
|
|
shutil.rmtree(folder_path) |
|
|
|
|
|
folder_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
if notebook: |
|
|
file_path = notebook.name |
|
|
with open(os.path.join(folder_path, file_path), "wb") as file: |
|
|
file.write(notebook.getbuffer()) |
|
|
|
|
|
def save_datasets(datasets): |
|
|
"""Save uploaded datasets to /tmp/Notebook directory""" |
|
|
folder_path = Path("/tmp/Notebook") |
|
|
|
|
|
|
|
|
folder_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
if datasets: |
|
|
for dataset in datasets: |
|
|
file_path = dataset.name |
|
|
with open(os.path.join(folder_path, file_path), "wb") as file: |
|
|
file.write(dataset.getbuffer()) |
|
|
|
|
|
def get_dataset_files(directory_path): |
|
|
"""Get list of dataset files in the directory""" |
|
|
folder_path = Path(directory_path) |
|
|
if not folder_path.exists(): |
|
|
return [] |
|
|
|
|
|
dataset_extensions = {'.csv', '.xlsx', '.xls', '.json', '.txt', '.parquet'} |
|
|
dataset_files = [f for f in folder_path.iterdir() |
|
|
if f.suffix.lower() in dataset_extensions and f.is_file()] |
|
|
|
|
|
return dataset_files |
|
|
|
|
|
def create_dataset_mapping(directory_path): |
|
|
"""Create a mapping of common dataset names to actual files""" |
|
|
dataset_files = get_dataset_files(directory_path) |
|
|
mapping = {} |
|
|
|
|
|
for file in dataset_files: |
|
|
|
|
|
filename = file.name |
|
|
name_without_ext = file.stem |
|
|
|
|
|
|
|
|
mapping[filename] = filename |
|
|
mapping[name_without_ext] = filename |
|
|
mapping[filename.lower()] = filename |
|
|
mapping[name_without_ext.lower()] = filename |
|
|
|
|
|
|
|
|
if filename.lower().startswith('data'): |
|
|
mapping['data.csv'] = filename |
|
|
mapping['dataset.csv'] = filename |
|
|
|
|
|
|
|
|
if file.suffix.lower() == '.csv' and 'default.csv' not in mapping: |
|
|
mapping['default.csv'] = filename |
|
|
|
|
|
return mapping |
|
|
|
|
|
def safe_concurrent_batch(chain, inputs, max_workers=2): |
|
|
"""Process inputs concurrently with error handling""" |
|
|
results = [] |
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
|
future_to_input = {executor.submit(chain.invoke, inp): inp for inp in inputs} |
|
|
|
|
|
for future in tqdm(as_completed(future_to_input), total=len(inputs), desc="Processing"): |
|
|
inp = future_to_input[future] |
|
|
try: |
|
|
output = future.result() |
|
|
results.append({ |
|
|
"input": inp, |
|
|
"output": output, |
|
|
"status": "success" |
|
|
}) |
|
|
except Exception as e: |
|
|
results.append({ |
|
|
"input": inp, |
|
|
"output": None, |
|
|
"status": f"failed: {str(e)}" |
|
|
}) |
|
|
return results |