|
|
import json |
|
|
import regex as re |
|
|
import gradio as gr |
|
|
import tiktoken |
|
|
import asyncio |
|
|
|
|
|
from application import * |
|
|
from pdfminer.high_level import extract_text |
|
|
from pdfminer.pdfparser import PDFParser |
|
|
from pdfminer.pdfdocument import PDFDocument |
|
|
|
|
|
encoding = tiktoken.get_encoding("cl100k_base") |
|
|
|
|
|
''' |
|
|
universal system functions |
|
|
''' |
|
|
|
|
|
def aterminal_print(afunc): |
|
|
from datetime import datetime |
|
|
async def wrapper(*args, **kwargs): |
|
|
start = datetime.now() |
|
|
print(f"{start.strftime('%y-%m-%d %H:%M:%S')} - executing function: {afunc.__name__}") |
|
|
|
|
|
result = await afunc(*args, **kwargs) |
|
|
|
|
|
end = datetime.now() |
|
|
print(f"{end.strftime('%y-%m-%d %H:%M:%S')} - completed function: {afunc.__name__}, runtime: {end-start} seconds") |
|
|
|
|
|
return result |
|
|
|
|
|
return wrapper |
|
|
|
|
|
def terminal_print(func): |
|
|
from datetime import datetime |
|
|
|
|
|
def wrapper(*args, **kwargs): |
|
|
start = datetime.now() |
|
|
print(f"{start.strftime('%y-%m-%d %H:%M:%S')} - executing function: {func.__name__}") |
|
|
|
|
|
result = func(*args, **kwargs) |
|
|
|
|
|
end = datetime.now() |
|
|
print(f"{end.strftime('%y-%m-%d %H:%M:%S')} - completed function: {func.__name__}, runtime: {end-start} seconds") |
|
|
|
|
|
return result |
|
|
|
|
|
return wrapper |
|
|
|
|
|
|
|
|
''' |
|
|
following functions are for file manipulation |
|
|
''' |
|
|
|
|
|
@terminal_print |
|
|
def read_pdf(file_path): |
|
|
''' |
|
|
this function read the pdf file and return the text |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
file_path : str |
|
|
path to the pdf file |
|
|
|
|
|
Returns |
|
|
------- |
|
|
text : str |
|
|
text extracted from the pdf file |
|
|
''' |
|
|
|
|
|
if type(file_path) is str: |
|
|
file_obj = open(file_path, 'rb') |
|
|
|
|
|
else: |
|
|
file_obj = open(file_path.name, 'rb') |
|
|
|
|
|
text = extract_text(file_obj) |
|
|
text = remove_symbols(text) |
|
|
text = remove_citation(text) |
|
|
|
|
|
parser = PDFParser(file_obj) |
|
|
doc = PDFDocument(parser) |
|
|
|
|
|
meta = doc.info |
|
|
|
|
|
file_obj.close() |
|
|
|
|
|
return text, meta |
|
|
|
|
|
''' |
|
|
following functions are for format standard response |
|
|
''' |
|
|
|
|
|
def format_response(code,data): |
|
|
''' |
|
|
this function format the response to be returned to the client. |
|
|
this is used for lambda serverless framework to return the response. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
code : int |
|
|
status code |
|
|
data : dict |
|
|
data to be returned to the client |
|
|
|
|
|
Returns |
|
|
------- |
|
|
dict |
|
|
formatted response |
|
|
''' |
|
|
return { |
|
|
"statusCode":code, |
|
|
"headers":{ |
|
|
"Access-Control-Allow-Origin": "*", |
|
|
"Content-Type": "application/json" |
|
|
}, |
|
|
"body":json.dumps(data), |
|
|
"isBase64Encoded": False |
|
|
} |
|
|
|
|
|
''' |
|
|
following functions are for string manipulation |
|
|
''' |
|
|
|
|
|
@terminal_print |
|
|
def format_text(text,remove_char_ls = ["\\n--\\n","\\n\\n","\n"]): |
|
|
''' |
|
|
this function format the text output by removing excessive characters |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
text : str |
|
|
text to be processed |
|
|
|
|
|
Returns |
|
|
------- |
|
|
str |
|
|
processed text |
|
|
''' |
|
|
for c in remove_char_ls: |
|
|
text = text.replace(c,"") |
|
|
|
|
|
return text |
|
|
|
|
|
@terminal_print |
|
|
def remove_symbols(text): |
|
|
''' |
|
|
this function remove symbols that are not in unicode |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
text : str |
|
|
text to be processed |
|
|
|
|
|
Returns |
|
|
------- |
|
|
str |
|
|
processed text |
|
|
''' |
|
|
text = text.encode("ascii", "ignore").decode() |
|
|
text = text.replace('-\n', '') |
|
|
return text |
|
|
|
|
|
@terminal_print |
|
|
def remove_citation(text): |
|
|
''' |
|
|
this function remove citation pattern in the text |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
text : str |
|
|
text to be processed |
|
|
|
|
|
Returns |
|
|
------- |
|
|
str |
|
|
processed text |
|
|
''' |
|
|
return re.sub(r'\(cid:\d+\)','',text) |
|
|
|
|
|
@terminal_print |
|
|
def str_to_tuple(s): |
|
|
''' |
|
|
this function convert string to tuple |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
s : str |
|
|
string to be converted |
|
|
|
|
|
Returns |
|
|
------- |
|
|
tuple |
|
|
converted tuple |
|
|
''' |
|
|
return tuple(s.replace("(","").replace(")","").split(",")) |
|
|
|
|
|
@terminal_print |
|
|
def replace_symbols(s): |
|
|
''' |
|
|
this function replace symbols in the string to comply with file names |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
s : str |
|
|
string to be replaced |
|
|
|
|
|
Returns |
|
|
------- |
|
|
str |
|
|
replaced string |
|
|
''' |
|
|
s = s.replace(" ","_") |
|
|
s = s.replace(",","") |
|
|
s = s.replace(".","") |
|
|
s = s.replace("-","_") |
|
|
s = s.replace("(","") |
|
|
s = s.replace(")","") |
|
|
s = s.replace("/","_") |
|
|
s = s.replace(":","") |
|
|
s = s.replace(";","") |
|
|
s = s.replace("'","") |
|
|
s = s.replace('"',"") |
|
|
return s |
|
|
|
|
|
''' |
|
|
following functions are for dynamodb data manipulation |
|
|
''' |
|
|
|
|
|
|
|
|
def db_map_to_py_dict(db_map): |
|
|
''' |
|
|
this function convert dynamodb map data structure to python dictionary |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
db_map : dict |
|
|
dynamodb map |
|
|
|
|
|
Returns |
|
|
------- |
|
|
dict |
|
|
python dictionary |
|
|
''' |
|
|
py_dict = {} |
|
|
for k,i in db_map.items(): |
|
|
for l,v in i.items(): |
|
|
if l == "M": |
|
|
py_dict[k] = db_map_to_py_dict(v) |
|
|
elif l == "S": |
|
|
py_dict[k] = v |
|
|
elif l == "N": |
|
|
py_dict[k] = int(v) if float(v)%1 ==0 else float(v) |
|
|
elif l == "L": |
|
|
py_dict[k] = db_list_to_py_list(v) |
|
|
elif l == "BS": |
|
|
py_dict[k] = v |
|
|
elif l == "BOOL": |
|
|
py_dict[k] = v |
|
|
elif l =="NULL": |
|
|
py_dict[k] = None |
|
|
else: |
|
|
py_dict[k] = v |
|
|
|
|
|
return py_dict |
|
|
|
|
|
|
|
|
def py_dict_to_db_map(py_dict): |
|
|
''' |
|
|
this function convert python dictionary to dynamodb map data structure |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
py_dict : dict |
|
|
python dictionary |
|
|
|
|
|
Returns |
|
|
------- |
|
|
dict |
|
|
dynamodb map |
|
|
''' |
|
|
db_map = {} |
|
|
for key,value in py_dict.items(): |
|
|
key = str(key) |
|
|
if type(value) is str: |
|
|
db_map[key] = {"S":value} |
|
|
elif type(value) is int or type(value) is float: |
|
|
db_map[key] = {"N":str(value)} |
|
|
elif type(value) is dict: |
|
|
db_map[key] = {"M":py_dict_to_db_map(value)} |
|
|
elif type(value) is list: |
|
|
db_map[key] = {"L":py_list_to_db_list(value)} |
|
|
elif type(value) is bytes: |
|
|
db_map[key] = {"B":value} |
|
|
elif type(value) is bool: |
|
|
db_map[key] = {"BOOL":value} |
|
|
elif value is None: |
|
|
db_map[key] = {"NULL":True} |
|
|
elif type(value) is set: |
|
|
db_map[key] = {"L":py_list_to_db_list(value)} |
|
|
return db_map |
|
|
|
|
|
|
|
|
def db_list_to_py_list(db_list): |
|
|
''' |
|
|
this function convert dynamodb list data structure to python list |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
db_list : list |
|
|
dynamodb list |
|
|
|
|
|
Returns |
|
|
------- |
|
|
list |
|
|
python list |
|
|
''' |
|
|
py_list = [] |
|
|
for d in db_list: |
|
|
for t,v in d.items(): |
|
|
if t == "M": |
|
|
py_list.append(db_map_to_py_dict(v)) |
|
|
elif t == "L": |
|
|
py_list.append(db_list_to_py_list(v)) |
|
|
elif t =="N": |
|
|
if "." in v: |
|
|
py_list.append(float(v)) |
|
|
else: |
|
|
py_list.append(int(v)) |
|
|
elif t =="S" or t =="BOOL" or t =="SS" or t =="NS": |
|
|
py_list.append(v) |
|
|
elif t =="B" or t =="BS": |
|
|
py_list.append(bytes(v,"utf-8")) |
|
|
elif t =="NULL": |
|
|
py_list.append(None) |
|
|
elif t =="BOOL": |
|
|
py_list.append(bool(v)) |
|
|
else: |
|
|
py_list.append(db_map_to_py_dict(v)) |
|
|
|
|
|
return py_list |
|
|
|
|
|
|
|
|
def py_list_to_db_list(py_list): |
|
|
''' |
|
|
this function convert python list to dynamodb list data structure |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
py_list : list |
|
|
python list |
|
|
|
|
|
Returns |
|
|
------- |
|
|
list |
|
|
dynamodb list |
|
|
''' |
|
|
db_list = [] |
|
|
for value in py_list: |
|
|
if type(value) is str: |
|
|
item = {"S":value} |
|
|
elif type(value) is int or type(value) is float: |
|
|
item = {"N":str(value)} |
|
|
elif type(value) is dict: |
|
|
item = {"M":py_dict_to_db_map(value)} |
|
|
|
|
|
elif type(value) is list: |
|
|
item = {"L":py_list_to_db_list(value)} |
|
|
elif type(value) is tuple: |
|
|
item = {"L":py_list_to_db_list(value)} |
|
|
elif type(value) is bytes: |
|
|
item = {"B":value} |
|
|
elif type(value) is bool: |
|
|
item = {"BOOL":value} |
|
|
elif value is None: |
|
|
item = {"NULL":True} |
|
|
elif type(value) is set: |
|
|
item = {"L":py_list_to_db_list(value)} |
|
|
|
|
|
db_list.append(item) |
|
|
|
|
|
return db_list |
|
|
|
|
|
def list_dict_to_dict(ls,key): |
|
|
result_dict = {} |
|
|
for d in ls: |
|
|
if key in d: |
|
|
result_dict[d[key]] = d |
|
|
return result_dict |
|
|
|
|
|
''' |
|
|
following functions are for markdown table creation |
|
|
''' |
|
|
|
|
|
@terminal_print |
|
|
def create_md_table(array): |
|
|
''' |
|
|
create markdown tables for an array. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
array: list |
|
|
a table in the form of a list of lists |
|
|
|
|
|
Returns |
|
|
------- |
|
|
md_table: str |
|
|
''' |
|
|
md_table = "" |
|
|
|
|
|
for i,row in enumerate(array): |
|
|
md_row = "" |
|
|
for item in row: |
|
|
md_item = f"| {item} " |
|
|
md_row += md_item |
|
|
|
|
|
md_row += "|\n" |
|
|
md_table += md_row |
|
|
|
|
|
if i == 0: |
|
|
md_table += f"| {' | '.join(['---' for _ in range(len(row))])} |\n" |
|
|
|
|
|
return md_table |
|
|
|
|
|
''' |
|
|
following functions are used for business logic. (to be moved to business logic layer) |
|
|
''' |
|
|
|
|
|
@terminal_print |
|
|
def est_cost(text,rate): |
|
|
''' |
|
|
this function calculate the estimated cost of the translation |
|
|
please note that the rate is per 1000 tokens. |
|
|
the structure of the charging function is aligned with openai's api pricing structure. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
text : str |
|
|
number of tokens in the text |
|
|
rate : float |
|
|
rate per 1000 tokens |
|
|
|
|
|
Returns |
|
|
------- |
|
|
float |
|
|
estimated cost of the translation''' |
|
|
n_tokens = len(encoding.encode(text)) |
|
|
return round(rate*n_tokens/1000,4) |
|
|
|
|
|
def create_md_tables(items): |
|
|
''' |
|
|
create markdown tables for the articles. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
items: dict |
|
|
a dictionary of items |
|
|
|
|
|
Returns |
|
|
------- |
|
|
md_text: str |
|
|
''' |
|
|
md_text = "" |
|
|
|
|
|
md_text += "| ID " |
|
|
sample = items[list(items.keys())[0]] |
|
|
for header in sample.keys(): |
|
|
md_text += f"| {header} " |
|
|
|
|
|
md_text += "|\n" |
|
|
|
|
|
|
|
|
md_text += "| --- " |
|
|
for _ in sample.keys(): |
|
|
md_text += "| --- " |
|
|
|
|
|
md_text += "|\n" |
|
|
|
|
|
|
|
|
for key,content in items.items(): |
|
|
md_table = f"| {key} " |
|
|
for value in content.values(): |
|
|
md_table += f"| {value} " |
|
|
md_table += "|\n" |
|
|
md_text += md_table |
|
|
|
|
|
return md_text |