File size: 8,435 Bytes
61b2df0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ac40eee
61b2df0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ac40eee
412440d
61b2df0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ac40eee
61b2df0
 
 
 
ac40eee
 
 
61b2df0
 
 
 
 
 
 
 
 
ac40eee
 
 
 
 
 
61b2df0
 
 
ac40eee
bb2e7f7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61b2df0
 
bb2e7f7
ac40eee
 
bb2e7f7
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import json
import logging
import os
import pandas as pd
from rapidfuzz import process, fuzz
import requests
from tabulate import tabulate


class TickerFinder():
    """
    A class for finding the best matching ticker for a given company name or ticker.
    Uses data from https://www.sec.gov/files/company_tickers.json and rapidfuzz package for fuzzy matching.
    """
    def __init__(self):
        """
        Initialize the TickerFinder object.
        This method sets the file paths and reads the ticker data into the self.df attribute. 
        """
        self.logger = logging.getLogger(__name__)
        self.rootdir = os.path.dirname(os.path.dirname(__file__))
        self.fname_raw = os.path.join(self.rootdir, 'data_raw', 'sec_gov_company_tickers_test.json')
        self.fname_compact = os.path.join(self.rootdir, 'data', 'sec_gov_company_tickers_compact.json')
        self.df = self.read_ticker_data()
        self.logger.info('Initialized TickerFinder object')

    def read_ticker_data(
            self
            ) -> pd.DataFrame:
        """
        Read compact ticker data from a local file.
        Returns
        df : pandas DataFrame
        """
        # if the compact data is not available, create it
        if not os.path.exists(self.fname_compact):
            self.logger.info(f'Compact ticker data was not found at {self.fname_compact}, creating it')
            self.compact_ticker_data()
        with open(self.fname_compact, 'r') as f:
            data = json.load(f)
        df = pd.DataFrame.from_dict(data, orient='columns')
        self.logger.info(f'Read compact ticker data from {self.fname_compact}')
        return df

    def compact_ticker_data(
            self
            ) -> None:
        """
        Compact the raw ticker data by extracting only the ticker and title fields and
        saving them to a local file.
        If the raw data is not available, this method will download it first.
        """
        if not os.path.exists(self.fname_raw):
            self.logger.info(f'Raw ticker data was not found at {self.fname_raw}, downloading it')
            self.download_ticker_data()
        # read the raw data
        with open(self.fname_raw, 'r') as f:
            data = json.load(f)
        # extract the necessary fields
        titles = [None]*len(data)
        tickers = [None]*len(data)
        for k, v in data.items():
            i = int(k)
            titles[i] = v['title']
            tickers[i] = v['ticker']
        data_compact = {'ticker': tickers, 'title': titles}
        # save the compact data
        with open(self.fname_compact, 'w') as f:
            json.dump(data_compact, f)
        self.logger.info(f'Compacted raw ticker data into {self.fname_compact}')
    
    def download_ticker_data(
            self
        ) -> None:
        """
        Download the raw ticker data from https://www.sec.gov/files/company_tickers.json
        using the requests package. The data is saved to a local file.
        If the download is successful, the raw data is saved as a JSON file.
        If the download fails, an exception is raised.
        """
        url = "https://www.sec.gov/files/company_tickers.json"
        headers = {
            "User-Agent": "censored_email_address",
            "Accept-Encoding": "gzip, deflate",
            "Host": "www.sec.gov",
            "Connection": "keep-alive"
        }
        response = requests.get(url, headers=headers)
        if response.status_code != 200:
            raise Exception(f"Error downloading ticker data from url.\nResponse status code: {response.status_code}")
        else:
            df = pd.read_json(response.text)
        # save the raw data
        with open(self.fname_raw, 'w') as f:
            df.to_json(f)
        self.logger.info(f'Dowloaded raw ticker data into {self.fname_raw}')

    def find_best_matching_title(
            self,
            input_name:str,
            top_n=5) -> pd.DataFrame:
        """
        Find the best matching company title for a given company name.
        Args:
        input_name : str
            The name to search for
        top_n : int, default=3
            The number of top matches to return
        Returns:
        results : pd.DataFrame
            A pd.df containing the matched title, ticker, and fuzzy matching score
        """
        matches = process.extract(
            input_name.lower(),
            self.df["title"].str.lower(),
            # scorer=fuzz.WRatio,
            # scorer=fuzz.partial_ratio,
            scorer=fuzz.ratio,
            limit=top_n)
        results = [(self.df.iloc[idx]["title"], self.df.iloc[idx]["ticker"], score) for title, score, idx in matches]
        df = pd.DataFrame(results, columns=["Title", "Ticker", "Score"])
        return df

    def find_best_matching_ticker(
            self,
            ticker:str,
            top_n:int=5
            ) -> pd.DataFrame:
        """
        Find the best matching company ticker for a given ticker.
        Args:
        ticker : str
            The ticker to search for
        top_n : int, default=3
            The number of top matches to return
        Returns:
        results : pd.DataFrame
            A pd.df containing the title, matched ticker, and fuzzy matching score
        """
        matches = process.extract(
            ticker.upper(),
            self.df["ticker"],
            # scorer=fuzz.WRatio,
            # scorer=fuzz.partial_ratio,
            scorer=fuzz.ratio,
            limit=top_n)

        results = [(self.df.iloc[idx]["title"], self.df.iloc[idx]["ticker"], score) for ticker, score, idx in matches]
        df = pd.DataFrame(results, columns=["Title", "Ticker", "Score"])
        return df

    def find_best_matching_ticker_or_title(
        self,
        user_input: str
        ) -> str:
        """
        Find the best matching company ticker for a given user input, which may be a ticker or a title.
        Args:
        user_input : str
            The user input to search for
        Returns:
        results : str
            A string containing the best matching title and ticker
        """
        # user may be trying to write a ticker, in which case find the best matching ticker:
        ticker_matches = self.find_best_matching_ticker(user_input)
        # user may be trying to write a title, in which case find the best matching title:
        title_matches = self.find_best_matching_title(user_input)
        # total matches:
        c_matches = pd.concat([ticker_matches, title_matches])
        # deduplicates:
        c_matches_dedup = c_matches.groupby(['Ticker', 'Title'], as_index=False)['Score'].sum()
        # sort by score:
        c_matches_sorted = c_matches_dedup.sort_values(by='Score', ascending=False)
        # convert results into a pretty string:
        results = self.df_to_pretty_string(c_matches_sorted)
        return(results)

    def df_to_pretty_string(
            self,
            df:pd.DataFrame,
            num_rows:int=5
            ) -> str:
        """
        Convert a pd.DataFrame into a pretty string, using the tabulate package.
        Args:
        df : pd.DataFrame
            The dataframe to convert
        Returns:
        pretty_string : str
            A string containing the pretty-formatted dataframe
        """
        df = df.rename(columns={'Title': 'Company Name', 'Ticker': 'Ticker Symbol'})
        df_subset = df[['Company Name', 'Ticker Symbol']].iloc[0:num_rows]  
        pretty_table= tabulate(df_subset, 
              headers='keys', 
              # tablefmt='plain', 
              tablefmt='html',
              showindex=False,
              numalign='left',
              stralign='left')
        return pretty_table
    
    def does_ticker_exist(
            self,
            ticker: str
            ) -> bool:
        """
        Check whether a given ticker exists in the ticker data.
        Args:
        ticker : str
            The ticker to check
        Returns:
        exists : bool
            True if the ticker exists, False otherwise
        """
        return ticker in self.df['ticker'].values


# if __name__ == "__main__":
#       results = TickerFinder().find_best_matching_ticker_or_title("microsoft")
#       print(results)
      # exists = TickerFinder().does_ticker_exist('bbbbb')
      # print(f'Ticker exists') if exists else print(f'Ticker does not exist')