Spaces:
Sleeping
Sleeping
| import requests | |
| from bs4 import BeautifulSoup | |
| from transformers import pipeline | |
| import gtts | |
| import io | |
| import os | |
| from tts import TextToSpeechConverter | |
| from datetime import datetime | |
| import xml.etree.ElementTree as ET | |
| from fake_useragent import UserAgent | |
| import locale | |
| news_topics = { | |
| "Technology": ["tech", "digital", "software", "hardware", "IT"], | |
| "AI": ["artificial intelligence", "machine learning", "deep learning", "neural network"], | |
| "Business": ["company", "corporate", "firm", "enterprise", "startup", "market"], | |
| "Finance": ["finance", "investment", "stock", "economy", "trading", "bank"], | |
| "Partnership": ["partner", "collaboration", "alliance", "merger", "acquisition"], | |
| "Social Media": ["social", "platform", "tweet", "facebook", "instagram", "linkedin", "post"], | |
| "Innovation": ["innovate", "new", "advance", "breakthrough", "disruption"], | |
| "Outage": ["outage", "downtime", "disrupt", "service interruption"], | |
| "Launch": ["launch", "release", "introduce", "unveil"], | |
| "Publicity": ["public", "campaign", "promo", "advertisement"], | |
| "Privacy": ["privacy", "data", "security", "breach"], | |
| "Entertainment": ["entertain", "media", "show", "movie", "series"], | |
| "Leadership": ["promotion", "leader", "executive", "ceo", "chairman", "manager"], | |
| "Mergers & Acquisitions": ["merger", "acquisition", "buyout", "takeover"] | |
| } | |
| def fetch_news(company, language=None, region=None): | |
| base_url = "https://news.google.com/rss/search" | |
| language = language or locale.getdefaultlocale()[0].replace('_', '-').lower() or 'en-US' | |
| region = region or 'US' | |
| params = { | |
| "q": f'"{company}"', | |
| "hl": language, | |
| "gl": region, | |
| "ceid": f"{region}:{language.split('-')[0]}" | |
| } | |
| headers = {"User-Agent": UserAgent().random, "Accept": "application/xml"} | |
| print(f"Fetching news for {company} with URL: {base_url}?{'&'.join(f'{k}={v}' for k, v in params.items())}") | |
| try: | |
| response = requests.get(base_url, headers=headers, params=params, timeout=15) | |
| print(f"Response status for {company}: {response.status_code}") | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, features="xml") | |
| if not soup: | |
| print("Error: BeautifulSoup returned None. Falling back to ElementTree.") | |
| return parse_with_elementtree(response.content, company) | |
| items = soup.find_all("item")[:10] | |
| if not items: | |
| print(f"No news items found in the RSS feed for {company} with BeautifulSoup.") | |
| return parse_with_elementtree(response.content, company) | |
| print(f"Found {len(items)} items with BeautifulSoup.") | |
| articles = [] | |
| for item in items: | |
| title = getattr(item.title, 'text', "No title") if item.title else "No title" | |
| desc = getattr(item.description, 'text', title) if item.description else title | |
| link = item.link.next_sibling.strip() if item.link and item.link.next_sibling else "No link" | |
| raw_date = getattr(item.pubDate, 'text', "Date not available") if item.pubDate else "Date not available" | |
| try: | |
| pub_date = datetime.strptime(raw_date, "%a, %d %b %Y %H:%M:%S %Z").strftime("%a, %d %b %Y") | |
| except ValueError: | |
| pub_date = "Date not available" | |
| desc_soup = BeautifulSoup(desc, "html.parser") | |
| full_text = desc_soup.get_text(separator=" ").strip() | |
| summary = full_text.replace(title, "").strip() | |
| summary_words = summary.split() | |
| source = title.split(" - ")[-1].strip() if " - " in title else "Unknown Source" | |
| final_summary = " ".join(summary_words[:80]) + f" - {source}" if len(summary_words) > 10 else f"{title} - {source}" | |
| articles.append({ | |
| "title": title, | |
| "summary": final_summary, | |
| "link": link, | |
| "pub_date": pub_date | |
| }) | |
| print(f"Successfully fetched {len(articles)} articles for {company} with BeautifulSoup") | |
| return articles | |
| except requests.exceptions.RequestException as e: | |
| print(f"Request failed for {company}: {str(e)}") | |
| return [] | |
| except Exception as e: | |
| print(f"Error processing news for {company} with BeautifulSoup: {str(e)}. Falling back to ElementTree.") | |
| return parse_with_elementtree(response.content, company) | |
| def parse_with_elementtree(content, company): | |
| print("Attempting to parse with ElementTree...") | |
| try: | |
| root = ET.fromstring(content) | |
| items = root.findall(".//item")[:10] | |
| if not items: | |
| print(f"No news items found in the RSS feed for {company} with ElementTree") | |
| return [] | |
| articles = [] | |
| for item in items: | |
| title = item.find("title").text if item.find("title") is not None else "No title" | |
| desc = item.find("description").text if item.find("description") is not None else title | |
| link = item.find("link").text if item.find("link") is not None else "No link" | |
| raw_date = item.find("pubDate").text if item.find("pubDate") is not None else "Date not available" | |
| try: | |
| pub_date = datetime.strptime(raw_date, "%a, %d %b %Y %H:%M:%S %Z").strftime("%a, %d %b %Y") | |
| except ValueError: | |
| pub_date = "Date not available" | |
| desc_soup = BeautifulSoup(desc, "html.parser") | |
| full_text = desc_soup.get_text(separator=" ").strip() | |
| summary = full_text if full_text else title | |
| summary_words = summary.split() | |
| source = title.split(" - ")[-1].strip() if " - " in title else "Unknown Source" | |
| final_summary = " ".join(summary_words[:80]) + f" - {source}" if len(summary_words) > 10 else f"{title} - {source}" | |
| articles.append({ | |
| "title": title, | |
| "summary": final_summary, | |
| "link": link, | |
| "pub_date": pub_date | |
| }) | |
| print(f"Successfully fetched {len(articles)} articles for {company} with ElementTree") | |
| return articles | |
| except Exception as e: | |
| print(f"Error processing news for {company} with ElementTree: {str(e)}") | |
| return [] | |
| sentiment_analyzer = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english") | |
| def analyze_sentiment(text): | |
| try: | |
| result = sentiment_analyzer(text[:512])[0] | |
| score = result["score"] | |
| label = result["label"] | |
| if score < 0.7 or "how to" in text.lower() or "review" in text.lower(): | |
| return "Neutral" | |
| return "Positive" if label == "POSITIVE" else "Negative" | |
| except Exception as e: | |
| print(f"Sentiment analysis error: {e}") | |
| return "Neutral" | |
| def extract_topics(text, max_topics=2): | |
| text_lower = text.lower() | |
| topic_scores = {} | |
| for topic, keywords in news_topics.items(): | |
| count = sum(text_lower.count(keyword.lower()) for keyword in keywords) | |
| if count > 0: | |
| topic_scores[topic] = count | |
| sorted_topics = sorted(topic_scores.items(), key=lambda x: x[1], reverse=True) | |
| return [topic for topic, _ in sorted_topics][:max_topics] if sorted_topics else ["General News"] | |
| tts_converter = TextToSpeechConverter() | |
| def generate_tts(text, language='hi'): | |
| try: | |
| if language == 'hi': | |
| result = tts_converter.generate_speech(text) | |
| if result["success"]: | |
| print(f"Hindi audio generated in memory") | |
| return result["audio_buffer"] | |
| else: | |
| print(f"Hindi audio error: {result['message']}") | |
| return None | |
| else: | |
| tts = gtts.gTTS(text=text, lang='en', slow=False) | |
| audio_buffer = io.BytesIO() | |
| tts.write_to_fp(audio_buffer) | |
| audio_buffer.seek(0) | |
| return audio_buffer | |
| except Exception as e: | |
| print(f"Audio generation error for {language}: {str(e)}") | |
| return None |