Spaces:
Runtime error
Runtime error
| import json | |
| import regex as re | |
| import tiktoken | |
| import asyncio | |
| from application import * | |
| from pdfminer.high_level import extract_text | |
| from pdfminer.pdfparser import PDFParser | |
| from pdfminer.pdfdocument import PDFDocument | |
| encoding = tiktoken.get_encoding("cl100k_base") | |
| ''' | |
| universal system functions | |
| ''' | |
| def aterminal_print(afunc): | |
| from datetime import datetime | |
| async def wrapper(*args, **kwargs): | |
| start = datetime.now() | |
| print(f"{start.strftime('%y-%m-%d %H:%M:%S')} - executing function: {afunc.__name__}") | |
| result = await afunc(*args, **kwargs) | |
| end = datetime.now() | |
| print(f"{end.strftime('%y-%m-%d %H:%M:%S')} - completed function: {afunc.__name__}, runtime: {end-start} seconds") | |
| return result | |
| return wrapper | |
| def terminal_print(func): | |
| from datetime import datetime | |
| # import os | |
| def wrapper(*args, **kwargs): | |
| start = datetime.now() | |
| print(f"{start.strftime('%y-%m-%d %H:%M:%S')} - executing function: {func.__name__}") | |
| result = func(*args, **kwargs) | |
| end = datetime.now() | |
| print(f"{end.strftime('%y-%m-%d %H:%M:%S')} - completed function: {func.__name__}, runtime: {end-start} seconds") | |
| return result | |
| return wrapper | |
| ''' | |
| following functions are for file manipulation | |
| ''' | |
| def read_pdf(file_path): | |
| ''' | |
| this function read the pdf file and return the text | |
| Parameters | |
| ---------- | |
| file_path : str | |
| path to the pdf file | |
| Returns | |
| ------- | |
| text : str | |
| text extracted from the pdf file | |
| ''' | |
| # open the pdf file | |
| if type(file_path) is str: | |
| file_obj = open(file_path, 'rb') | |
| # elif type(file_path) is tempfile._TemporaryFileWrapper: | |
| else: | |
| file_obj = open(file_path.name, 'rb') | |
| text = extract_text(file_obj) | |
| text = remove_symbols(text) | |
| text = remove_citation(text) | |
| parser = PDFParser(file_obj) | |
| doc = PDFDocument(parser) | |
| meta = doc.info | |
| # close the pdf file object | |
| file_obj.close() | |
| return text, meta | |
| ''' | |
| following functions are for format standard response | |
| ''' | |
| def format_response(code,data): | |
| ''' | |
| this function format the response to be returned to the client. | |
| this is used for lambda serverless framework to return the response. | |
| Parameters | |
| ---------- | |
| code : int | |
| status code | |
| data : dict | |
| data to be returned to the client | |
| Returns | |
| ------- | |
| dict | |
| formatted response | |
| ''' | |
| return { | |
| "statusCode":code, | |
| "headers":{ | |
| "Access-Control-Allow-Origin": "*", | |
| "Content-Type": "application/json" | |
| }, | |
| "body":json.dumps(data), | |
| "isBase64Encoded": False | |
| } | |
| ''' | |
| following functions are for string manipulation | |
| ''' | |
| def format_text(text,remove_char_ls = ["\\n--\\n","\\n\\n","\n"]): | |
| ''' | |
| this function format the text output by removing excessive characters | |
| Parameters | |
| ---------- | |
| text : str | |
| text to be processed | |
| Returns | |
| ------- | |
| str | |
| processed text | |
| ''' | |
| for c in remove_char_ls: | |
| text = text.replace(c,"") | |
| return text | |
| def remove_symbols(text): | |
| ''' | |
| this function remove symbols that are not in unicode | |
| Parameters | |
| ---------- | |
| text : str | |
| text to be processed | |
| Returns | |
| ------- | |
| str | |
| processed text | |
| ''' | |
| import re | |
| text = re.sub(r"[^a-zA-Z0-9\n\r]+", ' ', text) | |
| text = text.replace('-\n', '') | |
| return text | |
| def remove_citation(text): | |
| ''' | |
| this function remove citation pattern in the text | |
| Parameters | |
| ---------- | |
| text : str | |
| text to be processed | |
| Returns | |
| ------- | |
| str | |
| processed text | |
| ''' | |
| return re.sub(r'\(cid:\d+\)','',text) | |
| def str_to_tuple(s): | |
| ''' | |
| this function convert string to tuple | |
| Parameters | |
| ---------- | |
| s : str | |
| string to be converted | |
| Returns | |
| ------- | |
| tuple | |
| converted tuple | |
| ''' | |
| return tuple(s.replace("(","").replace(")","").split(",")) | |
| def replace_symbols(s): | |
| ''' | |
| this function replace symbols in the string to comply with file names | |
| Parameters | |
| ---------- | |
| s : str | |
| string to be replaced | |
| Returns | |
| ------- | |
| str | |
| replaced string | |
| ''' | |
| symbols_map = { | |
| " ":"_", | |
| ",":"", | |
| ".":"", | |
| "-":"_", | |
| "(":"", | |
| ")":"", | |
| "/":"_", | |
| ":":"", | |
| ";":"", | |
| "'":"", | |
| '"':"" | |
| } | |
| for symbol in symbols_map: | |
| s = s.replace(symbol,symbols_map[symbol]) | |
| return s | |
| ''' | |
| following functions are for dynamodb data manipulation | |
| ''' | |
| # @terminal_print | |
| def db_map_to_py_dict(db_map): | |
| ''' | |
| this function convert dynamodb map data structure to python dictionary | |
| Parameters | |
| ---------- | |
| db_map : dict | |
| dynamodb map | |
| Returns | |
| ------- | |
| dict | |
| python dictionary | |
| ''' | |
| py_dict = {} | |
| for k,i in db_map.items(): | |
| for l,v in i.items(): | |
| if l == "M": | |
| py_dict[k] = db_map_to_py_dict(v) | |
| elif l == "S": | |
| py_dict[k] = v | |
| elif l == "N": | |
| py_dict[k] = int(v) if float(v)%1 ==0 else float(v) | |
| elif l == "L": | |
| py_dict[k] = db_list_to_py_list(v) | |
| elif l == "BS": | |
| py_dict[k] = v | |
| elif l == "BOOL": | |
| py_dict[k] = v | |
| elif l =="NULL": | |
| py_dict[k] = None | |
| else: | |
| py_dict[k] = v | |
| return py_dict | |
| # @terminal_print | |
| def py_dict_to_db_map(py_dict): | |
| ''' | |
| this function convert python dictionary to dynamodb map data structure | |
| Parameters | |
| ---------- | |
| py_dict : dict | |
| python dictionary | |
| Returns | |
| ------- | |
| dict | |
| dynamodb map | |
| ''' | |
| db_map = {} | |
| for key,value in py_dict.items(): | |
| key = str(key) | |
| if type(value) is str: | |
| db_map[key] = {"S":value} | |
| elif type(value) is int or type(value) is float: | |
| db_map[key] = {"N":str(value)} | |
| db_map[key] = {"N":str(value)} | |
| elif type(value) is dict: | |
| db_map[key] = {"M":py_dict_to_db_map(value)} | |
| elif type(value) is list: | |
| db_map[key] = {"L":py_list_to_db_list(value)} | |
| elif type(value) is bytes: | |
| db_map[key] = {"B":value} | |
| elif type(value) is bool: | |
| db_map[key] = {"BOOL":value} | |
| elif value is None: | |
| db_map[key] = {"NULL":True} | |
| elif type(value) is set: | |
| db_map[key] = {"L":py_list_to_db_list(value)} | |
| return db_map | |
| # @terminal_print | |
| def db_list_to_py_list(db_list): | |
| ''' | |
| this function convert dynamodb list data structure to python list | |
| Parameters | |
| ---------- | |
| db_list : list | |
| dynamodb list | |
| Returns | |
| ------- | |
| list | |
| python list | |
| ''' | |
| py_list = [] | |
| for d in db_list: | |
| for t,v in d.items(): | |
| if t == "M": | |
| py_list.append(db_map_to_py_dict(v)) | |
| elif t == "L": | |
| py_list.append(db_list_to_py_list(v)) | |
| elif t =="N": | |
| if "." in v: | |
| py_list.append(float(v)) | |
| else: | |
| py_list.append(int(v)) | |
| elif t =="S" or t =="BOOL" or t =="SS" or t =="NS": | |
| py_list.append(v) | |
| elif t =="B" or t =="BS": | |
| py_list.append(bytes(v,"utf-8")) | |
| elif t =="NULL": | |
| py_list.append(None) | |
| else: | |
| py_list.append(db_map_to_py_dict(v)) | |
| return py_list | |
| # @terminal_print | |
| def py_list_to_db_list(py_list): | |
| ''' | |
| this function convert python list to dynamodb list data structure | |
| Parameters | |
| ---------- | |
| py_list : list | |
| python list | |
| Returns | |
| ------- | |
| list | |
| dynamodb list | |
| ''' | |
| db_list = [] | |
| for value in py_list: | |
| if type(value) is str: | |
| item = {"S":value} | |
| elif type(value) is int or type(value) is float: | |
| item = {"N":str(value)} | |
| elif type(value) is dict: | |
| item = {"M":py_dict_to_db_map(value)} | |
| elif type(value) is list: | |
| item = {"L":py_list_to_db_list(value)} | |
| elif type(value) is tuple: | |
| item = {"L":py_list_to_db_list(value)} | |
| elif type(value) is bytes: | |
| item = {"B":value} | |
| elif type(value) is bool: | |
| item = {"BOOL":value} | |
| elif value is None: | |
| item = {"NULL":True} | |
| elif type(value) is set: | |
| item = {"L":py_list_to_db_list(value)} | |
| db_list.append(item) | |
| return db_list | |
| def list_dict_to_dict(ls,key): | |
| if all([key in d for d in ls]): | |
| return {d[key]:d for d in ls} | |
| else: | |
| print("key not found in all dictionaries") | |
| return {} | |
| ''' | |
| following functions are for markdown table creation | |
| ''' | |
| def create_md_table(array): | |
| ''' | |
| create markdown tables for an array. | |
| Parameters | |
| ---------- | |
| array: list | |
| a table in the form of a list of lists | |
| Returns | |
| ------- | |
| md_table: str | |
| ''' | |
| md_table = "" | |
| for i,row in enumerate(array): | |
| md_row = "" | |
| for item in row: | |
| md_item = f"| {item} " | |
| md_row += md_item | |
| md_row += "|\n" | |
| md_table += md_row | |
| if i == 0: | |
| md_table += f"| {' | '.join(['---' for _ in range(len(row))])} |\n" | |
| return md_table | |
| ''' | |
| following functions are used for business logic. (to be moved to business logic layer) | |
| ''' | |
| def est_cost(text,rate): | |
| ''' | |
| this function calculate the estimated cost of the translation | |
| please note that the rate is per 1000 tokens. | |
| the structure of the charging function is aligned with openai's api pricing structure. | |
| Parameters | |
| ---------- | |
| text : str | |
| number of tokens in the text | |
| rate : float | |
| rate per 1000 tokens | |
| Returns | |
| ------- | |
| float | |
| estimated cost of the translation''' | |
| n_tokens = len(encoding.encode(text)) | |
| return round(rate*n_tokens/1000,4) |