InfiAgent / src /infiagent /utils /file_utils.py
g3eIL's picture
Upload 80 files
77320e4 verified
import csv
import io
import logging
import os
import shutil
import chardet
from .logger import get_logger
root_directory = os.path.abspath(__file__)
while 'infiagent' not in os.path.basename(root_directory):
root_directory = os.path.dirname(root_directory)
TEMP_FILE_UPLOAD_DIR = f"{root_directory}/tmp/upload_files/"
MAX_INPUT_FILE_SIZE = 1024 * 1024 * 1024
SAMPLE_FILE_SIZE = 2048
CSV_DEFAULT_DELIMITER = ","
CSV_DELIMITERS = [',', '\t', ';', '|', ' ']
logger = get_logger()
def clear_files(upload_file_dir):
for filename in os.listdir(upload_file_dir):
file_path = os.path.join(upload_file_dir, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print('Failed to delete %s. Error: %s' % (file_path, e))
shutil.rmtree(upload_file_dir)
def upload_files(uploaded_files, sandbox_id):
uploaded_files_list = []
if not uploaded_files:
logging.info("No file upload")
return uploaded_files_list
else:
logging.info("Got {} files to upload.".format(len(uploaded_files)))
FILE_DIR = os.path.join(TEMP_FILE_UPLOAD_DIR, sandbox_id)
if os.path.exists(FILE_DIR):
clear_files(FILE_DIR)
else:
# if the demo_folder directory is not present then create it.
os.makedirs(FILE_DIR)
for uploaded_file in uploaded_files:
# 获取文件的基本信息
file_details = {"FileName": uploaded_file.name, "FileType": uploaded_file.type, "FileSize": uploaded_file.size}
logging.info(file_details)
uploaded_files_list.append(_process_files(uploaded_file, FILE_DIR))
logging.info("All files saved to disk.")
return uploaded_files_list
def _process_files(uploaded_file, output_dir):
# Check if file size is more than 1 GB
if uploaded_file.size > MAX_INPUT_FILE_SIZE:
raise ValueError(f"File {uploaded_file.name} is larger than 1 GB")
# Check if the file is a CSV and if the delimiter meets requirement
if uploaded_file.name.endswith('.csv'):
return _process_local_csv_file(uploaded_file, output_dir)
else:
new_file_path = os.path.join(output_dir, uploaded_file.name)
with open(new_file_path, 'wb') as new_file:
new_file.write(uploaded_file.getvalue())
return new_file_path
def _process_local_csv_file(uploaded_file, output_dir):
"""
Process the uploaded file to convert the delimiter if needed and save the content in the output directory.
Args:
- uploaded_file: File-like object of the uploaded file
- output_dir (str): Directory where the processed file should be saved
Returns:
- str: The path to the saved file
"""
# Decode the content of the uploaded file
file_content = uploaded_file.read()
content_stream = io.BytesIO(file_content)
# Process the content stream
converted_file_stream, converted = convert_delimiter_to_comma(content_stream)
# Construct the output path
new_file_path = os.path.join(output_dir, uploaded_file.name)
# Write the processed content to the output path
with open(new_file_path, 'wb') as file:
file.write(converted_file_stream.getvalue())
return new_file_path
def convert_delimiter_to_comma(content_stream: io.BytesIO) -> (io.BytesIO, bool):
"""
Detects the delimiter of a CSV content stream and converts it to comma if it's not already.
Args:
- content_stream (io.BytesIO): Stream containing CSV content
Returns:
- tuple: New content stream with updated delimiter, flag indicating if conversion was done
"""
sample = content_stream.read(SAMPLE_FILE_SIZE)
content_stream.seek(0)
# Use chardet to detect the encoding
detected = chardet.detect(sample)
encoding = detected.get('encoding', 'utf-8') or 'utf-8'
decoded_sample = sample.decode(encoding, errors='replace')
sniffer = csv.Sniffer()
try:
delimiter = sniffer.sniff(decoded_sample, delimiters=''.join(CSV_DELIMITERS)).delimiter
except (csv.Error, UnicodeDecodeError) as e:
logger.warning("Unable to confidently determine the delimiter for the CSV content. Return original file. "
"error: {}".format(str(e)))
return content_stream, False
if delimiter == CSV_DEFAULT_DELIMITER:
logger.info("Original CSV file delimiter is ',', no need to convert.")
return content_stream, False
logger.info("Original CSV file delimiter is '{}', converting it to ','.".format(delimiter))
reader = csv.reader(content_stream.getvalue().decode('utf-8').splitlines(), delimiter=delimiter)
temp_output = io.StringIO() # Temporary StringIO to hold string representation
writer = csv.writer(temp_output, delimiter=CSV_DEFAULT_DELIMITER, lineterminator='\n')
for row in reader:
writer.writerow(row)
# Convert StringIO value to bytes and write to BytesIO stream
output_stream = io.BytesIO()
output_stream.write(temp_output.getvalue().encode('utf-8'))
output_stream.seek(0)
return output_stream, True
def get_file_name_and_path(input_file: str):
file_name = input_file.split("/")[-1]
tos_path = input_file.replace(file_name, "")
return file_name, tos_path