File size: 1,882 Bytes
ccefd0b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | import unittest
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.options.pipeline_options import PipelineOptions
import asyncio
import apache_beam as beam
from openbb_yfinance.models.equity_quote import YFinanceEquityQuoteFetcher as quote_fetcher
from openbb_yfinance.models.equity_profile import YFinanceEquityProfileFetcher as profile_fetcher
from openbb_yfinance.models.company_news import YFinanceCompanyNewsFetcher as news_fetcher
class AsyncProcess(beam.DoFn):
def __init__(self, credentials, fetcher):
self.credentials = credentials
self.fetcher = fetcher
async def fetch_data(self, element: str):
params = dict(symbol=element)
data = await self.fetcher.fetch_data(params, self.credentials)
return [d.model_dump(exclude_none=True) for d in data]
def process(self, element: str):
return asyncio.run(self.fetch_data(element))
class MyTestCase(unittest.TestCase):
def test_sample_pipeline(self):
credentials = {} # Running OBB endpoints which do not require credentials
debug_sink = beam.Map(print)
ticker = 'AAPL'
with TestPipeline(options=PipelineOptions()) as p:
quote = (p | 'Start Quote' >> beam.Create([ticker])
| 'Run Quote' >> beam.ParDo(AsyncProcess(credentials, quote_fetcher))
| 'Print quote' >> debug_sink)
profile = (p | 'Start Profile' >> beam.Create([ticker])
| 'Run Profile' >> beam.ParDo(AsyncProcess(credentials, profile_fetcher))
| 'Print profile' >> debug_sink)
news = (p | 'Start News' >> beam.Create([ticker])
| 'Run News' >> beam.ParDo(AsyncProcess(credentials, news_fetcher))
| 'Print nes' >> debug_sink)
if __name__ == '__main__':
unittest.main()
|