Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import struct | |
| import binascii | |
| import datetime | |
| import csv | |
| import json | |
| import requests | |
| from transformers import AutoTokenizer, AutoModel | |
| import torch | |
| from torch import Tensor | |
| import torch.nn.functional as F | |
| import numpy as np | |
| from scipy.spatial.distance import cdist | |
| from duckduckgo_search import DDGS | |
| from bs4 import BeautifulSoup | |
| from markdownify import markdownify | |
| model_name = "cl-nagoya/ruri-large" | |
| # "mixedbread-ai/mxbai-embed-large-v1" | |
| # "intfloat/multilingual-e5-large" | |
| input_dir = 'input' | |
| vectors_dir = 'vectors' | |
| model = None | |
| tokenizer = None | |
| device = None | |
| vectors = {} | |
| os.makedirs(input_dir, exist_ok=True) | |
| os.makedirs(vectors_dir, exist_ok=True) | |
| def ddg(text, max_results = 5): | |
| with DDGS() as ddgs: | |
| results = [r for r in ddgs.text(text, max_results=max_results)] | |
| print(results) | |
| return results | |
| def bs4(url): | |
| html = requests.get(url).text | |
| soup = BeautifulSoup(html, features="html.parser") | |
| # kill all script and style elements | |
| for script in soup(["script", "style"]): | |
| script.extract() # rip it out | |
| # get text | |
| text = soup.get_text() | |
| # break into lines and remove leading and trailing space on each | |
| lines = (line.strip() for line in text.splitlines()) | |
| # break multi-headlines into a line each | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| # drop blank lines | |
| text = '\n'.join(chunk for chunk in chunks if chunk) | |
| return text | |
| def md(url): | |
| html = requests.get(url).text.replace("\r", '') | |
| mdtxt = markdownify(html) | |
| mdtxt = re.sub("\n+", "\n", mdtxt, flags=(re.MULTILINE | re.DOTALL)) | |
| return mdtxt | |
| def upload(name, filename, content): | |
| os.makedirs(f"{input_dir}/{name}", exist_ok=True) | |
| srcpath = f"{input_dir}/{name}/{filename}" | |
| with open(srcpath, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| def delete(name, filename): | |
| srcpath = f"{input_dir}/{name}/{filename}" | |
| dstpath = f"{vectors_dir}/{name}/{filename}" | |
| if os.path.exists(srcpath): | |
| os.unlink(srcpath) | |
| if os.path.exists(dstpath): | |
| os.unlink(dstpath) | |
| def load_model(): | |
| global model, tokenizer, device | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| # CUDAが利用可能かチェックし、利用可能であればデバイスをCUDAに設定 | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"Using device: {device}") | |
| # モデルをデバイスに移動 | |
| model = AutoModel.from_pretrained(model_name).to(device) | |
| def average_pool(last_hidden_states: Tensor, attention_mask: Tensor) -> Tensor: | |
| last_hidden = last_hidden_states.masked_fill(~attention_mask[..., None].bool(), 0.0) | |
| return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None] | |
| def cosine_similarity(v1, v2): | |
| return 1 - cdist([v1], [v2], 'cosine')[0][0] | |
| def embedding(): | |
| for name in os.listdir(input_dir): | |
| os.makedirs(f"{input_dir}/{name}", exist_ok=True) | |
| os.makedirs(f"{vectors_dir}/{name}", exist_ok=True) | |
| for filename in os.listdir(f"{input_dir}/{name}"): | |
| embedding_file(name, filename) | |
| def embedding_file(name, filename): | |
| srcpath = f"{input_dir}/{name}/{filename}" | |
| dstpath = f"{vectors_dir}/{name}/{filename}" | |
| if os.path.isdir(srcpath): | |
| return | |
| if os.path.exists(dstpath): | |
| return | |
| print(srcpath) | |
| chunks = [] | |
| with open(srcpath, 'r', encoding='utf-8') as csv_file: | |
| reader = csv.reader(csv_file) | |
| for r in reader: | |
| if not r: | |
| continue | |
| if r[0] == 'chunk': # header | |
| continue | |
| if len(r) == 1: | |
| r.append('') | |
| chunks.append(r) | |
| # CSVファイルを開き、書き込みます | |
| with open(dstpath, mode='w', encoding='utf-8', newline='') as csv_file: | |
| fieldnames = ['chunk', 'output', 'vector'] | |
| writer = csv.DictWriter(csv_file, fieldnames=fieldnames) | |
| writer.writeheader() | |
| for r in chunks: | |
| writer.writerow({'chunk': r[0], 'output': r[1], 'vector': get_vector_string(r[0])}) | |
| def get_vector_string(chunk): | |
| global model, tokenizer, device | |
| inputs = tokenizer(chunk, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device) | |
| with torch.no_grad(): # 勾配計算を不要にする | |
| outputs = model(**inputs) | |
| embeddings = average_pool(outputs.last_hidden_state, inputs['attention_mask']) | |
| embeddings = F.normalize(embeddings, p=2, dim=1) | |
| vector_string = ",".join([hex(struct.unpack('>Q', struct.pack('>d', x))[0])[2:-7] for x in embeddings[0].cpu().numpy()]) # ベクトルを文字列に変換 | |
| return vector_string | |
| def load_vectors(): | |
| global vectors | |
| vectors = {} | |
| for name in os.listdir(vectors_dir): | |
| vectors[name] = [] | |
| for filename in os.listdir(f"{vectors_dir}/{name}"): | |
| filepath = f"{vectors_dir}/{name}/{filename}" | |
| with open(filepath, mode='r', encoding='utf-8') as csv_file: | |
| reader = csv.DictReader(csv_file) | |
| for row in reader: | |
| vector = np.array([struct.unpack('>d', binascii.unhexlify(x+'0000000'))[0] for x in row['vector'].split(',')]) | |
| vectors[name].append([row['chunk'], row['output'], vector]) | |
| def search(name, query_text, num = 3): | |
| dt = datetime.datetime.now() | |
| # クエリテキストをエンベディング | |
| inputs = tokenizer(query_text, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device) | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| query_embeddings = average_pool(outputs.last_hidden_state, inputs['attention_mask']) | |
| query_embeddings = F.normalize(query_embeddings, p=2, dim=1).cpu().numpy()[0] | |
| # CSVファイルを読み込み、各レコードとクエリの類似度を計算 | |
| similarities = [] | |
| for row in vectors[name]: | |
| similarity = cosine_similarity(query_embeddings, row[2]) | |
| similarities.append((row, similarity)) | |
| # 類似度でソートし、上位num件の結果を取得 | |
| top_matches = sorted(similarities, key=lambda x: x[1], reverse=True)[:num] | |
| result = '' | |
| for i, (row, similarity) in enumerate(top_matches, 1): | |
| if not row[1]: | |
| row[1] = row[0] | |
| result += f"#{i} {similarity*100:.2f}%\n{row[1]}\n\n" | |
| print(result) | |
| print(datetime.datetime.now() - dt) | |
| return result | |
| load_model() | |
| load_vectors() | |
| if __name__ == '__main__': | |
| embedding() | |