vibertron commited on
Commit
52ba620
·
verified ·
1 Parent(s): 8b83d9f

Update data_processor.py

Browse files
Files changed (1) hide show
  1. data_processor.py +381 -379
data_processor.py CHANGED
@@ -1,379 +1,381 @@
1
- # -------------------------------------------------------------
2
- # Data Processing Module for Financial QA System
3
- # Handles PDF extraction, text cleaning, and Q&A pair generation
4
- # -------------------------------------------------------------
5
-
6
- # -------------------
7
- # Importing libraries
8
- # -------------------
9
- import os
10
- import re
11
- import json
12
- import nltk
13
- import PyPDF2
14
- import logging
15
- import pdfplumber
16
- import pandas as pd
17
- from pathlib import Path
18
- from nltk.corpus import stopwords
19
- from typing import List, Dict, Tuple, Optional
20
- from nltk.tokenize import sent_tokenize, word_tokenize
21
- logging.basicConfig(level=logging.INFO)
22
- logger = logging.getLogger(__name__)
23
-
24
- # ---------------------------
25
- # Download required NLTK data
26
- # ---------------------------
27
- try:
28
- nltk.download('punkt', quiet=True)
29
- nltk.download('stopwords', quiet=True)
30
- nltk.download('averaged_perceptron_tagger', quiet=True)
31
- except:
32
- pass
33
-
34
- class FinancialDataProcessor:
35
- """Processes financial documents and generates Q&A pairs"""
36
-
37
- def __init__(self, data_dir: str = "financial_statements"):
38
- self.data_dir = Path(data_dir)
39
- self.processed_texts = {}
40
- self.qa_pairs = []
41
- self.stop_words = set(stopwords.words('english'))
42
-
43
- def extract_text_from_pdf(self, pdf_path: Path) -> str:
44
- """Extract text from PDF using multiple methods"""
45
- text = ""
46
-
47
- # ------------------------------------------------------
48
- # Try pdfplumber first (better for structured documents)
49
- # ------------------------------------------------------
50
- try:
51
- with pdfplumber.open(pdf_path) as pdf:
52
- for page in pdf.pages:
53
- page_text = page.extract_text()
54
- if page_text:
55
- text += page_text + "\n"
56
- logger.info(f"Successfully extracted text using pdfplumber from {pdf_path.name}")
57
- except Exception as e:
58
- logger.warning(f"pdfplumber failed for {pdf_path.name}: {e}")
59
-
60
- # ------------------
61
- # Fallback to PyPDF2
62
- # ------------------
63
- try:
64
- with open(pdf_path, 'rb') as file:
65
- pdf_reader = PyPDF2.PdfReader(file)
66
- for page in pdf_reader.pages:
67
- page_text = page.extract_text()
68
- if page_text:
69
- text += page_text + "\n"
70
- logger.info(f"Successfully extracted text using PyPDF2 from {pdf_path.name}")
71
- except Exception as e2:
72
- logger.error(f"Both PDF extraction methods failed for {pdf_path.name}: {e2}")
73
- return ""
74
-
75
- return text
76
-
77
- def clean_text(self, text: str) -> str:
78
- """Clean extracted text by removing noise and formatting"""
79
-
80
- # -------------------------------------
81
- # Remove extra whitespace and normalize
82
- # -------------------------------------
83
- text = re.sub(r'\s+', ' ', text)
84
- text = re.sub(r'[^\w\s\.\,\$\-\(\)\%]', '', text)
85
-
86
- # -------------------------------
87
- # Remove page numbers and headers
88
- # -------------------------------
89
- text = re.sub(r'Page \d+ of \d+', '', text)
90
- text = re.sub(r'^\d+\s*$', '', text, flags=re.MULTILINE)
91
-
92
- # ------------------------------------------
93
- # Remove common financial document artifacts
94
- # ------------------------------------------
95
- text = re.sub(r'CONSOLIDATED|FINANCIAL STATEMENTS|QUARTER ENDED|YEAR ENDED', '', text, flags=re.IGNORECASE)
96
-
97
- return text.strip()
98
-
99
- def segment_financial_sections(self, text: str) -> Dict[str, str]:
100
- """Segment text into logical financial sections"""
101
- sections = {
102
- 'income_statement': '',
103
- 'balance_sheet': '',
104
- 'cash_flow': '',
105
- 'notes': ''
106
- }
107
-
108
- # ---------------------------------
109
- # Simple keyword-based segmentation
110
- # ---------------------------------
111
- lines = text.split('\n')
112
- current_section = 'notes'
113
-
114
- for line in lines:
115
- line_lower = line.lower()
116
-
117
- if any(keyword in line_lower for keyword in ['revenue', 'income', 'earnings', 'net income']):
118
- current_section = 'income_statement'
119
- elif any(keyword in line_lower for keyword in ['assets', 'liabilities', 'equity', 'total assets']):
120
- current_section = 'balance_sheet'
121
- elif any(keyword in line_lower for keyword in ['cash flow', 'operating activities', 'investing activities']):
122
- current_section = 'cash_flow'
123
-
124
- sections[current_section] += line + '\n'
125
-
126
- return sections
127
-
128
- def extract_financial_metrics(self, text: str) -> Dict[str, str]:
129
- """Extract key financial metrics from text"""
130
- metrics = {}
131
-
132
- # ----------------
133
- # Revenue patterns
134
- # ----------------
135
- revenue_patterns = [
136
- r'revenue.*?(\$[\d,]+\.?\d*)\s*(billion|million|thousand)?',
137
- r'total revenue.*?(\$[\d,]+\.?\d*)\s*(billion|million|thousand)?',
138
- r'net revenue.*?(\$[\d,]+\.?\d*)\s*(billion|million|thousand)?'
139
- ]
140
-
141
- for pattern in revenue_patterns:
142
- matches = re.findall(pattern, text, re.IGNORECASE)
143
- if matches:
144
- metrics['revenue'] = matches[0][0] + ' ' + (matches[0][1] or '')
145
- break
146
-
147
- # -------------------
148
- # Net income patterns
149
- # -------------------
150
- net_income_patterns = [
151
- r'net income.*?(\$[\d,]+\.?\d*)\s*(billion|million|thousand)?',
152
- r'net earnings.*?(\$[\d,]+\.?\d*)\s*(billion|million|thousand)?'
153
- ]
154
-
155
- for pattern in net_income_patterns:
156
- matches = re.findall(pattern, text, re.IGNORECASE)
157
- if matches:
158
- metrics['net_income'] = matches[0][0] + ' ' + (matches[0][1] or '')
159
- break
160
-
161
- # ---------------
162
- # Assets patterns
163
- # ---------------
164
- assets_patterns = [
165
- r'total assets.*?(\$[\d,]+\.?\d*)\s*(billion|million|thousand)?',
166
- r'assets.*?(\$[\d,]+\.?\d*)\s*(billion|million|thousand)?'
167
- ]
168
-
169
- for pattern in assets_patterns:
170
- matches = re.findall(pattern, text, re.IGNORECASE)
171
- if matches:
172
- metrics['total_assets'] = matches[0][0] + ' ' + (matches[0][1] or '')
173
- break
174
-
175
- return metrics
176
-
177
- def generate_qa_pairs(self, processed_texts: Dict[str, str]) -> List[Dict[str, str]]:
178
- """Generate Q&A pairs based on extracted financial data"""
179
- qa_pairs = []
180
-
181
- # ----------------------------------
182
- # Extract metrics from all documents
183
- # ----------------------------------
184
- all_metrics = {}
185
- for doc_name, text in processed_texts.items():
186
- metrics = self.extract_financial_metrics(text)
187
- all_metrics[doc_name] = metrics
188
-
189
- # ------------------------------
190
- # Generate Q&A pairs for revenue
191
- # ------------------------------
192
- for doc_name, metrics in all_metrics.items():
193
- if 'revenue' in metrics:
194
- year = doc_name.split('_')[0] if '_' in doc_name else '2024'
195
- qa_pairs.append({
196
- 'question': f'What was the company\'s revenue in {year}?',
197
- 'answer': f'The company\'s revenue in {year} was {metrics["revenue"]}.',
198
- 'source': doc_name,
199
- 'category': 'revenue'
200
- })
201
-
202
- # ---------------------------------
203
- # Generate Q&A pairs for net income
204
- # ---------------------------------
205
- for doc_name, metrics in all_metrics.items():
206
- if 'net_income' in metrics:
207
- year = doc_name.split('_')[0] if '_' in doc_name else '2024'
208
- qa_pairs.append({
209
- 'question': f'What was the company\'s net income in {year}?',
210
- 'answer': f'The company\'s net income in {year} was {metrics["net_income"]}.',
211
- 'source': doc_name,
212
- 'category': 'net_income'
213
- })
214
-
215
- # -----------------------------------
216
- # Generate Q&A pairs for total assets
217
- # -----------------------------------
218
- for doc_name, metrics in all_metrics.items():
219
- if 'total_assets' in metrics:
220
- year = doc_name.split('_')[0] if '_' in doc_name else '2024'
221
- qa_pairs.append({
222
- 'question': f'What were the company\'s total assets in {year}?',
223
- 'answer': f'The company\'s total assets in {year} were {metrics["total_assets"]}.',
224
- 'source': doc_name,
225
- 'category': 'total_assets'
226
- })
227
-
228
- # ------------------------------------
229
- # Add some general financial questions
230
- # ------------------------------------
231
- qa_pairs.extend([
232
- {
233
- 'question': 'What type of company is this?',
234
- 'answer': 'This is Apple Inc., a technology company that designs, manufactures, and markets smartphones, personal computers, tablets, wearables, and accessories.',
235
- 'source': 'general',
236
- 'category': 'company_info'
237
- },
238
- {
239
- 'question': 'What are the main business segments?',
240
- 'answer': 'Apple\'s main business segments include iPhone, Mac, iPad, Wearables, Home and Accessories, and Services.',
241
- 'source': 'general',
242
- 'category': 'business_segments'
243
- }
244
- ])
245
-
246
- return qa_pairs
247
-
248
- def process_all_documents(self) -> Tuple[Dict[str, str], List[Dict[str, str]]]:
249
- """Process all financial documents and generate Q&A pairs"""
250
- logger.info("Starting document processing...")
251
-
252
- # ---------------------
253
- # Process each PDF file
254
- # ---------------------
255
- for pdf_file in self.data_dir.glob("*.pdf"):
256
- logger.info(f"Processing {pdf_file.name}...")
257
-
258
- # ------------
259
- # Extract text
260
- # ------------
261
- raw_text = self.extract_text_from_pdf(pdf_file)
262
- if not raw_text:
263
- continue
264
-
265
- # ----------
266
- # Clean text
267
- # ----------
268
- cleaned_text = self.clean_text(raw_text)
269
-
270
- # --------------------
271
- # Store processed text
272
- # --------------------
273
- doc_name = pdf_file.stem
274
- self.processed_texts[doc_name] = cleaned_text
275
-
276
- logger.info(f"Successfully processed {pdf_file.name}")
277
-
278
- # ------------------
279
- # Generate Q&A pairs
280
- # ------------------
281
- dynamic_qa_pairs = self.generate_qa_pairs(self.processed_texts)
282
-
283
- # ----------------------------------
284
- # Load static Q&A pairs if available
285
- # ----------------------------------
286
- static_qa_pairs = []
287
- static_qa_path = "processed_data/qa_pairs_static.json"
288
- try:
289
- with open(static_qa_path, 'r', encoding='utf-8') as f:
290
- static_qa_pairs = json.load(f)
291
- logger.info(f"Loaded {len(static_qa_pairs)} static Q&A pairs from {static_qa_path}")
292
- except Exception as e:
293
- logger.warning(f"Could not load static Q&A pairs: {e}")
294
-
295
- # --------------------------------
296
- # Concatenate static + dynamic Q&A
297
- # --------------------------------
298
- self.qa_pairs = static_qa_pairs + dynamic_qa_pairs
299
- logger.info(f"Generated {len(self.qa_pairs)} Q&A pairs")
300
-
301
- return self.processed_texts, self.qa_pairs
302
-
303
- def save_processed_data(self, output_dir: str = "processed_data"):
304
- """Save processed texts and Q&A pairs"""
305
- output_path = Path(output_dir)
306
- output_path.mkdir(exist_ok=True)
307
-
308
- # --------------------
309
- # Save processed texts
310
- # --------------------
311
- for doc_name, text in self.processed_texts.items():
312
- with open(output_path / f"{doc_name}_processed.txt", 'w', encoding='utf-8') as f:
313
- f.write(text)
314
-
315
- # --------------
316
- # Save Q&A pairs
317
- # --------------
318
- with open(output_path / "qa_pairs.json", 'w', encoding='utf-8') as f:
319
- json.dump(self.qa_pairs, f, indent=2)
320
-
321
- # ---------------------------
322
- # Save as CSV for fine-tuning
323
- # ---------------------------
324
- qa_df = pd.DataFrame(self.qa_pairs)
325
- qa_df.to_csv(output_path / "qa_pairs.csv", index=False)
326
-
327
- logger.info(f"Saved processed data to {output_dir}")
328
-
329
- def get_text_chunks(self, chunk_size: int = 400, overlap: int = 50) -> List[Dict[str, str]]:
330
- """Split processed texts into chunks for RAG"""
331
- chunks = []
332
-
333
- for doc_name, text in self.processed_texts.items():
334
- sentences = sent_tokenize(text)
335
- current_chunk = ""
336
- chunk_id = 0
337
-
338
- for sentence in sentences:
339
- if len(current_chunk.split()) + len(sentence.split()) <= chunk_size:
340
- current_chunk += sentence + " "
341
- else:
342
- if current_chunk.strip():
343
- chunks.append({
344
- 'id': f"{doc_name}_chunk_{chunk_id}",
345
- 'text': current_chunk.strip(),
346
- 'source': doc_name,
347
- 'chunk_size': len(current_chunk.split())
348
- })
349
- chunk_id += 1
350
- current_chunk = sentence + " "
351
-
352
- # ------------------
353
- # Add the last chunk
354
- # ------------------
355
- if current_chunk.strip():
356
- chunks.append({
357
- 'id': f"{doc_name}_chunk_{chunk_id}",
358
- 'text': current_chunk.strip(),
359
- 'source': doc_name,
360
- 'chunk_size': len(current_chunk.split())
361
- })
362
-
363
- return chunks
364
-
365
- if __name__ == "__main__":
366
-
367
- # -----------------------
368
- # Test the data processor
369
- # -----------------------
370
- processor = FinancialDataProcessor()
371
- processed_texts, qa_pairs = processor.process_all_documents()
372
- processor.save_processed_data()
373
-
374
- # ---------------
375
- # Generate chunks
376
- # ---------------
377
- chunks = processor.get_text_chunks()
378
- print(f"Generated {len(chunks)} text chunks")
379
- print(f"Generated {len(qa_pairs)} Q&A pairs")
 
 
 
1
+ # -------------------------------------------------------------
2
+ # Data Processing Module for Financial QA System
3
+ # Handles PDF extraction, text cleaning, and Q&A pair generation
4
+ # -------------------------------------------------------------
5
+
6
+ # -------------------
7
+ # Importing libraries
8
+ # -------------------
9
+ import os
10
+ import re
11
+ import json
12
+ import nltk
13
+ import PyPDF2
14
+ import logging
15
+ import pdfplumber
16
+ import pandas as pd
17
+ from pathlib import Path
18
+ from nltk.corpus import stopwords
19
+ from typing import List, Dict, Tuple, Optional
20
+ from nltk.tokenize import sent_tokenize, word_tokenize
21
+ logging.basicConfig(level=logging.INFO)
22
+ logger = logging.getLogger(__name__)
23
+
24
+ # ---------------------------
25
+ # Download required NLTK data
26
+ # ---------------------------
27
+ """
28
+ try:
29
+ nltk.download('punkt', quiet=True)
30
+ nltk.download('stopwords', quiet=True)
31
+ nltk.download('averaged_perceptron_tagger', quiet=True)
32
+ except:
33
+ pass
34
+ """
35
+
36
+ class FinancialDataProcessor:
37
+ """Processes financial documents and generates Q&A pairs"""
38
+
39
+ def __init__(self, data_dir: str = "financial_statements"):
40
+ self.data_dir = Path(data_dir)
41
+ self.processed_texts = {}
42
+ self.qa_pairs = []
43
+ self.stop_words = set(stopwords.words('english'))
44
+
45
+ def extract_text_from_pdf(self, pdf_path: Path) -> str:
46
+ """Extract text from PDF using multiple methods"""
47
+ text = ""
48
+
49
+ # ------------------------------------------------------
50
+ # Try pdfplumber first (better for structured documents)
51
+ # ------------------------------------------------------
52
+ try:
53
+ with pdfplumber.open(pdf_path) as pdf:
54
+ for page in pdf.pages:
55
+ page_text = page.extract_text()
56
+ if page_text:
57
+ text += page_text + "\n"
58
+ logger.info(f"Successfully extracted text using pdfplumber from {pdf_path.name}")
59
+ except Exception as e:
60
+ logger.warning(f"pdfplumber failed for {pdf_path.name}: {e}")
61
+
62
+ # ------------------
63
+ # Fallback to PyPDF2
64
+ # ------------------
65
+ try:
66
+ with open(pdf_path, 'rb') as file:
67
+ pdf_reader = PyPDF2.PdfReader(file)
68
+ for page in pdf_reader.pages:
69
+ page_text = page.extract_text()
70
+ if page_text:
71
+ text += page_text + "\n"
72
+ logger.info(f"Successfully extracted text using PyPDF2 from {pdf_path.name}")
73
+ except Exception as e2:
74
+ logger.error(f"Both PDF extraction methods failed for {pdf_path.name}: {e2}")
75
+ return ""
76
+
77
+ return text
78
+
79
+ def clean_text(self, text: str) -> str:
80
+ """Clean extracted text by removing noise and formatting"""
81
+
82
+ # -------------------------------------
83
+ # Remove extra whitespace and normalize
84
+ # -------------------------------------
85
+ text = re.sub(r'\s+', ' ', text)
86
+ text = re.sub(r'[^\w\s\.\,\$\-\(\)\%]', '', text)
87
+
88
+ # -------------------------------
89
+ # Remove page numbers and headers
90
+ # -------------------------------
91
+ text = re.sub(r'Page \d+ of \d+', '', text)
92
+ text = re.sub(r'^\d+\s*$', '', text, flags=re.MULTILINE)
93
+
94
+ # ------------------------------------------
95
+ # Remove common financial document artifacts
96
+ # ------------------------------------------
97
+ text = re.sub(r'CONSOLIDATED|FINANCIAL STATEMENTS|QUARTER ENDED|YEAR ENDED', '', text, flags=re.IGNORECASE)
98
+
99
+ return text.strip()
100
+
101
+ def segment_financial_sections(self, text: str) -> Dict[str, str]:
102
+ """Segment text into logical financial sections"""
103
+ sections = {
104
+ 'income_statement': '',
105
+ 'balance_sheet': '',
106
+ 'cash_flow': '',
107
+ 'notes': ''
108
+ }
109
+
110
+ # ---------------------------------
111
+ # Simple keyword-based segmentation
112
+ # ---------------------------------
113
+ lines = text.split('\n')
114
+ current_section = 'notes'
115
+
116
+ for line in lines:
117
+ line_lower = line.lower()
118
+
119
+ if any(keyword in line_lower for keyword in ['revenue', 'income', 'earnings', 'net income']):
120
+ current_section = 'income_statement'
121
+ elif any(keyword in line_lower for keyword in ['assets', 'liabilities', 'equity', 'total assets']):
122
+ current_section = 'balance_sheet'
123
+ elif any(keyword in line_lower for keyword in ['cash flow', 'operating activities', 'investing activities']):
124
+ current_section = 'cash_flow'
125
+
126
+ sections[current_section] += line + '\n'
127
+
128
+ return sections
129
+
130
+ def extract_financial_metrics(self, text: str) -> Dict[str, str]:
131
+ """Extract key financial metrics from text"""
132
+ metrics = {}
133
+
134
+ # ----------------
135
+ # Revenue patterns
136
+ # ----------------
137
+ revenue_patterns = [
138
+ r'revenue.*?(\$[\d,]+\.?\d*)\s*(billion|million|thousand)?',
139
+ r'total revenue.*?(\$[\d,]+\.?\d*)\s*(billion|million|thousand)?',
140
+ r'net revenue.*?(\$[\d,]+\.?\d*)\s*(billion|million|thousand)?'
141
+ ]
142
+
143
+ for pattern in revenue_patterns:
144
+ matches = re.findall(pattern, text, re.IGNORECASE)
145
+ if matches:
146
+ metrics['revenue'] = matches[0][0] + ' ' + (matches[0][1] or '')
147
+ break
148
+
149
+ # -------------------
150
+ # Net income patterns
151
+ # -------------------
152
+ net_income_patterns = [
153
+ r'net income.*?(\$[\d,]+\.?\d*)\s*(billion|million|thousand)?',
154
+ r'net earnings.*?(\$[\d,]+\.?\d*)\s*(billion|million|thousand)?'
155
+ ]
156
+
157
+ for pattern in net_income_patterns:
158
+ matches = re.findall(pattern, text, re.IGNORECASE)
159
+ if matches:
160
+ metrics['net_income'] = matches[0][0] + ' ' + (matches[0][1] or '')
161
+ break
162
+
163
+ # ---------------
164
+ # Assets patterns
165
+ # ---------------
166
+ assets_patterns = [
167
+ r'total assets.*?(\$[\d,]+\.?\d*)\s*(billion|million|thousand)?',
168
+ r'assets.*?(\$[\d,]+\.?\d*)\s*(billion|million|thousand)?'
169
+ ]
170
+
171
+ for pattern in assets_patterns:
172
+ matches = re.findall(pattern, text, re.IGNORECASE)
173
+ if matches:
174
+ metrics['total_assets'] = matches[0][0] + ' ' + (matches[0][1] or '')
175
+ break
176
+
177
+ return metrics
178
+
179
+ def generate_qa_pairs(self, processed_texts: Dict[str, str]) -> List[Dict[str, str]]:
180
+ """Generate Q&A pairs based on extracted financial data"""
181
+ qa_pairs = []
182
+
183
+ # ----------------------------------
184
+ # Extract metrics from all documents
185
+ # ----------------------------------
186
+ all_metrics = {}
187
+ for doc_name, text in processed_texts.items():
188
+ metrics = self.extract_financial_metrics(text)
189
+ all_metrics[doc_name] = metrics
190
+
191
+ # ------------------------------
192
+ # Generate Q&A pairs for revenue
193
+ # ------------------------------
194
+ for doc_name, metrics in all_metrics.items():
195
+ if 'revenue' in metrics:
196
+ year = doc_name.split('_')[0] if '_' in doc_name else '2024'
197
+ qa_pairs.append({
198
+ 'question': f'What was the company\'s revenue in {year}?',
199
+ 'answer': f'The company\'s revenue in {year} was {metrics["revenue"]}.',
200
+ 'source': doc_name,
201
+ 'category': 'revenue'
202
+ })
203
+
204
+ # ---------------------------------
205
+ # Generate Q&A pairs for net income
206
+ # ---------------------------------
207
+ for doc_name, metrics in all_metrics.items():
208
+ if 'net_income' in metrics:
209
+ year = doc_name.split('_')[0] if '_' in doc_name else '2024'
210
+ qa_pairs.append({
211
+ 'question': f'What was the company\'s net income in {year}?',
212
+ 'answer': f'The company\'s net income in {year} was {metrics["net_income"]}.',
213
+ 'source': doc_name,
214
+ 'category': 'net_income'
215
+ })
216
+
217
+ # -----------------------------------
218
+ # Generate Q&A pairs for total assets
219
+ # -----------------------------------
220
+ for doc_name, metrics in all_metrics.items():
221
+ if 'total_assets' in metrics:
222
+ year = doc_name.split('_')[0] if '_' in doc_name else '2024'
223
+ qa_pairs.append({
224
+ 'question': f'What were the company\'s total assets in {year}?',
225
+ 'answer': f'The company\'s total assets in {year} were {metrics["total_assets"]}.',
226
+ 'source': doc_name,
227
+ 'category': 'total_assets'
228
+ })
229
+
230
+ # ------------------------------------
231
+ # Add some general financial questions
232
+ # ------------------------------------
233
+ qa_pairs.extend([
234
+ {
235
+ 'question': 'What type of company is this?',
236
+ 'answer': 'This is Apple Inc., a technology company that designs, manufactures, and markets smartphones, personal computers, tablets, wearables, and accessories.',
237
+ 'source': 'general',
238
+ 'category': 'company_info'
239
+ },
240
+ {
241
+ 'question': 'What are the main business segments?',
242
+ 'answer': 'Apple\'s main business segments include iPhone, Mac, iPad, Wearables, Home and Accessories, and Services.',
243
+ 'source': 'general',
244
+ 'category': 'business_segments'
245
+ }
246
+ ])
247
+
248
+ return qa_pairs
249
+
250
+ def process_all_documents(self) -> Tuple[Dict[str, str], List[Dict[str, str]]]:
251
+ """Process all financial documents and generate Q&A pairs"""
252
+ logger.info("Starting document processing...")
253
+
254
+ # ---------------------
255
+ # Process each PDF file
256
+ # ---------------------
257
+ for pdf_file in self.data_dir.glob("*.pdf"):
258
+ logger.info(f"Processing {pdf_file.name}...")
259
+
260
+ # ------------
261
+ # Extract text
262
+ # ------------
263
+ raw_text = self.extract_text_from_pdf(pdf_file)
264
+ if not raw_text:
265
+ continue
266
+
267
+ # ----------
268
+ # Clean text
269
+ # ----------
270
+ cleaned_text = self.clean_text(raw_text)
271
+
272
+ # --------------------
273
+ # Store processed text
274
+ # --------------------
275
+ doc_name = pdf_file.stem
276
+ self.processed_texts[doc_name] = cleaned_text
277
+
278
+ logger.info(f"Successfully processed {pdf_file.name}")
279
+
280
+ # ------------------
281
+ # Generate Q&A pairs
282
+ # ------------------
283
+ dynamic_qa_pairs = self.generate_qa_pairs(self.processed_texts)
284
+
285
+ # ----------------------------------
286
+ # Load static Q&A pairs if available
287
+ # ----------------------------------
288
+ static_qa_pairs = []
289
+ static_qa_path = "processed_data/qa_pairs_static.json"
290
+ try:
291
+ with open(static_qa_path, 'r', encoding='utf-8') as f:
292
+ static_qa_pairs = json.load(f)
293
+ logger.info(f"Loaded {len(static_qa_pairs)} static Q&A pairs from {static_qa_path}")
294
+ except Exception as e:
295
+ logger.warning(f"Could not load static Q&A pairs: {e}")
296
+
297
+ # --------------------------------
298
+ # Concatenate static + dynamic Q&A
299
+ # --------------------------------
300
+ self.qa_pairs = static_qa_pairs + dynamic_qa_pairs
301
+ logger.info(f"Generated {len(self.qa_pairs)} Q&A pairs")
302
+
303
+ return self.processed_texts, self.qa_pairs
304
+
305
+ def save_processed_data(self, output_dir: str = "processed_data"):
306
+ """Save processed texts and Q&A pairs"""
307
+ output_path = Path(output_dir)
308
+ output_path.mkdir(exist_ok=True)
309
+
310
+ # --------------------
311
+ # Save processed texts
312
+ # --------------------
313
+ for doc_name, text in self.processed_texts.items():
314
+ with open(output_path / f"{doc_name}_processed.txt", 'w', encoding='utf-8') as f:
315
+ f.write(text)
316
+
317
+ # --------------
318
+ # Save Q&A pairs
319
+ # --------------
320
+ with open(output_path / "qa_pairs.json", 'w', encoding='utf-8') as f:
321
+ json.dump(self.qa_pairs, f, indent=2)
322
+
323
+ # ---------------------------
324
+ # Save as CSV for fine-tuning
325
+ # ---------------------------
326
+ qa_df = pd.DataFrame(self.qa_pairs)
327
+ qa_df.to_csv(output_path / "qa_pairs.csv", index=False)
328
+
329
+ logger.info(f"Saved processed data to {output_dir}")
330
+
331
+ def get_text_chunks(self, chunk_size: int = 400, overlap: int = 50) -> List[Dict[str, str]]:
332
+ """Split processed texts into chunks for RAG"""
333
+ chunks = []
334
+
335
+ for doc_name, text in self.processed_texts.items():
336
+ sentences = sent_tokenize(text)
337
+ current_chunk = ""
338
+ chunk_id = 0
339
+
340
+ for sentence in sentences:
341
+ if len(current_chunk.split()) + len(sentence.split()) <= chunk_size:
342
+ current_chunk += sentence + " "
343
+ else:
344
+ if current_chunk.strip():
345
+ chunks.append({
346
+ 'id': f"{doc_name}_chunk_{chunk_id}",
347
+ 'text': current_chunk.strip(),
348
+ 'source': doc_name,
349
+ 'chunk_size': len(current_chunk.split())
350
+ })
351
+ chunk_id += 1
352
+ current_chunk = sentence + " "
353
+
354
+ # ------------------
355
+ # Add the last chunk
356
+ # ------------------
357
+ if current_chunk.strip():
358
+ chunks.append({
359
+ 'id': f"{doc_name}_chunk_{chunk_id}",
360
+ 'text': current_chunk.strip(),
361
+ 'source': doc_name,
362
+ 'chunk_size': len(current_chunk.split())
363
+ })
364
+
365
+ return chunks
366
+
367
+ if __name__ == "__main__":
368
+
369
+ # -----------------------
370
+ # Test the data processor
371
+ # -----------------------
372
+ processor = FinancialDataProcessor()
373
+ processed_texts, qa_pairs = processor.process_all_documents()
374
+ processor.save_processed_data()
375
+
376
+ # ---------------
377
+ # Generate chunks
378
+ # ---------------
379
+ chunks = processor.get_text_chunks()
380
+ print(f"Generated {len(chunks)} text chunks")
381
+ print(f"Generated {len(qa_pairs)} Q&A pairs")