Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| """app.ipynb | |
| Automatically generated by Colaboratory. | |
| Original file is located at | |
| https://colab.research.google.com/drive/1EabZcU6Wk8QL6n0_cXYso4djYWFXPOIJ | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| import tensorflow as tf | |
| from tensorflow import keras | |
| import gradio as gr | |
| from pathlib import Path | |
| import os | |
| import subprocess | |
| # Run the command as a subprocess | |
| def run_subprocess(command,subprocess_result_Q=True): | |
| print(f'\n{command}\n') | |
| process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| stdout, stderr = process.communicate() | |
| if subprocess_result_Q: | |
| # Print the output and errors if any | |
| print(f"\nOutput: {stdout}\n", ) | |
| print(f"\nErrors: {stderr}\n", ) | |
| command = [ | |
| "python", | |
| "-m", | |
| "pip", | |
| "install", | |
| "--upgrade", | |
| "pip" | |
| ] | |
| run_subprocess(command) | |
| command = [ | |
| "pip", | |
| "install", | |
| "transformers" | |
| ] | |
| run_subprocess(command) | |
| # !pip install ultralytics | |
| command = [ | |
| "pip3", | |
| "install", | |
| "--upgrade", | |
| "ultralytics" | |
| ] | |
| run_subprocess(command) | |
| from ultralytics import YOLO | |
| """# set up paths""" | |
| # project_path = Path('Fu-Chuen/Fungus_Classification_Genus_Species/tree/main') | |
| project_path = Path('./') | |
| model_path = Path(project_path,'Model') | |
| test_path = Path(project_path,'Test') | |
| os.makedirs(test_path, exist_ok=True) | |
| Yolov8n_path = Path(project_path,'Result/Yolov8n') | |
| os.makedirs(Yolov8n_path, exist_ok=True) | |
| detect_path = Path(project_path,'runs/detect') | |
| weight_path = Path(model_path,'best_Yolo_v8n.pt') | |
| # 指定工作目录 | |
| # working_directory = '/home/user/app' | |
| working_directory = os.getcwd() | |
| print(f'\nworking directory: {working_directory}\n') | |
| # 递归获取目录下所有文件 | |
| def get_all_files(directory): | |
| all_files = [] | |
| all_directories = [] | |
| for root, dirs, files in os.walk(directory): | |
| for file in files: | |
| all_files.append(os.path.join(root, file)) | |
| for dir in dirs: | |
| all_directories.append(os.path.join(root, dir)) | |
| # 打印所有文件路径 | |
| print("\n所有的文件:\n") | |
| for file in all_files: | |
| print(file) | |
| # 打印所有文件路径 | |
| print("\n所有的目录:\n") | |
| for dir in all_directories: | |
| print(dir) | |
| return all_files, all_directories | |
| # 列出工作目录及其子目录下的所有文件 | |
| # get_all_files(working_directory) | |
| """# load MobileNet model""" | |
| MobileNet_model = keras.models.load_model(Path(model_path,'MobileNetV3Large_Genus_5th_fold_model')) | |
| genus_labels = ['Aspergillus', "Cladosporium", 'Penicillium', 'Trichophyton', "Others"] | |
| """# find the most frequent element""" | |
| def most_frequent_element(my_list): | |
| # Create an empty dictionary to store element frequencies | |
| element_count = {} | |
| # Count the occurrences of each element in the list | |
| for element in my_list: | |
| if element in element_count: | |
| element_count[element] += 1 | |
| else: | |
| element_count[element] = 1 | |
| # Find the element(s) with the highest frequency | |
| max_frequency = max(element_count.values()) | |
| most_frequent_elements = [element for element, frequency in element_count.items() if frequency == max_frequency] | |
| # Print the result | |
| # print("The element(s) with the highest frequency:", most_frequent_elements) | |
| # print("The highest frequency:", max_frequency) | |
| return most_frequent_elements, max_frequency | |
| """# genus classify using MobileNet model""" | |
| def genus_classify_images(files,threshold): | |
| results = "" | |
| predict = [] | |
| for i, file_path in enumerate(files): | |
| print(f'\ngenus_classify_image {i+1}: {file_path.name} \n') | |
| try: | |
| inp = tf.keras.preprocessing.image.load_img(file_path.name, target_size=(224, 224)) | |
| inp = tf.keras.preprocessing.image.img_to_array(inp) | |
| inp = inp.reshape((-1, 224, 224, 3)) | |
| inp = tf.keras.applications.mobilenet_v3.preprocess_input(inp) | |
| model, labels = (MobileNet_model,genus_labels) | |
| prediction = model.predict(inp).flatten() | |
| confidences = {labels[i]: float(prediction[i]) for i in range(len(labels))} | |
| top_3 = sorted(confidences.items(), key=lambda x: x[1], reverse=True)[:3] | |
| predict.append(top_3[0][0]) | |
| results += f"Image {i+1}: Top 3 predictions: {', '.join([f'{label}: {probab:.3f}' for label, probab in top_3])}\n" | |
| except Exception as e: | |
| results += f"Image: {file_path.name} - Error: {str(e)}\n" | |
| ensemble_predict, max_frequency = most_frequent_element(predict) | |
| predict_result = ensemble_predict[0] if max_frequency/len(files) >= threshold else "Unclassified" | |
| results = f'Prediction of genus: {predict_result}\n\n' + results | |
| return predict_result, results | |
| """# Aspergillus classify and count statistics""" | |
| # 實時偵測 2013-10-05 OK | |
| # --source 偵測圖像路徑 | |
| # --save-txt 儲存標籤文件 | |
| # --conf 0.25 只保留置信度分數高於0.25的 | |
| def delete_files_in_folder(path): | |
| for root, dirs, files in os.walk(path): | |
| print(f'root: {root}') | |
| for file in files: | |
| print(f'file: {file}') | |
| file_path = os.path.join(root, file) | |
| os.remove(file_path) | |
| def find_the_most_recent_predict_path(path): | |
| path = list(map(lambda x: x.split('/'),path)) | |
| path = [i for i in path if 'predict' in i[-1]] | |
| # Sort the list by the last element of each inner list | |
| path = sorted(path, key=lambda x: x[-1]) | |
| return '/'.join(path[-1]) | |
| def Aspergillus_Detect(threshold): | |
| # get_all_files(working_directory) | |
| model = YOLO(weight_path) # pretrained YOLOv8n model | |
| # model.predict(test_path, save = True , save_txt = True, output_dir = Yolov8n_path ) # predict on an image | |
| model.predict(test_path, save = True , save_txt = True, ) # predict on an image | |
| all_files, all_directories = get_all_files(working_directory) | |
| image_path = Path(find_the_most_recent_predict_path(all_directories)) | |
| label_path = Path(image_path,'labels') | |
| print('\nmost_recent_predict_path: {image_path}\n') | |
| file_paths = [] | |
| for root, dirs, files in os.walk(label_path): | |
| for file in files: | |
| if file.lower().endswith('.jpg'): | |
| file_path = os.path.join(root, file) | |
| file_paths.append(file_path) | |
| df_test = pd.DataFrame({'filepath': file_paths}) | |
| files = os.listdir(label_path) | |
| print(f'\nfiles in label_path: {files}\n') | |
| # initialize an empty list to store data | |
| data = [] | |
| # loop through each file and extract the data | |
| for file in files: | |
| if file.endswith('.txt'): | |
| with open(os.path.join(label_path, file), 'r') as f: | |
| lines = f.readlines() | |
| for line in lines: | |
| line = line.strip().split(' ') | |
| data.append([file[:-4], line[0], float(line[1]), float(line[2]), float(line[3]), float(line[4])]) | |
| # convert list of data to data frame | |
| df = pd.DataFrame(data, columns=['image', 'class', 'xmin', 'ymin', 'xmax', 'ymax']) | |
| # group by labels, count the number of detections for each label | |
| counts = df.groupby(['image', 'class']).size().reset_index(name='count') | |
| # add the file name to the data frame | |
| counts['file'] = counts['image'].apply(lambda x: x + '.jpg') | |
| counts['filepath'] = counts['file'].apply(lambda x: Path(label_path,x)) | |
| replace_dict = {'0': 'flavus-oryzae', '1': 'fumigatus', '2': 'niger', '3': 'terreus', '4': 'versicolor'} | |
| counts['label'] = counts['class'].replace(replace_dict) | |
| # Select only the relevant columns | |
| counts = counts[['filepath', 'label', 'count']] | |
| df_pivot = pd.pivot_table(counts, values='count', index=['filepath'], columns='label', aggfunc='sum') | |
| df_pivot['detect'] = df_pivot.sum(axis=1) | |
| pd.options.display.float_format = '{:,.0f}'.format | |
| df_pivot = pd.DataFrame(df_pivot) | |
| total_col = df_pivot.pop('detect') | |
| df_pivot.insert(0, 'detect', total_col) | |
| detect_number = round(df_pivot['detect'].sum()) | |
| a = [] | |
| for i in df_pivot.columns[1:]: | |
| a.append([i,round(df_pivot[i].sum()/detect_number,3)]) | |
| a = sorted(a,key=lambda x: x[1],reverse=True) | |
| print(f'\na: {a}\n') | |
| # # 列出工作目录及其子目录下的所有文件 | |
| # get_all_files(working_directory) | |
| # delete_files_in_folder(image_path) | |
| result = [f'Prediction of species of Aspergillus: {a[0][0] if a[0][1] >= threshold else "Unclassified"}', | |
| f'There are {df_pivot.shape[0]} mold images.', | |
| f'Yolov8n detects {detect_number} instances.', | |
| f'The top {len(a)} percentage of specifies:'] | |
| for i in a: | |
| # print(f'i: {i}') | |
| result.append(f'{i[0]} {i[1]}') | |
| return '\n'.join(result) | |
| """# classify images: genus and Aspergillus""" | |
| def classify_images(files,threshold): | |
| print(f'threshold = {threshold}') | |
| predict, result1 = genus_classify_images(files,threshold) | |
| # # 列出工作目录及其子目录下的所有文件 | |
| # all_files = get_all_files(working_directory) | |
| if predict == 'Aspergillus': | |
| result2 = Aspergillus_Detect(threshold) | |
| return f'{result1}\n\n{result2}' | |
| return result1 | |
| import shutil | |
| def upload_file(files): | |
| delete_folder(test_path) | |
| file_paths = [] | |
| for file in files: | |
| # Save the uploaded image to the 'Test' folder | |
| file_path = Path(test_path, file.name.split('/')[-1]) | |
| print(f'\nupload_file:{file}, {file.name}, {os.path.abspath(file_path)}\n') | |
| try: | |
| shutil.copy(file.name, file_path) | |
| file_paths.append(file_path) | |
| print(f"File copied successfully.") | |
| except: | |
| print("\nError occurred while copying file.\n") | |
| # print(f'\nfile_paths: {file_paths}\n') | |
| return file_paths | |
| # # Define the path to the directory you want to delete files from | |
| # directory_path = test_path | |
| # Iterate through all files in the directory and delete them | |
| def delete_folder(directory_path): | |
| for filename in os.listdir(directory_path): | |
| file_path = os.path.join(directory_path, filename) | |
| try: | |
| if os.path.isfile(file_path) or os.path.islink(file_path): | |
| os.unlink(file_path) | |
| elif os.path.isdir(file_path): | |
| shutil.rmtree(file_path) | |
| except Exception as e: | |
| print(f"Failed to delete {file_path}: {e}") | |
| # delete_folder(test_path) | |
| """# main of gradio""" | |
| # threshold = 0.6 | |
| with gr.Blocks() as fungus_classification: | |
| threshold = gr.Slider(minimum=0.1, maximum=1, step=0.1, value=0.6, label="Threshold", info="Choose between 0.1 and 1") | |
| image_output = gr.Gallery(label = 'Images of Molds') | |
| predict_outputs = gr.Textbox(label = 'Prediction Result') | |
| upload_button = gr.UploadButton("Click to Upload Files", file_types=["image", "video"], file_count="multiple") | |
| upload_button.upload(upload_file, upload_button, image_output) | |
| upload_button.upload(classify_images, [upload_button,threshold], predict_outputs) | |
| fungus_classification.launch(share=True, debug=True) | |
| # fungus_classification.launch(share=True, debug=True, enable_queue=True) |