##packages code import streamlit as st from shapely.geometry import Point import pandas as pd from tqdm import tqdm import numpy as np import json, requests import pandas as pd #from pandas.io.json import json_normalize import matplotlib.pyplot as plt import seaborn as sns from math import radians, cos, sin, asin, sqrt from sentence_transformers import SentenceTransformer, util path = "Climate_site/python_scripts/" @st.cache_resource def model_nlp(): model = SentenceTransformer('all-MiniLM-L6-v2') return model @st.cache_data # 👈 Add the caching decorator def load_data(): url = path + "institutions.tsv" dic = pd.read_csv(url, delimiter = "\t" , index_col = 0).to_dict('index') return dic dic_institutions = load_data() import unicodedata from metaphone import doublemetaphone from fuzzywuzzy import fuzz from difflib import SequenceMatcher import re import geopandas as gpd from geopandas import GeoDataFrame #################### General Functions ############################# def URL(base_URL , entity_type , filters): url = base_URL + entity_type + filters return url def get_data(url): url = requests.get(url) text = url.text import json data = json.loads(text) return data ## encoding the abstract def reconstruction_abstract(abstract_inverted_index): # return the abstract is the abstract exists in the database, else, return None if abstract_inverted_index != None: list_values = list(abstract_inverted_index.values()) list_keys = list(abstract_inverted_index.keys()) #from the words in the abstract (keys of abstract_inverted_index) and their position in the text (values of abstract_inverted_index), reconstruct the abstract size_abstract = max([ max(elem) for elem in abstract_inverted_index.values() ] ) abstract = [""]*(size_abstract +1) for i in range(len(list_values)): for pos in list_values[i]: abstract[pos] = list_keys[i] return " ".join(list(abstract)) else: return None ## calculate efficiently the dot product between two vectors def norm(vector): return np.sqrt(sum(x * x for x in vector)) def cosine_similarity2(vec_a, vec_b): norm_a = norm(vec_a) norm_b = norm(vec_b) dot = sum(a * b for a, b in zip(vec_a, vec_b)) return dot / (norm_a * norm_b) ## Extracted texts def print_extracted_text(name_file): file = open('iea.txt', "r", encoding='utf8') lines = file.readlines() count = 0 for index, line in enumerate(lines): read_line = line.strip() print(read_line) file.close() iea.txt def details(name_file , display): file = open(path + "iea.txt", "r") lines = file.readlines() mark = 0 dic_details = {} count = -1 for index, line in enumerate(lines): line = line.strip() if line == "Close explanation": break if line != "" and (line[0].isnumeric() and ">" in line and " " in line) : count += 1 if mark == 1 and line != "" and line[0] == "*": if display == True: print(count) print(text) print(" ") dic_details[count] = text mark = 0 if mark == 1: text = text + line + " " if line.split(" ")[-1] == "Details" or line.split(" ")[-1] == "Hide": mark = 1 text = "" return dic_details def key_initiatives(name_file , display ): file = open(path + 'iea.txt', "r", encoding='utf8') lines = file.readlines() mark = 0 dic_key_initiatives = {} count = -1 for index, line in enumerate(lines): line = line.strip() if line == "Close explanation": break if line != "" and (line[0].isnumeric() and ">" in line and " " in line) : count += 1 if mark == 1 and line != "" and ( (line[0].isnumeric() and ">" in line and " " in line) or line == "*Deployment targets:*" or line == "*Announced development targets:*"): if display == True: print(count) print(text) print(" ") dic_key_initiatives[count] = text mark = 0 if mark == 1: text = text + line + " " if line == "*Key initiatives:*": mark = 1 text = "" return dic_key_initiatives def deployment_target(name_file , display): file = open(path + 'iea.txt', "r", encoding='utf8') lines = file.readlines() mark = 0 dic_target = {} count = -1 for index, line in enumerate(lines): line = line.strip() if line == "Close explanation": break if line != "" and (line[0].isnumeric() and ">" in line and " " in line) : count += 1 if mark == 1 and line != "" and ((line[0].isnumeric() and ">" in line and " " in line) or line == "*Announced cost reduction targets:*" or line == "*Announced development targets:*"): if display == True: print(count) print(text) print(" ") dic_target[count] = text mark = 0 if mark == 1: text = text + line + " " if line == "*Deployment targets:*" or line == "*Announced development targets:*": mark = 1 text = "" return dic_target def cost_reduction_target(name_file , display): file = open(path + 'iea.txt', "r", encoding='utf8') lines = file.readlines() mark = 0 dic_cost = {} count = -1 for index, line in enumerate(lines): line = line.strip() if line == "Close explanation": break if line != "" and (line[0].isnumeric() and ">" in line and " " in line) : count += 1 if mark == 1 and line != "" and (line[0].isnumeric() and ">" in line and " " in line) : if display == True: print(count) print(text) print(" ") dic_cost[count] = text mark = 0 if mark == 1: text = text + line + " " if line == "*Announced cost reduction targets:*": mark = 1 text = "" return dic_cost def key_words(name_file, display ): file = open(path + 'iea.txt', "r", encoding='utf8') lines = file.readlines() list_categories = [] count = -1 for index, line in enumerate(lines): line = line.strip() if line == "Close explanation": break if line != "" and (line[0].isnumeric() and ">" in line and " " in line) : count += 1 if display == True: print("Technologies" , count+1 , ":") if line != "": if line[0].isnumeric() and ">" in line and " " in line: i = 0 try: line = line.split(" ")[2] except: print(line) break if "Details" not in lines[index] and "Moderate" not in lines[index]: while " " not in line: i += 1 if "Details"==lines[index + i][:7] or "End-use"==lines[index + i][:7]: break else: line = line + " " + lines[index + i] #if " Production" in line: #line = line.replace(" Production" , "") line = line.replace("\n" , " ") line = line.replace("/" , " ") line = line.replace("-" , " ") line = line.split(" ")[0] if " " in line: line = line.replace(" ", " ") line = line.split(">") if "(" in line[-1]: line[-1] = line[-1].split("(")[0] for i in range(len(line)): # remove multiple spaces line[i] = re.sub(' +', ' ', line[i]) # remove trailing spaces line[i] = line[i].strip() if display == True: print(line) print(" ") if '' in line: line.remove('') list_categories.append([count , line]) return list_categories def technology(name_file, display ): # Filepath too specific, need to change to relative path file = open(path + 'iea.txt', "r", encoding='utf8') lines = file.readlines() list_categories = [] count = -1 for index, line in enumerate(lines): line = line.strip() if line == "Close explanation": break if line != "" and (line[0].isnumeric() and ">" in line and " " in line) : count += 1 if display == True: print("Technologies" , count+1 , ":") if line != "": if line[0].isnumeric() and ">" in line and " " in line: i = 0 try: line = line.split(" ")[1] except: print(line) break line = line.replace("\n" , " ") line = line.replace("/" , " ") line = line.replace("-" , " ") line = line.strip() line = re.sub(' +', ' ', line) line = line.split(" ")[0] line = line.split(">") if "(" in line[-1]: line[-1] = line[-1].split("(")[0] for i in range(len(line)): # remove multiple spaces line[i] = re.sub(' +', ' ', line[i]) # remove trailing spaces line[i] = line[i].strip() if display == True: print(line) print(" ") list_categories.append([count , line]) return list_categories #################### Patent Functions ############################# def related_patents(number_technology , research_words, carbon_related , display): name_file = "iea" list_categories = key_words( name_file, False) dic_patents = {} max_count = 0 base_URL_PV = "https://api.patentsview.org/" filter_works = "patents/query?" filter_PV = "q={%22_and%22:[{%22_text_all%22:{%22patent_abstract%22:%22" filter_PV += research_words.replace("," , "") if carbon_related == True: filter_PV += "%22}},{%22_eq%22:{%22cpc_group_id%22:%22Y02E%22}}]}&f=[%22patent_number%22,%22patent_title%22,%22assignee_country%22,%22patent_date%22,%22inventor_id%22,%22assignee_organization%22,%22inventor_longitude%22,%22inventor_latitude%22,%22inventor_last_name%22,%22inventor_first_name%22,%22cpc_subsection_title%22,%22assignee_city%22,%22patent_abstract%22,%22patent_kind%22,%22cpc_group_id%22,%22assignee_organization%22,%22citedby_patent_number%22]&s={%22patent_date%22:%22desc%22}&o={%22per_page%22:200}" else: filter_PV += "%22}}]}&f=[%22patent_number%22,%22patent_title%22,%22assignee_country%22,%22patent_date%22,%22assignee_organization%22,%22inventor_longitude%22,%22inventor_latitude%22,%22inventor_last_name%22,%22inventor_id%22,%22inventor_first_name%22,%22cpc_subsection_title%22,%22assignee_city%22,%22patent_abstract%22,%22patent_kind%22,%22cpc_group_id%22,%22assignee_organization%22,%22citedby_patent_number%22]&s={%22patent_date%22:%22desc%22}&o={%22per_page%22:200}" filter_PV = filter_PV.replace(" " , "%20") url = URL(base_URL_PV , filter_works, filter_PV) data = get_data(url) if display == True: print( data["total_patent_count"] , elem[-1] ) print(url) for i in range(data["count"]): dic_patents[ "US-" + data["patents"][i]["patent_number"]] = {} dic_patents[ "US-" + data["patents"][i]["patent_number"]]["title"] = data["patents"][i]["patent_title"] dic_patents["US-" + data["patents"][i]["patent_number"]]["abstract"] = data["patents"][i]["patent_abstract"] dic_patents[ "US-" + data["patents"][i]["patent_number"]]["assignee"] = str(data["patents"][i]["assignees"][0]["assignee_organization"]) dic_patents["US-" + data["patents"][i]["patent_number"]]["assignee_city"] = str(data["patents"][i]["assignees"][0]["assignee_city"]) dic_patents["US-" + data["patents"][i]["patent_number"]]["assignee_country"] = str(data["patents"][i]["assignees"][0]["assignee_country"]) for j in range(1, len(data["patents"][i]["assignees"])): dic_patents[ "US-" + data["patents"][i]["patent_number"]]["assignee"] += ", " + str(data["patents"][i]["assignees"][j]["assignee_organization"]) dic_patents[ "US-" + data["patents"][i]["patent_number"]]["assignee_city"] += ", " + str(data["patents"][i]["assignees"][j]["assignee_city"]) dic_patents["US-" + data["patents"][i]["patent_number"]]["assignee_country"] += ", " + str(data["patents"][i]["assignees"][j]["assignee_country"]) dic_patents[ "US-" + data["patents"][i]["patent_number"]]["list_inventors"] = data["patents"][i]["inventors"] dic_patents[ "US-" + data["patents"][i]["patent_number"]]["inventors"] = str(data["patents"][i]["inventors"][0]["inventor_first_name"]) + " " + str(data["patents"][i]["inventors"][0]["inventor_last_name"]) for j in range(1, len(data["patents"][i]["inventors"])): dic_patents[ "US-" + data["patents"][i]["patent_number"]]["inventors"] += ", " + str(data["patents"][i]["inventors"][j]["inventor_first_name"]) + " " + str(data["patents"][i]["inventors"][j]["inventor_last_name"]) dic_patents["US-" + data["patents"][i]["patent_number"]]["date"] = data["patents"][i]["patent_date"] dic_patents["US-" + data["patents"][i]["patent_number"]]["number_citations"] = len(data["patents"][i]["citedby_patents"]) if display == True: print(" ") return dic_patents ## Ranking Patents def ranking_patents(number_technology , research_words, carbon_related , display): model = model_nlp() name_file = "iea" list_categories = key_words( name_file, False) dic_details = details(name_file , False) dic_patents = related_patents(number_technology , research_words, carbon_related , display) dic_scores = {} if display == True: print("Key words: " , list_categories[number_technology][1]) if number_technology in dic_details: reference_text = dic_details[number_technology] if display == True: print("Technology details: " , reference_text) print(" ") encoded_text = model.encode(reference_text, convert_to_tensor=False).tolist() if len(dic_patents ) == 0: return "Select other key words" else: for ids in list(dic_patents.keys()): dic_scores[ids] = {} encoded_title = model.encode(dic_patents[ids]["title"], convert_to_tensor=False).tolist() score_title = cosine_similarity2(encoded_title, encoded_text) if dic_patents[ids]["abstract"] != None: encoded_abstract = model.encode(dic_patents[ids]["abstract"], convert_to_tensor=False).tolist() score_abstract = cosine_similarity2(encoded_abstract, encoded_text) else: score_abstract = None dic_scores[ids]["title comparision"] = score_title dic_scores[ids]["abstract comparison"] = score_abstract dic_scores[ids]["title"] = dic_patents[ids]["title"] dic_scores[ids]["citations"] = dic_patents[ids]["number_citations"] dic_scores[ids]["date"] = dic_patents[ids]["date"][:4] dic_scores[ids]["assignee"] = dic_patents[ids]["assignee"] #dic_scores[ids]["assignee_city"] = dic_patents[ids]["assignee_city"] #dic_scores[ids]["assignee_country"] = dic_patents[ids]["assignee_country"] dic_scores[ids]["inventors"] = dic_patents[ids]["inventors"] dic_scores[ids]["number of co-inventors"] = len(dic_patents[ids]["inventors"].split(",")) return dic_patents , dic_scores def get_ranking_patents(technologies, number_technology ,category , carbon_related , size): dic , dic_patents = ranking_patents(number_technology , category, carbon_related , False) if dic_patents == {}: return "No patent found, select other key words" elif dic_patents == "Select other key words": return dic_patents else: return pd.DataFrame(dic_patents).T.sort_values(by="abstract comparison" , ascending = False).head(size) #normalize a string dat that represents often a name. def normalize(data): normal = unicodedata.normalize('NFKD', data).encode('ASCII', 'ignore') val = normal.decode("utf-8") # lower full name in upper val = re.sub(r"[A-Z]{3,}", lambda x: x.group().lower(), val) # add space in front of upper case val = re.sub(r"(\w)([A-Z])", r"\1 \2", val) # Lower case val = val.lower() # remove special characters val = re.sub('[^A-Za-z0-9 ]+', ' ', val) # remove multiple spaces val = re.sub(' +', ' ', val) # remove trailing spaces val = val.strip() return val def main_inventors(technologies, number_technology , carbon_related , category , size ): display = False dic_patents , dic_ranked = ranking_patents(number_technology , category, carbon_related , display) dic_patents_co_inventors = {} for patent in list(dic_ranked.keys())[:size]: for k in range(len(dic_patents[patent]["list_inventors"])): inventor_id = dic_patents[patent]["list_inventors"][k]["inventor_id"] inventor_name = dic_patents[patent]["list_inventors"][k]["inventor_first_name"] + " " + dic_patents[patent]["list_inventors"][k]["inventor_last_name"] inventor_name_norm = normalize(inventor_name).split() inventor_name_norm = inventor_name_norm[0] + " " + inventor_name_norm[-1] if inventor_name_norm not in dic_patents_co_inventors: dic_patents_co_inventors[inventor_name_norm] = {} dic_patents_co_inventors[inventor_name_norm]["Inventor's name"] = inventor_name dic_patents_co_inventors[inventor_name_norm]["PatentsView inventor's id"] = inventor_id dic_patents_co_inventors[inventor_name_norm]["Number of occurence"] = 1 dic_patents_co_inventors[inventor_name_norm]["Number of related citations"] = dic_patents[patent]["number_citations"] else: if inventor_id not in dic_patents_co_inventors[inventor_name_norm]["PatentsView inventor's id"] : dic_patents_co_inventors[inventor_name_norm]["PatentsView inventor's id"] += ", " + inventor_id if inventor_name not in dic_patents_co_inventors[inventor_name_norm]["Inventor's name"] : dic_patents_co_inventors[inventor_name_norm]["Inventor's name"] += ", " + inventor_name dic_patents_co_inventors[inventor_name_norm]["Number of occurence"] += 1 dic_patents_co_inventors[inventor_name_norm]["Number of related citations"] += dic_patents[patent]["number_citations"] dic_patents_co_inventors = {k: v for k, v in sorted(dic_patents_co_inventors.items(), key=lambda item: item[1]["Number of occurence"] , reverse = True)} if dic_patents_co_inventors == {}: return "No patent, select other key words" else: for inventor_name_norm in list(dic_patents_co_inventors.keys()): list_inventors = dic_patents_co_inventors[inventor_name_norm]["PatentsView inventor's id"].split(", ") work_count = 0 cited_by_count = 0 for elem in list_inventors: url = "https://api.patentsview.org/inventors/query?q={%22inventor_id%22:[%22" + elem + "%22]}&f=[%22inventor_total_num_patents%22,%22patent_num_cited_by_us_patents%22]" data = get_data(url)["inventors"][0] work_count += int(data["inventor_total_num_patents"]) for k in range(len(data["patents"])): cited_by_count += int(data["patents"][k]["patent_num_cited_by_us_patents"]) dic_patents_co_inventors[inventor_name_norm]["Number of patents"] = work_count dic_patents_co_inventors[inventor_name_norm]["Number of US patents citations"] = cited_by_count return pd.DataFrame(dic_patents_co_inventors , index = ["Inventor's name", "PatentsView inventor's id", "Number of occurence" , "Number of patents" ,"Number of US patents citations" , "Number of related citations"]).T def map_inventors(technologies, number_technology , carbon_related , category , size): display = False dic_patents , dic_ranked = ranking_patents(number_technology , category, carbon_related , display) dic_patents_co_inventors = {} count = 0 for patent in list(dic_ranked.keys())[:size]: for k in range(len(dic_patents[patent]["list_inventors"])): dic_patents_co_inventors[count] = {} dic_patents_co_inventors[count]["latitude"] = dic_patents[patent]["list_inventors"][k]["inventor_latitude"] dic_patents_co_inventors[count]["longitude"] = dic_patents[patent]["list_inventors"][k]["inventor_longitude"] dic_patents_co_inventors[count]["longitude"] = dic_patents[patent]["list_inventors"][k]["inventor_longitude"] dic_patents_co_inventors[count]["inventor_name"] = str(dic_patents[patent]["list_inventors"][k]["inventor_first_name"]) + " " + str(dic_patents[patent]["list_inventors"][k]["inventor_last_name"]) dic_patents_co_inventors[count]["patent_date"] = dic_patents[patent]["date"] count += 1 if dic_patents_co_inventors == {}: return "No patent, select other key words" map_df = pd.DataFrame(dic_patents_co_inventors).T map_df["longitude"]=map_df['longitude'].astype(float) map_df['latitude']=map_df['latitude'].astype(float) map_df = map_df[map_df["latitude"].notnull()] return map_df #geometry = [Point(xy) for xy in zip(map_df['Longitude'], map_df['Latitude'])] #gdf = GeoDataFrame(map_df, geometry=geometry) #this is a simple map that goes with geopandas #world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres')) #gdf.plot(ax=world.plot( color='white', edgecolor='black' ), marker='o', color='red', markersize=15 , zorder = 1); #plt.xlim([-180, 180]) #plt.ylim([-90, 90]) #plt.title("Main inventors: geographic location") #plt.xlabel("Longitude") #plt.ylabel("Latitude") #plt.show() ## Extract quantitative data def extract_sentences_with_numbers(text , text_name): if text != None: text = text.replace("CO 2" , "CO2") text = text.replace("CO 3" , "CO3") text = text.replace("CO(2)" , "CO2") text = text.replace("CO(3)" , "CO3") print(text_name + ": " , text) print(" ") list_text = list(text) for i in range(1,len(list_text)-1): if list_text[i] == " " and list_text[i-1] == "." and list_text[i+1].isupper(): list_text[i] = "~" text = "".join(list_text) text = text.split("~") for sentence in text: if any(char.isdigit() for char in sentence): if "CO2" in sentence: print("\x1b[31mCARBON RELATED:\x1b[0m", sentence) print(" ") if "GJ" in sentence or "MJ" in sentence: print("\x1b[31mENERGY:\x1b[0m" , sentence) print(" ") ##price if "€" in sentence or "$" in sentence or "EUR" in sentence or "dollars" in sentence.lower(): print("\x1b[31mPRICE:\x1b[0m" , sentence) print(" ") ##dates digits = [] for word in sentence.replace("," , "").replace("%" , "").replace("." , " ").split(): if word.isdigit() and 1850 < int(word) < 2200 : digits.append(word) if digits != []: print("\x1b[31mDATE:\x1b[0m" , sentence) print(" ") ##CO quantity if "Mt" in sentence or "tC" in sentence or "t-C" in sentence: print("\x1b[31mCARBON QUANTITY:\x1b[0m" , sentence) print(" ") print(" ") def extract_quantitative_data_technology(technologies, number_technology): count = 0 name_file = "iea" dic_target = deployment_target(name_file , False) dic_cost = cost_reduction_target(name_file , False) dic_details = details(name_file , False) cost_target_text = 'No information' cost_text = 'No information' if number_technology in dic_details: reference_text = dic_details[number_technology] #print("\033[96mFROM IEA website: ") #print("\033[92mTechnology details: \x1b[0m" , reference_text) #print(" ") #encoded_text = model.encode(reference_text, convert_to_tensor=False).tolist() if number_technology in dic_target: cost_target_text = dic_target[number_technology] #print("\033[96mFROM IEA website: ") #sentences = extract_sentences_with_numbers(cost_target_text , "\033[92mDeployment target and Announced development target\x1b[0m") if number_technology in dic_cost: cost_text = dic_cost[number_technology] #print("\033[96mFROM IEA website: ") #sentences = extract_sentences_with_numbers(cost_text , "\033[92mAnnounced cost reduction targets\x1b[0m") return reference_text, cost_target_text , cost_text def extract_quantitative_data_patent(patent_id): patent_id = patent_id[3:] url = "https://api.patentsview.org/patents/query?q={%22patent_id%22:%22" + str(patent_id) + "%22}&f=[%22patent_number%22,%22patent_title%22,%22patent_abstract%22,%22patent_date%22,%22inventor_last_name%22,%22inventor_first_name%22,%22assignee_organization%22]" url_google = "https://patents.google.com/patent/US" + str(patent_id) data = get_data(url)["patents"][0] title = data["patent_title"] abstract = data["patent_abstract"] co_inventors = ", ".join([ data["inventors"][i]["inventor_first_name"] + " " + data["inventors"][i]["inventor_last_name"] for i in range(len(data["inventors"])) ]) assignees = ", ".join([ str(data["assignees"][i]["assignee_organization"]) for i in range(len(data["assignees"])) ] ) return url_google , title , abstract , data["patent_date"] , co_inventors , assignees ################################### Extracted texts ############################################################### #@title Which patents are related to the technology? def finder(): name_file = 'iea' res = technology("iea", False ) list_categories_tech = [] list_categories = key_words(name_file , False) list_technologies = [ ( ", ".join(list_categories[i][1]) , i ) for i in range(len(list_categories)) ] dic_technologies = {} for i in range(len(res)): names = res[i][1] if ", ".join(names) not in list_categories_tech: list_categories_tech.append(", ".join(names)) dic_technologies[", ".join(names)] = [] dic_technologies[", ".join(names)].append( (", ".join(list_categories[i][1]) , i )) list_climate = [ ("Any related patents" , False ) , ("Climate related patents" , True)] dic_categories = {} for elem in list_technologies: list_words = elem[0].split(",")[-3:] dic_categories[elem[1]] = [ " ".join(list_words[-1].split()[:3]) , " ".join(list_words[-2].split()[:3]) if len(list_words) > 1 else "" , " ".join(list_words[-3].split()[:3]) if len(list_words) > 2 else "" ] return dic_technologies, dic_categories, list_categories_tech, list_technologies