Spaces:
No application file
No application file
| import os | |
| import glob | |
| from typing import List, Dict | |
| import fitz | |
| import re | |
| from concurrent.futures import ThreadPoolExecutor | |
| from tqdm import tqdm | |
| class PDFTextExtractor: | |
| def __init__(self, input_dir: str, output_dir: str = None): | |
| self.input_dir = input_dir | |
| self.output_dir = output_dir or os.path.join(input_dir, "extracted_text") | |
| # Ensure output directory exists | |
| os.makedirs(self.output_dir, exist_ok=True) | |
| def get_pdf_files(self) -> List[str]: | |
| pdf_files = glob.glob(os.path.join(self.input_dir, "*.pdf")) | |
| pdf_files.extend(glob.glob(os.path.join(self.input_dir, "*.PDF"))) | |
| print(f"Found {len(pdf_files)} PDF files in directory {self.input_dir}") | |
| return pdf_files | |
| def extract_text_from_pdf(self, pdf_path: str) -> Dict: | |
| filename = os.path.basename(pdf_path) | |
| result = { | |
| "filename": filename, | |
| "path": pdf_path, | |
| "success": False, | |
| "text": "", | |
| "pages": 0, | |
| "error": None | |
| } | |
| try: | |
| doc = fitz.open(pdf_path) | |
| result["pages"] = len(doc) | |
| full_text = "" | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| # Use "text" mode to extract plain text, ignoring tables and images | |
| page_text = page.get_text("text") | |
| full_text += page_text + "\n\n" # Add line breaks to separate pages | |
| # Clean the text | |
| full_text = self.clean_text(full_text) | |
| result["text"] = full_text | |
| result["success"] = True | |
| # Close the document | |
| doc.close() | |
| except Exception as e: | |
| error_msg = f"Error extracting {filename}: {str(e)}" | |
| print(error_msg) | |
| result["error"] = error_msg | |
| return result | |
| def clean_text(self, text: str) -> str: | |
| # Remove consecutive empty lines | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| # Remove unprintable characters, but keep Chinese, English, numbers and basic punctuation | |
| text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9.,!?;:()\'"οΌγοΌοΌγοΌοΌγγγγγγ\s]', '', text) | |
| # Merge multiple spaces | |
| text = re.sub(r'\s+', ' ', text) | |
| # Fix spacing issues between Chinese and English | |
| text = re.sub(r'([a-zA-Z])([\u4e00-\u9fa5])', r'\1 \2', text) | |
| text = re.sub(r'([\u4e00-\u9fa5])([a-zA-Z])', r'\1 \2', text) | |
| return text.strip() | |
| def save_extracted_text(self, extraction_result: Dict) -> None: | |
| """Save the extracted text to a file""" | |
| if not extraction_result["success"]: | |
| return | |
| # Create output filename based on original filename | |
| base_name = os.path.splitext(extraction_result["filename"])[0] | |
| output_path = os.path.join(self.output_dir, f"{base_name}.txt") | |
| # Write to text file | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| f.write(extraction_result["text"]) | |
| print(f"Saved extracted text to {output_path}") | |
| def process_single_pdf(self, pdf_path: str) -> Dict: | |
| """Process a single PDF file and save results""" | |
| extraction_result = self.extract_text_from_pdf(pdf_path) | |
| if extraction_result["success"]: | |
| self.save_extracted_text(extraction_result) | |
| print(f"Successfully processed {extraction_result['filename']} ({extraction_result['pages']} pages)") | |
| else: | |
| print(f"Failed to process {extraction_result['filename']}: {extraction_result['error']}") | |
| return extraction_result | |
| def extract_all_pdfs(self, max_workers: int = 4) -> List[Dict]: | |
| pdf_files = self.get_pdf_files() | |
| results = [] | |
| if not pdf_files: | |
| print("No PDF files found") | |
| return results | |
| # Use thread pool for parallel processing | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| # Use tqdm to create a progress bar | |
| for result in tqdm(executor.map(self.process_single_pdf, pdf_files), | |
| total=len(pdf_files), | |
| desc="Processing PDF files"): | |
| results.append(result) | |
| # Count successful and failed processes | |
| success_count = sum(1 for r in results if r["success"]) | |
| fail_count = len(results) - success_count | |
| print(f"PDF processing completed: {success_count} successful, {fail_count} failed") | |
| return results | |
| # Usage example | |
| if __name__ == "__main__": | |
| # Configure input and output directories | |
| INPUT_DIR = "../data" | |
| OUTPUT_DIR = "../data" | |
| # Create extractor instance | |
| extractor = PDFTextExtractor(INPUT_DIR, OUTPUT_DIR) | |
| # Execute extraction | |
| results = extractor.extract_all_pdfs(max_workers=4) # Use 4 threads for parallel processing | |
| # Print summary | |
| print(f"\nProcessed {len(results)} PDF files in total") | |
| print(f"Successful: {sum(1 for r in results if r['success'])}") | |
| print(f"Failed: {sum(1 for r in results if not r['success'])}") |