| import feedparser |
| from typing import List |
| from datetime import datetime |
| from app.models import Article |
| import re |
|
|
| class RSSParser: |
| """RSS feed parser for news sources""" |
| |
| async def parse_google_news(self, content: str, category: str) -> List[Article]: |
| """Parse Google News RSS feed with advanced XML parsing""" |
| try: |
| articles = [] |
| |
| |
| item_regex = r'<item>([\s\S]*?)</item>' |
| matches = re.findall(item_regex, content) |
| |
| for item in matches[:20]: |
| title = self._extract_tag(item, 'title') or 'No title' |
| link = self._extract_tag(item, 'link') or self._extract_tag(item, 'guid') or '' |
| description = self._extract_tag(item, 'description') or self._extract_tag(item, 'content:encoded') or '' |
| pub_date = self._extract_tag(item, 'pubDate') or self._extract_tag(item, 'published') or datetime.now().isoformat() |
| creator = self._extract_tag(item, 'dc:creator') or self._extract_tag(item, 'author') or 'Google News' |
| |
| |
| image = self._extract_image_from_xml(item, description, category, title) |
| |
| |
| source_match = re.search(r'<a[^>]*>([^<]+)</a>', description) |
| article_source = source_match.group(1) if source_match else 'Google News' |
| |
| |
| cleaned_description = self._clean_google_news_description(description) |
| |
| article = Article( |
| title=self._clean_html(title), |
| description=cleaned_description, |
| url=link, |
| image_url=image, |
| published_at=pub_date, |
| source=self._clean_html(article_source), |
| category=category |
| ) |
| articles.append(article) |
| |
| return articles |
| except Exception as e: |
| print(f"Error parsing Google News: {e}") |
| return [] |
| |
| def _extract_image_from_xml(self, item: str, description: str, category: str, title: str) -> str: |
| """Extract image from multiple XML sources with fallbacks""" |
| |
| |
| media_match = re.search(r'<media:(content|thumbnail)[^>]*url="([^"]+)"', item) |
| if media_match: |
| return media_match.group(2) |
| |
| |
| enclosure_match = re.search(r'<enclosure[^>]*url="([^"]+)"', item) |
| if enclosure_match: |
| return enclosure_match.group(1) |
| |
| |
| |
| img_match = re.search(r'<img[^>]+src=["\']([^"\']+)["\']', description) |
| if img_match: |
| return img_match.group(1) |
| |
| |
| og_match = re.search(r'property=["\']og:image["\'][^>]*content=["\']([^"\']+)["\']', description) |
| if og_match: |
| return og_match.group(1) |
| |
| |
| |
| |
| return "" |
| |
| def _clean_google_news_description(self, description: str) -> str: |
| """Clean Google News description - they typically only contain links, not actual content""" |
| |
| if 'news.google.com/rss/articles' in description: |
| return '' |
| |
| |
| after_link_match = re.search(r'</a>([\s\S]*)', description) |
| if after_link_match: |
| extracted = self._clean_html(after_link_match.group(1)) |
| if len(extracted) > 30: |
| return extracted[:200] |
| |
| |
| full_clean = self._clean_html(description) |
| if len(full_clean) > 30 and not full_clean.startswith('http'): |
| return full_clean[:200] |
| |
| return '' |
| |
| def _extract_tag(self, xml: str, tag_name: str) -> str: |
| """Extract XML tag content""" |
| pattern = f'<{tag_name}[^>]*>([\\s\\S]*?)</{tag_name}>' |
| match = re.search(pattern, xml, re.IGNORECASE) |
| return match.group(1).strip() if match else '' |
| |
| def _clean_html(self, html: str) -> str: |
| """Remove HTML tags and decode entities""" |
| text = html |
| |
| |
| text = re.sub(r'<!\[CDATA\[([\s\S]*?)\]\]>', r'\1', text) |
| |
| |
| text = re.sub(r'<[^>]+>', '', text) |
| text = re.sub(r'<[^>]*', '', text) |
| text = re.sub(r'>', '', text) |
| |
| |
| entities = { |
| ' ': ' ', '&': '&', '<': '<', '>': '>', |
| '"': '"', ''': "'", ''': "'", |
| '…': '...', '—': '—', '–': '–' |
| } |
| for entity, char in entities.items(): |
| text = text.replace(entity, char) |
| |
| |
| text = re.sub(r'&#\d+;', '', text) |
| |
| |
| text = re.sub(r'\s+', ' ', text).strip() |
| |
| return text |
| |
| async def parse_provider_rss(self, content: str, provider: str) -> List[Article]: |
| """Parse cloud provider RSS feed""" |
| try: |
| feed = feedparser.parse(content) |
| articles = [] |
| |
| for entry in feed.entries[:20]: |
| |
| image_url = self._extract_image_from_entry(entry) |
| |
| |
| published_at = self._parse_date(entry.get('published', '')) |
| |
| |
| description = entry.get('summary', '') |
| if description: |
| |
| description = re.sub(r'<[^>]+>', '', description) |
| description = description[:200] + '...' if len(description) > 200 else description |
| |
| article = Article( |
| title=entry.get('title', ''), |
| description=description, |
| url=entry.get('link', ''), |
| image_url=image_url, |
| published_at=published_at, |
| source=provider.upper(), |
| category=f'cloud-{provider}' |
| ) |
| articles.append(article) |
| |
| return articles |
| except Exception as e: |
| print(f"Error parsing provider RSS: {e}") |
| return [] |
| |
| def _extract_image_from_entry(self, entry) -> str: |
| """Extract image URL from feed entry""" |
| |
| if hasattr(entry, 'media_content') and entry.media_content: |
| return entry.media_content[0].get('url', '') |
| |
| |
| if hasattr(entry, 'media_thumbnail') and entry.media_thumbnail: |
| return entry.media_thumbnail[0].get('url', '') |
| |
| |
| if hasattr(entry, 'enclosures') and entry.enclosures: |
| for enclosure in entry.enclosures: |
| if enclosure.get('type', '').startswith('image'): |
| return enclosure.get('href', '') |
| |
| |
| content = '' |
| if hasattr(entry, 'content') and entry.content: |
| content = entry.content[0].get('value', '') |
| elif hasattr(entry, 'summary'): |
| content = entry.summary |
| |
| if content: |
| import re |
| img_match = re.search(r'<img[^>]+src=["\']([^"\']+)["\']', content) |
| if img_match: |
| return img_match.group(1) |
| |
| |
| return "" |
| |
| def _parse_date(self, date_str: str) -> datetime: |
| """Parse date string to datetime""" |
| try: |
| |
| |
| from dateutil import parser |
| return parser.parse(date_str) |
| except: |
| return datetime.now() |
|
|