Spaces:
Sleeping
Sleeping
File size: 8,435 Bytes
61b2df0 ac40eee 61b2df0 ac40eee 412440d 61b2df0 ac40eee 61b2df0 ac40eee 61b2df0 ac40eee 61b2df0 ac40eee bb2e7f7 61b2df0 bb2e7f7 ac40eee bb2e7f7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
import json
import logging
import os
import pandas as pd
from rapidfuzz import process, fuzz
import requests
from tabulate import tabulate
class TickerFinder():
"""
A class for finding the best matching ticker for a given company name or ticker.
Uses data from https://www.sec.gov/files/company_tickers.json and rapidfuzz package for fuzzy matching.
"""
def __init__(self):
"""
Initialize the TickerFinder object.
This method sets the file paths and reads the ticker data into the self.df attribute.
"""
self.logger = logging.getLogger(__name__)
self.rootdir = os.path.dirname(os.path.dirname(__file__))
self.fname_raw = os.path.join(self.rootdir, 'data_raw', 'sec_gov_company_tickers_test.json')
self.fname_compact = os.path.join(self.rootdir, 'data', 'sec_gov_company_tickers_compact.json')
self.df = self.read_ticker_data()
self.logger.info('Initialized TickerFinder object')
def read_ticker_data(
self
) -> pd.DataFrame:
"""
Read compact ticker data from a local file.
Returns
df : pandas DataFrame
"""
# if the compact data is not available, create it
if not os.path.exists(self.fname_compact):
self.logger.info(f'Compact ticker data was not found at {self.fname_compact}, creating it')
self.compact_ticker_data()
with open(self.fname_compact, 'r') as f:
data = json.load(f)
df = pd.DataFrame.from_dict(data, orient='columns')
self.logger.info(f'Read compact ticker data from {self.fname_compact}')
return df
def compact_ticker_data(
self
) -> None:
"""
Compact the raw ticker data by extracting only the ticker and title fields and
saving them to a local file.
If the raw data is not available, this method will download it first.
"""
if not os.path.exists(self.fname_raw):
self.logger.info(f'Raw ticker data was not found at {self.fname_raw}, downloading it')
self.download_ticker_data()
# read the raw data
with open(self.fname_raw, 'r') as f:
data = json.load(f)
# extract the necessary fields
titles = [None]*len(data)
tickers = [None]*len(data)
for k, v in data.items():
i = int(k)
titles[i] = v['title']
tickers[i] = v['ticker']
data_compact = {'ticker': tickers, 'title': titles}
# save the compact data
with open(self.fname_compact, 'w') as f:
json.dump(data_compact, f)
self.logger.info(f'Compacted raw ticker data into {self.fname_compact}')
def download_ticker_data(
self
) -> None:
"""
Download the raw ticker data from https://www.sec.gov/files/company_tickers.json
using the requests package. The data is saved to a local file.
If the download is successful, the raw data is saved as a JSON file.
If the download fails, an exception is raised.
"""
url = "https://www.sec.gov/files/company_tickers.json"
headers = {
"User-Agent": "censored_email_address",
"Accept-Encoding": "gzip, deflate",
"Host": "www.sec.gov",
"Connection": "keep-alive"
}
response = requests.get(url, headers=headers)
if response.status_code != 200:
raise Exception(f"Error downloading ticker data from url.\nResponse status code: {response.status_code}")
else:
df = pd.read_json(response.text)
# save the raw data
with open(self.fname_raw, 'w') as f:
df.to_json(f)
self.logger.info(f'Dowloaded raw ticker data into {self.fname_raw}')
def find_best_matching_title(
self,
input_name:str,
top_n=5) -> pd.DataFrame:
"""
Find the best matching company title for a given company name.
Args:
input_name : str
The name to search for
top_n : int, default=3
The number of top matches to return
Returns:
results : pd.DataFrame
A pd.df containing the matched title, ticker, and fuzzy matching score
"""
matches = process.extract(
input_name.lower(),
self.df["title"].str.lower(),
# scorer=fuzz.WRatio,
# scorer=fuzz.partial_ratio,
scorer=fuzz.ratio,
limit=top_n)
results = [(self.df.iloc[idx]["title"], self.df.iloc[idx]["ticker"], score) for title, score, idx in matches]
df = pd.DataFrame(results, columns=["Title", "Ticker", "Score"])
return df
def find_best_matching_ticker(
self,
ticker:str,
top_n:int=5
) -> pd.DataFrame:
"""
Find the best matching company ticker for a given ticker.
Args:
ticker : str
The ticker to search for
top_n : int, default=3
The number of top matches to return
Returns:
results : pd.DataFrame
A pd.df containing the title, matched ticker, and fuzzy matching score
"""
matches = process.extract(
ticker.upper(),
self.df["ticker"],
# scorer=fuzz.WRatio,
# scorer=fuzz.partial_ratio,
scorer=fuzz.ratio,
limit=top_n)
results = [(self.df.iloc[idx]["title"], self.df.iloc[idx]["ticker"], score) for ticker, score, idx in matches]
df = pd.DataFrame(results, columns=["Title", "Ticker", "Score"])
return df
def find_best_matching_ticker_or_title(
self,
user_input: str
) -> str:
"""
Find the best matching company ticker for a given user input, which may be a ticker or a title.
Args:
user_input : str
The user input to search for
Returns:
results : str
A string containing the best matching title and ticker
"""
# user may be trying to write a ticker, in which case find the best matching ticker:
ticker_matches = self.find_best_matching_ticker(user_input)
# user may be trying to write a title, in which case find the best matching title:
title_matches = self.find_best_matching_title(user_input)
# total matches:
c_matches = pd.concat([ticker_matches, title_matches])
# deduplicates:
c_matches_dedup = c_matches.groupby(['Ticker', 'Title'], as_index=False)['Score'].sum()
# sort by score:
c_matches_sorted = c_matches_dedup.sort_values(by='Score', ascending=False)
# convert results into a pretty string:
results = self.df_to_pretty_string(c_matches_sorted)
return(results)
def df_to_pretty_string(
self,
df:pd.DataFrame,
num_rows:int=5
) -> str:
"""
Convert a pd.DataFrame into a pretty string, using the tabulate package.
Args:
df : pd.DataFrame
The dataframe to convert
Returns:
pretty_string : str
A string containing the pretty-formatted dataframe
"""
df = df.rename(columns={'Title': 'Company Name', 'Ticker': 'Ticker Symbol'})
df_subset = df[['Company Name', 'Ticker Symbol']].iloc[0:num_rows]
pretty_table= tabulate(df_subset,
headers='keys',
# tablefmt='plain',
tablefmt='html',
showindex=False,
numalign='left',
stralign='left')
return pretty_table
def does_ticker_exist(
self,
ticker: str
) -> bool:
"""
Check whether a given ticker exists in the ticker data.
Args:
ticker : str
The ticker to check
Returns:
exists : bool
True if the ticker exists, False otherwise
"""
return ticker in self.df['ticker'].values
# if __name__ == "__main__":
# results = TickerFinder().find_best_matching_ticker_or_title("microsoft")
# print(results)
# exists = TickerFinder().does_ticker_exist('bbbbb')
# print(f'Ticker exists') if exists else print(f'Ticker does not exist')
|