| import sys |
| import os |
| import pandas as pd |
| from datetime import datetime, timedelta |
| from bs4 import BeautifulSoup |
| import requests |
| import re |
| import time |
| import feedparser |
| import gradio as gr |
|
|
| |
| RSS_URL = 'https://vecherka.su/rss/' |
| CSV_FILE_PATH = 'bd.csv' |
|
|
|
|
|
|
| def parse_article_date(published_date_str): |
| """Парсит дату публикации статьи из RSS в объект datetime.""" |
| date_formats = [ |
| '%a, %d %b %Y %H:%M:%S %z', |
| '%a, %d %b %Y %H:%M:%S %Z', |
| '%d %b %Y' |
| ] |
| for fmt in date_formats: |
| try: |
| return datetime.strptime(published_date_str, fmt) |
| except ValueError: |
| continue |
| print(f"Could not parse date: '{published_date_str}'") |
| return None |
|
|
| def is_recent_article(parsed_date): |
| """Проверяет, опубликована ли статья сегодня или вчера.""" |
| today = datetime.now() |
| yesterday = today - timedelta(days=1) |
| article_date_str = parsed_date.strftime('%d-%m-%Y') |
| today_str = today.strftime('%d-%m-%Y') |
| yesterday_str = yesterday.strftime('%d-%m-%Y') |
| return article_date_str == today_str or article_date_str == yesterday_str |
|
|
|
|
|
|
|
|
|
|
| def extract_images_from_entry(entry): |
| """Извлекает URL изображений из RSS‑записи (media_content, links, HTML‑контент).""" |
| image_urls = [] |
|
|
| |
| if 'media_content' in entry and len(image_urls) < 3: |
| for media in entry.media_content: |
| if (media.get('type', '').startswith('image/') and |
| media.get('url') and media.get('url') not in image_urls): |
| image_urls.append(media['url']) |
| if len(image_urls) == 3: break |
|
|
| |
| if 'links' in entry and len(image_urls) < 3: |
| for link_entry in entry.links: |
| if (link_entry.get('rel') == 'enclosure' and |
| link_entry.get('type', '').startswith('image/') and |
| link_entry.get('href') and link_entry.get('href') not in image_urls): |
| image_urls.append(link_entry['href']) |
| if len(image_urls) == 3: break |
|
|
| |
| html_content = entry.get('summary', '') or ( |
| entry.get('content', [{}])[0].get('value', '') if entry.get('content') else '') |
| if html_content and len(image_urls) < 3: |
| soup = BeautifulSoup(html_content, 'html.parser') |
| img_tags = soup.find_all('img') |
| for img in img_tags: |
| if img.get('src') and img.get('src') not in image_urls: |
| image_urls.append(img['src']) |
| if len(image_urls) == 3: break |
| return image_urls |
|
|
|
|
|
|
| def fetch_article_text(news_link): |
| """Загружает и извлекает текст статьи по ссылке.""" |
| try: |
| response = requests.get(news_link, timeout=10) |
| response.raise_for_status() |
| article_soup = BeautifulSoup(response.text, 'html.parser') |
| detail_text_div = article_soup.find('div', class_='detail-text') |
|
|
| if detail_text_div: |
| full_text = detail_text_div.get_text(separator=' ', strip=True) |
| |
| full_text = re.sub(r'[^.!?]*\bподписывайтесь\b[^.!?]*[?.!]', '', full_text, flags=re.IGNORECASE) |
| full_text = re.sub(r'\s+', ' ', full_text).strip() |
|
|
| |
| if re.search(r'\bРеклама\b', full_text, re.IGNORECASE): |
| print(f"Skipping article due to 'Реклама' in full text.") |
| return None |
| return full_text |
| else: |
| print(f"Could not find 'detail-text' div for article: {news_link}") |
| return None |
| except requests.exceptions.RequestException as e: |
| print(f"Error fetching content for {news_link}: {e}") |
| return None |
| except Exception as e: |
| print(f"Error parsing content for {news_link}: {e}") |
| return None |
|
|
|
|
|
|
|
|
| def check_for_new_articles(): |
| """Основная функция: проверяет RSS‑ленту на новые статьи и сохраняет их в CSV.""" |
| print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Checking for new articles...") |
|
|
| |
| today_date = datetime.now() |
| yesterday_date = today_date - timedelta(days=1) |
| today_str = today_date.strftime('%d-%m-%Y') |
| yesterday_str = yesterday_date.strftime('%d-%m-%Y') |
|
|
|
|
| return today_str, yesterday_str |
|
|
|
|
|
|
| def load_existing_articles(): |
| """Загружает существующие статьи из CSV для предотвращения дубликатов.""" |
| processed_links = set() |
| existing_df = None |
|
|
| if os.path.exists(CSV_FILE_PATH): |
| try: |
| existing_df = pd.read_csv(CSV_FILE_PATH, encoding='utf-8-sig', sep=';') |
| processed_links = set(existing_df['link'].tolist()) |
| print(f"Loaded {len(processed_links)} existing articles from {CSV_FILE_PATH}.") |
| except Exception as e: |
| print(f"Error loading existing CSV: {e}. Starting with an empty processed_links set.") |
|
|
| return processed_links, existing_df |
|
|
|
|
|
|
|
|
| def fetch_and_parse_rss(): |
| """Получает и парсит RSS‑ленту.""" |
| feed = feedparser.parse(RSS_URL) |
|
|
| if not feed.entries: |
| print("No entries found in the RSS feed.") |
| return [] |
|
|
| print(f"Found {len(feed.entries)} entries in RSS feed.") |
| return feed.entries |
|
|
|
|
|
|
|
|
| def process_single_article(entry, today_str, yesterday_str, processed_links): |
| """Обрабатывает одну статью: проверяет дату, извлекает данные, возвращает словарь с данными или None.""" |
| title = getattr(entry, 'title', 'No Title') |
| news_link = getattr(entry, 'link', None) |
|
|
| |
| if not news_link or news_link in processed_links: |
| return None |
|
|
| published_date_str = getattr(entry, 'published', None) |
| if not published_date_str: |
| print(f"Skipping entry '{title}' due to missing publication date.") |
| return None |
|
|
| |
| parsed_date = parse_article_date(published_date_str) |
| if not parsed_date: |
| return None |
|
|
| article_date_str = parsed_date.strftime('%d-%m-%Y') |
|
|
| |
| if article_date_str != today_str and article_date_str != yesterday_str: |
| return None |
|
|
| |
| image_urls = extract_images_from_entry(entry) |
|
|
| |
| full_text = fetch_article_text(news_link) |
| if not full_text: |
| return None |
|
|
| |
| short_text = full_text[:200] if len(full_text) > 200 else full_text |
|
|
| return { |
| 'title': title, |
| 'published': article_date_str, |
| 'image_urls': image_urls, |
| 'link': news_link, |
| 'full_text': full_text, |
| 'Status': 'Off', |
| 'short_text': short_text, |
| 'Constant': '' |
| } |
|
|
|
|
| def save_new_articles(new_articles_data, existing_df): |
| """Сохраняет новые статьи в CSV‑файл.""" |
| if not new_articles_data: |
| print("No new articles found to add.") |
| return 0 |
|
|
| new_df = pd.DataFrame(new_articles_data) |
| |
| new_df['image_urls'] = new_df['image_urls'].apply(lambda x: ', '.join(x)) |
|
|
| if existing_df is not None and not existing_df.empty: |
| |
| new_df.to_csv( |
| CSV_FILE_PATH, |
| mode='a', |
| header=False, |
| index=False, |
| encoding='utf-8-sig', |
| sep=';' |
| ) |
| else: |
| |
| new_df.to_csv( |
| CSV_FILE_PATH, |
| mode='w', |
| header=True, |
| index=False, |
| encoding='utf-8-sig', |
| sep=';' |
| ) |
|
|
| articles_added_count = len(new_articles_data) |
| print(f"Added {articles_added_count} new articles to {CSV_FILE_PATH}.") |
| return articles_added_count |
|
|
|
|
|
|
| def check_for_new_articles(): |
| """Основная функция: проверяет RSS‑ленту на новые статьи и сохраняет их в CSV. Возвращает количество добавленных статей.""" |
| |
| today_str, yesterday_str = check_for_new_articles_init() |
|
|
|
|
| |
| processed_links, existing_df = load_existing_articles() |
|
|
|
|
| |
| entries = fetch_and_parse_rss() |
| if not entries: |
| return 0 |
|
|
| |
| new_articles_data = [] |
| for entry in entries: |
| article_data = process_single_article(entry, today_str, yesterday_str, processed_links) |
| if article_data: |
| new_articles_data.append(article_data) |
|
|
| |
| return save_new_articles(new_articles_data, existing_df) |
|
|