Spaces:
Runtime error
Runtime error
| import os | |
| import glob | |
| import json | |
| import math | |
| import time | |
| import pickle | |
| import openai | |
| import requests | |
| import pandas as pd | |
| import gradio as gr | |
| import plotly.graph_objects as go | |
| from googlesearch import search | |
| from scrapingbee import ScrapingBeeClient | |
| global scraped_folder_path, category_folder_path, log_file_path, openapikey, scrapingbeekey, excel_file | |
| # definition to create folder | |
| def create_folder(_path): | |
| try: | |
| os.makedirs(_path) | |
| return _path | |
| except: | |
| return _path | |
| data_folder = 'data_folder' | |
| scraped_folder = 'scrapped_files' | |
| category_folder = 'categories' | |
| excel_file = 'Scorecard_Final.xlsx' | |
| log_folder = 'log_files' | |
| create_folder(data_folder) | |
| scraped_folder_path = create_folder(f'{data_folder}/{scraped_folder}') | |
| category_folder_path = create_folder(f'{data_folder}/{category_folder}') | |
| log_file_path = create_folder(f'{data_folder}/{log_folder}') | |
| categories_found = [item.split('/')[-1] for item in glob.glob(f'{category_folder_path}/*')] | |
| json_found = [item.split('/')[-1] for item in glob.glob(f'{scraped_folder_path}/*.json')] | |
| def update_json_(x): | |
| new_options = [] | |
| for count_item, item in enumerate(sorted(glob.glob(f'{scraped_folder_path}/*'))): | |
| # print(count_item+1) | |
| new_options.append(item.split('/')[-1]) | |
| return gr.Dropdown.update(choices=new_options, interactive=True) | |
| # get file score | |
| def get_raw_score(data): | |
| score_dict = {} | |
| for key in data.keys(): | |
| score = 0 | |
| if "Questions" in data[key].keys(): | |
| for question in data[key]['Questions'].keys(): | |
| if len(data[key]['Questions'][question]) > 0: | |
| score += 1 | |
| if data[key]['Topic'] not in score_dict: | |
| score_dict[data[key]['Topic']] = 0 | |
| score_dict[data[key]['Topic']] += score | |
| df = pd.DataFrame(score_dict, index=[0]).T.reset_index() | |
| df.columns = ['theta', 'r'] | |
| return df, min(score_dict.values()), sum(score_dict.values()) | |
| # getting the overall score | |
| def get_overall_score(name): | |
| # get the raw score | |
| try: | |
| data = json.load(open(f'{scraped_folder_path}/{name}')) | |
| except: | |
| data = json.load(open(name)) | |
| score, level, experience = get_raw_score(data) | |
| # adding the polar plots | |
| fig = go.Figure() | |
| fig.add_trace(go.Scatterpolar( | |
| r = score.r, | |
| theta = score.theta, | |
| marker=dict(size=10, color = "magenta"), | |
| fill='toself', | |
| )) | |
| file_type = name.replace('.json', '') | |
| fig.update_traces(mode="markers", marker=dict(line_color='white', opacity=0.7)) | |
| fig.update_layout(title_text=f'{file_type} >> level:{level}, exp:{experience}/100', | |
| polar=dict(radialaxis=dict(range = [0, 20], visible=True,)), | |
| showlegend=False) | |
| return fig | |
| def get_comparative_score(file, group=''): | |
| if group != '': | |
| json_files = glob.glob(f'{category_folder_path}/{group}/*.json') | |
| if len(json_files) > 0: | |
| for enum_jf, jf in enumerate(json_files): | |
| print(enum_jf) | |
| data = json.load(open(jf)) | |
| if enum_jf == 0: | |
| df, _, _ = get_raw_score(data) | |
| continue | |
| temp_df, _, _ = get_raw_score(data) | |
| df = pd.concat([df, temp_df]) | |
| else: | |
| df = None | |
| fig = get_overall_score(file) | |
| if group != '': | |
| if df is not None: | |
| # adding the polar plots | |
| fig.add_trace(go.Scatterpolar( | |
| r = df.r, | |
| theta = df.theta, | |
| marker=dict(size=7, color = "limegreen"), | |
| )) | |
| fig.update_traces(mode="markers", marker=dict(line_color='white', opacity=0.7)) | |
| fig.update_layout(polar=dict(radialaxis=dict(range = [0, 20], visible=True,)), | |
| showlegend=False) | |
| return fig | |
| def create_question_dictionary(url, text, questions, question_dictionary, gpt_filter_prompt, useless_urls, completely_useless_urls): | |
| tries = 0 | |
| gpt_filter_answer = '' | |
| while tries < 3 and gpt_filter_answer == '': | |
| try: | |
| tries += 1 | |
| gpt_filter_response = openai.ChatCompletion.create( | |
| model="gpt-3.5-turbo", | |
| messages=[ | |
| {"role": "system", "content": "You are an assistant who helps to extract information of a startup from its homepage. You should answer if the text is about a specific topic. You should only answer with either yes or no as the first word and explain why you made your choice"}, | |
| {"role": "user", "content": f"{gpt_filter_prompt} \n {text}" }, | |
| ]) | |
| for choice in gpt_filter_response.choices: | |
| gpt_filter_answer += choice.message.content | |
| except Exception as e: | |
| print("filter tries: ", tries) | |
| print(e) | |
| # if the website is about the general question, then proceed to ask the scoring questions | |
| if gpt_filter_answer == '': | |
| print("Error The gpt filter responded with an empty string") | |
| completely_useless_urls.append([url, gpt_filter_answer]) | |
| return question_dictionary, useless_urls, completely_useless_urls | |
| if gpt_filter_answer[:3].lower() == "yes" or gpt_filter_answer[:2].lower() == "ja": | |
| for question in questions: | |
| if type(question) != str: | |
| continue | |
| if question not in question_dictionary.keys(): | |
| question_dictionary[question] = [] | |
| tries = 0 | |
| question_answer = "" | |
| while tries < 3 and question_answer == '': | |
| try: | |
| tries += 1 | |
| question_response = openai.ChatCompletion.create( | |
| model="gpt-3.5-turbo", | |
| messages=[ | |
| {"role": "system", "content": "You are an assistant who tries to answer always with yes or no in the first place. When yes, you explain the reason for it in 80 words by using a list of short descriptions"}, | |
| {"role": "user", "content": f"{question} \n {text}" }, | |
| ]) | |
| for choice in question_response.choices: | |
| question_answer += choice.message.content | |
| except Exception as e: | |
| print("question tries: ", tries) | |
| print(e) | |
| # If the question is answered yes, save the reason and website | |
| if question_answer[:3].lower() == "yes" or question_answer[:2].lower() == "ja": | |
| # save question, url and answer? | |
| question_dictionary[question].append([url, question_answer]) | |
| else: | |
| useless_urls.append([url, question_answer]) | |
| else: | |
| # safe url that didnt pass the filter | |
| completely_useless_urls.append([url, gpt_filter_answer]) | |
| return question_dictionary, useless_urls, completely_useless_urls | |
| def log_to_file(f, general_question, question_dictionary, useless_urls, completely_useless_urls): | |
| f.write("Frage: " + general_question + "\n") | |
| print("Frage: ", general_question) | |
| print("\tAuf den Folgenden Websiten wurden Informationen zu dieser Frage gefunden mit Punktevergabe:") | |
| f.write("\tAuf den Folgenden Websiten wurden Informationen zu dieser Frage gefunden mit Punktevergabe:\n") | |
| for question in question_dictionary.keys(): | |
| question_list = question_dictionary[question] | |
| if len(question_list) > 0: | |
| for url, answer in question_list: | |
| print("\t\tQuelle: ", url) | |
| f.write("\t\t--Quelle: " + url + " | ") | |
| print("\t\tAntwort: ", answer) | |
| f.write("Antwort: " + answer.replace("\n", " ") + "\n") | |
| print("\n\tAuf den Folgenden Websiten wurden Informationen zu dieser Frage gefunden, aber keine Punktevergabe:\n") | |
| f.write("\n\tAuf den Folgenden Websiten wurden Informationen zu dieser Frage gefunden, aber keine Punktevergabe:\n") | |
| for u_url in useless_urls: | |
| print("\t\t", u_url) | |
| f.write("\t\t--" + u_url[0] + " | " + u_url[1].replace("\n", " ") + "\n") | |
| print("\n\t Auf den Folgenden Websiten wurden keine Informationen zu dieser Frage gefunden:\n") | |
| f.write("\n\tAuf den Folgenden Websiten wurden keine Informationen zu dieser Frage gefunden:\n") | |
| for cu_url in completely_useless_urls: | |
| print("\t\t", cu_url) | |
| f.write("\t\t--" + cu_url[0]+ " | " + cu_url[1].replace("\n", " ") + "\n") | |
| f.write("---------------------------------------------------------------------------------------------------------------------------------\n") | |
| print("----------------------------------------------------------------------\n") | |
| # constants = pickle.load(open('aux_files/aux_file.exii', 'rb')) | |
| scarpingbeekey = os.environ['getkey'] | |
| openai.api_key = os.environ['chatkey'] | |
| # openapikey = constants['openapi_key'] | |
| # scarpingbeekey = constants['scrapingbee_key'] | |
| # os.environ['OPEN_API_KEY'] = openapikey | |
| # openai.api_key = openapikey | |
| def send_request(google_prompt): | |
| response = requests.get( | |
| url="https://app.scrapingbee.com/api/v1/store/google", | |
| params={ | |
| "api_key": scarpingbeekey, | |
| "search": google_prompt, | |
| "add_html": True, | |
| "nb_results": 1 | |
| }, | |
| ) | |
| return response.json() | |
| def get_json_dict(df, web_list, progress, name): | |
| filename="/" + name + "_log.txt" | |
| with open(log_file_path+filename, 'w+', encoding='utf-8') as f: | |
| output_dictionary = {} | |
| topics = df['Topic'].unique() | |
| for topic in progress.tqdm(topics, desc='Topic'): ########## | |
| time.sleep(0.2) | |
| print(topic) | |
| df_topic = df[df['Topic'] == topic] | |
| general_questions = df_topic['Questions'].unique() | |
| for general_question in progress.tqdm(general_questions, desc='GenQ'): ########### | |
| time.sleep(0.3) | |
| output_dictionary[general_question] = {"Topic": topic} | |
| df_question = df_topic[df_topic['Questions']==general_question].reset_index() | |
| google_prompt = df_question['Google Prompts'].values[0] | |
| gpt_filter_prompt = df_question['GPT Filter Prompt'].values[0] | |
| questions = df_question.iloc[0, 5:].values.tolist() | |
| question_dictionary = {} | |
| useless_urls = [] # a list of urls that have the information that we are looking for but are answered not with yes | |
| completely_useless_urls = [] # a list of urls that dont have the information that we are looking | |
| # scrape google with google_prompt | |
| request_json = send_request(google_prompt) | |
| search_results = [] | |
| num_urls = 1 | |
| if 'organic_results' in request_json.keys(): | |
| if len(request_json['organic_results']) == 0: | |
| print("organic_results are empty") | |
| else: | |
| for i in range(num_urls): | |
| search_results.append(request_json['organic_results'][i]['url']) | |
| else: | |
| print("organic_results not in request_json") | |
| # adding the extra user defined prompts | |
| search_results = list(set(search_results + web_list)) | |
| if len(search_results) == 0: | |
| print("Didnt have any search results for googleprompt:", google_prompt) | |
| continue | |
| # print the first 10 URLs | |
| for url in progress.tqdm(search_results, desc='url'): | |
| time.sleep(0.4) | |
| # scrape the text of the website | |
| client = ScrapingBeeClient(api_key=scarpingbeekey) | |
| url_text = client.get(url, | |
| params = { | |
| 'json_response': 'True', | |
| 'extract_rules': {"text": "body",}, | |
| } | |
| ) | |
| json_object = json.loads(url_text.text) | |
| if 'body' not in json_object.keys(): | |
| print("json_object has no key: body") | |
| continue | |
| if "text" in json_object['body'].keys(): | |
| text_content = json_object['body']['text'] | |
| else: | |
| print("json_object['body'] has no key: text") | |
| continue | |
| if len(text_content) == 0: | |
| continue | |
| splitsize = 10000 | |
| if len(text_content) > splitsize: | |
| num_splits = math.ceil(len(text_content) / splitsize) | |
| #for i in range(num_splits-1): | |
| for i in range(1): | |
| text = text_content[i*splitsize:(i+1)*splitsize] | |
| question_dictionary, useless_urls, completely_useless_urls = create_question_dictionary(url, text, questions, question_dictionary, gpt_filter_prompt, useless_urls, completely_useless_urls) | |
| text = text_content[(i+1)*splitsize:] | |
| question_dictionary, useless_urls, completely_useless_urls = create_question_dictionary(url, text, questions, question_dictionary, gpt_filter_prompt, useless_urls, completely_useless_urls) | |
| else: | |
| question_dictionary, useless_urls, completely_useless_urls = create_question_dictionary(url, text_content, questions, question_dictionary, gpt_filter_prompt, useless_urls, completely_useless_urls) | |
| log_to_file(f, general_question, question_dictionary, useless_urls, completely_useless_urls) | |
| output_dictionary[general_question]['Questions'] = question_dictionary | |
| return output_dictionary | |
| def scrape_me(name, progress=gr.Progress()): | |
| if name == '': | |
| return f"Scraping not possible, empty entries found !" | |
| #load the excel file | |
| df_reference = pd.read_excel(excel_file, sheet_name="Sheet1") | |
| # working with prompts in first brackets | |
| if ',' in name: | |
| entity_names = name.split(',') | |
| else: | |
| entity_names = [name] | |
| entity_websites = [] | |
| for en_num, en in enumerate(entity_names): | |
| if "(" in en: | |
| start = en.find("(") | |
| end = en.find(")") | |
| websites = en[start+1:end].split(";") | |
| entity_names[en_num] = en[:start].strip() | |
| else: | |
| entity_names[en_num] = en.strip() | |
| websites = [] | |
| entity_websites.append(websites) | |
| # looping thru' the entity and the prompts | |
| count_web = 0 | |
| for en in progress.tqdm(entity_names, desc='iterating through searchable entities'): | |
| time.sleep(0.1) | |
| # replacing the corporate name in the question string | |
| enum_df = df_reference.replace({"<corporate>": en}, regex=True) | |
| # retrieving the scraped data dictionary | |
| json_dict = get_json_dict(enum_df.head(25), entity_websites[count_web], progress, en) | |
| # converting and saving the json file | |
| json_object = json.dumps(json_dict, indent = 4) | |
| json_file = f'{scraped_folder_path}/{en}.json' | |
| with open(json_file, "w") as outfile: | |
| outfile.write(json_object) | |
| count_web += 1 | |
| return f"Scraped results for the following entities: {name} !" | |
| with gr.Blocks(title='EXii Startup Scapper') as demo: | |
| with gr.Tab("Scraping Toolbox"): | |
| result_text = gr.Textbox(label='Debug Information', placeholder='Debug Information') | |
| with gr.Row(): | |
| scrapin_it_digga = gr.Text(label="Startup to scrape", | |
| info='Separate two startups by a "," and force to seach in custom URLs within "()" and separate URLs ";"', | |
| placeholder='saturn (https://www.saturn.de/; https://www.mediamarkt.de/), cyberport (https://www.cyberport.de/)') | |
| with gr.Row(): | |
| scrape_button = gr.Button("Start scraping") | |
| with gr.Column(): | |
| with gr.Row(): | |
| scrapes_found = gr.Dropdown(json_found, label="Scraped startups", info="Select a scraped json files") | |
| with gr.Row(): | |
| json_update_button = gr.Button("Update scrapped data") | |
| with gr.Column(): | |
| # with gr.Row(): | |
| # show_the_score = gr.Button('Plot score') | |
| sexy_plot = gr.Plot(label='Exponential Growth Score') | |
| json_update_button.click(update_json_, inputs=scrapes_found, outputs=scrapes_found) | |
| scrape_button.click(scrape_me, inputs=scrapin_it_digga, outputs=result_text) | |
| # show_the_score.click(get_comparative_score, inputs=[scrapes_found], outputs=sexy_plot) | |
| scrapes_found.change(get_comparative_score, inputs=[scrapes_found], outputs=sexy_plot) | |
| demo.queue(concurrency_count=4).launch(debug=True) |