Spaces:
Build error
Build error
| import pandas as pd | |
| import numpy as np | |
| from sklearn.feature_extraction.text import CountVectorizer | |
| from sklearn.feature_extraction.text import TfidfTransformer | |
| import re | |
| from xml.dom.minidom import parseString | |
| import os | |
| import json | |
| import nltk | |
| from nltk.tokenize import word_tokenize | |
| from nltk.corpus import stopwords | |
| from nltk.stem.snowball import SnowballStemmer | |
| class BasicSearch: | |
| # constructor function | |
| def __init__(self, doctype = 'minfin-letters') : | |
| self.doctype = doctype | |
| self.load_everything() | |
| # read data | |
| def load_basic_data(self, data_directory = 'data') : | |
| # global title | |
| # global text | |
| # global qtitle | |
| # global qtext | |
| # global atitle | |
| # global atext | |
| # global questions | |
| # global answers | |
| # global added_refs | |
| # global missed_refs | |
| text_file = open(os.path.join(data_directory, 'taxcode.xml'), "r", encoding="utf-8") | |
| data = text_file.read() | |
| text_file.close() | |
| document = parseString('<data>' + data + '</data>') | |
| self.title = document.getElementsByTagName('title') | |
| self.text = document.getElementsByTagName('text') | |
| text_file = open(os.path.join(data_directory, 'K2-answer.xml'), "r", encoding="utf-8") | |
| textdata = text_file.read() | |
| text_file.close() | |
| document = parseString('<data>' + textdata + '</data>') | |
| self.atitle = document.getElementsByTagName('title') | |
| self.atext = document.getElementsByTagName('text') | |
| text_file = open(os.path.join(data_directory, 'K2-question.xml'), "r", encoding="utf-8") | |
| textdata = text_file.read() | |
| text_file.close() | |
| document = parseString('<data>' + textdata + '</data>') | |
| self.qtitle = document.getElementsByTagName('title') | |
| self.qtext = document.getElementsByTagName('text') | |
| # fname2 = 'references-04-12-2023.xml' | |
| text_file = open(os.path.join(data_directory, 'references-04-12-2023.xml'), "r", encoding="utf-8") | |
| textdata = text_file.read() | |
| text_file.close() | |
| document = parseString('<data>' + textdata + '</data>') | |
| reftext = document.getElementsByTagName('text') | |
| text_file = open(os.path.join(data_directory, 'references-Vlad-11-12-2023.xml'), "r", encoding="utf-8") | |
| textdata = text_file.read() | |
| text_file.close() | |
| document = parseString('<data>' + textdata + '</data>') | |
| reftext2 = document.getElementsByTagName('text') | |
| # reading Vlad's json data | |
| datadir = os.path.join(data_directory, 'data_jsons_20240104') | |
| filelist = os.listdir(datadir) | |
| filelist.sort() | |
| questions = [''] * len(filelist) | |
| answers = [''] * len(filelist) | |
| added_refs = [[]] * len(filelist) | |
| missed_refs = [[]] * len(filelist) | |
| count = 0 | |
| for filename in filelist : | |
| x = filename.find('.') | |
| if x == -1 : | |
| print('ERROR :', filename) | |
| if filename[:x].isnumeric() : | |
| i = int(filename[:x]) | |
| # print(i) | |
| f = open(os.path.join(datadir, filename), encoding="utf-8") | |
| d = json.load(f) | |
| refs = set(d['added_refs'].keys()) | |
| refs -= {''} | |
| refs = list(refs) | |
| questions[i] = d['question'] | |
| answers[i] = d['answer'] | |
| missed_refs[i] = d['refs'] | |
| added_refs[i] = refs | |
| count += 1 | |
| self.questions = questions[:count] | |
| self.answers = answers[:count] | |
| self.added_refs = added_refs[:count] | |
| self.missed_refs = missed_refs[:count] | |
| def load_text_processing(self) : | |
| # globals stop_words | |
| # global stemmer | |
| # nltk.download('punkt') | |
| # nltk.download('stopwords') | |
| # nlp = ru_core_news_md.load() | |
| self.stop_words = set(stopwords.words('russian')) | |
| self.stemmer = SnowballStemmer("russian") | |
| def analyze(self, s) : | |
| template = r'[\'\"\.\,\?\!\:\;\-\+\%\^\&\*\@\~\_\=/\\\>\<\#\$\(\)\|\n\r\d]' | |
| s = re.sub(template, ' ', s) | |
| s = re.sub(' +', ' ', s) | |
| # tokens = nlp(s) | |
| # tokens = [str(t.lemma_) for t in tokens] | |
| tokens = word_tokenize(s) | |
| tokens = [t for t in tokens if t not in self.stop_words and t != ' '] | |
| tokens = [self.stemmer.stem(word) for word in tokens] | |
| newtext = ' '.join(tokens) | |
| return newtext | |
| # load medium dataset | |
| def load_medium_dataset(self) : | |
| # global dataset_medium | |
| infile = open(os.path.join('data', 'search_data', 'medium_dataset.json'), 'r', encoding="utf-8") | |
| self.dataset_medium = json.load(infile) | |
| # create a filtered list of references for Vlad's json data | |
| def create_filtered_refs(self) : | |
| doctype = self.doctype | |
| added_refs = self.added_refs | |
| # global filtered_refs | |
| # global doctype_template | |
| # t = r'(НКРФ|ГКРФ|ТКРФ|ФЗ|[Зз]акон|Минфин|ФНС|Правительства|ФАС|АС|КС|ВС|[Сс]удебн|[Сс]уд)' | |
| if doctype == 'court-decisions' : | |
| doctype_template = r'(ФАС |АС |КС |ВС |[Сс]удебн|[Сс]уд)' # courts' decisions | |
| elif doctype == 'minfin-letters' : | |
| doctype_template = r'[Пп]исьмо [Мм]инфина' # Minfin letters | |
| elif doctype == 'fns-letters' : | |
| doctype_template = r'[Пп]исьмо (ФНС|фнс)' # FNS letters | |
| elif doctype == 'all-letters' : | |
| doctype_template = r'(ФАС |АС |КС |ВС |[Сс]удебн|[Сс]уд|[Пп]исьмо [Мм]инфина|[Пп]исьмо (ФНС|фнс))' # courts' decisions + Minfin letters + FNS letters | |
| else : | |
| print('Error : wrong doctype') | |
| filtered_refs = [] | |
| for i in range(len(added_refs)) : | |
| refs = [] | |
| for j in range(len(added_refs[i])) : | |
| s = added_refs[i][j] | |
| if re.search(doctype_template, s) != None: | |
| refs.append(s) | |
| # print(i, j, s) | |
| filtered_refs.append(refs) | |
| self.filtered_refs = filtered_refs | |
| self.doctype_template = doctype_template | |
| # creating corpora fo TF-IDF embedding | |
| def create_corpora(self) : | |
| # global qcorpus | |
| # global nkcorpus | |
| # global pmfcorpus | |
| # global pmfrefs | |
| # global pmfids | |
| # global items | |
| self.qcorpus = [] | |
| for i in range(len(self.qtext)) : | |
| if not i % 100 : print(i, end = ' ') | |
| s = self.qtext[i].firstChild.nodeValue | |
| s = self.analyze(s) | |
| self.qcorpus.append(s) | |
| self.nkcorpus = [] | |
| for i in range(len(self.text)) : | |
| if not i % 100 : print(i, end = ' ') | |
| s = self.text[i].firstChild.nodeValue | |
| s = self.analyze(s) | |
| self.nkcorpus.append(s) | |
| self.pmfcorpus = [] | |
| self.pmfrefs = [] | |
| self.pmfids = [] | |
| i = 0 | |
| self.items = [] | |
| for key, value in self.dataset_medium.items() : | |
| # print('test') | |
| # break | |
| if re.search(self.doctype_template, key) != None : | |
| s = value | |
| ss = key | |
| if s != None : | |
| s = s.replace('\n', ' ') | |
| if s != None and s.count(' ') < 12000 : | |
| if not i % 100 : print(i, end = ' ') | |
| # print('test') | |
| # break | |
| s = self.analyze(s) | |
| self.pmfcorpus.append(s) | |
| self.pmfrefs.append(ss) | |
| self.pmfids.append(i) | |
| self.items.append({'title' : key, 'text' : value}) | |
| i += 1 | |
| # build up TF-IDF representation | |
| def create_TFIDF(self) : | |
| # global TFIDF | |
| # global QTFIDF | |
| # global vectorizer | |
| # global transformer | |
| self.vectorizer = CountVectorizer() | |
| self.transformer = TfidfTransformer(smooth_idf = False, norm = 'l2', sublinear_tf = True) | |
| X = self.vectorizer.fit_transform(self.pmfcorpus) | |
| QX = self.vectorizer.transform(self.qcorpus) | |
| self.TFIDF = self.transformer.fit_transform(X) | |
| self.QTFIDF = self.transformer.transform(QX) | |
| # get top letters sorted by TF-IDF cosine similarity | |
| def getTop(self, i, top) : | |
| v = self.QTFIDF[i] | |
| vt = v.transpose() | |
| scores = self.TFIDF.dot(vt)[:, 0].todense() | |
| scores = np.squeeze(np.asarray(scores)) | |
| df = pd.DataFrame() | |
| df[0] = scores | |
| df[1] = self.pmfrefs | |
| df.sort_values(0, ascending = False, inplace = True) | |
| # df.sort_values(0, ascending = True, inplace = True) | |
| # ids = df.index | |
| ids = df[1] | |
| # print(df) | |
| return ids[:top].tolist() | |
| def test_TFIDF_top(self, top = 40) : | |
| N = len(self.qtext) | |
| allhits = 0 | |
| allrefs = 0 | |
| recall = [] | |
| precision = [] | |
| f1 = [] | |
| for i in range(N) : | |
| # if not i % 10 : print(i, end = ' ') | |
| refs = set(self.filtered_refs[i]) | |
| resp = self.getTop(i, top) | |
| serp = set(resp) | |
| hits = len(refs & serp) | |
| tp = hits | |
| fp = top - tp | |
| fn = len(refs) - hits | |
| if tp == 0 : | |
| if fp == 0 and fn == 0 : | |
| # print(i, len(refs), fp, fn) | |
| recall.append(1) | |
| precision.append(1) | |
| f1.append(1) | |
| else : | |
| # print(i, len(refs), fp, fn) | |
| recall.append(0) | |
| precision.append(0) | |
| f1.append(0) | |
| else : | |
| recall.append(tp / (tp + fn)) | |
| precision.append(tp / (tp + fp)) | |
| f1.append(2 * tp / (2 * tp + fp + fn)) | |
| print() | |
| print('mean recall:', sum(recall) / len(recall)) | |
| print('mean precision:', sum(precision) / len(precision)) | |
| # print('mean F1:', 2 / (len(recall) / sum(recall) + len(precision) / sum(precision))) | |
| print('mean F1:', sum(f1) / len(f1)) | |
| # get letters with TF-IDF cosine similarity score > value | |
| def getTopByScoreValue(self, i, value) : | |
| v = self.QTFIDF[i] | |
| vt = v.transpose() | |
| scores = self.TFIDF.dot(vt)[:, 0].todense() | |
| scores = np.squeeze(np.asarray(scores)) | |
| df = pd.DataFrame() | |
| df[0] = scores | |
| df[1] = self.pmfrefs | |
| df.sort_values(0, ascending = False, inplace = True) | |
| df1 = df.loc[df[0] > value] | |
| ids = df1[1] | |
| return ids.tolist() | |
| # calculate metrics for letters with TF-IDF cosine similarity score > value | |
| def test_TFIDF_value(self, value = .4) : | |
| N = len(self.qtext) | |
| allhits = 0 | |
| allrefs = 0 | |
| recall = [] | |
| precision = [] | |
| f1 = [] | |
| topsize = [] | |
| count = 0 | |
| for i in range(N) : | |
| # if not i % 10 : print(i, end = ' ') | |
| refs = set(self.filtered_refs[i]) | |
| resp = self.getTopByScoreValue(i, value) | |
| serp = set(resp) | |
| hits = len(refs & serp) | |
| top = len(resp) | |
| topsize.append(top) | |
| if top > 0 : | |
| count += 1 | |
| tp = hits | |
| fp = top - tp | |
| fn = len(refs) - hits | |
| if tp == 0 : | |
| if fp == 0 and fn == 0 : | |
| recall.append(1) | |
| precision.append(1) | |
| f1.append(1) | |
| else : | |
| recall.append(0) | |
| precision.append(0) | |
| f1.append(0) | |
| else : | |
| recall.append(tp / (tp + fn)) | |
| precision.append(tp / (tp + fp)) | |
| f1.append(2 * tp / (2 * tp + fp + fn)) | |
| print() | |
| print('mean recall:', sum(recall) / len(recall)) | |
| print('mean precision:', sum(precision) / len(precision)) | |
| print('mean F1:', sum(f1) / len(f1)) | |
| print('mean top size: ', sum(topsize) / len(topsize)) | |
| count, count / 517 | |
| # get letters with TF-IDF cosine similarity score > top score * ratio | |
| def getTopByScoreRelValue(self, i, ratio) : | |
| v = self.QTFIDF[i] | |
| vt = v.transpose() | |
| scores = self.TFIDF.dot(vt)[:, 0].todense() | |
| scores = np.squeeze(np.asarray(scores)) | |
| df = pd.DataFrame() | |
| df[0] = scores | |
| df[1] = self.pmfrefs | |
| df.sort_values(0, ascending = False, inplace = True) | |
| value = df.iloc[0, 0] | |
| df1 = df.loc[df[0] > value * ratio] | |
| ids = df1[1] | |
| return ids.tolist() | |
| # calculate metrics for letters with TF-IDF cosine similarity score > top score * ratio | |
| def test_TFIDF_ratio(self, ratio = .9) : | |
| N = len(self.qtext) | |
| allhits = 0 | |
| allrefs = 0 | |
| recall = [] | |
| precision = [] | |
| f1 = [] | |
| topsize = [] | |
| count = 0 | |
| for i in range(N) : | |
| # if not i % 10 : print(i, end = ' ') | |
| refs = set(self.filtered_refs[i]) | |
| resp = self.getTopByScoreRelValue(i, ratio) | |
| serp = set(resp) | |
| hits = len(refs & serp) | |
| top = len(resp) | |
| topsize.append(top) | |
| tp = hits | |
| fp = top - tp | |
| fn = len(refs) - hits | |
| r = 0 | |
| p = 0 | |
| f = 0 | |
| if tp == 0 : | |
| if fp == 0 and fn == 0 : | |
| recall.append(1) | |
| precision.append(1) | |
| f1.append(1) | |
| r = 1 | |
| p = 1 | |
| f = 1 | |
| else : | |
| recall.append(0) | |
| precision.append(0) | |
| f1.append(0) | |
| else : | |
| recall.append(tp / (tp + fn)) | |
| precision.append(tp / (tp + fp)) | |
| f1.append(2 * tp / (2 * tp + fp + fn)) | |
| r = tp / (tp + fn) | |
| p = tp / (tp + fp) | |
| f = 2 * tp / (2 * tp + fp + fn) | |
| if (f > r and f > p) or (f < r and f < p) : | |
| print('ERROR :', i, r, p, f) | |
| print() | |
| print('mean recall:', sum(recall) / len(recall)) | |
| print('mean precision:', sum(precision) / len(precision)) | |
| print('mean F1:', sum(f1) / len(f1)) | |
| print('mean top size: ', sum(topsize) / len(topsize)) | |
| # def getTopForQuery(self, i, top, query) : | |
| # v = QTFIDF[i] | |
| # vt = v.transpose() | |
| # scores = TFIDF.dot(vt)[:, 0].todense() | |
| # scores = np.squeeze(np.asarray(scores)) | |
| # df = pd.DataFrame() | |
| # df[0] = scores | |
| # df[1] = pmfrefs | |
| # df.sort_values(0, ascending = False, inplace = True) | |
| # # df.sort_values(0, ascending = True, inplace = True) | |
| # # ids = df.index | |
| # ids = df[1] | |
| # # print(df) | |
| # return ids[:top].tolist() | |
| def load_everything(self) : | |
| self.load_basic_data() | |
| self.load_text_processing() | |
| s = '|()><.,!?:;=*-/\\8. Форма \n \r Cчета-фактуры и порядок его заполнения, формы и порядок ведения журнала учета полученных и выставленных счетов-фактур, книг покупок и книг продаж устанавливаются Правительством Российской Федерации.' | |
| print(self.analyze(s)) | |
| self.load_medium_dataset() | |
| self.create_filtered_refs() | |
| self.create_corpora() | |
| print(len(self.pmfcorpus)) | |
| self.create_TFIDF() | |
| def test_everything(self) : | |
| self.test_TFIDF_top(top = 40) | |
| self.test_TFIDF_value(value = .4) | |
| self.test_TFIDF_ratio(ratio = .9) | |
| def search(self, query, top = 10) : | |
| analyzed_query = self.analyze(query) | |
| query_TF = self.vectorizer.transform([analyzed_query]) | |
| query_TFIDF = self.transformer.transform(query_TF) | |
| v = query_TFIDF[0] | |
| vt = v.transpose() | |
| scores = self.TFIDF.dot(vt)[:, 0].todense() | |
| scores = np.squeeze(np.asarray(scores)) | |
| df = pd.DataFrame() | |
| df[0] = scores | |
| df[1] = self.pmfrefs | |
| df.sort_values(0, ascending = False, inplace = True) | |
| # df.sort_values(0, ascending = True, inplace = True) | |
| # ids = df.index | |
| ids = df[1] | |
| # print(df) | |
| titles = ids[:top].tolist() | |
| docs = [] | |
| for id in ids : | |
| docs.append(self.dataset_medium[id]) | |
| return titles, docs | |