| from fastapi import FastAPI,UploadFile,File,Form |
| from typing import List,Dict |
| import difflib |
| import librosa |
| from collections import defaultdict |
| from pykakasi import kakasi |
| from dotenv import load_dotenv |
|
|
| import openai |
|
|
| from fastapi import FastAPI,Request |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.templating import Jinja2Templates |
| from fastapi.responses import HTMLResponse,JSONResponse |
|
|
| |
| from fastapi import Depends, HTTPException |
| from starlette.middleware.sessions import SessionMiddleware |
|
|
| import numpy as np |
| import json |
| import pickle |
| import csv |
| import os |
| import ast |
| import shutil |
|
|
| import torch |
| from transformers import Wav2Vec2ForPreTraining,Wav2Vec2Processor |
| from transformers import BertModel,BertJapaneseTokenizer,BertTokenizer |
| from sklearn.metrics.pairwise import cosine_similarity |
|
|
| |
| app = FastAPI() |
| app.mount("/static", StaticFiles(directory="static"), name="static") |
| templates = Jinja2Templates(directory="templates") |
|
|
| app.add_middleware(SessionMiddleware, secret_key="your_secret_key") |
|
|
| |
| nodes=dict() |
| p2c=defaultdict(list) |
| with open('data/all_BirdDBnode.tsv', mode='r', newline='', encoding='utf-8') as f: |
| for row in csv.DictReader(f, delimiter = '\t'): |
| nodes[row["id"]] = row |
| p2c[row["parent_taxon"]].append(row["id"]) |
| print("knowledge data loading is complete !") |
|
|
|
|
| |
| w2v2 = Wav2Vec2ForPreTraining.from_pretrained("model/wav2vec2-bird-jp-all") |
| print("sound model loading is complete !") |
|
|
|
|
| |
| with open('data/sound_vecs.json') as f: |
| sound_vecs = json.load(f) |
| print("sound vec data loading is complete !") |
|
|
|
|
| |
| |
| en_model = BertModel.from_pretrained('model/en_model') |
| en_tokenizer = BertTokenizer.from_pretrained('model/en_tokenizer') |
| ja_model = BertModel.from_pretrained('model/ja_model') |
| ja_tokenizer = BertJapaneseTokenizer.from_pretrained('model/ja_tokenizer') |
| print("language model loading is complete !") |
|
|
| |
| |
| |
| |
| |
|
|
| |
| with open('data/en_name_vecs.bin','rb') as bf: |
| en_name_vecs = pickle.load(bf) |
| print("en_name_vecs data loading is complete !") |
|
|
| with open('data/ja_name_vecs.bin','rb') as bf: |
| ja_name_vecs = pickle.load(bf) |
| print("ja_name_vecs data loading is complete !") |
|
|
| with open('data/en_aliases_vecs_all.bin','rb') as bf: |
| en_aliases_vecs = pickle.load(bf) |
| print("en_aliases_vecs_all data loading is complete !") |
|
|
| with open('data/ja_aliases_vecs.bin','rb') as bf: |
| ja_aliases_vecs = pickle.load(bf) |
| print("ja_aliases_vecs data loading is complete !") |
| print("language vec data loading is complete !") |
|
|
| |
| def raito(query,word): |
| raito = difflib.SequenceMatcher(None, query, word).ratio() |
| return raito |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
| |
|
|
|
|
| |
| def raito_b(q_vec, w_vec): |
| |
| similarity = cosine_similarity(q_vec.unsqueeze(0).numpy(), w_vec.unsqueeze(0).numpy()) |
| return similarity[0][0] |
|
|
|
|
| def raito_bert(q_vec, word, en, aliases): |
| |
| if en == True: |
| tokenizer = en_tokenizer |
| model = en_model |
| |
| if aliases == False:w_vec = en_name_vecs[word] |
| else:w_vec = en_aliases_vecs[word] |
| |
| else: |
| tokenizer = ja_tokenizer |
| model = ja_model |
| |
| if aliases == False:w_vec = ja_name_vecs[word] |
| else:w_vec = ja_aliases_vecs[word] |
|
|
|
|
| similarity = cosine_similarity(q_vec.unsqueeze(0).numpy(), w_vec.unsqueeze(0).numpy()) |
| return similarity[0][0] |
|
|
|
|
| |
|
|
| def small_d(d): |
| if d != None: |
| small_d = {"id":d["id"], |
| "en_name":d["en_name"], |
| "ja_name":d["ja_name"], |
| "en_aliases":d["en_aliases"], |
| "ja_aliases":d["ja_aliases"], |
| "img_urls":d["img_urls"], |
| "taxon_name":d["taxon_name"], |
| "BR_id":d["BirdResearchDB_label01_32k_audio_id"], |
| "JP_id":d["BirdJPBookDB__data_audio_id"] |
| } |
| else: |
| small_d == None |
| return small_d |
|
|
|
|
| def id2ans(myself_id): |
| ans = {"myself":dict(),"my_parent":None,"my_children":None} |
| myself_d = nodes[myself_id] |
| parent_id = nodes[myself_id]["parent_taxon"] |
| parent_d = nodes[parent_id] |
|
|
| ans["myself"] = small_d(myself_d) |
| |
| if parent_id in nodes: |
| ans["my_parent"] = small_d(parent_d) |
| if myself_id in p2c: |
| ans["my_children"] = [small_d(nodes[chile_id]) for chile_id in p2c[myself_id]] |
| return ans |
|
|
|
|
| |
| def cos_sim(v1, v2): |
| return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2)) |
|
|
|
|
| |
| def to_katakana(text): |
| |
| kakasi_instance = kakasi() |
| kakasi_instance.setMode("J", "K") |
| kakasi_instance.setMode("H", "K") |
| |
| |
| conv = kakasi_instance.getConverter() |
| katakana_text = conv.do(text) |
| return katakana_text |
|
|
|
|
| |
| @app.middleware("http") |
| async def some_middleware(request: Request, call_next): |
| response = await call_next(request) |
| session = request.cookies.get('session') |
| if session: |
| response.set_cookie(key='session', value=request.cookies.get('session'), httponly=True) |
| return response |
|
|
| |
|
|
| |
| |
| |
| |
|
|
| |
| def ask_gpt3(question,api_key,max_tokens=2600): |
| |
| |
| bird_prompt = "このデータを基にこの鳥について解説して:" |
|
|
| load_dotenv() |
| openai.api_key = api_key |
| debug_mode = True |
| json_string = json.dumps(question) |
|
|
| response = openai.Completion.create( |
| engine="text-davinci-003", |
| |
| prompt=bird_prompt+json_string, |
| max_tokens=max_tokens, |
| stop=None, |
| temperature=0.7, |
| ) |
| return response.choices[0].text.strip() |
|
|
| |
| def aliases_str(d_aliases): |
| if d_aliases == "{}": |
| return "" |
| else: |
| |
| d_aliases = ast.literal_eval(d_aliases) |
| aliases = "" |
| for k,v in d_aliases.items(): |
| aliases = aliases+v+"/" |
|
|
| aliases = aliases[:-1] |
| if aliases != "": |
| aliases = "("+aliases+")" |
| |
| return aliases |
|
|
| def imgs_list(d_img_urls): |
| img_urls = [] |
| if d_img_urls == "{}": |
| pass |
| else: |
| d_img_urls = json.loads(d_img_urls.replace("'",'"')) |
| for k,v in d_img_urls.items(): |
| img_urls.append(v) |
| return img_urls |
|
|
| def d4html(myself_d,self_or_parent,n): |
| print(self_or_parent+"_taxon_name"+n) |
| new_d ={ |
| self_or_parent+"_taxon_name"+n:myself_d["taxon_name"], |
| self_or_parent+"_ja_name"+n:myself_d["ja_name"], |
| self_or_parent+"_ja_aliases"+n:aliases_str(myself_d["ja_aliases"]), |
| self_or_parent+"_en_name"+n:myself_d["en_name"], |
| self_or_parent+"_en_aliases"+n:aliases_str(myself_d["en_aliases"]), |
| self_or_parent+"_link"+n:"https://www.wikidata.org/wiki/"+myself_d["id"], |
| self_or_parent+"_imgs_list"+n:imgs_list(myself_d["img_urls"]) |
| } |
| return new_d |
|
|
| def self_d4html(myself_d,n): |
| new_d ={"self_taxon_name"+n:myself_d["taxon_name"], |
| "self_ja_name"+n:myself_d["ja_name"], |
| "self_ja_aliases"+n:aliases_str(myself_d["ja_aliases"]), |
| "self_en_name"+n:myself_d["en_name"], |
| "self_en_aliases"+n:aliases_str(myself_d["en_aliases"]), |
| "self_link"+n:"https://www.wikidata.org/wiki/"+myself_d["id"], |
| "self_imgs_list"+n:imgs_list(myself_d["img_urls"]) |
| } |
| print(new_d) |
| return new_d |
|
|
| def parent_d4html(parent_d,n): |
| new_d ={"parent_taxon_name"+n:parent_d["taxon_name"], |
| "parent_ja_name"+n:parent_d["ja_name"], |
| "parent_ja_aliases"+n:aliases_str(parent_d["ja_aliases"]), |
| "parent_en_name"+n:parent_d["en_name"], |
| "parent_en_aliases"+n:aliases_str(parent_d["en_aliases"]), |
| "parent_link"+n:"https://www.wikidata.org/wiki/"+parent_d["id"], |
| "parent_imgs_list"+n:imgs_list(parent_d["img_urls"]) |
| } |
| return new_d |
|
|
| |
| @app.get("/word_search", response_class=HTMLResponse) |
| async def read_root(request:Request, api_key:str=""): |
| api_key = request.session.get("api_key", "please_input_your_api_key") |
| return templates.TemplateResponse("word_search.html", {"request": request,"api_key":api_key}) |
|
|
|
|
| @app.post("/word_search", response_class=HTMLResponse) |
| async def search_adjacent_nodes(request:Request, api_key: str = Form(...), query: str = Form(...)): |
|
|
| form_data = await request.form() |
| query = form_data["query"] |
| query = to_katakana(query) |
| max_cos_in_wikidata = 0.0 |
| max_id_in_wikidata = None |
|
|
| |
| en_tokens = en_tokenizer(query, return_tensors="pt", padding=True, truncation=True) |
| ja_tokens = ja_tokenizer(query, return_tensors="pt", padding=True, truncation=True) |
| |
| with torch.no_grad(): |
| en_model.eval() |
| en_output = en_model(**en_tokens) |
|
|
| ja_model.eval() |
| ja_output = ja_model(**ja_tokens) |
|
|
| en_q_vec = en_output.last_hidden_state[0][0] |
| ja_q_vec = ja_output.last_hidden_state[0][0] |
|
|
| |
| for node_id,node in nodes.items(): |
| |
| |
| r_in_node = set() |
|
|
| en_name_vec = en_name_vecs[node["en_name"]] |
| ja_name_vec = ja_name_vecs[node["ja_name"]] |
|
|
| r_in_node.add(raito_b(en_q_vec,en_name_vec)) |
| r_in_node.add(raito_b(ja_q_vec,ja_name_vec)) |
| |
| if isinstance(node["en_aliases"], dict): |
| for k,v in node["en_aliases"].items(): |
| en_aliases_vec = en_aliases_vecs[v] |
| r_in_node.add(raito_b(en_q_vec,en_aliases_vec,en=True,aliases=True)) |
| if isinstance(node["ja_aliases"], dict): |
| for k,v in node["ja_aliases"].items(): |
| ja_aliases_vec = ja_aliases_vecs[v] |
| r_in_node.add(raito_b(ja_q_vec,ja_aliases_vec,en=False,aliases=True)) |
|
|
|
|
| if max(r_in_node) != 0.0: |
| if max(r_in_node) > max_cos_in_wikidata: |
| max_cos_in_wikidata = max(r_in_node) |
| max_id_in_wikidata = node_id |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
| |
| |
| |
|
|
| if max_id_in_wikidata!=None: |
| ans_json = id2ans(max_id_in_wikidata) |
|
|
| |
| |
| |
| |
| |
| gpt_ans_self = ask_gpt3(ans_json["myself"],api_key) |
| gpt_ans_parent = ask_gpt3(ans_json["my_parent"],api_key) |
| gpt_ans_children = ask_gpt3(ans_json["my_children"],api_key) |
| print(max_cos_in_wikidata) |
|
|
|
|
| return templates.TemplateResponse("word_search.html", |
| {**{"request": request, |
| "api_key":api_key, |
| "max_cos_in_wikidata":round(max_cos_in_wikidata,4), |
| "gpt_ans_self": gpt_ans_self, |
| "gpt_ans_parent": gpt_ans_parent, |
| "gpt_ans_children": gpt_ans_children}, |
| **d4html(ans_json["myself"],"self",""), |
| **d4html(ans_json["my_parent"],"parent","") |
| }) |
| else: |
| return None |
|
|
|
|
| |
| @app.get("/sound_search", response_class=HTMLResponse) |
| async def read_root(request:Request, api_key:str=""): |
| api_key = request.session.get("api_key", "please_input_your_api_key") |
| return templates.TemplateResponse("sound_search.html", {"request": request,"api_key":api_key}) |
|
|
| @app.post("/sound_search",response_class=HTMLResponse) |
| async def sound_search(request: Request, api_key: str = Form(...), file: UploadFile = File(...)): |
| |
|
|
| uploaded_dir = "uploaded" |
| shutil.rmtree(uploaded_dir) |
| os.mkdir(uploaded_dir) |
|
|
| with open(uploaded_dir+"/"+file.filename, "wb") as f: |
| f.write(await file.read()) |
|
|
| sound_data,_ = librosa.load("uploaded/"+file.filename, sr=16000) |
| result = w2v2(torch.tensor([sound_data])) |
| hidden_vecs = result.projected_states |
| input_vecs = np.mean(hidden_vecs[0].cpu().detach().numpy(), axis=0) |
|
|
| max_cos_sim = 0.0 |
| max_in_sounddata = None |
|
|
| id_cos_d = dict() |
|
|
| for d in sound_vecs: |
| cos = cos_sim(input_vecs,d["vector"]) |
| id_cos_d[d["id"][0]]=cos |
|
|
| id_cos_sorted = sorted(id_cos_d.items(), key=lambda x:x[1],reverse=True) |
|
|
| try: |
| ans_json_1 = id2ans(id_cos_sorted[0][0]) |
| ans_json_2 = id2ans(id_cos_sorted[1][0]) |
| ans_json_3 = id2ans(id_cos_sorted[2][0]) |
|
|
| |
| gpt_ans_self_1 = ask_gpt3(ans_json_1["myself"],api_key) |
| gpt_ans_parent_1 = ask_gpt3(ans_json_1["my_parent"],api_key) |
| gpt_ans_children_1 = ask_gpt3(ans_json_1["my_children"],api_key) |
|
|
| gpt_ans_self_2 = ask_gpt3(ans_json_2["myself"],api_key) |
| gpt_ans_parent_2 = ask_gpt3(ans_json_2["my_parent"],api_key) |
| gpt_ans_children_2 = ask_gpt3(ans_json_2["my_children"],api_key) |
|
|
| gpt_ans_self_3 = ask_gpt3(ans_json_3["myself"],api_key) |
| gpt_ans_parent_3 = ask_gpt3(ans_json_3["my_parent"],api_key) |
| gpt_ans_children_3 = ask_gpt3(ans_json_3["my_children"],api_key) |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
|
|
| return templates.TemplateResponse("sound_search.html", |
| {**{"request": request, |
| "api_key":api_key, |
| "max_cos_in_wikidata_1":round(id_cos_sorted[0][1],4), |
| "max_cos_in_wikidata_2":round(id_cos_sorted[1][1],4), |
| "max_cos_in_wikidata_3":round(id_cos_sorted[2][1],4), |
| "gpt_ans_self_1": gpt_ans_self_1, |
| "gpt_ans_parent_1": gpt_ans_parent_1, |
| "gpt_ans_children_1": gpt_ans_children_1, |
| "gpt_ans_self_2": gpt_ans_self_2, |
| "gpt_ans_parent_2": gpt_ans_parent_2, |
| "gpt_ans_children_2": gpt_ans_children_2, |
| "gpt_ans_self_3": gpt_ans_self_3, |
| "gpt_ans_parent_3": gpt_ans_parent_3, |
| "gpt_ans_children_3": gpt_ans_children_3}, |
| **d4html(ans_json_1["myself"],"self","_1"), |
| **d4html(ans_json_1["my_parent"],"parent","_1"), |
| **d4html(ans_json_2["myself"],"self","_2"), |
| **d4html(ans_json_2["my_parent"],"parent","_2"), |
| **d4html(ans_json_3["myself"],"self","_3"), |
| **d4html(ans_json_3["my_parent"],"parent","_3") |
| }) |
| except: |
| return None |
|
|