Spaces:
Sleeping
Sleeping
| import numpy, string, functools, itertools, json | |
| from underthesea import pos_tag, ner | |
| stopwords = open('resources/stopwords_small.txt', encoding='utf-8').read().split('\n') | |
| stopwords = set([w.replace(' ','_') for w in stopwords]) | |
| punct_set = set([c for c in string.punctuation]) | set(['“','”',"...","–","…","..","•",'“','”']) | |
| map_pos = {'M':'noun','Y':'noun','Nb':'noun','Nc':'noun','Ni':'noun','Np':'noun','N':'noun','X':'adj', | |
| 'Nu':'noun','Ny':'noun','V':'verb', 'Vb':'verb','Vy':'verb','A': 'adj','Ab': 'adj','R':'adj'} | |
| map_synonym = json.load(open('resources/synonym.json', encoding='utf-8')) | |
| with open('resources/bigram.txt', encoding='utf-8') as f: | |
| data = f.read().split('\n') | |
| data = data[:-1] | |
| markov_score = {} | |
| for line in data: | |
| word, score = line.split('\t') | |
| # some score of words in corpus | |
| markov_score[word] = int(score) | |
| del data | |
| def makovCal(a, b): | |
| termBigram = a + "_" + b | |
| if termBigram in markov_score: | |
| freBigram = markov_score[termBigram] | |
| else: | |
| freBigram = 1 | |
| if a in markov_score: | |
| freUnigram = markov_score[a] | |
| else: | |
| freUnigram = 1 | |
| if freUnigram < 5: | |
| freUnigram = 5000 # 2000 | |
| else: | |
| freUnigram += 5000 # 2000 | |
| return float(freBigram) / freUnigram | |
| def generateCombinations(tokens, thresh_hold): | |
| combinations = [] | |
| for i in range(0, len(tokens)): | |
| word = tokens[i][0].lower() | |
| if word in stopwords: | |
| combinations.append([word]) | |
| continue | |
| pos = tokens[i][1] | |
| if pos in map_pos: | |
| pos = map_pos[pos] | |
| if word in map_synonym[pos]: | |
| synonyms = map_synonym[pos][word] | |
| possible_synonym = [] | |
| for syn in synonyms: | |
| if i == 0: | |
| pre_word = 'NONE' | |
| else: | |
| pre_word = tokens[i-1][0].lower() | |
| if i == len(tokens) - 1: | |
| next_word = 'NONE' | |
| else: | |
| next_word = tokens[i+1][0].lower() | |
| if makovCal(pre_word, syn) > thresh_hold or makovCal(syn, next_word) > thresh_hold: | |
| possible_synonym.append(syn) | |
| combinations.append([word] + possible_synonym) | |
| else: | |
| combinations.append([word]) | |
| else: | |
| combinations.append([word]) | |
| return combinations | |
| def generateVariants(untokenize_text): | |
| words = pos_tag(untokenize_text) | |
| for i in range(0, len(words)): | |
| words[i] = (words[i][0].replace(' ','_'), words[i][1]) | |
| tokens = words | |
| combinations = generateCombinations(tokens, 0.001) | |
| num_variants = functools.reduce(lambda x, y: x*y, [len(c) for c in combinations]) | |
| base_line = 0.001 | |
| while(num_variants > 10000): | |
| base_line = base_line * 2 | |
| combinations = generateCombinations(tokens,base_line) | |
| num_variants = functools.reduce(lambda x, y: x*y, [len(c) for c in combinations]) | |
| combinations = list(itertools.product(*combinations)) | |
| combinations = [' '.join(e) for e in combinations] | |
| return combinations | |
| def generateNgram(paper, ngram=2, deli='_', rmSet = {}): | |
| words = paper.split() | |
| if len(words) == 1: | |
| return '' | |
| ngrams = [] | |
| for i in range(0,len(words) - ngram + 1): | |
| block = words[i:i + ngram] | |
| if not any(w in rmSet for w in block): | |
| ngrams.append(deli.join(block)) | |
| return ngrams | |
| def generatePassages(document, n): | |
| passages = [] | |
| paragraphs = document.split('\n\n') | |
| for para in paragraphs: | |
| sentences = para.rsplit(' . ') | |
| if len(sentences) <= 8: | |
| passages.append(' '.join(sentences)) | |
| else: | |
| for i in range(0, len(sentences) - n + 1): | |
| passages.append(' '.join([sentences[i + j] for j in range(0, n) if '?' not in sentences[i + j]])) | |
| return passages | |
| def passage_score(q_ngrams,passage): | |
| try: | |
| passage = passage.lower() | |
| p_unigram = set(generateNgram(passage,1,'_',punct_set | stopwords)) | |
| uni_score = len(p_unigram & q_ngrams['unigram']) | |
| p_bigram = set(generateNgram(passage,2,'_',punct_set | stopwords)) | |
| p_trigram = set(generateNgram(passage,3,'_',punct_set | stopwords)) | |
| p_fourgram= set(generateNgram(passage,4,'_',punct_set)) | |
| bi_score = len(p_bigram & q_ngrams['bigram']) | |
| tri_score = len(p_trigram & q_ngrams['trigram']) | |
| four_score = len(p_fourgram & q_ngrams['fourgram']) | |
| emd_sim = 0 | |
| return uni_score + bi_score*2 + tri_score*3 + four_score*4 + emd_sim*3 | |
| except: | |
| return 0 | |
| def passage_score_wrap(args): | |
| return passage_score(args[0],args[1]) | |
| def keyword_extraction(question): | |
| keywords = [] | |
| question = question.replace('_',' ') | |
| words = pos_tag(question) | |
| for i in range(0, len(words)): | |
| words[i] = (words[i][0].replace(' ','_'), words[i][1]) | |
| for token in words: | |
| word = token[0] | |
| pos = token[1] | |
| if word not in stopwords: | |
| keywords += word.lower().split('_') | |
| keywords = list(set(keywords)) | |
| keywords = [[w] for w in keywords] | |
| return keywords | |
| def isRelevant(text, keywords): | |
| text = text.lower().replace('_',' ') | |
| words = list(set([_ for word in keywords for _ in word])) | |
| for word in words: | |
| if word in text and word not in stopwords: | |
| return True | |
| return False | |
| def removeDuplicate(documents): | |
| mapUnigram = {} | |
| for doc in documents: | |
| mapUnigram[doc] = generateNgram(doc.lower(),1,'_',punct_set | stopwords) | |
| uniqueDocs = [] | |
| for i in range(0,len(documents)): | |
| check = True | |
| for j in range(0,len(uniqueDocs)): | |
| check_doc = mapUnigram[documents[i]] | |
| exists_doc = mapUnigram[uniqueDocs[j]] | |
| overlap_score = len( set(check_doc) & set(exists_doc) ) | |
| if overlap_score >= 0.8 * len(set(check_doc)) or overlap_score >= 0.8 * len(set(exists_doc)): | |
| check = False | |
| if check: | |
| uniqueDocs.append(documents[i]) | |
| return uniqueDocs | |
| def rel_ranking(question, documents): | |
| #Return ranked list of passages from list of documents | |
| q_variants = generateVariants(question) | |
| q_keywords = keyword_extraction(question) | |
| q_ngrams = {'unigram': set(generateNgram(question.lower(), 1, '_', punct_set | stopwords)), | |
| 'bigram' : set([]), 'trigram': set([]), 'fourgram': set([])} | |
| for q in q_variants: | |
| q = q.lower() | |
| q_ngrams['bigram'] = q_ngrams['bigram'] | set(generateNgram(q, 2, '_', punct_set | stopwords)) | |
| q_ngrams['trigram'] = q_ngrams['trigram'] | set(generateNgram(q, 3, '_', punct_set | stopwords)) | |
| q_ngrams['fourgram'] = q_ngrams['fourgram'] | set(generateNgram(q, 4, '_', punct_set)) | |
| documents = [d for d in documents if isRelevant(d, q_keywords)] | |
| passages = [generatePassages(d, 8) for d in documents] | |
| passages = [j for i in passages for j in i] | |
| passages = [' '.join([_.strip() for _ in p.split()]) for p in passages] | |
| passages = list(set(passages)) | |
| passages = [p for p in passages if isRelevant(p,q_keywords)] | |
| p_scores = [] | |
| for p in passages: | |
| p_scores += [passage_score_wrap((q_ngrams, p))] | |
| p_res = numpy.argsort([-s for s in p_scores]) | |
| relevantDocs = [] | |
| for i in range(0, len(passages)): | |
| relevantDocs.append(passages[p_res[i]]) | |
| relevantDocs = removeDuplicate(relevantDocs) | |
| return relevantDocs |