Spaces:
Running
Running
| import json | |
| import logging | |
| import os | |
| import pandas as pd | |
| from rapidfuzz import process, fuzz | |
| import requests | |
| from tabulate import tabulate | |
| class TickerFinder(): | |
| """ | |
| A class for finding the best matching ticker for a given company name or ticker. | |
| Uses data from https://www.sec.gov/files/company_tickers.json and rapidfuzz package for fuzzy matching. | |
| """ | |
| def __init__(self): | |
| """ | |
| Initialize the TickerFinder object. | |
| This method sets the file paths and reads the ticker data into the self.df attribute. | |
| """ | |
| self.logger = logging.getLogger(__name__) | |
| self.rootdir = os.path.dirname(os.path.dirname(__file__)) | |
| self.fname_raw = os.path.join(self.rootdir, 'data_raw', 'sec_gov_company_tickers_test.json') | |
| self.fname_compact = os.path.join(self.rootdir, 'data', 'sec_gov_company_tickers_compact.json') | |
| self.df = self.read_ticker_data() | |
| self.logger.info('Initialized TickerFinder object') | |
| def read_ticker_data( | |
| self | |
| ) -> pd.DataFrame: | |
| """ | |
| Read compact ticker data from a local file. | |
| Returns | |
| df : pandas DataFrame | |
| """ | |
| # if the compact data is not available, create it | |
| if not os.path.exists(self.fname_compact): | |
| self.logger.info(f'Compact ticker data was not found at {self.fname_compact}, creating it') | |
| self.compact_ticker_data() | |
| with open(self.fname_compact, 'r') as f: | |
| data = json.load(f) | |
| df = pd.DataFrame.from_dict(data, orient='columns') | |
| self.logger.info(f'Read compact ticker data from {self.fname_compact}') | |
| return df | |
| def compact_ticker_data( | |
| self | |
| ) -> None: | |
| """ | |
| Compact the raw ticker data by extracting only the ticker and title fields and | |
| saving them to a local file. | |
| If the raw data is not available, this method will download it first. | |
| """ | |
| if not os.path.exists(self.fname_raw): | |
| self.logger.info(f'Raw ticker data was not found at {self.fname_raw}, downloading it') | |
| self.download_ticker_data() | |
| # read the raw data | |
| with open(self.fname_raw, 'r') as f: | |
| data = json.load(f) | |
| # extract the necessary fields | |
| titles = [None]*len(data) | |
| tickers = [None]*len(data) | |
| for k, v in data.items(): | |
| i = int(k) | |
| titles[i] = v['title'] | |
| tickers[i] = v['ticker'] | |
| data_compact = {'ticker': tickers, 'title': titles} | |
| # save the compact data | |
| with open(self.fname_compact, 'w') as f: | |
| json.dump(data_compact, f) | |
| self.logger.info(f'Compacted raw ticker data into {self.fname_compact}') | |
| def download_ticker_data( | |
| self | |
| ) -> None: | |
| """ | |
| Download the raw ticker data from https://www.sec.gov/files/company_tickers.json | |
| using the requests package. The data is saved to a local file. | |
| If the download is successful, the raw data is saved as a JSON file. | |
| If the download fails, an exception is raised. | |
| """ | |
| url = "https://www.sec.gov/files/company_tickers.json" | |
| headers = { | |
| "User-Agent": "censored_email_address", | |
| "Accept-Encoding": "gzip, deflate", | |
| "Host": "www.sec.gov", | |
| "Connection": "keep-alive" | |
| } | |
| response = requests.get(url, headers=headers) | |
| if response.status_code != 200: | |
| raise Exception(f"Error downloading ticker data from url.\nResponse status code: {response.status_code}") | |
| else: | |
| df = pd.read_json(response.text) | |
| # save the raw data | |
| with open(self.fname_raw, 'w') as f: | |
| df.to_json(f) | |
| self.logger.info(f'Dowloaded raw ticker data into {self.fname_raw}') | |
| def find_best_matching_title( | |
| self, | |
| input_name:str, | |
| top_n=5) -> pd.DataFrame: | |
| """ | |
| Find the best matching company title for a given company name. | |
| Args: | |
| input_name : str | |
| The name to search for | |
| top_n : int, default=3 | |
| The number of top matches to return | |
| Returns: | |
| results : pd.DataFrame | |
| A pd.df containing the matched title, ticker, and fuzzy matching score | |
| """ | |
| matches = process.extract( | |
| input_name.lower(), | |
| self.df["title"].str.lower(), | |
| # scorer=fuzz.WRatio, | |
| # scorer=fuzz.partial_ratio, | |
| scorer=fuzz.ratio, | |
| limit=top_n) | |
| results = [(self.df.iloc[idx]["title"], self.df.iloc[idx]["ticker"], score) for title, score, idx in matches] | |
| df = pd.DataFrame(results, columns=["Title", "Ticker", "Score"]) | |
| return df | |
| def find_best_matching_ticker( | |
| self, | |
| ticker:str, | |
| top_n:int=5 | |
| ) -> pd.DataFrame: | |
| """ | |
| Find the best matching company ticker for a given ticker. | |
| Args: | |
| ticker : str | |
| The ticker to search for | |
| top_n : int, default=3 | |
| The number of top matches to return | |
| Returns: | |
| results : pd.DataFrame | |
| A pd.df containing the title, matched ticker, and fuzzy matching score | |
| """ | |
| matches = process.extract( | |
| ticker.upper(), | |
| self.df["ticker"], | |
| # scorer=fuzz.WRatio, | |
| # scorer=fuzz.partial_ratio, | |
| scorer=fuzz.ratio, | |
| limit=top_n) | |
| results = [(self.df.iloc[idx]["title"], self.df.iloc[idx]["ticker"], score) for ticker, score, idx in matches] | |
| df = pd.DataFrame(results, columns=["Title", "Ticker", "Score"]) | |
| return df | |
| def find_best_matching_ticker_or_title( | |
| self, | |
| user_input: str | |
| ) -> str: | |
| """ | |
| Find the best matching company ticker for a given user input, which may be a ticker or a title. | |
| Args: | |
| user_input : str | |
| The user input to search for | |
| Returns: | |
| results : str | |
| A string containing the best matching title and ticker | |
| """ | |
| # user may be trying to write a ticker, in which case find the best matching ticker: | |
| ticker_matches = self.find_best_matching_ticker(user_input) | |
| # user may be trying to write a title, in which case find the best matching title: | |
| title_matches = self.find_best_matching_title(user_input) | |
| # total matches: | |
| c_matches = pd.concat([ticker_matches, title_matches]) | |
| # deduplicates: | |
| c_matches_dedup = c_matches.groupby(['Ticker', 'Title'], as_index=False)['Score'].sum() | |
| # sort by score: | |
| c_matches_sorted = c_matches_dedup.sort_values(by='Score', ascending=False) | |
| # convert results into a pretty string: | |
| results = self.df_to_pretty_string(c_matches_sorted) | |
| return(results) | |
| def df_to_pretty_string( | |
| self, | |
| df:pd.DataFrame, | |
| num_rows:int=5 | |
| ) -> str: | |
| """ | |
| Convert a pd.DataFrame into a pretty string, using the tabulate package. | |
| Args: | |
| df : pd.DataFrame | |
| The dataframe to convert | |
| Returns: | |
| pretty_string : str | |
| A string containing the pretty-formatted dataframe | |
| """ | |
| df = df.rename(columns={'Title': 'Company Name', 'Ticker': 'Ticker Symbol'}) | |
| df_subset = df[['Company Name', 'Ticker Symbol']].iloc[0:num_rows] | |
| pretty_table= tabulate(df_subset, | |
| headers='keys', | |
| # tablefmt='plain', | |
| tablefmt='html', | |
| showindex=False, | |
| numalign='left', | |
| stralign='left') | |
| return pretty_table | |
| def does_ticker_exist( | |
| self, | |
| ticker: str | |
| ) -> bool: | |
| """ | |
| Check whether a given ticker exists in the ticker data. | |
| Args: | |
| ticker : str | |
| The ticker to check | |
| Returns: | |
| exists : bool | |
| True if the ticker exists, False otherwise | |
| """ | |
| return ticker in self.df['ticker'].values | |
| # if __name__ == "__main__": | |
| # results = TickerFinder().find_best_matching_ticker_or_title("microsoft") | |
| # print(results) | |
| # exists = TickerFinder().does_ticker_exist('bbbbb') | |
| # print(f'Ticker exists') if exists else print(f'Ticker does not exist') | |