Jman666 commited on
Commit
4db6283
·
verified ·
1 Parent(s): b09895f

Upload utils.py

Browse files
Files changed (1) hide show
  1. utils.py +291 -0
utils.py ADDED
@@ -0,0 +1,291 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ from bs4 import BeautifulSoup
3
+ import pandas as pd
4
+ from typing import List, Dict, Any
5
+ import numpy as np
6
+ from transformers import pipeline
7
+ import urllib.parse
8
+ from sklearn.feature_extraction.text import TfidfVectorizer
9
+ import tldextract
10
+ from deep_translator import GoogleTranslator
11
+ from playsound import playsound
12
+ import soundfile as sf
13
+ from transformers import AutoModel, AutoTokenizer
14
+ def search_news(company_name: str, num_articles: int = 2) -> List[str]:
15
+ search_url = f"https://www.google.com/search?q={company_name}+news&tbm=nws"
16
+ headers = {
17
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
18
+ }
19
+
20
+ try:
21
+ response = requests.get(search_url, headers=headers)
22
+ response.raise_for_status()
23
+ soup = BeautifulSoup(response.text, "html.parser")
24
+
25
+ article_links = []
26
+ for article in soup.select('.SoaBEf'):
27
+ link_element = article.select_one('a')
28
+ if link_element and 'href' in link_element.attrs:
29
+ href = link_element['href']
30
+ if href.startswith('/url?q='):
31
+ url = href.split('/url?q=')[1].split('&')[0]
32
+ url = urllib.parse.unquote(url)
33
+ article_links.append(url)
34
+ elif href.startswith('http'):
35
+ article_links.append(href)
36
+
37
+ if len(article_links) >= num_articles:
38
+ break
39
+
40
+ return article_links
41
+ except Exception as e:
42
+ print(f"Error fetching news articles: {e}")
43
+ return []
44
+
45
+ def extract_article_content(url: str) -> Dict[str, Any]:
46
+ headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
47
+
48
+ try:
49
+ response = requests.get(url, headers=headers)
50
+ response.raise_for_status()
51
+ soup = BeautifulSoup(response.text, "html.parser")
52
+
53
+ title = soup.find("h1").get_text().strip() if soup.find("h1") else "No title found"
54
+
55
+ content_element = soup.find("article") or soup.find("main") or soup.find("div", class_=["content", "article", "story"])
56
+ content = " ".join([p.get_text().strip() for p in content_element.find_all("p")]) if content_element else "No content found"
57
+
58
+ date_element = soup.find("time")
59
+ date = date_element["datetime"] if date_element and "datetime" in date_element.attrs else None
60
+
61
+ return {
62
+ 'url': url,
63
+ 'title': title,
64
+ 'content': content,
65
+ 'date': date
66
+ }
67
+ except Exception as e:
68
+ print(f"Error extracting content from {url}: {e}")
69
+ return {
70
+ 'url': url,
71
+ 'title': "Error extracting content",
72
+ 'content': "Error extracting content",
73
+ 'date': None
74
+ }
75
+
76
+ def get_company_news(company_name: str) -> List[Dict[str, Any]]:
77
+ """
78
+ Fetch exactly 10 news articles for a given company.
79
+ If fewer than 10 articles are retrieved initially, retry fetching more.
80
+ """
81
+ max_articles = 10
82
+ articles = []
83
+ retries = 3 # Number of retries to fetch missing articles
84
+
85
+ for attempt in range(retries):
86
+ # Fetch article URLs
87
+ article_urls = search_news(company_name, num_articles=max_articles - len(articles))
88
+
89
+ # Process each URL to extract content
90
+ for url in article_urls:
91
+ try:
92
+ article_data = extract_article_content(url)
93
+ # Avoid duplicates by checking the URL
94
+ if article_data['url'] not in [a['url'] for a in articles]:
95
+ articles.append(article_data)
96
+ except Exception as e:
97
+ print(f"Error extracting from {url}: {e}")
98
+
99
+ # Break if we have enough articles
100
+ if len(articles) >= max_articles:
101
+ break
102
+
103
+ # If still fewer than 10 articles, fill with placeholders
104
+ while len(articles) < max_articles:
105
+ articles.append({
106
+ 'url': 'N/A',
107
+ 'title': 'No Title Available',
108
+ 'content': 'No Content Available',
109
+ 'date': None
110
+ })
111
+
112
+ return articles
113
+ def summarize_article(content: str, max_length: int = 50) -> str:
114
+ summarizer = pipeline("summarization")
115
+ max_input_length = summarizer.model.config.max_position_embeddings # Get model's max input length
116
+
117
+ # Ensure content does not exceed max input length
118
+ truncated_content = content[:max_input_length]
119
+
120
+ summary = summarizer(truncated_content, max_length=max_length, min_length=0, do_sample=False)
121
+ return summary[0]['summary_text']
122
+
123
+ def analyze_sentiment(text: str) -> Dict[str, Any]:
124
+ """
125
+ Analyze sentiment of the given text.
126
+
127
+ Args:
128
+ text: The text to analyze.
129
+
130
+ Returns:
131
+ Dictionary containing sentiment category and score.
132
+ """
133
+ try:
134
+ # Initialize sentiment analyzer
135
+ sentiment_analyzer = pipeline("sentiment-analysis", truncation=True)
136
+
137
+ # Truncate text manually to avoid exceeding token limits
138
+ max_token_limit = 512 # Most transformer models have a 512-token limit
139
+ words = text.split()
140
+ if len(words) > max_token_limit:
141
+ text = ' '.join(words[:max_token_limit])
142
+
143
+ # Perform sentiment analysis
144
+ result = sentiment_analyzer(text)
145
+
146
+ # Determine sentiment category based on label and score
147
+ sentiment_category = "Positive" if result[0]['label'] == "POSITIVE" else "Negative"
148
+ score = result[0]['score']
149
+
150
+ # Add neutral category for borderline cases
151
+ if 0.4 <= score <= 0.6:
152
+ sentiment_category = "Neutral"
153
+
154
+ return {
155
+ 'sentiment': sentiment_category,
156
+ 'score': score
157
+ }
158
+ except Exception as e:
159
+ print(f"Error in sentiment analysis: {e}")
160
+ return {
161
+ 'sentiment': "Unknown",
162
+ 'score': 0.0
163
+ }
164
+
165
+ def extract_key_topics(text: str, num_topics: int = 5) -> List[str]:
166
+ if len(text.split()) < 10:
167
+ return ["Not enough text to extract topics"]
168
+
169
+ vectorizer = TfidfVectorizer(stop_words='english', max_features=100)
170
+ tfidf_matrix = vectorizer.fit_transform([text])
171
+ feature_names = vectorizer.get_feature_names_out()
172
+ tfidf_scores = tfidf_matrix.toarray()[0]
173
+ sorted_indices = np.argsort(tfidf_scores)[::-1]
174
+ top_topics = [feature_names[idx] for idx in sorted_indices[:num_topics]]
175
+
176
+ return top_topics
177
+
178
+ def perform_comparative_analysis(articles: List[Dict[str, Any]]) -> Dict[str, Any]:
179
+ sentiment_counts = {
180
+ 'Positive': len([a for a in articles if a['sentiment']['sentiment'] == 'Positive']),
181
+ 'Neutral': len([a for a in articles if a['sentiment']['sentiment'] == 'Neutral']),
182
+ 'Negative': len([a for a in articles if a['sentiment']['sentiment'] == 'Negative'])
183
+ }
184
+
185
+ all_topics = [topic for article in articles for topic in article['topics']]
186
+ topic_frequency = {}
187
+ for topic in all_topics:
188
+ topic_frequency[topic] = topic_frequency.get(topic, 0) + 1
189
+
190
+ common_topics = sorted(topic_frequency.items(), key=lambda x: x[1], reverse=True)
191
+
192
+ sentiment_by_source = {}
193
+ for article in articles:
194
+ source = extract_source_from_url(article['url'])
195
+ if source not in sentiment_by_source:
196
+ sentiment_by_source[source] = []
197
+ sentiment_by_source[source].append(article['sentiment']['sentiment'])
198
+
199
+ return {
200
+ 'sentiment_distribution': sentiment_counts,
201
+ 'common_topics': common_topics[:10],
202
+ 'sentiment_by_source': sentiment_by_source
203
+ }
204
+
205
+ def extract_source_from_url(url: str) -> str:
206
+ extracted_info = tldextract.extract(url)
207
+ return extracted_info.domain
208
+
209
+ from typing import List, Dict, Any
210
+ from transformers import pipeline
211
+
212
+ def get_combined_summary(articles, max_length: int = 100) -> str:
213
+ """
214
+ Generate a combined summary from multiple news articles.
215
+
216
+ Args:
217
+ articles: List of article dictionaries containing content
218
+ max_length: Maximum length of the final summary
219
+
220
+ Returns:
221
+ A comprehensive summary combining insights from all articles
222
+ """
223
+ # Combine all article contents with titles as context
224
+ combined_content = ""
225
+ for article in articles:
226
+ # Use .get() with default values to handle missing keys
227
+ title = article.get('title', 'No Title')
228
+ content = article.get('content', 'Content not available')
229
+ combined_content += f"Article: {title}\n{content}\n\n"
230
+
231
+ # Initialize the summarizer
232
+ summarizer = pipeline("summarization")
233
+
234
+ # Handle token limit constraints
235
+ max_input_length = summarizer.model.config.max_position_embeddings
236
+ truncated_content = combined_content[:max_input_length]
237
+
238
+ # Generate the combined summary
239
+ summary = summarizer(truncated_content, max_length=max_length, min_length=30, do_sample=False)
240
+
241
+ # Handle different return formats from the pipeline
242
+ if isinstance(summary, list):
243
+ return summary[0]['summary_text']
244
+ else:
245
+ return summary['summary_text']
246
+
247
+ def generate_hindi_summary(combined_summary: str) -> str:
248
+ """
249
+ Translate the combined summary to Hindi using deep-translator.
250
+
251
+ Args:
252
+ combined_summary: The English combined summary
253
+
254
+ Returns:
255
+ The Hindi translation of the combined summary
256
+ """
257
+ try:
258
+ translator = GoogleTranslator(source='auto', target='hi')
259
+ hindi_summary = translator.translate(text=combined_summary)
260
+ return hindi_summary
261
+ except Exception as e:
262
+ print(f"Error in translation: {e}")
263
+ return "Translation failed"
264
+ def generate_hindi_speech(hindi_summary: str):
265
+ """
266
+ Convert Hindi summary to speech using AI4Bharat's VITS-Rasa-13 model and play it
267
+
268
+ Args:
269
+ hindi_summary: Hindi text summary to synthesize (max 500 characters)
270
+ """
271
+ try:
272
+ # Load pre-trained model (requires CUDA-enabled GPU)
273
+ model = AutoModel.from_pretrained("ai4bharat/vits_rasa_13", trust_remote_code=True).to("cuda")
274
+ tokenizer = AutoTokenizer.from_pretrained("ai4bharat/vits_rasa_13", trust_remote_code=True)
275
+
276
+ # Process text and generate speech
277
+ inputs = tokenizer(text=hindi_summary, return_tensors="pt").to("cuda")
278
+
279
+ # Use default Indian voice profile (speaker_id=16 for male, 17 for female)
280
+ outputs = model(inputs['input_ids'], speaker_id=16, emotion_id=0)
281
+
282
+ # Convert to numpy array and save as temporary file
283
+ audio_data = outputs.waveform.squeeze().cpu().numpy()
284
+ sf.write("temp_hindi_speech.wav", audio_data, model.config.sampling_rate)
285
+
286
+ # Play the audio using playsound
287
+ playsound("temp_hindi_speech.wav")
288
+
289
+ except Exception as e:
290
+ print(f"Error in speech generation or playback: {e}")
291
+