nn-search-api-v5 / BasicSearch.py
muryshev's picture
init
936a3f8
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
import re
from xml.dom.minidom import parseString
import os
import json
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem.snowball import SnowballStemmer
class BasicSearch:
# constructor function
def __init__(self, doctype = 'minfin-letters') :
self.doctype = doctype
self.load_everything()
# read data
def load_basic_data(self, data_directory = 'data') :
# global title
# global text
# global qtitle
# global qtext
# global atitle
# global atext
# global questions
# global answers
# global added_refs
# global missed_refs
text_file = open(os.path.join(data_directory, 'taxcode.xml'), "r", encoding="utf-8")
data = text_file.read()
text_file.close()
document = parseString('<data>' + data + '</data>')
self.title = document.getElementsByTagName('title')
self.text = document.getElementsByTagName('text')
text_file = open(os.path.join(data_directory, 'K2-answer.xml'), "r", encoding="utf-8")
textdata = text_file.read()
text_file.close()
document = parseString('<data>' + textdata + '</data>')
self.atitle = document.getElementsByTagName('title')
self.atext = document.getElementsByTagName('text')
text_file = open(os.path.join(data_directory, 'K2-question.xml'), "r", encoding="utf-8")
textdata = text_file.read()
text_file.close()
document = parseString('<data>' + textdata + '</data>')
self.qtitle = document.getElementsByTagName('title')
self.qtext = document.getElementsByTagName('text')
# fname2 = 'references-04-12-2023.xml'
text_file = open(os.path.join(data_directory, 'references-04-12-2023.xml'), "r", encoding="utf-8")
textdata = text_file.read()
text_file.close()
document = parseString('<data>' + textdata + '</data>')
reftext = document.getElementsByTagName('text')
text_file = open(os.path.join(data_directory, 'references-Vlad-11-12-2023.xml'), "r", encoding="utf-8")
textdata = text_file.read()
text_file.close()
document = parseString('<data>' + textdata + '</data>')
reftext2 = document.getElementsByTagName('text')
# reading Vlad's json data
datadir = os.path.join(data_directory, 'data_jsons_20240104')
filelist = os.listdir(datadir)
filelist.sort()
questions = [''] * len(filelist)
answers = [''] * len(filelist)
added_refs = [[]] * len(filelist)
missed_refs = [[]] * len(filelist)
count = 0
for filename in filelist :
x = filename.find('.')
if x == -1 :
print('ERROR :', filename)
if filename[:x].isnumeric() :
i = int(filename[:x])
# print(i)
f = open(os.path.join(datadir, filename), encoding="utf-8")
d = json.load(f)
refs = set(d['added_refs'].keys())
refs -= {''}
refs = list(refs)
questions[i] = d['question']
answers[i] = d['answer']
missed_refs[i] = d['refs']
added_refs[i] = refs
count += 1
self.questions = questions[:count]
self.answers = answers[:count]
self.added_refs = added_refs[:count]
self.missed_refs = missed_refs[:count]
def load_text_processing(self) :
# globals stop_words
# global stemmer
# nltk.download('punkt')
# nltk.download('stopwords')
# nlp = ru_core_news_md.load()
self.stop_words = set(stopwords.words('russian'))
self.stemmer = SnowballStemmer("russian")
def analyze(self, s) :
template = r'[\'\"\.\,\?\!\:\;\-\+\%\^\&\*\@\~\_\=/\\\>\<\#\$\(\)\|\n\r\d]'
s = re.sub(template, ' ', s)
s = re.sub(' +', ' ', s)
# tokens = nlp(s)
# tokens = [str(t.lemma_) for t in tokens]
tokens = word_tokenize(s)
tokens = [t for t in tokens if t not in self.stop_words and t != ' ']
tokens = [self.stemmer.stem(word) for word in tokens]
newtext = ' '.join(tokens)
return newtext
# load medium dataset
def load_medium_dataset(self) :
# global dataset_medium
infile = open(os.path.join('data', 'search_data', 'medium_dataset.json'), 'r', encoding="utf-8")
self.dataset_medium = json.load(infile)
# create a filtered list of references for Vlad's json data
def create_filtered_refs(self) :
doctype = self.doctype
added_refs = self.added_refs
# global filtered_refs
# global doctype_template
# t = r'(НКРФ|ГКРФ|ТКРФ|ФЗ|[Зз]акон|Минфин|ФНС|Правительства|ФАС|АС|КС|ВС|[Сс]удебн|[Сс]уд)'
if doctype == 'court-decisions' :
doctype_template = r'(ФАС |АС |КС |ВС |[Сс]удебн|[Сс]уд)' # courts' decisions
elif doctype == 'minfin-letters' :
doctype_template = r'[Пп]исьмо [Мм]инфина' # Minfin letters
elif doctype == 'fns-letters' :
doctype_template = r'[Пп]исьмо (ФНС|фнс)' # FNS letters
elif doctype == 'all-letters' :
doctype_template = r'(ФАС |АС |КС |ВС |[Сс]удебн|[Сс]уд|[Пп]исьмо [Мм]инфина|[Пп]исьмо (ФНС|фнс))' # courts' decisions + Minfin letters + FNS letters
else :
print('Error : wrong doctype')
filtered_refs = []
for i in range(len(added_refs)) :
refs = []
for j in range(len(added_refs[i])) :
s = added_refs[i][j]
if re.search(doctype_template, s) != None:
refs.append(s)
# print(i, j, s)
filtered_refs.append(refs)
self.filtered_refs = filtered_refs
self.doctype_template = doctype_template
# creating corpora fo TF-IDF embedding
def create_corpora(self) :
# global qcorpus
# global nkcorpus
# global pmfcorpus
# global pmfrefs
# global pmfids
# global items
self.qcorpus = []
for i in range(len(self.qtext)) :
if not i % 100 : print(i, end = ' ')
s = self.qtext[i].firstChild.nodeValue
s = self.analyze(s)
self.qcorpus.append(s)
self.nkcorpus = []
for i in range(len(self.text)) :
if not i % 100 : print(i, end = ' ')
s = self.text[i].firstChild.nodeValue
s = self.analyze(s)
self.nkcorpus.append(s)
self.pmfcorpus = []
self.pmfrefs = []
self.pmfids = []
i = 0
self.items = []
for key, value in self.dataset_medium.items() :
# print('test')
# break
if re.search(self.doctype_template, key) != None :
s = value
ss = key
if s != None :
s = s.replace('\n', ' ')
if s != None and s.count(' ') < 12000 :
if not i % 100 : print(i, end = ' ')
# print('test')
# break
s = self.analyze(s)
self.pmfcorpus.append(s)
self.pmfrefs.append(ss)
self.pmfids.append(i)
self.items.append({'title' : key, 'text' : value})
i += 1
# build up TF-IDF representation
def create_TFIDF(self) :
# global TFIDF
# global QTFIDF
# global vectorizer
# global transformer
self.vectorizer = CountVectorizer()
self.transformer = TfidfTransformer(smooth_idf = False, norm = 'l2', sublinear_tf = True)
X = self.vectorizer.fit_transform(self.pmfcorpus)
QX = self.vectorizer.transform(self.qcorpus)
self.TFIDF = self.transformer.fit_transform(X)
self.QTFIDF = self.transformer.transform(QX)
# get top letters sorted by TF-IDF cosine similarity
def getTop(self, i, top) :
v = self.QTFIDF[i]
vt = v.transpose()
scores = self.TFIDF.dot(vt)[:, 0].todense()
scores = np.squeeze(np.asarray(scores))
df = pd.DataFrame()
df[0] = scores
df[1] = self.pmfrefs
df.sort_values(0, ascending = False, inplace = True)
# df.sort_values(0, ascending = True, inplace = True)
# ids = df.index
ids = df[1]
# print(df)
return ids[:top].tolist()
def test_TFIDF_top(self, top = 40) :
N = len(self.qtext)
allhits = 0
allrefs = 0
recall = []
precision = []
f1 = []
for i in range(N) :
# if not i % 10 : print(i, end = ' ')
refs = set(self.filtered_refs[i])
resp = self.getTop(i, top)
serp = set(resp)
hits = len(refs & serp)
tp = hits
fp = top - tp
fn = len(refs) - hits
if tp == 0 :
if fp == 0 and fn == 0 :
# print(i, len(refs), fp, fn)
recall.append(1)
precision.append(1)
f1.append(1)
else :
# print(i, len(refs), fp, fn)
recall.append(0)
precision.append(0)
f1.append(0)
else :
recall.append(tp / (tp + fn))
precision.append(tp / (tp + fp))
f1.append(2 * tp / (2 * tp + fp + fn))
print()
print('mean recall:', sum(recall) / len(recall))
print('mean precision:', sum(precision) / len(precision))
# print('mean F1:', 2 / (len(recall) / sum(recall) + len(precision) / sum(precision)))
print('mean F1:', sum(f1) / len(f1))
# get letters with TF-IDF cosine similarity score > value
def getTopByScoreValue(self, i, value) :
v = self.QTFIDF[i]
vt = v.transpose()
scores = self.TFIDF.dot(vt)[:, 0].todense()
scores = np.squeeze(np.asarray(scores))
df = pd.DataFrame()
df[0] = scores
df[1] = self.pmfrefs
df.sort_values(0, ascending = False, inplace = True)
df1 = df.loc[df[0] > value]
ids = df1[1]
return ids.tolist()
# calculate metrics for letters with TF-IDF cosine similarity score > value
def test_TFIDF_value(self, value = .4) :
N = len(self.qtext)
allhits = 0
allrefs = 0
recall = []
precision = []
f1 = []
topsize = []
count = 0
for i in range(N) :
# if not i % 10 : print(i, end = ' ')
refs = set(self.filtered_refs[i])
resp = self.getTopByScoreValue(i, value)
serp = set(resp)
hits = len(refs & serp)
top = len(resp)
topsize.append(top)
if top > 0 :
count += 1
tp = hits
fp = top - tp
fn = len(refs) - hits
if tp == 0 :
if fp == 0 and fn == 0 :
recall.append(1)
precision.append(1)
f1.append(1)
else :
recall.append(0)
precision.append(0)
f1.append(0)
else :
recall.append(tp / (tp + fn))
precision.append(tp / (tp + fp))
f1.append(2 * tp / (2 * tp + fp + fn))
print()
print('mean recall:', sum(recall) / len(recall))
print('mean precision:', sum(precision) / len(precision))
print('mean F1:', sum(f1) / len(f1))
print('mean top size: ', sum(topsize) / len(topsize))
count, count / 517
# get letters with TF-IDF cosine similarity score > top score * ratio
def getTopByScoreRelValue(self, i, ratio) :
v = self.QTFIDF[i]
vt = v.transpose()
scores = self.TFIDF.dot(vt)[:, 0].todense()
scores = np.squeeze(np.asarray(scores))
df = pd.DataFrame()
df[0] = scores
df[1] = self.pmfrefs
df.sort_values(0, ascending = False, inplace = True)
value = df.iloc[0, 0]
df1 = df.loc[df[0] > value * ratio]
ids = df1[1]
return ids.tolist()
# calculate metrics for letters with TF-IDF cosine similarity score > top score * ratio
def test_TFIDF_ratio(self, ratio = .9) :
N = len(self.qtext)
allhits = 0
allrefs = 0
recall = []
precision = []
f1 = []
topsize = []
count = 0
for i in range(N) :
# if not i % 10 : print(i, end = ' ')
refs = set(self.filtered_refs[i])
resp = self.getTopByScoreRelValue(i, ratio)
serp = set(resp)
hits = len(refs & serp)
top = len(resp)
topsize.append(top)
tp = hits
fp = top - tp
fn = len(refs) - hits
r = 0
p = 0
f = 0
if tp == 0 :
if fp == 0 and fn == 0 :
recall.append(1)
precision.append(1)
f1.append(1)
r = 1
p = 1
f = 1
else :
recall.append(0)
precision.append(0)
f1.append(0)
else :
recall.append(tp / (tp + fn))
precision.append(tp / (tp + fp))
f1.append(2 * tp / (2 * tp + fp + fn))
r = tp / (tp + fn)
p = tp / (tp + fp)
f = 2 * tp / (2 * tp + fp + fn)
if (f > r and f > p) or (f < r and f < p) :
print('ERROR :', i, r, p, f)
print()
print('mean recall:', sum(recall) / len(recall))
print('mean precision:', sum(precision) / len(precision))
print('mean F1:', sum(f1) / len(f1))
print('mean top size: ', sum(topsize) / len(topsize))
# def getTopForQuery(self, i, top, query) :
# v = QTFIDF[i]
# vt = v.transpose()
# scores = TFIDF.dot(vt)[:, 0].todense()
# scores = np.squeeze(np.asarray(scores))
# df = pd.DataFrame()
# df[0] = scores
# df[1] = pmfrefs
# df.sort_values(0, ascending = False, inplace = True)
# # df.sort_values(0, ascending = True, inplace = True)
# # ids = df.index
# ids = df[1]
# # print(df)
# return ids[:top].tolist()
def load_everything(self) :
self.load_basic_data()
self.load_text_processing()
s = '|()><.,!?:;=*-/\\8. Форма \n \r Cчета-фактуры и порядок его заполнения, формы и порядок ведения журнала учета полученных и выставленных счетов-фактур, книг покупок и книг продаж устанавливаются Правительством Российской Федерации.'
print(self.analyze(s))
self.load_medium_dataset()
self.create_filtered_refs()
self.create_corpora()
print(len(self.pmfcorpus))
self.create_TFIDF()
def test_everything(self) :
self.test_TFIDF_top(top = 40)
self.test_TFIDF_value(value = .4)
self.test_TFIDF_ratio(ratio = .9)
def search(self, query, top = 10) :
analyzed_query = self.analyze(query)
query_TF = self.vectorizer.transform([analyzed_query])
query_TFIDF = self.transformer.transform(query_TF)
v = query_TFIDF[0]
vt = v.transpose()
scores = self.TFIDF.dot(vt)[:, 0].todense()
scores = np.squeeze(np.asarray(scores))
df = pd.DataFrame()
df[0] = scores
df[1] = self.pmfrefs
df.sort_values(0, ascending = False, inplace = True)
# df.sort_values(0, ascending = True, inplace = True)
# ids = df.index
ids = df[1]
# print(df)
titles = ids[:top].tolist()
docs = []
for id in ids :
docs.append(self.dataset_medium[id])
return titles, docs