import pandas as pd import numpy as np from sklearn.feature_extraction.text import CountVectorizer from sklearn.feature_extraction.text import TfidfTransformer import re from xml.dom.minidom import parseString import os import json import nltk from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from nltk.stem.snowball import SnowballStemmer class BasicSearch: # constructor function def __init__(self, doctype = 'minfin-letters') : self.doctype = doctype self.load_everything() # read data def load_basic_data(self, data_directory = 'data') : # global title # global text # global qtitle # global qtext # global atitle # global atext # global questions # global answers # global added_refs # global missed_refs text_file = open(os.path.join(data_directory, 'taxcode.xml'), "r", encoding="utf-8") data = text_file.read() text_file.close() document = parseString('' + data + '') self.title = document.getElementsByTagName('title') self.text = document.getElementsByTagName('text') text_file = open(os.path.join(data_directory, 'K2-answer.xml'), "r", encoding="utf-8") textdata = text_file.read() text_file.close() document = parseString('' + textdata + '') self.atitle = document.getElementsByTagName('title') self.atext = document.getElementsByTagName('text') text_file = open(os.path.join(data_directory, 'K2-question.xml'), "r", encoding="utf-8") textdata = text_file.read() text_file.close() document = parseString('' + textdata + '') self.qtitle = document.getElementsByTagName('title') self.qtext = document.getElementsByTagName('text') # fname2 = 'references-04-12-2023.xml' text_file = open(os.path.join(data_directory, 'references-04-12-2023.xml'), "r", encoding="utf-8") textdata = text_file.read() text_file.close() document = parseString('' + textdata + '') reftext = document.getElementsByTagName('text') text_file = open(os.path.join(data_directory, 'references-Vlad-11-12-2023.xml'), "r", encoding="utf-8") textdata = text_file.read() text_file.close() document = parseString('' + textdata + '') reftext2 = document.getElementsByTagName('text') # reading Vlad's json data datadir = os.path.join(data_directory, 'data_jsons_20240104') filelist = os.listdir(datadir) filelist.sort() questions = [''] * len(filelist) answers = [''] * len(filelist) added_refs = [[]] * len(filelist) missed_refs = [[]] * len(filelist) count = 0 for filename in filelist : x = filename.find('.') if x == -1 : print('ERROR :', filename) if filename[:x].isnumeric() : i = int(filename[:x]) # print(i) f = open(os.path.join(datadir, filename), encoding="utf-8") d = json.load(f) refs = set(d['added_refs'].keys()) refs -= {''} refs = list(refs) questions[i] = d['question'] answers[i] = d['answer'] missed_refs[i] = d['refs'] added_refs[i] = refs count += 1 self.questions = questions[:count] self.answers = answers[:count] self.added_refs = added_refs[:count] self.missed_refs = missed_refs[:count] def load_text_processing(self) : # globals stop_words # global stemmer # nltk.download('punkt') # nltk.download('stopwords') # nlp = ru_core_news_md.load() self.stop_words = set(stopwords.words('russian')) self.stemmer = SnowballStemmer("russian") def analyze(self, s) : template = r'[\'\"\.\,\?\!\:\;\-\+\%\^\&\*\@\~\_\=/\\\>\<\#\$\(\)\|\n\r\d]' s = re.sub(template, ' ', s) s = re.sub(' +', ' ', s) # tokens = nlp(s) # tokens = [str(t.lemma_) for t in tokens] tokens = word_tokenize(s) tokens = [t for t in tokens if t not in self.stop_words and t != ' '] tokens = [self.stemmer.stem(word) for word in tokens] newtext = ' '.join(tokens) return newtext # load medium dataset def load_medium_dataset(self) : # global dataset_medium infile = open(os.path.join('data', 'search_data', 'medium_dataset.json'), 'r', encoding="utf-8") self.dataset_medium = json.load(infile) # create a filtered list of references for Vlad's json data def create_filtered_refs(self) : doctype = self.doctype added_refs = self.added_refs # global filtered_refs # global doctype_template # t = r'(НКРФ|ГКРФ|ТКРФ|ФЗ|[Зз]акон|Минфин|ФНС|Правительства|ФАС|АС|КС|ВС|[Сс]удебн|[Сс]уд)' if doctype == 'court-decisions' : doctype_template = r'(ФАС |АС |КС |ВС |[Сс]удебн|[Сс]уд)' # courts' decisions elif doctype == 'minfin-letters' : doctype_template = r'[Пп]исьмо [Мм]инфина' # Minfin letters elif doctype == 'fns-letters' : doctype_template = r'[Пп]исьмо (ФНС|фнс)' # FNS letters elif doctype == 'all-letters' : doctype_template = r'(ФАС |АС |КС |ВС |[Сс]удебн|[Сс]уд|[Пп]исьмо [Мм]инфина|[Пп]исьмо (ФНС|фнс))' # courts' decisions + Minfin letters + FNS letters else : print('Error : wrong doctype') filtered_refs = [] for i in range(len(added_refs)) : refs = [] for j in range(len(added_refs[i])) : s = added_refs[i][j] if re.search(doctype_template, s) != None: refs.append(s) # print(i, j, s) filtered_refs.append(refs) self.filtered_refs = filtered_refs self.doctype_template = doctype_template # creating corpora fo TF-IDF embedding def create_corpora(self) : # global qcorpus # global nkcorpus # global pmfcorpus # global pmfrefs # global pmfids # global items self.qcorpus = [] for i in range(len(self.qtext)) : if not i % 100 : print(i, end = ' ') s = self.qtext[i].firstChild.nodeValue s = self.analyze(s) self.qcorpus.append(s) self.nkcorpus = [] for i in range(len(self.text)) : if not i % 100 : print(i, end = ' ') s = self.text[i].firstChild.nodeValue s = self.analyze(s) self.nkcorpus.append(s) self.pmfcorpus = [] self.pmfrefs = [] self.pmfids = [] i = 0 self.items = [] for key, value in self.dataset_medium.items() : # print('test') # break if re.search(self.doctype_template, key) != None : s = value ss = key if s != None : s = s.replace('\n', ' ') if s != None and s.count(' ') < 12000 : if not i % 100 : print(i, end = ' ') # print('test') # break s = self.analyze(s) self.pmfcorpus.append(s) self.pmfrefs.append(ss) self.pmfids.append(i) self.items.append({'title' : key, 'text' : value}) i += 1 # build up TF-IDF representation def create_TFIDF(self) : # global TFIDF # global QTFIDF # global vectorizer # global transformer self.vectorizer = CountVectorizer() self.transformer = TfidfTransformer(smooth_idf = False, norm = 'l2', sublinear_tf = True) X = self.vectorizer.fit_transform(self.pmfcorpus) QX = self.vectorizer.transform(self.qcorpus) self.TFIDF = self.transformer.fit_transform(X) self.QTFIDF = self.transformer.transform(QX) # get top letters sorted by TF-IDF cosine similarity def getTop(self, i, top) : v = self.QTFIDF[i] vt = v.transpose() scores = self.TFIDF.dot(vt)[:, 0].todense() scores = np.squeeze(np.asarray(scores)) df = pd.DataFrame() df[0] = scores df[1] = self.pmfrefs df.sort_values(0, ascending = False, inplace = True) # df.sort_values(0, ascending = True, inplace = True) # ids = df.index ids = df[1] # print(df) return ids[:top].tolist() def test_TFIDF_top(self, top = 40) : N = len(self.qtext) allhits = 0 allrefs = 0 recall = [] precision = [] f1 = [] for i in range(N) : # if not i % 10 : print(i, end = ' ') refs = set(self.filtered_refs[i]) resp = self.getTop(i, top) serp = set(resp) hits = len(refs & serp) tp = hits fp = top - tp fn = len(refs) - hits if tp == 0 : if fp == 0 and fn == 0 : # print(i, len(refs), fp, fn) recall.append(1) precision.append(1) f1.append(1) else : # print(i, len(refs), fp, fn) recall.append(0) precision.append(0) f1.append(0) else : recall.append(tp / (tp + fn)) precision.append(tp / (tp + fp)) f1.append(2 * tp / (2 * tp + fp + fn)) print() print('mean recall:', sum(recall) / len(recall)) print('mean precision:', sum(precision) / len(precision)) # print('mean F1:', 2 / (len(recall) / sum(recall) + len(precision) / sum(precision))) print('mean F1:', sum(f1) / len(f1)) # get letters with TF-IDF cosine similarity score > value def getTopByScoreValue(self, i, value) : v = self.QTFIDF[i] vt = v.transpose() scores = self.TFIDF.dot(vt)[:, 0].todense() scores = np.squeeze(np.asarray(scores)) df = pd.DataFrame() df[0] = scores df[1] = self.pmfrefs df.sort_values(0, ascending = False, inplace = True) df1 = df.loc[df[0] > value] ids = df1[1] return ids.tolist() # calculate metrics for letters with TF-IDF cosine similarity score > value def test_TFIDF_value(self, value = .4) : N = len(self.qtext) allhits = 0 allrefs = 0 recall = [] precision = [] f1 = [] topsize = [] count = 0 for i in range(N) : # if not i % 10 : print(i, end = ' ') refs = set(self.filtered_refs[i]) resp = self.getTopByScoreValue(i, value) serp = set(resp) hits = len(refs & serp) top = len(resp) topsize.append(top) if top > 0 : count += 1 tp = hits fp = top - tp fn = len(refs) - hits if tp == 0 : if fp == 0 and fn == 0 : recall.append(1) precision.append(1) f1.append(1) else : recall.append(0) precision.append(0) f1.append(0) else : recall.append(tp / (tp + fn)) precision.append(tp / (tp + fp)) f1.append(2 * tp / (2 * tp + fp + fn)) print() print('mean recall:', sum(recall) / len(recall)) print('mean precision:', sum(precision) / len(precision)) print('mean F1:', sum(f1) / len(f1)) print('mean top size: ', sum(topsize) / len(topsize)) count, count / 517 # get letters with TF-IDF cosine similarity score > top score * ratio def getTopByScoreRelValue(self, i, ratio) : v = self.QTFIDF[i] vt = v.transpose() scores = self.TFIDF.dot(vt)[:, 0].todense() scores = np.squeeze(np.asarray(scores)) df = pd.DataFrame() df[0] = scores df[1] = self.pmfrefs df.sort_values(0, ascending = False, inplace = True) value = df.iloc[0, 0] df1 = df.loc[df[0] > value * ratio] ids = df1[1] return ids.tolist() # calculate metrics for letters with TF-IDF cosine similarity score > top score * ratio def test_TFIDF_ratio(self, ratio = .9) : N = len(self.qtext) allhits = 0 allrefs = 0 recall = [] precision = [] f1 = [] topsize = [] count = 0 for i in range(N) : # if not i % 10 : print(i, end = ' ') refs = set(self.filtered_refs[i]) resp = self.getTopByScoreRelValue(i, ratio) serp = set(resp) hits = len(refs & serp) top = len(resp) topsize.append(top) tp = hits fp = top - tp fn = len(refs) - hits r = 0 p = 0 f = 0 if tp == 0 : if fp == 0 and fn == 0 : recall.append(1) precision.append(1) f1.append(1) r = 1 p = 1 f = 1 else : recall.append(0) precision.append(0) f1.append(0) else : recall.append(tp / (tp + fn)) precision.append(tp / (tp + fp)) f1.append(2 * tp / (2 * tp + fp + fn)) r = tp / (tp + fn) p = tp / (tp + fp) f = 2 * tp / (2 * tp + fp + fn) if (f > r and f > p) or (f < r and f < p) : print('ERROR :', i, r, p, f) print() print('mean recall:', sum(recall) / len(recall)) print('mean precision:', sum(precision) / len(precision)) print('mean F1:', sum(f1) / len(f1)) print('mean top size: ', sum(topsize) / len(topsize)) # def getTopForQuery(self, i, top, query) : # v = QTFIDF[i] # vt = v.transpose() # scores = TFIDF.dot(vt)[:, 0].todense() # scores = np.squeeze(np.asarray(scores)) # df = pd.DataFrame() # df[0] = scores # df[1] = pmfrefs # df.sort_values(0, ascending = False, inplace = True) # # df.sort_values(0, ascending = True, inplace = True) # # ids = df.index # ids = df[1] # # print(df) # return ids[:top].tolist() def load_everything(self) : self.load_basic_data() self.load_text_processing() s = '|()><.,!?:;=*-/\\8. Форма \n \r Cчета-фактуры и порядок его заполнения, формы и порядок ведения журнала учета полученных и выставленных счетов-фактур, книг покупок и книг продаж устанавливаются Правительством Российской Федерации.' print(self.analyze(s)) self.load_medium_dataset() self.create_filtered_refs() self.create_corpora() print(len(self.pmfcorpus)) self.create_TFIDF() def test_everything(self) : self.test_TFIDF_top(top = 40) self.test_TFIDF_value(value = .4) self.test_TFIDF_ratio(ratio = .9) def search(self, query, top = 10) : analyzed_query = self.analyze(query) query_TF = self.vectorizer.transform([analyzed_query]) query_TFIDF = self.transformer.transform(query_TF) v = query_TFIDF[0] vt = v.transpose() scores = self.TFIDF.dot(vt)[:, 0].todense() scores = np.squeeze(np.asarray(scores)) df = pd.DataFrame() df[0] = scores df[1] = self.pmfrefs df.sort_values(0, ascending = False, inplace = True) # df.sort_values(0, ascending = True, inplace = True) # ids = df.index ids = df[1] # print(df) titles = ids[:top].tolist() docs = [] for id in ids : docs.append(self.dataset_medium[id]) return titles, docs