Spaces:
Runtime error
Runtime error
| import requests | |
| import csv | |
| from datetime import datetime, timedelta | |
| import time | |
| import os | |
| # Define your Aylien credentials | |
| AppID = os.getenv('APP_ID') # Your Application ID | |
| APIKey = os.getenv('API_KEY') # Your API Key | |
| PolygonAPIKey = os.getenv('POLYGON_API_KEY') # Your Polygon API Key | |
| # Function to get authentication header | |
| def get_auth_header(appid, apikey): | |
| return { | |
| 'X-Application-Id': appid, | |
| 'X-Application-Key': apikey | |
| } | |
| # Function to fetch stories for specific companies within a date range | |
| def fetch_stories_for_date_range(ticker, headers, start_date, end_date): | |
| all_stories = [] | |
| params = { | |
| 'entities.stock_tickers': ticker, | |
| 'published_at.start': start_date.strftime('%Y-%m-%dT%H:%M:%SZ'), | |
| 'published_at.end': end_date.strftime('%Y-%m-%dT%H:%M:%SZ'), | |
| 'language': 'en', | |
| 'per_page': 2, # Set per_page to maximum allowed value | |
| 'sort_by': 'published_at', | |
| 'sort_direction': 'desc' | |
| } | |
| while True: | |
| time.sleep(1) # Adding a 1-second delay between API calls | |
| response = requests.get('https://api.aylien.com/news/stories', headers=headers, params=params) | |
| if response.status_code == 200: | |
| data = response.json() | |
| stories = data.get('stories', []) | |
| if not stories: | |
| break | |
| all_stories.extend(stories) | |
| if 'next' in data.get('links', {}): | |
| params['cursor'] = data['links']['next'] | |
| else: | |
| break | |
| else: | |
| print(f"Failed to fetch stories for ticker {ticker}: {response.status_code} - {response.text}") | |
| break | |
| return all_stories | |
| # Function to fetch stock data for a given symbol within a date range using Polygon API | |
| def get_stock_data(api_key, symbol, start_date, end_date): | |
| time.sleep(1) # Adding a 1-second delay between API calls | |
| base_url = f"https://api.polygon.io/v2/aggs/ticker/{symbol}/range/1/day/{start_date}/{end_date}?apiKey={api_key}" | |
| response = requests.get(base_url) | |
| if response.status_code == 200: | |
| data = response.json() | |
| results = data['results'] | |
| stock_data = {datetime.fromtimestamp(result['t'] / 1000).strftime('%Y-%m-%d'): {'open': result['o'], 'close': result['c']} for result in results} | |
| return stock_data | |
| else: | |
| print(f"Failed to fetch stock data for {symbol} from Polygon API: {response.status_code} - {response.text}") | |
| return None | |
| # Save data to CSV file | |
| def save_data_to_csv(ticker, all_stories, stock_data): | |
| file_name = f"Database/{ticker}_db.csv" | |
| with open(file_name, mode='w', newline='', encoding='utf-8') as file: | |
| writer = csv.writer(file) | |
| writer.writerow([ | |
| 'Publication Date','Summary', 'Sentiment Polarity', 'Sentiment Confidence', 'Keywords', 'stock_date', 'stock_price', 'percentage_change' | |
| ]) | |
| for story in all_stories[:-1]: | |
| article_id = story.get('id', 'N/A') | |
| summary = ' '.join(story.get('summary', {}).get('sentences', [])) | |
| keywords = ", ".join(story.get('keywords', [])) | |
| sentiment = story.get('sentiment', {}).get('title', {}) | |
| sentiment_polarity = sentiment.get('polarity', 'N/A') | |
| sentiment_confidence = story.get('sentiment', {}).get('body', {}).get('score', 'N/A') | |
| print(sentiment_polarity) | |
| publication_date = datetime.strptime(story.get('published_at', 'N/A'), '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d') | |
| stock_date = (datetime.strptime(publication_date, '%Y-%m-%d') + timedelta(days=1)).strftime('%Y-%m-%d') | |
| if stock_data.get(stock_date) == None: | |
| stock_price = 'N/A' | |
| open_stock_price = 'N/A' | |
| print(stock_price) | |
| else: | |
| stock_price = stock_data.get(stock_date).get('close', 'N/A') | |
| open_stock_price = stock_data.get(stock_date).get('open', 'N/A') | |
| print(stock_price) | |
| if stock_price == 'N/A': | |
| # If current stock price is not available, try to get the previous day's price | |
| previous_date = (datetime.strptime(stock_date, '%Y-%m-%d') - timedelta(days=1)).strftime('%Y-%m-%d') | |
| if stock_data.get(previous_date) == None: | |
| stock_price = 'N/A' | |
| else: | |
| previous_stock_price = stock_data.get(previous_date).get('close', 'N/A') | |
| stock_price = previous_stock_price | |
| open_stock_price = previous_stock_price | |
| if stock_price == 'N/A': | |
| # If current stock price is not available, try to get the previous day's price | |
| previous_date = (datetime.strptime(stock_date, '%Y-%m-%d') - timedelta(days=2)).strftime('%Y-%m-%d') | |
| if stock_data.get(previous_date) == None: | |
| stock_price = 'N/A' | |
| else: | |
| previous_stock_price = stock_data.get(previous_date).get('close', 'N/A') | |
| stock_price = previous_stock_price | |
| open_stock_price = previous_stock_price | |
| if stock_price == 'N/A': | |
| # If current stock price is not available, try to get the previous day's price | |
| previous_date = (datetime.strptime(stock_date, '%Y-%m-%d') - timedelta(days=3)).strftime('%Y-%m-%d') | |
| if stock_data.get(previous_date) == None: | |
| stock_price = 'N/A' | |
| else: | |
| previous_stock_price = stock_data.get(previous_date).get('close', 'N/A') | |
| stock_price = previous_stock_price | |
| open_stock_price = previous_stock_price | |
| else: | |
| None | |
| else: | |
| None | |
| else: | |
| None | |
| percentage_change = ((float(stock_price) - float(open_stock_price)) / float(open_stock_price)) * 100 | |
| writer.writerow([ | |
| publication_date, summary, sentiment_polarity, sentiment_confidence, keywords, stock_date, stock_price, percentage_change | |
| ]) | |
| print(f"Data has been written to {file_name}") | |
| def main(): | |
| tickers = ['META'] # Example tickers | |
| headers = get_auth_header(AppID, APIKey) | |
| # Define the date range (last 3 days) | |
| end_date = "2024-02-12" #datetime.now() # Current date | |
| end_date = datetime.strptime(end_date, '%Y-%m-%d') | |
| start_date = datetime.now() - timedelta(days=180) # Three days ago | |
| # Fetch all stock data for each ticker in the date range | |
| for ticker in tickers: | |
| stock_data = get_stock_data(PolygonAPIKey, ticker, start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")) | |
| print(stock_data) | |
| if stock_data: | |
| all_stories = [] | |
| current_date = start_date | |
| while current_date < end_date: | |
| next_day = current_date + timedelta(days=1) | |
| if next_day > end_date: | |
| next_day = end_date | |
| print(f"Fetching stories for {ticker}...") | |
| all_stories.extend(fetch_stories_for_date_range(ticker, headers, current_date, next_day)) | |
| current_date = next_day | |
| save_data_to_csv(ticker, all_stories, stock_data) | |
| if __name__ == '__main__': | |
| main() | |