Spaces:
No application file
No application file
File size: 5,371 Bytes
8255e91 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
import os
import glob
from typing import List, Dict
import fitz
import re
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
class PDFTextExtractor:
def __init__(self, input_dir: str, output_dir: str = None):
self.input_dir = input_dir
self.output_dir = output_dir or os.path.join(input_dir, "extracted_text")
# Ensure output directory exists
os.makedirs(self.output_dir, exist_ok=True)
def get_pdf_files(self) -> List[str]:
pdf_files = glob.glob(os.path.join(self.input_dir, "*.pdf"))
pdf_files.extend(glob.glob(os.path.join(self.input_dir, "*.PDF")))
print(f"Found {len(pdf_files)} PDF files in directory {self.input_dir}")
return pdf_files
def extract_text_from_pdf(self, pdf_path: str) -> Dict:
filename = os.path.basename(pdf_path)
result = {
"filename": filename,
"path": pdf_path,
"success": False,
"text": "",
"pages": 0,
"error": None
}
try:
doc = fitz.open(pdf_path)
result["pages"] = len(doc)
full_text = ""
for page_num in range(len(doc)):
page = doc.load_page(page_num)
# Use "text" mode to extract plain text, ignoring tables and images
page_text = page.get_text("text")
full_text += page_text + "\n\n" # Add line breaks to separate pages
# Clean the text
full_text = self.clean_text(full_text)
result["text"] = full_text
result["success"] = True
# Close the document
doc.close()
except Exception as e:
error_msg = f"Error extracting {filename}: {str(e)}"
print(error_msg)
result["error"] = error_msg
return result
def clean_text(self, text: str) -> str:
# Remove consecutive empty lines
text = re.sub(r'\n{3,}', '\n\n', text)
# Remove unprintable characters, but keep Chinese, English, numbers and basic punctuation
text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9.,!?;:()\'",。!?、;:《》【】「」\s]', '', text)
# Merge multiple spaces
text = re.sub(r'\s+', ' ', text)
# Fix spacing issues between Chinese and English
text = re.sub(r'([a-zA-Z])([\u4e00-\u9fa5])', r'\1 \2', text)
text = re.sub(r'([\u4e00-\u9fa5])([a-zA-Z])', r'\1 \2', text)
return text.strip()
def save_extracted_text(self, extraction_result: Dict) -> None:
"""Save the extracted text to a file"""
if not extraction_result["success"]:
return
# Create output filename based on original filename
base_name = os.path.splitext(extraction_result["filename"])[0]
output_path = os.path.join(self.output_dir, f"{base_name}.txt")
# Write to text file
with open(output_path, 'w', encoding='utf-8') as f:
f.write(extraction_result["text"])
print(f"Saved extracted text to {output_path}")
def process_single_pdf(self, pdf_path: str) -> Dict:
"""Process a single PDF file and save results"""
extraction_result = self.extract_text_from_pdf(pdf_path)
if extraction_result["success"]:
self.save_extracted_text(extraction_result)
print(f"Successfully processed {extraction_result['filename']} ({extraction_result['pages']} pages)")
else:
print(f"Failed to process {extraction_result['filename']}: {extraction_result['error']}")
return extraction_result
def extract_all_pdfs(self, max_workers: int = 4) -> List[Dict]:
pdf_files = self.get_pdf_files()
results = []
if not pdf_files:
print("No PDF files found")
return results
# Use thread pool for parallel processing
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Use tqdm to create a progress bar
for result in tqdm(executor.map(self.process_single_pdf, pdf_files),
total=len(pdf_files),
desc="Processing PDF files"):
results.append(result)
# Count successful and failed processes
success_count = sum(1 for r in results if r["success"])
fail_count = len(results) - success_count
print(f"PDF processing completed: {success_count} successful, {fail_count} failed")
return results
# Usage example
if __name__ == "__main__":
# Configure input and output directories
INPUT_DIR = "../data"
OUTPUT_DIR = "../data"
# Create extractor instance
extractor = PDFTextExtractor(INPUT_DIR, OUTPUT_DIR)
# Execute extraction
results = extractor.extract_all_pdfs(max_workers=4) # Use 4 threads for parallel processing
# Print summary
print(f"\nProcessed {len(results)} PDF files in total")
print(f"Successful: {sum(1 for r in results if r['success'])}")
print(f"Failed: {sum(1 for r in results if not r['success'])}") |