# coding=utf-8 import pandas as pd import numpy as np from sklearn.feature_extraction.text import CountVectorizer from sklearn.feature_extraction.text import TfidfTransformer from scipy import sparse import re from xml.dom.minidom import parseString #, parse import os import sys import json # alpha = 1.15 # beta = .2 # gamma = .4 # delta = .31 # epsilon = 0 alpha = 0 beta = .55 gamma = .0 delta = .2 epsilon = 0 zeta = .65 # stemmer class class Porter: PERFECTIVEGROUND = re.compile(u"((ив|ивши|ившись|ыв|ывши|ывшись)|((?<=[ая])(в|вши|вшись)))$") REFLEXIVE = re.compile(u"(с[яь])$") ADJECTIVE = re.compile(u"(ее|ие|ые|ое|ими|ыми|ей|ий|ый|ой|ем|им|ым|ом|его|ого|ему|ому|их|ых|ую|юю|ая|яя|ою|ею)$") PARTICIPLE = re.compile(u"((ивш|ывш|ующ)|((?<=[ая])(ем|нн|вш|ющ|щ)))$") VERB = re.compile(u"((ила|ыла|ена|ейте|уйте|ите|или|ыли|ей|уй|ил|ыл|им|ым|ен|ило|ыло|ено|ят|ует|уют|ит|ыт|ены|ить|ыть|ишь|ую|ю)|((?<=[ая])(ла|на|ете|йте|ли|й|л|ем|н|ло|но|ет|ют|ны|ть|ешь|нно)))$") NOUN = re.compile(u"(а|ев|ов|ие|ье|е|иями|ями|ами|еи|ии|и|ией|ей|ой|ий|й|иям|ям|ием|ем|ам|ом|о|у|ах|иях|ях|ы|ь|ию|ью|ю|ия|ья|я)$") RVRE = re.compile(u"^(.*?[аеиоуыэюя])(.*)$") DERIVATIONAL = re.compile(u".*[^аеиоуыэюя]+[аеиоуыэюя].*ость?$") DER = re.compile(u"ость?$") SUPERLATIVE = re.compile(u"(ейше|ейш)$") I = re.compile(u"и$") P = re.compile(u"ь$") NN = re.compile(u"нн$") def stem(word): # word = word.lower() word = word.replace(u'ё', u'е') m = re.match(Porter.RVRE, word) if m and m.groups(): pre = m.group(1) rv = m.group(2) temp = Porter.PERFECTIVEGROUND.sub('', rv, 1) if temp == rv: rv = Porter.REFLEXIVE.sub('', rv, 1) temp = Porter.ADJECTIVE.sub('', rv, 1) if temp != rv: rv = temp rv = Porter.PARTICIPLE.sub('', rv, 1) else: temp = Porter.VERB.sub('', rv, 1) if temp == rv: rv = Porter.NOUN.sub('', rv, 1) else: rv = temp else: rv = temp rv = Porter.I.sub('', rv, 1) if re.match(Porter.DERIVATIONAL, rv): rv = Porter.DER.sub('', rv, 1) temp = Porter.P.sub('', rv, 1) if temp == rv: rv = Porter.SUPERLATIVE.sub('', rv, 1) rv = Porter.NN.sub(u'н', rv, 1) else: rv = temp word = pre+rv return word stem = staticmethod(stem) class BasicSearch: # constructor function def __init__(self, doctype = 'minfin-letters', data_directory = './') : self.doctype = doctype self.load_everything(data_directory=data_directory) def read_xml(self, path): with open(path, "r", encoding="utf-8") as text_file: data = text_file.read() document = parseString('' + data + '') return [ document.getElementsByTagName('title'), document.getElementsByTagName('text') ] def getRefsNK(self, s) : i = 0 refs = set() x = 0 while x != -1 : x = s.lower().find(' ст.', x) if x != -1 : # x += 1 y = s.lower().find('нк рф', x) if y != -1 : # print(i) # print(x, y) dx = 4 if s[x + dx] == ' ' : dx = 5 if y - x <= 13 and y - x > 5 : # print(s[x + 4: y + 5]) ref = 'Статья ' + s[x + dx: y - 1] if ref in self.refid : refs.add(ref) x = y else : # print('error: ', s[x + 4: y + 5]) x += 1 i += 1 if i > 1000 : break return list(refs) def getRefsNK1(self, s, debug = False, altrefs = set()) : i = 0 refs = set() x = 0 slen = len(s) s0 = s s = s.replace('(',' ') s = s.replace(')',' ') s = s.replace(';',' ') s = s.replace(':',' ') s = s.replace(',',' ') while x != -1 : # print(x) x1 = s.lower().find('нк рф', x) if x1 == -1 : break # print(x) x2 = x1 - 12 x2 = max(x2, 0) x31 = s.lower().find('ст.', x2) x32 = s.lower().find('ьей', x2) x33 = s.lower().find('ьёй', x2) x34 = s.lower().find('ями', x2) x35 = s.lower().find('тьи', x2) x36 = s.lower().find('тье', x2) if x31 == -1 : x31 = slen if x32 == -1 : x32 = slen if x33 == -1 : x33 = slen if x34 == -1 : x34 = slen if x35 == -1 : x35 = slen if x36 == -1 : x36 = slen x3 = min(x31, x32, x33, x34, x35, x36) # print(x1, x2, x3) # if x3 > x1 : # print('not found: ', s0[x2 : x1 + 5]) x = x3 # print(x) if x != -1 : # x += 1 y = s.lower().find('нк рф', x) if y != -1 : # print(i) # print(y) # print(s) dx = 3 if s[x + dx] == ' ' : dx += 1 if y - x <= 13 and y - x > 4 : # print(s[x + 4: y + 5]) ref = 'Статья ' + s[x + dx: y - 1] if ref in self.refid : refs.add(ref) if debug and (ref not in altrefs): print('...' + s0[y - 40 : y + 5]) x = y + 1 else : # print('error: ', s[x + 4: y + 5]) x += 1 i += 1 if i > 1000 : break return list(refs) def getRefsNK2(self, s, debug = False, altrefs = set()) : i = 0 refs = set() x = 0 slen = len(s) s0 = s s = s.replace('(',' ') s = s.replace(')',' ') s = s.replace(';',' ') s = s.replace(':',' ') s = s.replace(',',' ') while x != -1 : # print(x) x1 = s.lower().find('нкрф', x) if x1 == -1 : break # print(x) x2 = x1 - 12 x2 = max(x2, 0) x3 = s.lower().find('ст.', x2) # print(x1, x2, x3) # if x3 > x1 : # print('not found: ', s0[x2 : x1 + 5]) x = x3 # print(x) if x != -1 : # x += 1 y = s.lower().find('нкрф', x) if y != -1 : # print(i) # print(y) # print(s) dx = 3 if s[x + dx] == ' ' : dx += 1 if y - x <= 13 and y - x > 4 : # print(s[x + 4: y + 5]) ref = 'Статья ' + s[x + dx: y - 1] if ref in self.refid : refs.add(ref) if debug and (ref not in altrefs): print('...' + s0[y - 40 : y + 5]) x = y + 1 else : # print('error: ', s[x + 4: y + 5]) x += 1 i += 1 if i > 1000 : break return list(refs) # read data def load_basic_data(self, data_directory = 'data') : # global title # global text # global qtitle # global qtext # global atitle # global atext # global questions # global answers # global added_refs # global missed_refs self.title, self.text = self.read_xml(os.path.join(data_directory, 'taxcode.xml')) self.atitle, self.atext = self.read_xml(os.path.join(data_directory, 'K2-answer.xml')) self.qtitle, self.qtext = self.read_xml(os.path.join(data_directory, 'K2-question.xml')) _, reftext = self.read_xml(os.path.join(data_directory, 'references-04-12-2023.xml')) reflist = [set()] * len(self.qtitle) reflist1 = [set()] * len(self.qtitle) qreflist = [set()] * len(self.qtitle) def getRefNK(s) : x = s.find('. ') y = s.find(' (') if x == -1 : x = sys.maxsize if y == -1 : y = sys.maxsize x = min(x, y) id = s[:x] return id self.refid = {} self.titleref = {} self.idref = [0] * len(self.title) for i in range(len(self.title)) : s = self.title[i].firstChild.nodeValue id = getRefNK(s) self.refid[id] = i self.titleref[s] = id self.idref[i] = id for i in range(len(self.qtext)) : # for i in range(1,2) : doctext = self.atext[i].firstChild.nodeValue qdoctext = self.qtext[i].firstChild.nodeValue refdoctext = reftext[i].firstChild.nodeValue refs = self.getRefsNK1(doctext) qrefs = self.getRefsNK1(qdoctext) refs1 = self.getRefsNK2(refdoctext) # print(refs, qrefs) intrefs = [] intrefs1 = [] intqrefs = [] for ref in refs : intrefs.append(self.refid[ref]) for ref in refs1 : intrefs1.append(self.refid[ref]) for ref in qrefs : intqrefs.append(self.refid[ref]) reflist[i] = set(intrefs) reflist1[i] = set(intrefs1) qreflist[i] = set(intqrefs) for i in range(len(reflist)) : reflist[i] |= reflist1[i] self.nk_refs = [] for i in range(len(reflist)) : refs = list(reflist[i]) newrefs = [] for j in range(len(refs)) : ref = self.idref[refs[j]] m = re.search('(\d+\.\d+|\d+)', ref) s = ref[m.start() : m.end()] ref1 = 'ст.' + s + ' НКРФ' newrefs.append(ref1) self.nk_refs.append(newrefs) # reading Vlad's json data # datadir = os.path.join(data_directory, 'data_jsons_20240104') datadir = os.path.join(data_directory, 'data_jsons_20240119') filelist = os.listdir(datadir) filelist = [x for x in filelist if re.search(r'\d+.json', x)] filelist.sort() questions = [''] * len(filelist) answers = [''] * len(filelist) added_refs = [[]] * len(filelist) missed_refs = [[]] * len(filelist) count = 0 for filename in filelist : x = filename.find('.') if x == -1 : print('ERROR :', filename) if filename[:x].isnumeric() : i = int(filename[:x]) # print(i) with open(os.path.join(datadir, filename), 'r', encoding='utf-8') as f: d = json.load(f) refs = set(d['added_refs'].keys()) refs -= {''} refs = list(refs) questions[i] = d['question'] answers[i] = d['answer'] missed_refs[i] = d['refs'] added_refs[i] = refs count += 1 self.questions = questions#[:count] self.answers = answers#[:count] self.added_refs = added_refs#[:count] self.missed_refs = missed_refs#[:count] def load_text_processing(self) : # globals stop_words # global stemmer # nltk.download('punkt') # nltk.download('stopwords') # nlp = ru_core_news_md.load() # self.stop_words = set(stopwords.words('russian')) self.stop_words = {'а', 'без', 'более', 'больше', 'будет', 'будто', 'бы', 'был', 'была', 'были', 'было', 'быть', 'в', 'вам', 'вас', 'вдруг', 'ведь', 'во', 'вот', 'впрочем', 'все', 'всегда', 'всего', 'всех', 'всю', 'вы', 'где', 'да', 'даже', 'два', 'для', 'до', 'другой', 'его', 'ее', 'ей', 'ему', 'если', 'есть', 'еще', 'ж', 'же', 'за', 'зачем', 'здесь', 'и', 'из', 'или', 'им', 'иногда', 'их', 'к', 'как', 'какая', 'какой', 'когда', 'конечно', 'кто', 'куда', 'ли', 'лучше', 'между', 'меня', 'мне', 'много', 'может', 'можно', 'мой', 'моя', 'мы', 'на', 'над', 'надо', 'наконец', 'нас', 'не', 'него', 'нее', 'ней', 'нельзя', 'нет', 'ни', 'нибудь', 'никогда', 'ним', 'них', 'ничего', 'но', 'ну', 'о', 'об', 'один', 'он', 'она', 'они', 'опять', 'от', 'перед', 'по', 'под', 'после', 'потом', 'потому', 'почти', 'при', 'про', 'раз', 'разве', 'с', 'сам', 'свою', 'себе', 'себя', 'сейчас', 'со', 'совсем', 'так', 'такой', 'там', 'тебя', 'тем', 'теперь', 'то', 'тогда', 'того', 'тоже', 'только', 'том', 'тот', 'три', 'тут', 'ты', 'у', 'уж', 'уже', 'хорошо', 'хоть', 'чего', 'чем', 'через', 'что', 'чтоб', 'чтобы', 'чуть', 'эти', 'этого', 'этой', 'этом', 'этот', 'эту', 'я'} # self.stemmer = SnowballStemmer("russian") self.stemmer = Porter() def analyze(self, s) : template = r'[\'\"\.\,\?\!\:\;\-\+\%\^\&\*\@\~\_\=/\\\>\<\#\$\(\)\|\n\r\d]' s = re.sub(template, ' ', s) # template = r'( \w |^\w | \w$)' # s = re.sub(template, ' ', s) # s = re.sub(' +', ' ', s) s = ' '.join( [w for w in s.split() if len(w) > 1] ) # tokens = nlp(s) # tokens = [str(t.lemma_) for t in tokens] # tokens = word_tokenize(s) tokens = s.strip().lower().split(' ') # tokens = [t for t in tokens if t not in self.stop_words and t != ' '] # tokens = [self.stemmer.stem(word) for word in tokens] tokens = [self.stemmer.stem(word) for word in tokens if word not in self.stop_words] newtext = ' '.join(tokens) return newtext # load medium dataset def load_medium_dataset(self, path) : # global dataset_medium with open(path, 'r', encoding='utf-8') as infile: self.dataset_medium = json.load(infile) # data_path = "./legal_info_search_data/data_jsons_20240119" # all_docs = {} # for filename in os.listdir(data_path): # with open(os.path.join(data_path, filename), "r", encoding="utf-8") as f: # all_docs[int(filename.split(".")[0])] = json.load(f) # # filter out docs with no added_refs # dataset_small = {} # for key, value in all_docs.items() : # added_refs = value['added_refs'] # dataset_small.update(added_refs) # # self.dataset_medium = dataset_small # dataset_new = {} # for key in dataset_small : # m = re.search(r'(ст.(\d+\.\d+|\d+) [НГТ]КРФ|Федеральный закон|Постановление Правительства РФ|Приказ ФНС РФ|Решение Коллегии Евразийской экономической комиссии)', key) # s = key # if m != None : # s = key[m.start() : ] # if s in self.dataset_medium : # dataset_new[s] = self.dataset_medium[s] # elif s in dataset_small : # dataset_new[s] = dataset_small[s] # else : # dataset_new[key] = dataset_small[key] # # print(key, 'is absent') # self.dataset_medium = dataset_new # create a filtered list of references for Vlad's json data def create_filtered_refs(self) : doctype = self.doctype added_refs = self.added_refs # global filtered_refs # global doctype_template # t = r'(НКРФ|ГКРФ|ТКРФ|ФЗ|[Зз]акон|Минфин|ФНС|Правительства|ФАС|АС|КС|ВС|[Сс]удебн|[Сс]уд)' if doctype == 'court-decisions' : doctype_template = r'(ФАС |АС |КС |ВС |[Сс]удебн|[Сс]уд)' # courts' decisions ref_template = doctype_template elif doctype == 'minfin-letters' : doctype_template = r'[Пп]исьмо [Мм]инфина' # Minfin letters ref_template = doctype_template elif doctype == 'fns-letters' : doctype_template = r'[Пп]исьмо (ФНС|фнс)' # FNS letters ref_template = doctype_template elif doctype == 'all-letters' : doctype_template = r'(ФАС |АС |КС |ВС |[Сс]удебн|[Сс]уд|[Пп]исьмо [Мм]инфина|[Пп]исьмо (ФНС|фнс))' # courts' decisions + Minfin letters + FNS letters ref_template = doctype_template elif doctype == 'taxcode' : doctype_template = r'^ст.(\d+\.\d+|\d+) НКРФ' ref_template = r'ст.(\d+\.\d+|\d+) НКРФ' # taxcode ref formst differs from doctype format elif doctype == 'other-laws' : doctype_template = r'(^ст.(\d+\.\d+|\d+) [ГТ]КРФ|^Федеральный закон|Приказ ФНС РФ|Постановление Правительства РФ|Решение Коллегии Евразийской экономической комиссии)' # courts' decisions + Minfin letters + FNS letters + taxcode ref_template = r'(ст.(\d+\.\d+|\d+) [ГТ]КРФ|Федеральный закон|Приказ ФНС РФ|Постановление Правительства РФ|Решение Коллегии Евразийской экономической комиссии)' # taxcode ref formst differs from doctype format elif doctype == 'all-docs' : # doctype_template = r'(ФАС |АС |КС |ВС |[Сс]удебн|[Сс]уд|[Пп]исьмо [Мм]инфина|[Пп]исьмо (ФНС|фнс)|^ст.(\d+\.\d+|\d+) НКРФ)' # courts' decisions + Minfin letters + FNS letters + taxcode # ref_template = r'(ФАС |АС |КС |ВС |[Сс]удебн|[Сс]уд|[Пп]исьмо [Мм]инфина|[Пп]исьмо (ФНС|фнс)|ст.(\d+\.\d+|\d+) НКРФ)' # taxcode ref formst differs from doctype format doctype_template = r'(ФАС |АС |КС |ВС |[Сс]удебн|[Сс]уд|[Пп]исьмо [Мм]инфина|[Пп]исьмо (ФНС|фнс)|^ст.(\d+\.\d+|\d+) НКРФ|^ст.(\d+\.\d+|\d+) [ГТ]КРФ|^Федеральный закон|Приказ ФНС РФ|Постановление Правительства РФ|^Решение Коллегии Евразийской экономической комиссии)' # courts' decisions + Minfin letters + FNS letters + taxcode ref_template = r'(ФАС |АС |КС |ВС |[Сс]удебн|[Сс]уд|[Пп]исьмо [Мм]инфина|[Пп]исьмо (ФНС|фнс)|ст.(\d+\.\d+|\d+) НКРФ|ст.(\d+\.\d+|\d+) [ГТ]КРФ|Федеральный закон|Приказ ФНС РФ|Постановление Правительства РФ|Решение Коллегии Евразийской экономической комиссии)' # taxcode ref formst differs from doctype format else : print('Error : wrong doctype "' + doctype + '"') filtered_refs = [] nk_mask = [] for i in range(len(added_refs)) : refs = [] for j in range(len(added_refs[i])) : s = added_refs[i][j] if re.search(ref_template, s) != None: m = re.search(r'(ст.(\d+\.\d+|\d+) [НГТ]КРФ|Федеральный закон|Постановление Правительства РФ|Приказ ФНС РФ|Решение Коллегии Евразийской экономической комиссии)', s) if m != None : s = s[m.start() : ] if s in self.dataset_medium : refs.append(s) # print(i, j, s) if doctype_template.find('НКРФ') != -1 : refs += self.nk_refs[i] refs = list(set(refs)) filtered_refs.append(refs) self.filtered_refs = filtered_refs self.doctype_template = doctype_template # creating corpora fo TF-IDF embedding def create_corpora(self) : self.qcorpus = [] for i in range(len(self.qtext)) : if not i % 100 : print(i, end = ' ') # s = self.qtext[i].firstChild.nodeValue s = self.qtitle[i].firstChild.nodeValue + ' ' + self.qtext[i].firstChild.nodeValue s = self.analyze(s) self.qcorpus.append(s) self.acorpus = [] for i in range(len(self.qtext)) : s = self.atext[i].firstChild.nodeValue s = self.analyze(s) self.acorpus.append(s) # self.nkcorpus = [] # for i in range(len(self.text)) : # if not i % 100 : print(i, end = ' ') # s = self.text[i].firstChild.nodeValue # s = self.analyze(s) # self.nkcorpus.append(s) self.pmfcorpus = [] self.pmfrefs = [] self.pmfids = [] self.pmflengths = [] self.nk_mask = [] self.laws_mask = [] i = 0 self.items = [] for key, value in self.dataset_medium.items() : # print('test') # break if re.search(self.doctype_template, key) != None : s = value ss = key m = re.search(r'(ст.(\d+\.\d+|\d+) [НГТ]КРФ|Федеральный закон|Постановление Правительства РФ|Приказ ФНС РФ|Решение Коллегии Евразийской экономической комиссии)', ss) if m != None : ss = ss[m.start() : ] if s != None : s = s.replace('\n', ' ') if s != None and s.count(' ') : if not i % 100 : print(i, end = ' ') # print('test') # break s = self.analyze(s) if s.count(' ') : self.pmfcorpus.append(s) self.pmfrefs.append(ss) self.pmfids.append(i) self.items.append({'title' : key, 'text' : value}) self.pmflengths.append(s.count(' ')) # if ss.find('НКРФ') != -1 : if re.search(r'НКРФ', ss) : self.nk_mask.append(1) else: self.nk_mask.append(0) if re.search(r'([ГТ]КРФ|Федеральный закон|Приказ ФНС РФ|Постановление Правительства РФ|Решение Коллегии Евразийской экономической комиссии)', ss) : self.laws_mask.append(1) else: self.laws_mask.append(0) i += 1 self.refids = {} for i in range(len(self.pmfrefs)) : key = self.pmfrefs[i] self.refids[key] = i # build up TF-IDF representation def create_TFIDF(self) : self.vectorizer = CountVectorizer() # self.transformer = TfidfTransformer(smooth_idf = False, norm = 'l2', sublinear_tf = True) self.transformer = TfidfTransformer(smooth_idf = False, norm = None, sublinear_tf = True) X = self.vectorizer.fit_transform(self.pmfcorpus) QX = self.vectorizer.transform(self.qcorpus) self.TFIDF = self.transformer.fit_transform(X) self.QTFIDF = self.transformer.transform(QX) # self.norm = [] # for i in range(self.TFIDF.shape[0]) : # n = scipy.sparse.linalg.norm(self.TFIDF[i]) # self.norm.append(n) # self.TFIDF[i] /= n # for i in range(self.QTFIDF.shape[0]) : # qn = scipy.sparse.linalg.norm(self.QTFIDF[i]) # self.QTFIDF[i] /= qn n = np.sqrt(self.TFIDF.multiply(self.TFIDF).sum(axis = 1)) self.TFIDF = self.TFIDF.multiply(sparse.csr_matrix(1 / n)) self.norm = n.flatten().tolist()[0] n = np.sqrt(self.QTFIDF.multiply(self.QTFIDF).sum(axis = 1)) self.QTFIDF = self.QTFIDF.multiply(sparse.csr_matrix(1 / n)) self.avectorizer = CountVectorizer() self.atransformer = TfidfTransformer(smooth_idf = False, norm = 'l2', sublinear_tf = True) # self.atransformer = TfidfTransformer(smooth_idf = False, norm = None, sublinear_tf = True) AX = self.avectorizer.fit_transform(self.acorpus) AQX = self.avectorizer.transform(self.qcorpus) self.ATFIDF = self.atransformer.fit_transform(AX) self.AQTFIDF = self.atransformer.transform(AQX) # get top letters sorted by TF-IDF cosine similarity def getKNNScores(self, v, i = -1) : # v = self.AQTFIDF[i] vt = v.transpose() ascores = self.ATFIDF.dot(vt)[:, 0].todense() ascores = np.squeeze(np.asarray(ascores)) scores = [0] * len(self.refids) for j in range(len(self.filtered_refs)) : score = ascores[j] refs = self.filtered_refs[j] for k in range(len(refs)) : ref = refs[k] m = re.search(r'ст.(\d+\.\d+|\d+) НКРФ', ref) if i != j and m != None : key = ref[m.start() : ] if key in self.refids : id = self.refids[key] if scores[id] < score : scores[id] = score return scores def getScores(self, v1, v2, i = -1) : # v = self.QTFIDF[i] vt = v1.transpose() scores = self.TFIDF.dot(vt)[:, 0].todense() scores = np.squeeze(np.asarray(scores)) nk_scores = self.getKNNScores(v2, i) df = pd.DataFrame() df[0] = scores df[1] = nk_scores df[2] = self.norm df[3] = self.nk_mask df[4] = 1 - df[3] df[5] = (1 - np.sign(df[1])) * df[3] df[0] = df[0] * df[4] + df[1] + df[5] * df[0] * zeta # df[0] = df[0] * df[4] + np.maximum(df[1], df[0] * zeta) df[0] *= np.log(df[2]) ** alpha df[0] *= (1 + df[3] * beta) df[0] += df[3] * gamma df[4] = self.laws_mask df[0] *= (1 + df[4] * delta) df[0] += df[4] * epsilon return df[0].tolist() def getTop(self, i, top) : v1 = self.QTFIDF[i] v2 = self.AQTFIDF[i] df = pd.DataFrame() df[0] = self.getScores(v1, v2, i) # df[0] = self.getKNNScores(i) df[1] = self.pmfrefs df.sort_values(0, ascending = False, inplace = True) # df.sort_values(0, ascending = True, inplace = True) ids = df[1].tolist() scores = df[0].tolist() filtered_ids = [] for i in range(len(ids)) : id = ids[i] score = scores[i] if id not in filtered_ids : filtered_ids.append(id) if len(filtered_ids) == top : break # return ids[:top].tolist() return filtered_ids def test_TFIDF_top(self, top = 40, metric = '') : N = len(self.qtext) allhits = 0 allrefs = 0 recall = [] precision = [] f1 = [] for i in range(N) : # if not i % 10 : print(i, end = ' ') refs = set(self.filtered_refs[i]) resp = self.getTop(i, top) serp = set(resp) hits = len(refs & serp) allhits += hits allrefs += len(refs) tp = hits fp = top - tp fn = len(refs) - hits if tp == 0 and metric == 'corrected': if fp == 0 and fn == 0 : # print(i, len(refs), fp, fn) recall.append(1) precision.append(1) f1.append(1) else : # print(i, len(refs), fp, fn) recall.append(0) precision.append(0) f1.append(0) elif tp + fn > 0 : recall.append(tp / (tp + fn)) precision.append(tp / (tp + fp)) f1.append(2 * tp / (2 * tp + fp + fn)) print('\ntotal: ', allhits, allrefs, allhits / (allrefs + .00001)) print('mean recall:', sum(recall) / len(recall)) print('mean precision:', sum(precision) / len(precision)) print('mean F1:', sum(f1) / len(f1)) # get letters with TF-IDF cosine similarity score > value def getTopByScoreValue(self, i, value) : # v = self.QTFIDF[i] # vt = v.transpose() # scores = self.TFIDF.dot(vt)[:, 0].todense() # scores = np.squeeze(np.asarray(scores)) # df = pd.DataFrame() # df[0] = scores # df[1] = self.pmfrefs v1 = self.QTFIDF[i] v2 = self.AQTFIDF[i] df = pd.DataFrame() df[0] = self.getScores(v1, v2, i) df[1] = self.pmfrefs df.sort_values(0, ascending = False, inplace = True) df1 = df.loc[df[0] > value] ids = df1[1] return ids.tolist() # calculate metrics for letters with TF-IDF cosine similarity score > value def test_TFIDF_value(self, value = .4) : N = len(self.qtext) allhits = 0 allrefs = 0 recall = [] precision = [] f1 = [] topsize = [] count = 0 for i in range(N) : # if not i % 10 : print(i, end = ' ') refs = set(self.filtered_refs[i]) resp = self.getTopByScoreValue(i, value) serp = set(resp) hits = len(refs & serp) top = len(resp) topsize.append(top) if top > 0 : count += 1 tp = hits fp = top - tp fn = len(refs) - hits if tp == 0 : if fp == 0 and fn == 0 : recall.append(1) precision.append(1) f1.append(1) else : recall.append(0) precision.append(0) f1.append(0) else : recall.append(tp / (tp + fn)) precision.append(tp / (tp + fp)) f1.append(2 * tp / (2 * tp + fp + fn)) allhits += hits allrefs += len(refs) print('\ntotal: ', allhits, allrefs, allhits / (allrefs + .00001)) print('mean recall:', sum(recall) / len(recall)) print('mean precision:', sum(precision) / len(precision)) print('mean F1:', sum(f1) / len(f1)) print('mean top size: ', sum(topsize) / len(topsize)) print('non-empty top:', count) print('non-empty top share:', count / 517) # return topsize # get letters with TF-IDF cosine similarity score > top score * ratio def getTopByScoreRelValue(self, i, ratio) : # v = self.QTFIDF[i] # vt = v.transpose() # scores = self.TFIDF.dot(vt)[:, 0].todense() # scores = np.squeeze(np.asarray(scores)) # df = pd.DataFrame() # df[0] = scores # df[1] = self.pmfrefs v1 = self.QTFIDF[i] v2 = self.AQTFIDF[i] df = pd.DataFrame() df[0] = self.getScores(v1, v2, i) df[1] = self.pmfrefs df.sort_values(0, ascending = False, inplace = True) value = df.iloc[0, 0] df1 = df.loc[df[0] > value * ratio] ids = df1[1] return ids.tolist() # calculate metrics for letters with TF-IDF cosine similarity score > top score * ratio def test_TFIDF_ratio(self, ratio = .9) : N = len(self.qtext) allhits = 0 allrefs = 0 recall = [] precision = [] f1 = [] topsize = [] count = 0 for i in range(N) : # if not i % 10 : print(i, end = ' ') refs = set(self.filtered_refs[i]) resp = self.getTopByScoreRelValue(i, ratio) serp = set(resp) hits = len(refs & serp) top = len(resp) topsize.append(top) tp = hits fp = top - tp fn = len(refs) - hits r = 0 p = 0 f = 0 if tp == 0 : if fp == 0 and fn == 0 : recall.append(1) precision.append(1) f1.append(1) r = 1 p = 1 f = 1 else : recall.append(0) precision.append(0) f1.append(0) else : recall.append(tp / (tp + fn)) precision.append(tp / (tp + fp)) f1.append(2 * tp / (2 * tp + fp + fn)) r = tp / (tp + fn) p = tp / (tp + fp) f = 2 * tp / (2 * tp + fp + fn) if (f > r and f > p) or (f < r and f < p) : print('ERROR :', i, r, p, f) allhits += hits allrefs += len(refs) print('\ntotal: ', allhits, allrefs, allhits / (allrefs + .00001)) print('mean recall:', sum(recall) / len(recall)) print('mean precision:', sum(precision) / len(precision)) print('mean F1:', sum(f1) / len(f1)) print('mean top size: ', sum(topsize) / len(topsize)) # return topsize # def getTopForQuery(self, i, top, query) : # v = QTFIDF[i] # vt = v.transpose() # scores = TFIDF.dot(vt)[:, 0].todense() # scores = np.squeeze(np.asarray(scores)) # df = pd.DataFrame() # df[0] = scores # df[1] = pmfrefs # df.sort_values(0, ascending = False, inplace = True) # # df.sort_values(0, ascending = True, inplace = True) # # ids = df.index # ids = df[1] # # print(df) # return ids[:top].tolist() def load_everything(self, data_directory = 'data') : self.load_basic_data(data_directory=data_directory) self.load_text_processing() s = '|()><.,!?:;=*-/\\8. Форма \n \r Cчета-фактуры и порядок его заполнения, формы и порядок ведения журнала учета полученных и выставленных счетов-фактур, книг покупок и книг продаж устанавливаются Правительством Российской Федерации.' print(self.analyze(s)) self.load_medium_dataset(path=os.path.join(data_directory, 'search_data', 'medium_dataset.json')) self.create_filtered_refs() self.create_corpora() print(len(self.pmfcorpus)) self.create_TFIDF() def test_everything(self) : self.test_TFIDF_top(top = 40) self.test_TFIDF_value(value = .2) self.test_TFIDF_ratio(ratio = .9) def search(self, query, top = 10) : analyzed_query = self.analyze(query) query_TF = self.vectorizer.transform([analyzed_query]) query_TFIDF = self.transformer.transform(query_TF) n = np.sqrt(query_TFIDF.multiply(query_TFIDF).sum(axis = 1)) query_TFIDF = query_TFIDF.multiply(sparse.csr_matrix(1 / n)) query_ATF = self.avectorizer.transform([analyzed_query]) query_ATFIDF = self.atransformer.transform(query_ATF) v1 = query_TFIDF[0] v2 = query_ATFIDF[0] # vt = v.transpose() # scores = self.TFIDF.dot(vt)[:, 0].todense() # scores = np.squeeze(np.asarray(scores)) # df = pd.DataFrame() # df[0] = scores # df[1] = self.pmfrefs # df[2] = self.norm # df[3] = self.nk_mask # df[0] *= np.log(df[2]) ** alpha # df[0] *= (1 + df[3] * beta) # df[0] += df[3] * gamma # df[4] = self.laws_mask # df[0] *= (1 + df[4] * delta) # df[0] += df[4] * epsilon # df.sort_values(0, ascending = False, inplace = True) # # df.sort_values(0, ascending = True, inplace = True) # if top == 'auto' : # value = df.iloc[0, 0] # ratio = 0.81 # df1 = df.loc[df[0] > value * ratio] # ids = df1[1] # top = len(ids) # else : # ids = df[1][:top] # # print(df) df = pd.DataFrame() df[0] = self.getScores(v1, v2) # df[0] = self.getKNNScores(i) df[1] = self.pmfrefs df.sort_values(0, ascending = False, inplace = True) # df.sort_values(0, ascending = True, inplace = True) titles = df[1].tolist() # titles = ids.tolist() docs = [] for i in range(len(titles)) : id = df.iloc[i, 1] docs.append(self.dataset_medium[id]) # print() # print (i, df.iloc[i, 0], id) # print(self.dataset_medium[id]) scores = df[0][:top].tolist() return titles, docs, scores # bsearch = BasicSearch('taxcode') # bsearch = BasicSearch('minfin-letters') # bsearch = BasicSearch('fns-letters') # bsearch = BasicSearch('other-laws') # bsearch = BasicSearch('all-docs', './data') # bsearch.test_TFIDF_top(40) # query = 'Форма счета-фактуры и порядок его заполнения' # titles, docs, scores = bsearch.search(query, top = 40) # print() # print('top size', len(scores)) # print('top score', scores[0]) # print('\n', titles[0], ':\n') # print(docs[0])