Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import json | |
| app = FastAPI() | |
| class ArticleScraper: | |
| def __init__(self): | |
| self.scraper_api_key = "24610cfe7680c5a15d77bd32cfd23fc3" | |
| self.scraper_api_url = "http://api.scraperapi.com/" | |
| def scrape_bloomberg(self, article_url): | |
| params = { | |
| "api_key": self.scraper_api_key, | |
| "url": article_url, | |
| } | |
| response = requests.get(self.scraper_api_url, params=params) | |
| html = response.text | |
| soup = BeautifulSoup(html, 'html.parser') | |
| script = soup.find('script', {'id': '__NEXT_DATA__'}) | |
| json_data = json.loads(script.text) | |
| props = json_data['props']['pageProps'] | |
| contents = props['story']['body']['content'] | |
| article_text = [] | |
| for item in contents: | |
| text = self.extract_text(item) | |
| if text: | |
| article_text.append(text) | |
| return '\n\n'.join(article_text) | |
| def scrape_financial_times(self, article_url): | |
| headers = { | |
| 'Referer': 'https://twitter.com' | |
| } | |
| cookies = { | |
| 'FTCookieConsentGDPR': 'true', | |
| 'FTAllocation': '00000000-0000-0000-0000-000000000000' | |
| } | |
| response = requests.get(article_url, headers=headers, cookies=cookies) | |
| if response.status_code == 200: | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| article_script = soup.find('script', {'type': 'application/ld+json'}) | |
| if article_script: | |
| article_data = json.loads(article_script.string) | |
| return article_data.get('articleBody', '') | |
| else: | |
| return "Article content not found in the expected format." | |
| else: | |
| return f"Failed to retrieve the webpage. Status code: {response.status_code}" | |
| def extract_text(self, content_item): | |
| if content_item['type'] == 'paragraph': | |
| text_parts = [] | |
| for item in content_item['content']: | |
| if item['type'] == 'text': | |
| text_parts.append(item['value']) | |
| elif item['type'] == 'entity': | |
| if 'link' in item['data'] and item['data']['link']['destination'].get('web'): | |
| url = item['data']['link']['destination']['web'] | |
| text = ' '.join([sub_item['value'] for sub_item in item['content'] if sub_item['type'] == 'text']) | |
| text_parts.append(f"[{text}]({url})") | |
| else: | |
| text_parts.extend([sub_item['value'] for sub_item in item['content'] if sub_item['type'] == 'text']) | |
| elif item['type'] == 'link': | |
| url = item['data']['destination'].get('web', '') | |
| text = ' '.join([sub_item['value'] for sub_item in item['content'] if sub_item['type'] == 'text']) | |
| if url: | |
| text_parts.append(f"[{text}]({url})") | |
| else: | |
| text_parts.append(text) | |
| return ' '.join(text_parts) | |
| elif content_item['type'] == 'entity' and content_item['subType'] == 'story': | |
| url = content_item['data']['link']['destination'].get('web', '') | |
| text = ' '.join([sub_item['value'] for sub_item in content_item['content'] if sub_item['type'] == 'text']) | |
| return f"Read More: [{text}]({url})" | |
| elif content_item['type'] == 'media' and content_item['subType'] == 'photo': | |
| photo_data = content_item['data']['photo'] | |
| caption = photo_data.get('caption', '') | |
| credit = photo_data.get('credit', '') | |
| src = photo_data.get('src', '') | |
| alt = photo_data.get('alt', '') | |
| return f"\n\n*{caption}* {credit}\n" | |
| elif content_item['type'] == 'media' and content_item['subType'] == 'chart': | |
| chart_data = content_item['data']['chart'] | |
| attachment = content_item['data']['attachment'] | |
| title = attachment.get('title', '') | |
| subtitle = attachment.get('subtitle', '') | |
| source = attachment.get('source', '') | |
| fallback_image = chart_data.get('fallback', '') | |
| footnote = attachment.get('footnote', '') | |
| return f"\n\n**{title}**\n*{subtitle}*\n{footnote}\n{source}\n" | |
| return '' | |
| def scrape_article(self, url): | |
| if 'bloomberg.com' in url: | |
| return self.scrape_bloomberg(url) | |
| elif 'ft.com' in url: | |
| return self.scrape_financial_times(url) | |
| else: | |
| return "Unsupported website. Please provide a URL from Bloomberg or Financial Times." | |
| class ArticleRequest(BaseModel): | |
| url: str | |
| async def hello(): | |
| return {"response" : "Greetings from SS!"} | |
| async def scrape_article(request: ArticleRequest): | |
| scraper = ArticleScraper() | |
| content = scraper.scrape_article(request.url) | |
| if "Unsupported website" in content: | |
| raise HTTPException(status_code=400, detail=content) | |
| return {"content": content} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |