Spaces:
Sleeping
Sleeping
| import os | |
| import argparse | |
| from unstructured.partition.pdf import partition_pdf | |
| import logging | |
| from tqdm import tqdm | |
| import re | |
| # Set up logging | |
| logging.basicConfig(filename='pdf_processing.log', level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s') | |
| def process_pdf_with_unstructured(pdf_path, output_dir): | |
| """ | |
| Processes a PDF file using unstructured.io, extracts content, and saves it as a Markdown file. | |
| Args: | |
| pdf_path (str): The path to the input PDF file. | |
| output_dir (str): The directory to save the output Markdown file and extracted images. | |
| """ | |
| if not os.path.exists(pdf_path): | |
| logging.error(f"PDF file not found at {pdf_path}") | |
| print(f"Error: PDF file not found at {pdf_path}") | |
| return False | |
| # Cleaned up filename for fallback citation | |
| pdf_basename = os.path.splitext(os.path.basename(pdf_path))[0] | |
| # A simple regex to clean up common filename artifacts | |
| cleaned_citation = re.sub(r'[\-_]', ' ', pdf_basename).replace('.pdf', '') | |
| print(f"Processing {pdf_path} with unstructured.io...") | |
| logging.info(f"Processing {pdf_path} with unstructured.io...") | |
| try: | |
| # Create a specific output directory for images from this PDF | |
| pdf_output_dir = os.path.join(output_dir, pdf_basename) | |
| os.makedirs(pdf_output_dir, exist_ok=True) | |
| image_output_path = os.path.join(pdf_output_dir, "images") | |
| os.makedirs(image_output_path, exist_ok=True) | |
| print(f"Extracting images to: {image_output_path}") | |
| elements = partition_pdf( | |
| filename=pdf_path, | |
| strategy="hi_res", | |
| extract_images_in_pdf=True, | |
| infer_table_structure=True, | |
| chunking_strategy="by_title", | |
| max_characters=4096, | |
| new_after_n_chars=3800, | |
| combine_text_under_n_chars=2048, | |
| image_output_dir_path=image_output_path | |
| ) | |
| # Attempt to extract a better citation from the document's text | |
| try: | |
| # Look for "Sri Lanka Journal of Obstetrics and Gynaecology" or similar patterns | |
| full_text = "\\n".join([el.text for el in elements[:20]]) # Check first 20 elements | |
| sljog_pattern = r"Sri Lanka Journal of Obstetrics and Gynaecology, \\d{4}; \\d+ \\(.*?\\): \\d+-\\d+" | |
| match = re.search(sljog_pattern, full_text, re.IGNORECASE) | |
| if match: | |
| cleaned_citation = match.group(0).replace('\\n', ' ').strip() | |
| else: | |
| # Fallback to the first non-empty text element if no specific pattern is found | |
| first_title = next((el.text for el in elements if el.text.strip()), None) | |
| if first_title and len(first_title) < 150: # Assume titles are reasonably short | |
| cleaned_citation = first_title.strip() | |
| except Exception as citation_exc: | |
| print(f"Could not automatically extract a detailed citation, falling back to filename. Reason: {citation_exc}") | |
| logging.warning(f"Citation extraction failed for {pdf_path}, using fallback. Error: {citation_exc}") | |
| markdown_content = [] | |
| # Add YAML frontmatter for citation | |
| markdown_content.append("---") | |
| markdown_content.append(f"citation: \"{cleaned_citation}\"") | |
| markdown_content.append("---") | |
| for element in elements: | |
| if "Table" in str(type(element)): | |
| if hasattr(element, 'metadata') and hasattr(element.metadata, 'text_as_html') and element.metadata.text_as_html: | |
| markdown_content.append("## Table") | |
| markdown_content.append(element.metadata.text_as_html) | |
| elif "Image" in str(type(element)): | |
| if hasattr(element, 'metadata') and hasattr(element.metadata, 'image_path') and element.metadata.image_path: | |
| image_filename = os.path.basename(element.metadata.image_path) | |
| relative_image_path = os.path.join("images", image_filename) | |
| markdown_content.append(f"") | |
| else: | |
| markdown_content.append(element.text) | |
| # Construct the output Markdown path | |
| output_md_filename = f"{pdf_basename}.md" | |
| output_md_path = os.path.join(pdf_output_dir, output_md_filename) | |
| print(f"Saving Markdown output to: {output_md_path}") | |
| with open(output_md_path, "w", encoding="utf-8") as f: | |
| f.write("\\n\\n".join(markdown_content)) | |
| print(f"Successfully processed {pdf_path}") | |
| logging.info(f"Successfully processed {pdf_path}") | |
| return True | |
| except Exception as e: | |
| print(f"An error occurred while processing {pdf_path}: {e}") | |
| logging.error(f"An error occurred while processing {pdf_path}: {e}") | |
| return False | |
| def process_directory(input_dir, output_dir): | |
| """ | |
| Processes all PDF files in a given directory. | |
| Args: | |
| input_dir (str): The path to the directory containing PDF files. | |
| output_dir (str): The directory to save the output Markdown files. | |
| """ | |
| pdf_files = [f for f in os.listdir(input_dir) if f.endswith('.pdf')] | |
| if not pdf_files: | |
| print(f"No PDF files found in {input_dir}") | |
| return | |
| print(f"Found {len(pdf_files)} PDF files to process.") | |
| success_count = 0 | |
| failure_count = 0 | |
| for pdf_file in tqdm(pdf_files, desc="Processing PDFs"): | |
| pdf_path = os.path.join(input_dir, pdf_file) | |
| if process_pdf_with_unstructured(pdf_path, output_dir): | |
| success_count += 1 | |
| else: | |
| failure_count += 1 | |
| print(f"\nProcessing complete.") | |
| print(f"Successfully processed: {success_count} files") | |
| print(f"Failed to process: {failure_count} files") | |
| logging.info(f"Processing complete. Success: {success_count}, Failed: {failure_count}") | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Process a PDF file or a directory of PDF files with unstructured.io to extract content as Markdown.") | |
| parser.add_argument("input_path", type=str, help="The path to the input PDF file or directory.") | |
| parser.add_argument("--output_dir", type=str, default="src/processed_markdown", help="The directory to save the output Markdown file.") | |
| args = parser.parse_args() | |
| # Ensure the main output directory exists | |
| os.makedirs(args.output_dir, exist_ok=True) | |
| if os.path.isdir(args.input_path): | |
| process_directory(args.input_path, args.output_dir) | |
| elif os.path.isfile(args.input_path) and args.input_path.endswith('.pdf'): | |
| process_pdf_with_unstructured(args.input_path, args.output_dir) | |
| else: | |
| print(f"Error: Invalid input path. Please provide a valid PDF file or a directory.") | |
| logging.error(f"Invalid input path: {args.input_path}") | |
| if __name__ == "__main__": | |
| main() |