File size: 5,402 Bytes
77320e4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
import csv
import io
import logging
import os
import shutil
import chardet
from .logger import get_logger
root_directory = os.path.abspath(__file__)
while 'infiagent' not in os.path.basename(root_directory):
root_directory = os.path.dirname(root_directory)
TEMP_FILE_UPLOAD_DIR = f"{root_directory}/tmp/upload_files/"
MAX_INPUT_FILE_SIZE = 1024 * 1024 * 1024
SAMPLE_FILE_SIZE = 2048
CSV_DEFAULT_DELIMITER = ","
CSV_DELIMITERS = [',', '\t', ';', '|', ' ']
logger = get_logger()
def clear_files(upload_file_dir):
for filename in os.listdir(upload_file_dir):
file_path = os.path.join(upload_file_dir, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print('Failed to delete %s. Error: %s' % (file_path, e))
shutil.rmtree(upload_file_dir)
def upload_files(uploaded_files, sandbox_id):
uploaded_files_list = []
if not uploaded_files:
logging.info("No file upload")
return uploaded_files_list
else:
logging.info("Got {} files to upload.".format(len(uploaded_files)))
FILE_DIR = os.path.join(TEMP_FILE_UPLOAD_DIR, sandbox_id)
if os.path.exists(FILE_DIR):
clear_files(FILE_DIR)
else:
# if the demo_folder directory is not present then create it.
os.makedirs(FILE_DIR)
for uploaded_file in uploaded_files:
# 获取文件的基本信息
file_details = {"FileName": uploaded_file.name, "FileType": uploaded_file.type, "FileSize": uploaded_file.size}
logging.info(file_details)
uploaded_files_list.append(_process_files(uploaded_file, FILE_DIR))
logging.info("All files saved to disk.")
return uploaded_files_list
def _process_files(uploaded_file, output_dir):
# Check if file size is more than 1 GB
if uploaded_file.size > MAX_INPUT_FILE_SIZE:
raise ValueError(f"File {uploaded_file.name} is larger than 1 GB")
# Check if the file is a CSV and if the delimiter meets requirement
if uploaded_file.name.endswith('.csv'):
return _process_local_csv_file(uploaded_file, output_dir)
else:
new_file_path = os.path.join(output_dir, uploaded_file.name)
with open(new_file_path, 'wb') as new_file:
new_file.write(uploaded_file.getvalue())
return new_file_path
def _process_local_csv_file(uploaded_file, output_dir):
"""
Process the uploaded file to convert the delimiter if needed and save the content in the output directory.
Args:
- uploaded_file: File-like object of the uploaded file
- output_dir (str): Directory where the processed file should be saved
Returns:
- str: The path to the saved file
"""
# Decode the content of the uploaded file
file_content = uploaded_file.read()
content_stream = io.BytesIO(file_content)
# Process the content stream
converted_file_stream, converted = convert_delimiter_to_comma(content_stream)
# Construct the output path
new_file_path = os.path.join(output_dir, uploaded_file.name)
# Write the processed content to the output path
with open(new_file_path, 'wb') as file:
file.write(converted_file_stream.getvalue())
return new_file_path
def convert_delimiter_to_comma(content_stream: io.BytesIO) -> (io.BytesIO, bool):
"""
Detects the delimiter of a CSV content stream and converts it to comma if it's not already.
Args:
- content_stream (io.BytesIO): Stream containing CSV content
Returns:
- tuple: New content stream with updated delimiter, flag indicating if conversion was done
"""
sample = content_stream.read(SAMPLE_FILE_SIZE)
content_stream.seek(0)
# Use chardet to detect the encoding
detected = chardet.detect(sample)
encoding = detected.get('encoding', 'utf-8') or 'utf-8'
decoded_sample = sample.decode(encoding, errors='replace')
sniffer = csv.Sniffer()
try:
delimiter = sniffer.sniff(decoded_sample, delimiters=''.join(CSV_DELIMITERS)).delimiter
except (csv.Error, UnicodeDecodeError) as e:
logger.warning("Unable to confidently determine the delimiter for the CSV content. Return original file. "
"error: {}".format(str(e)))
return content_stream, False
if delimiter == CSV_DEFAULT_DELIMITER:
logger.info("Original CSV file delimiter is ',', no need to convert.")
return content_stream, False
logger.info("Original CSV file delimiter is '{}', converting it to ','.".format(delimiter))
reader = csv.reader(content_stream.getvalue().decode('utf-8').splitlines(), delimiter=delimiter)
temp_output = io.StringIO() # Temporary StringIO to hold string representation
writer = csv.writer(temp_output, delimiter=CSV_DEFAULT_DELIMITER, lineterminator='\n')
for row in reader:
writer.writerow(row)
# Convert StringIO value to bytes and write to BytesIO stream
output_stream = io.BytesIO()
output_stream.write(temp_output.getvalue().encode('utf-8'))
output_stream.seek(0)
return output_stream, True
def get_file_name_and_path(input_file: str):
file_name = input_file.split("/")[-1]
tos_path = input_file.replace(file_name, "")
return file_name, tos_path |