yiqing111's picture
Upload 7 files
8255e91 verified
import os
import glob
from typing import List, Dict
import fitz
import re
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
class PDFTextExtractor:
def __init__(self, input_dir: str, output_dir: str = None):
self.input_dir = input_dir
self.output_dir = output_dir or os.path.join(input_dir, "extracted_text")
# Ensure output directory exists
os.makedirs(self.output_dir, exist_ok=True)
def get_pdf_files(self) -> List[str]:
pdf_files = glob.glob(os.path.join(self.input_dir, "*.pdf"))
pdf_files.extend(glob.glob(os.path.join(self.input_dir, "*.PDF")))
print(f"Found {len(pdf_files)} PDF files in directory {self.input_dir}")
return pdf_files
def extract_text_from_pdf(self, pdf_path: str) -> Dict:
filename = os.path.basename(pdf_path)
result = {
"filename": filename,
"path": pdf_path,
"success": False,
"text": "",
"pages": 0,
"error": None
}
try:
doc = fitz.open(pdf_path)
result["pages"] = len(doc)
full_text = ""
for page_num in range(len(doc)):
page = doc.load_page(page_num)
# Use "text" mode to extract plain text, ignoring tables and images
page_text = page.get_text("text")
full_text += page_text + "\n\n" # Add line breaks to separate pages
# Clean the text
full_text = self.clean_text(full_text)
result["text"] = full_text
result["success"] = True
# Close the document
doc.close()
except Exception as e:
error_msg = f"Error extracting {filename}: {str(e)}"
print(error_msg)
result["error"] = error_msg
return result
def clean_text(self, text: str) -> str:
# Remove consecutive empty lines
text = re.sub(r'\n{3,}', '\n\n', text)
# Remove unprintable characters, but keep Chinese, English, numbers and basic punctuation
text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9.,!?;:()\'"οΌŒγ€‚οΌοΌŸγ€οΌ›οΌšγ€Šγ€‹γ€γ€‘γ€Œγ€\s]', '', text)
# Merge multiple spaces
text = re.sub(r'\s+', ' ', text)
# Fix spacing issues between Chinese and English
text = re.sub(r'([a-zA-Z])([\u4e00-\u9fa5])', r'\1 \2', text)
text = re.sub(r'([\u4e00-\u9fa5])([a-zA-Z])', r'\1 \2', text)
return text.strip()
def save_extracted_text(self, extraction_result: Dict) -> None:
"""Save the extracted text to a file"""
if not extraction_result["success"]:
return
# Create output filename based on original filename
base_name = os.path.splitext(extraction_result["filename"])[0]
output_path = os.path.join(self.output_dir, f"{base_name}.txt")
# Write to text file
with open(output_path, 'w', encoding='utf-8') as f:
f.write(extraction_result["text"])
print(f"Saved extracted text to {output_path}")
def process_single_pdf(self, pdf_path: str) -> Dict:
"""Process a single PDF file and save results"""
extraction_result = self.extract_text_from_pdf(pdf_path)
if extraction_result["success"]:
self.save_extracted_text(extraction_result)
print(f"Successfully processed {extraction_result['filename']} ({extraction_result['pages']} pages)")
else:
print(f"Failed to process {extraction_result['filename']}: {extraction_result['error']}")
return extraction_result
def extract_all_pdfs(self, max_workers: int = 4) -> List[Dict]:
pdf_files = self.get_pdf_files()
results = []
if not pdf_files:
print("No PDF files found")
return results
# Use thread pool for parallel processing
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Use tqdm to create a progress bar
for result in tqdm(executor.map(self.process_single_pdf, pdf_files),
total=len(pdf_files),
desc="Processing PDF files"):
results.append(result)
# Count successful and failed processes
success_count = sum(1 for r in results if r["success"])
fail_count = len(results) - success_count
print(f"PDF processing completed: {success_count} successful, {fail_count} failed")
return results
# Usage example
if __name__ == "__main__":
# Configure input and output directories
INPUT_DIR = "../data"
OUTPUT_DIR = "../data"
# Create extractor instance
extractor = PDFTextExtractor(INPUT_DIR, OUTPUT_DIR)
# Execute extraction
results = extractor.extract_all_pdfs(max_workers=4) # Use 4 threads for parallel processing
# Print summary
print(f"\nProcessed {len(results)} PDF files in total")
print(f"Successful: {sum(1 for r in results if r['success'])}")
print(f"Failed: {sum(1 for r in results if not r['success'])}")