Ali Abdullah commited on
Commit
56b13cf
·
verified ·
1 Parent(s): a751990

Upload web_scraper.py

Browse files
Files changed (1) hide show
  1. web_scraper.py +226 -0
web_scraper.py ADDED
@@ -0,0 +1,226 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ from bs4 import BeautifulSoup
3
+ import re
4
+ from typing import List, Dict
5
+ from urllib.parse import urljoin, urlparse
6
+ import time
7
+ import nltk
8
+ from nltk.tokenize import sent_tokenize
9
+
10
+ # Download required NLTK data
11
+ try:
12
+ nltk.data.find('tokenizers/punkt')
13
+ except LookupError:
14
+ nltk.download('punkt')
15
+
16
+ class WebScraper:
17
+ def __init__(self, delay: float = 1.0):
18
+ """
19
+ Initialize web scraper
20
+ Args:
21
+ delay: Delay between requests to be respectful to servers
22
+ """
23
+ self.delay = delay
24
+ self.session = requests.Session()
25
+ self.session.headers.update({
26
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
27
+ })
28
+
29
+ def scrape_article(self, url: str) -> Dict[str, str]:
30
+ """
31
+ Scrape article content from a URL
32
+ Args:
33
+ url: URL to scrape
34
+ Returns:
35
+ Dictionary with title, content, and metadata
36
+ """
37
+ try:
38
+ print(f"Scraping: {url}")
39
+ response = self.session.get(url, timeout=10)
40
+ response.raise_for_status()
41
+
42
+ soup = BeautifulSoup(response.content, 'html.parser')
43
+
44
+ # Extract title
45
+ title = self._extract_title(soup)
46
+
47
+ # Extract main content
48
+ content = self._extract_content(soup)
49
+
50
+ # Clean and process content
51
+ cleaned_content = self._clean_text(content)
52
+
53
+ time.sleep(self.delay) # Be respectful to the server
54
+
55
+ return {
56
+ 'url': url,
57
+ 'title': title,
58
+ 'content': cleaned_content,
59
+ 'word_count': len(cleaned_content.split()),
60
+ 'char_count': len(cleaned_content)
61
+ }
62
+
63
+ except Exception as e:
64
+ print(f"Error scraping {url}: {str(e)}")
65
+ return {
66
+ 'url': url,
67
+ 'title': '',
68
+ 'content': '',
69
+ 'error': str(e),
70
+ 'word_count': 0,
71
+ 'char_count': 0
72
+ }
73
+
74
+ def _extract_title(self, soup: BeautifulSoup) -> str:
75
+ """Extract title from HTML"""
76
+ # Try different title selectors
77
+ title_selectors = [
78
+ 'h1',
79
+ 'title',
80
+ '.title',
81
+ '.article-title',
82
+ '[data-testid="headline"]'
83
+ ]
84
+
85
+ for selector in title_selectors:
86
+ element = soup.select_one(selector)
87
+ if element and element.get_text().strip():
88
+ return element.get_text().strip()
89
+
90
+ return "No title found"
91
+
92
+ def _extract_content(self, soup: BeautifulSoup) -> str:
93
+ """Extract main content from HTML"""
94
+ # Remove unwanted elements
95
+ for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'ad']):
96
+ element.decompose()
97
+
98
+ # Try different content selectors in order of preference
99
+ content_selectors = [
100
+ 'article',
101
+ '.article-content',
102
+ '.post-content',
103
+ '.entry-content',
104
+ '.content',
105
+ 'main',
106
+ '.main-content',
107
+ '[role="main"]'
108
+ ]
109
+
110
+ for selector in content_selectors:
111
+ element = soup.select_one(selector)
112
+ if element:
113
+ return element.get_text()
114
+
115
+ # Fallback: get all paragraph text
116
+ paragraphs = soup.find_all('p')
117
+ return '\n'.join([p.get_text() for p in paragraphs])
118
+
119
+ def _clean_text(self, text: str) -> str:
120
+ """Clean and normalize text"""
121
+ # Remove extra whitespace
122
+ text = re.sub(r'\s+', ' ', text)
123
+
124
+ # Remove special characters but keep basic punctuation
125
+ text = re.sub(r'[^\w\s.,!?;:()\-"]', '', text)
126
+
127
+ # Remove very short lines (likely navigation/ads)
128
+ lines = text.split('\n')
129
+ meaningful_lines = [line.strip() for line in lines if len(line.strip()) > 20]
130
+
131
+ return ' '.join(meaningful_lines).strip()
132
+
133
+ class TextChunker:
134
+ def __init__(self, chunk_size: int = 500, overlap: int = 50):
135
+ """
136
+ Initialize text chunker
137
+ Args:
138
+ chunk_size: Maximum tokens per chunk
139
+ overlap: Overlap between chunks
140
+ """
141
+ self.chunk_size = chunk_size
142
+ self.overlap = overlap
143
+
144
+ def chunk_text(self, text: str, metadata: Dict = None) -> List[Dict]:
145
+ """
146
+ Split text into overlapping chunks
147
+ Args:
148
+ text: Text to chunk
149
+ metadata: Additional metadata to include
150
+ Returns:
151
+ List of chunk dictionaries
152
+ """
153
+ if not text.strip():
154
+ return []
155
+
156
+ # Use sentence tokenization for better chunk boundaries
157
+ sentences = sent_tokenize(text)
158
+ chunks = []
159
+ current_chunk = []
160
+ current_length = 0
161
+
162
+ for sentence in sentences:
163
+ sentence_length = len(sentence.split())
164
+
165
+ # If adding this sentence would exceed chunk size, create a new chunk
166
+ if current_length + sentence_length > self.chunk_size and current_chunk:
167
+ chunk_text = ' '.join(current_chunk)
168
+ chunks.append(self._create_chunk_dict(chunk_text, metadata, len(chunks)))
169
+
170
+ # Start new chunk with overlap
171
+ overlap_sentences = current_chunk[-self.overlap//10:] if len(current_chunk) >= self.overlap//10 else current_chunk
172
+ current_chunk = overlap_sentences + [sentence]
173
+ current_length = sum(len(s.split()) for s in current_chunk)
174
+ else:
175
+ current_chunk.append(sentence)
176
+ current_length += sentence_length
177
+
178
+ # Add the last chunk
179
+ if current_chunk:
180
+ chunk_text = ' '.join(current_chunk)
181
+ chunks.append(self._create_chunk_dict(chunk_text, metadata, len(chunks)))
182
+
183
+ return chunks
184
+
185
+ def _create_chunk_dict(self, text: str, metadata: Dict, chunk_id: int) -> Dict:
186
+ """Create a chunk dictionary with metadata"""
187
+ chunk_dict = {
188
+ 'chunk_id': chunk_id,
189
+ 'text': text,
190
+ 'word_count': len(text.split()),
191
+ 'char_count': len(text)
192
+ }
193
+
194
+ if metadata:
195
+ chunk_dict.update(metadata)
196
+
197
+ return chunk_dict
198
+
199
+ # Example usage
200
+ if __name__ == "__main__":
201
+ # Test the scraper
202
+ scraper = WebScraper()
203
+ chunker = TextChunker()
204
+
205
+ # Test URL (replace with your target URL)
206
+ test_url = "https://medium.com/@aminajavaid30/building-a-rag-system-the-data-ingestion-pipeline-d04235fd17ea"
207
+
208
+ # Scrape content
209
+ article_data = scraper.scrape_article(test_url)
210
+ print(f"Title: {article_data['title']}")
211
+ print(f"Content length: {article_data['word_count']} words")
212
+
213
+ # Create chunks
214
+ if article_data['content']:
215
+ chunks = chunker.chunk_text(
216
+ article_data['content'],
217
+ metadata={
218
+ 'url': article_data['url'],
219
+ 'title': article_data['title']
220
+ }
221
+ )
222
+ print(f"Created {len(chunks)} chunks")
223
+
224
+ # Show first chunk
225
+ if chunks:
226
+ print(f"First chunk: {chunks[0]['text'][:200]}...")