Ali Abdullah commited on
Commit
063051a
·
verified ·
1 Parent(s): ae7c986

Update web_scraper.py

Browse files
Files changed (1) hide show
  1. web_scraper.py +138 -225
web_scraper.py CHANGED
@@ -1,226 +1,139 @@
1
- import requests
2
- from bs4 import BeautifulSoup
3
- import re
4
- from typing import List, Dict
5
- from urllib.parse import urljoin, urlparse
6
- import time
7
- import nltk
8
- from nltk.tokenize import sent_tokenize
9
-
10
- # Download required NLTK data
11
- try:
12
- nltk.data.find('tokenizers/punkt')
13
- except LookupError:
14
- nltk.download('punkt')
15
-
16
- class WebScraper:
17
- def __init__(self, delay: float = 1.0):
18
- """
19
- Initialize web scraper
20
- Args:
21
- delay: Delay between requests to be respectful to servers
22
- """
23
- self.delay = delay
24
- self.session = requests.Session()
25
- self.session.headers.update({
26
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
27
- })
28
-
29
- def scrape_article(self, url: str) -> Dict[str, str]:
30
- """
31
- Scrape article content from a URL
32
- Args:
33
- url: URL to scrape
34
- Returns:
35
- Dictionary with title, content, and metadata
36
- """
37
- try:
38
- print(f"Scraping: {url}")
39
- response = self.session.get(url, timeout=10)
40
- response.raise_for_status()
41
-
42
- soup = BeautifulSoup(response.content, 'html.parser')
43
-
44
- # Extract title
45
- title = self._extract_title(soup)
46
-
47
- # Extract main content
48
- content = self._extract_content(soup)
49
-
50
- # Clean and process content
51
- cleaned_content = self._clean_text(content)
52
-
53
- time.sleep(self.delay) # Be respectful to the server
54
-
55
- return {
56
- 'url': url,
57
- 'title': title,
58
- 'content': cleaned_content,
59
- 'word_count': len(cleaned_content.split()),
60
- 'char_count': len(cleaned_content)
61
- }
62
-
63
- except Exception as e:
64
- print(f"Error scraping {url}: {str(e)}")
65
- return {
66
- 'url': url,
67
- 'title': '',
68
- 'content': '',
69
- 'error': str(e),
70
- 'word_count': 0,
71
- 'char_count': 0
72
- }
73
-
74
- def _extract_title(self, soup: BeautifulSoup) -> str:
75
- """Extract title from HTML"""
76
- # Try different title selectors
77
- title_selectors = [
78
- 'h1',
79
- 'title',
80
- '.title',
81
- '.article-title',
82
- '[data-testid="headline"]'
83
- ]
84
-
85
- for selector in title_selectors:
86
- element = soup.select_one(selector)
87
- if element and element.get_text().strip():
88
- return element.get_text().strip()
89
-
90
- return "No title found"
91
-
92
- def _extract_content(self, soup: BeautifulSoup) -> str:
93
- """Extract main content from HTML"""
94
- # Remove unwanted elements
95
- for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'ad']):
96
- element.decompose()
97
-
98
- # Try different content selectors in order of preference
99
- content_selectors = [
100
- 'article',
101
- '.article-content',
102
- '.post-content',
103
- '.entry-content',
104
- '.content',
105
- 'main',
106
- '.main-content',
107
- '[role="main"]'
108
- ]
109
-
110
- for selector in content_selectors:
111
- element = soup.select_one(selector)
112
- if element:
113
- return element.get_text()
114
-
115
- # Fallback: get all paragraph text
116
- paragraphs = soup.find_all('p')
117
- return '\n'.join([p.get_text() for p in paragraphs])
118
-
119
- def _clean_text(self, text: str) -> str:
120
- """Clean and normalize text"""
121
- # Remove extra whitespace
122
- text = re.sub(r'\s+', ' ', text)
123
-
124
- # Remove special characters but keep basic punctuation
125
- text = re.sub(r'[^\w\s.,!?;:()\-"]', '', text)
126
-
127
- # Remove very short lines (likely navigation/ads)
128
- lines = text.split('\n')
129
- meaningful_lines = [line.strip() for line in lines if len(line.strip()) > 20]
130
-
131
- return ' '.join(meaningful_lines).strip()
132
-
133
- class TextChunker:
134
- def __init__(self, chunk_size: int = 500, overlap: int = 50):
135
- """
136
- Initialize text chunker
137
- Args:
138
- chunk_size: Maximum tokens per chunk
139
- overlap: Overlap between chunks
140
- """
141
- self.chunk_size = chunk_size
142
- self.overlap = overlap
143
-
144
- def chunk_text(self, text: str, metadata: Dict = None) -> List[Dict]:
145
- """
146
- Split text into overlapping chunks
147
- Args:
148
- text: Text to chunk
149
- metadata: Additional metadata to include
150
- Returns:
151
- List of chunk dictionaries
152
- """
153
- if not text.strip():
154
- return []
155
-
156
- # Use sentence tokenization for better chunk boundaries
157
- sentences = sent_tokenize(text)
158
- chunks = []
159
- current_chunk = []
160
- current_length = 0
161
-
162
- for sentence in sentences:
163
- sentence_length = len(sentence.split())
164
-
165
- # If adding this sentence would exceed chunk size, create a new chunk
166
- if current_length + sentence_length > self.chunk_size and current_chunk:
167
- chunk_text = ' '.join(current_chunk)
168
- chunks.append(self._create_chunk_dict(chunk_text, metadata, len(chunks)))
169
-
170
- # Start new chunk with overlap
171
- overlap_sentences = current_chunk[-self.overlap//10:] if len(current_chunk) >= self.overlap//10 else current_chunk
172
- current_chunk = overlap_sentences + [sentence]
173
- current_length = sum(len(s.split()) for s in current_chunk)
174
- else:
175
- current_chunk.append(sentence)
176
- current_length += sentence_length
177
-
178
- # Add the last chunk
179
- if current_chunk:
180
- chunk_text = ' '.join(current_chunk)
181
- chunks.append(self._create_chunk_dict(chunk_text, metadata, len(chunks)))
182
-
183
- return chunks
184
-
185
- def _create_chunk_dict(self, text: str, metadata: Dict, chunk_id: int) -> Dict:
186
- """Create a chunk dictionary with metadata"""
187
- chunk_dict = {
188
- 'chunk_id': chunk_id,
189
- 'text': text,
190
- 'word_count': len(text.split()),
191
- 'char_count': len(text)
192
- }
193
-
194
- if metadata:
195
- chunk_dict.update(metadata)
196
-
197
- return chunk_dict
198
-
199
- # Example usage
200
- if __name__ == "__main__":
201
- # Test the scraper
202
- scraper = WebScraper()
203
- chunker = TextChunker()
204
-
205
- # Test URL (replace with your target URL)
206
- test_url = "https://medium.com/@aminajavaid30/building-a-rag-system-the-data-ingestion-pipeline-d04235fd17ea"
207
-
208
- # Scrape content
209
- article_data = scraper.scrape_article(test_url)
210
- print(f"Title: {article_data['title']}")
211
- print(f"Content length: {article_data['word_count']} words")
212
-
213
- # Create chunks
214
- if article_data['content']:
215
- chunks = chunker.chunk_text(
216
- article_data['content'],
217
- metadata={
218
- 'url': article_data['url'],
219
- 'title': article_data['title']
220
- }
221
- )
222
- print(f"Created {len(chunks)} chunks")
223
-
224
- # Show first chunk
225
- if chunks:
226
  print(f"First chunk: {chunks[0]['text'][:200]}...")
 
1
+ import requests
2
+ from bs4 import BeautifulSoup
3
+ import re
4
+ from typing import List, Dict
5
+ from urllib.parse import urljoin, urlparse
6
+ import time
7
+ import nltk
8
+ from nltk.tokenize import sent_tokenize
9
+
10
+ # Download required NLTK data
11
+ try:
12
+ nltk.data.find('tokenizers/punkt')
13
+ except LookupError:
14
+ nltk.download('punkt')
15
+
16
+ from newspaper import Article
17
+
18
+ class WebScraper:
19
+ def __init__(self, delay: float = 1.0):
20
+ self.delay = delay
21
+
22
+ def scrape_article(self, url: str) -> Dict[str, str]:
23
+ try:
24
+ article = Article(url)
25
+ article.download()
26
+ article.parse()
27
+
28
+ return {
29
+ 'url': url,
30
+ 'title': article.title or 'Untitled',
31
+ 'content': article.text,
32
+ 'word_count': len(article.text.split()),
33
+ 'char_count': len(article.text)
34
+ }
35
+
36
+ except Exception as e:
37
+ return {
38
+ 'url': url,
39
+ 'title': '',
40
+ 'content': '',
41
+ 'error': str(e),
42
+ 'word_count': 0,
43
+ 'char_count': 0
44
+ }
45
+
46
+ class TextChunker:
47
+ def __init__(self, chunk_size: int = 500, overlap: int = 50):
48
+ """
49
+ Initialize text chunker
50
+ Args:
51
+ chunk_size: Maximum tokens per chunk
52
+ overlap: Overlap between chunks
53
+ """
54
+ self.chunk_size = chunk_size
55
+ self.overlap = overlap
56
+
57
+ def chunk_text(self, text: str, metadata: Dict = None) -> List[Dict]:
58
+ """
59
+ Split text into overlapping chunks
60
+ Args:
61
+ text: Text to chunk
62
+ metadata: Additional metadata to include
63
+ Returns:
64
+ List of chunk dictionaries
65
+ """
66
+ if not text.strip():
67
+ return []
68
+
69
+ # Use sentence tokenization for better chunk boundaries
70
+ sentences = sent_tokenize(text)
71
+ chunks = []
72
+ current_chunk = []
73
+ current_length = 0
74
+
75
+ for sentence in sentences:
76
+ sentence_length = len(sentence.split())
77
+
78
+ # If adding this sentence would exceed chunk size, create a new chunk
79
+ if current_length + sentence_length > self.chunk_size and current_chunk:
80
+ chunk_text = ' '.join(current_chunk)
81
+ chunks.append(self._create_chunk_dict(chunk_text, metadata, len(chunks)))
82
+
83
+ # Start new chunk with overlap
84
+ overlap_sentences = current_chunk[-self.overlap//10:] if len(current_chunk) >= self.overlap//10 else current_chunk
85
+ current_chunk = overlap_sentences + [sentence]
86
+ current_length = sum(len(s.split()) for s in current_chunk)
87
+ else:
88
+ current_chunk.append(sentence)
89
+ current_length += sentence_length
90
+
91
+ # Add the last chunk
92
+ if current_chunk:
93
+ chunk_text = ' '.join(current_chunk)
94
+ chunks.append(self._create_chunk_dict(chunk_text, metadata, len(chunks)))
95
+
96
+ return chunks
97
+
98
+ def _create_chunk_dict(self, text: str, metadata: Dict, chunk_id: int) -> Dict:
99
+ """Create a chunk dictionary with metadata"""
100
+ chunk_dict = {
101
+ 'chunk_id': chunk_id,
102
+ 'text': text,
103
+ 'word_count': len(text.split()),
104
+ 'char_count': len(text)
105
+ }
106
+
107
+ if metadata:
108
+ chunk_dict.update(metadata)
109
+
110
+ return chunk_dict
111
+
112
+ # Example usage
113
+ if __name__ == "__main__":
114
+ # Test the scraper
115
+ scraper = WebScraper()
116
+ chunker = TextChunker()
117
+
118
+ # Test URL (replace with your target URL)
119
+ test_url = "https://medium.com/@aminajavaid30/building-a-rag-system-the-data-ingestion-pipeline-d04235fd17ea"
120
+
121
+ # Scrape content
122
+ article_data = scraper.scrape_article(test_url)
123
+ print(f"Title: {article_data['title']}")
124
+ print(f"Content length: {article_data['word_count']} words")
125
+
126
+ # Create chunks
127
+ if article_data['content']:
128
+ chunks = chunker.chunk_text(
129
+ article_data['content'],
130
+ metadata={
131
+ 'url': article_data['url'],
132
+ 'title': article_data['title']
133
+ }
134
+ )
135
+ print(f"Created {len(chunks)} chunks")
136
+
137
+ # Show first chunk
138
+ if chunks:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
139
  print(f"First chunk: {chunks[0]['text'][:200]}...")