Spaces:
Sleeping
Sleeping
| import csv | |
| import json | |
| import os | |
| import datetime | |
| from processor.processor import Processor | |
| from constants.constants import * | |
| from search.search_by_id import Searcher | |
| from fastapi import FastAPI, File, UploadFile, HTTPException | |
| import uvicorn | |
| from pydantic import BaseModel | |
| import pandas as pd | |
| from tmp.utils import update_products_csv | |
| from search.matching_judge import compare_matching_with_manual | |
| '''compare_matching_with_manual("C:\\Projects (Mediterra)\\!TechLead\\WineMatching\\Data (New5)\\products.csv", | |
| "C:\\Projects (Mediterra)\\!TechLead\\WineMatching\\Data (New4)\\ws-items-for-test.csv", | |
| "C:\\Projects (Mediterra)\\!TechLead\\WineMatching\m1-50-250325-133739.csv", | |
| "C:\\Projects (Mediterra)\\!TechLead\\WineMatching\\Data (New4)\\matching-20250318.csv")''' | |
| processor=Processor(LONG_TYPES_LIST, | |
| SHORT_TYPES_LIST, | |
| SOUR, | |
| WINE_TYPES, | |
| GBS, | |
| COLORS_FOR_TRIM, | |
| GRAPES, | |
| OTHER_WORDS, | |
| SOUR_MERGE_DICT, | |
| TYPES_WINES_DICT, | |
| COLOR_MERGE_DICT) | |
| searcher=Searcher() | |
| class item_by_id(BaseModel): | |
| result_file: str | |
| id: str | |
| class match_request(BaseModel): | |
| items: str | |
| threshold: int | |
| items_first: int | |
| def get_data_dir(): | |
| return "/home/user/app/_data/" | |
| #return "_data" | |
| def get_products_dir(): | |
| return os.path.join(get_data_dir(), "products") | |
| def get_items_dir(): | |
| return os.path.join(get_data_dir(), "items") | |
| def get_results_dir(): | |
| return os.path.join(get_data_dir(), "results") | |
| app = FastAPI() | |
| async def get_result_csv(): | |
| results = [] | |
| for file in os.listdir(get_results_dir()): | |
| if file.endswith(".csv"): | |
| results.append(file) | |
| results_json = json.dumps(results) | |
| return results_json | |
| async def upload_result_csv(file: UploadFile = File(...)): | |
| try: | |
| contents = file.file.read() | |
| with open(os.path.join(get_results_dir(), file.filename), 'wb') as f: | |
| f.write(contents) | |
| except Exception: | |
| raise HTTPException(status_code=500, detail='Something went wrong') | |
| finally: | |
| file.file.close() | |
| return {"message": f"Successfully uploaded {file.filename}"} | |
| async def upload_products_csv(file: UploadFile, overwrite_existing: int): | |
| try: | |
| datadir = get_products_dir() | |
| if not os.path.exists(datadir): | |
| os.makedirs(datadir) | |
| tempfile = os.path.join(datadir, "products.csv_upload") | |
| contents = file.file.read() | |
| with open(tempfile, 'wb') as f: | |
| f.write(contents) | |
| fullfn = os.path.join(datadir, "products.csv") | |
| update_products_csv(tempfile, fullfn, overwrite_existing) | |
| os.remove(tempfile) | |
| except Exception: | |
| raise HTTPException(status_code=500, detail='Something went wrong') | |
| finally: | |
| file.file.close() | |
| return {"message": f"Successfully uploaded {file.filename}"} | |
| #@app.post("/api/upload_items_csv") | |
| def upload_items_csv(file: UploadFile): | |
| try: | |
| itemsdir = get_items_dir() | |
| if not os.path.exists(itemsdir): | |
| os.makedirs(itemsdir) | |
| contents = file.file.read() | |
| fullfn = os.path.join(itemsdir, file.filename) | |
| with open(fullfn, 'wb') as f: | |
| f.write(contents) | |
| except Exception: | |
| raise HTTPException(status_code=500, detail='Something went wrong') | |
| finally: | |
| file.file.close() | |
| #return {"message": f"Successfully uploaded {file.filename}"} | |
| return fullfn | |
| async def get_items_csv(): | |
| itemsdir = get_items_dir() | |
| results = [] | |
| for file in os.listdir(itemsdir): | |
| if file.endswith(".csv"): | |
| results.append(file) | |
| results_json = json.dumps(results) | |
| return results_json | |
| async def match(items_file: UploadFile, threshold: int, items_first: int): | |
| prods_file = os.path.join(get_products_dir(), "products.csv") | |
| if not os.path.isfile(prods_file): | |
| return {"Status": "Error", "ErrorDesc": "File 'Products.csv' not found"} | |
| items_fn = upload_items_csv(items_file) | |
| #if len(r.items) == 0: | |
| # return {"Status": "Error", "ErrorDesc": "Items file not specified"} | |
| if not threshold: | |
| threshold = 50 | |
| #items_fn = os.path.join(get_items_dir(), r.items) | |
| #if not os.path.isfile(items_fn): | |
| # return {"Status": "Error", "ErrorDesc": "Items file not found"} | |
| row_items = pd.read_csv(items_fn, sep='\t') | |
| os.remove(items_fn) | |
| row_products = pd.read_csv(prods_file, sep='\t', on_bad_lines='skip') | |
| df, items, products = processor.process(row_products, row_items, items_first, threshold) | |
| results_dir = get_results_dir() | |
| if not os.path.exists(results_dir): | |
| os.makedirs(results_dir) | |
| output_csv = "m1-" + str(threshold) + "-" + datetime.datetime.now().strftime('%y%m%d-%H%M%S') + ".csv" | |
| df.to_csv(os.path.join(results_dir, output_csv), sep='\t', index=False) | |
| return {"Status": "Success", "result_file" : output_csv} | |
| async def get_matched_by_id(item: item_by_id): | |
| fullfn = os.path.join(get_results_dir(), item.result_file) | |
| if not os.path.isfile(fullfn): | |
| return {"Status": "Error", "ErrorDesc": "Specified result CSV file not found"} | |
| (df, is_alternative) = searcher.search(fullfn, int(item.id)) | |
| if df.empty: | |
| return {"Status": "Success", "IsAlternative": False, "Data": ""} | |
| return {"Status": "Success", "IsAlternative": is_alternative, "Data": df.to_json(orient='records')} | |
| if __name__ == "__main__": | |
| uvicorn.run( | |
| app, | |
| host="0.0.0.0", | |
| port=8000, | |
| log_level="debug" | |
| ) |