| """
|
| docs.py - Universal Document Processing Module
|
| A comprehensive interface for processing, creating, editing, and converting
|
| virtually all document types across different domains.
|
| """
|
|
|
| import os
|
| import json
|
| import csv
|
| import sqlite3
|
| import zipfile
|
| import tarfile
|
| import shutil
|
| import subprocess
|
| import tempfile
|
| from pathlib import Path
|
| from typing import Union, List, Dict, Any, Optional, BinaryIO
|
| from datetime import datetime
|
| import hashlib
|
| import io
|
|
|
|
|
| import pandas as pd
|
| import numpy as np
|
|
|
|
|
| import chardet
|
|
|
|
|
| try:
|
| from docx import Document
|
| from docx.shared import Inches, Pt, RGBColor
|
| from docx.enum.text import WD_ALIGN_PARAGRAPH
|
| DOCX_AVAILABLE = True
|
| except ImportError:
|
| DOCX_AVAILABLE = False
|
|
|
|
|
| try:
|
| import pypdf
|
| import pdfplumber
|
| from reportlab.lib.pagesizes import letter, A4
|
| from reportlab.pdfgen import canvas
|
| from reportlab.lib.utils import ImageReader
|
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle
|
| from reportlab.lib.styles import getSampleStyleSheet
|
| PDF_AVAILABLE = True
|
| except ImportError:
|
| PDF_AVAILABLE = False
|
|
|
|
|
| try:
|
| import openpyxl
|
| from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
|
| from openpyxl.chart import BarChart, Reference
|
| OPENPYXL_AVAILABLE = True
|
| except ImportError:
|
| OPENPYXL_AVAILABLE = False
|
|
|
| try:
|
| import xlrd
|
| import xlwt
|
| XLS_AVAILABLE = True
|
| except ImportError:
|
| XLS_AVAILABLE = False
|
|
|
|
|
| try:
|
| from pptx import Presentation
|
| from pptx.util import Inches as PptxInches
|
| from pptx.enum.text import PP_ALIGN
|
| from pptx.dml.color import RGBColor as PptxRGBColor
|
| PPTX_AVAILABLE = True
|
| except ImportError:
|
| PPTX_AVAILABLE = False
|
|
|
|
|
| try:
|
| from PIL import Image, ImageDraw, ImageFont, ImageFilter
|
| PILLOW_AVAILABLE = True
|
|
|
| try:
|
| import easyocr
|
| EASYOCR_AVAILABLE = True
|
|
|
| _easyocr_reader = None
|
| print("✅ EasyOCR available for image text extraction")
|
| except ImportError:
|
| EASYOCR_AVAILABLE = False
|
| print("⚠️ EasyOCR not installed. Install with: pip install easyocr")
|
| except ImportError:
|
| PILLOW_AVAILABLE = False
|
| EASYOCR_AVAILABLE = False
|
| print("⚠️ Pillow not installed. Install with: pip install Pillow")
|
|
|
|
|
| try:
|
| from bs4 import BeautifulSoup
|
| import lxml
|
| import markdown
|
| BEAUTIFULSOUP_AVAILABLE = True
|
| except ImportError:
|
| BEAUTIFULSOUP_AVAILABLE = False
|
|
|
|
|
| try:
|
| import yaml
|
| import toml
|
| YAML_AVAILABLE = True
|
| TOML_AVAILABLE = True
|
| except ImportError:
|
| YAML_AVAILABLE = False
|
| TOML_AVAILABLE = False
|
|
|
|
|
| try:
|
| import py7zr
|
| SEVENZ_AVAILABLE = True
|
| except ImportError:
|
| SEVENZ_AVAILABLE = False
|
|
|
|
|
| try:
|
| import h5py
|
| H5PY_AVAILABLE = True
|
| except ImportError:
|
| H5PY_AVAILABLE = False
|
|
|
|
|
| try:
|
| from sqlalchemy import create_engine, text
|
| SQLALCHEMY_AVAILABLE = True
|
| except ImportError:
|
| SQLALCHEMY_AVAILABLE = False
|
|
|
|
|
| try:
|
| from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips
|
| MOVIEPY_AVAILABLE = True
|
| except ImportError:
|
| MOVIEPY_AVAILABLE = False
|
|
|
| try:
|
| from pydub import AudioSegment
|
| PYDUB_AVAILABLE = True
|
| except ImportError:
|
| PYDUB_AVAILABLE = False
|
|
|
| try:
|
| import mutagen
|
| from mutagen.mp3 import MP3
|
| from mutagen.flac import FLAC
|
| MUTAGEN_AVAILABLE = True
|
| except ImportError:
|
| MUTAGEN_AVAILABLE = False
|
|
|
|
|
| def get_easyocr_reader(lang='en'):
|
| """Lazy initialization of EasyOCR reader"""
|
| global _easyocr_reader
|
| if _easyocr_reader is None and EASYOCR_AVAILABLE:
|
| try:
|
| import easyocr
|
|
|
| _easyocr_reader = easyocr.Reader([lang, 'en'], gpu=False)
|
| print("✅ EasyOCR reader initialized")
|
| except Exception as e:
|
| print(f"⚠️ Failed to initialize EasyOCR: {e}")
|
| return None
|
| return _easyocr_reader
|
|
|
|
|
| class DocumentProcessor:
|
| """
|
| Universal Document Processor - Handles creation, reading, editing, conversion
|
| of virtually all document types.
|
| """
|
|
|
| def __init__(self, output_dir: str = "processed_docs"):
|
| """
|
| Initialize the document processor.
|
|
|
| Args:
|
| output_dir: Directory for output files
|
| """
|
| self.output_dir = Path(output_dir)
|
| self.output_dir.mkdir(parents=True, exist_ok=True)
|
| self.temp_dir = Path(tempfile.mkdtemp())
|
| self.supported_formats = self._get_supported_formats()
|
|
|
|
|
| print(f"📁 Document Processor initialized")
|
| print(f" Output directory: {self.output_dir}")
|
| print(f" Available: Word={DOCX_AVAILABLE}, PDF={PDF_AVAILABLE}, Excel={OPENPYXL_AVAILABLE}")
|
| print(f" Available: PPT={PPTX_AVAILABLE}, Images={PILLOW_AVAILABLE}, OCR={EASYOCR_AVAILABLE}")
|
|
|
| def _get_supported_formats(self) -> Dict[str, List[str]]:
|
| """Return dictionary of supported formats by category."""
|
| return {
|
| "text": [".txt", ".text", ".asc", ".log"],
|
| "word": [".docx", ".doc", ".odt", ".rtf"],
|
| "pdf": [".pdf", ".xps"],
|
| "spreadsheet": [".xlsx", ".xls", ".csv", ".tsv", ".ods"],
|
| "presentation": [".pptx", ".ppt", ".odp", ".key"],
|
| "image": [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp", ".svg"],
|
| "markup": [".html", ".htm", ".xml", ".md", ".rst"],
|
| "data": [".json", ".yaml", ".yml", ".toml", ".ini"],
|
| "archive": [".zip", ".tar", ".gz", ".7z", ".rar"],
|
| "ebook": [".epub", ".mobi", ".pdf"],
|
| "database": [".db", ".sqlite", ".sqlite3"],
|
| "audio": [".mp3", ".wav", ".flac", ".aac", ".ogg"],
|
| "video": [".mp4", ".avi", ".mov", ".mkv", ".webm"],
|
| "cad": [".stl", ".obj", ".dxf"],
|
| "scientific": [".h5", ".hdf5", ".nc", ".mat"],
|
| "business": [".qfx", ".ofx"],
|
| "calendar": [".ics", ".ical", ".vcf"],
|
| "security": [".pem", ".crt", ".key", ".p12"],
|
| }
|
|
|
|
|
|
|
| def create_text_file(self, content: str, output_path: str, encoding: str = 'utf-8') -> str:
|
| """Create a plain text file."""
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
| with open(output_file, 'w', encoding=encoding) as f:
|
| f.write(content)
|
|
|
| return str(output_file)
|
|
|
| def read_text_file(self, file_path: str, detect_encoding: bool = True) -> str:
|
| """Read a text file with optional encoding detection."""
|
| path = Path(file_path)
|
|
|
| if detect_encoding:
|
| with open(path, 'rb') as f:
|
| raw_data = f.read()
|
| result = chardet.detect(raw_data)
|
| encoding = result['encoding'] or 'utf-8'
|
| else:
|
| encoding = 'utf-8'
|
|
|
| with open(path, 'r', encoding=encoding, errors='replace') as f:
|
| return f.read()
|
|
|
| def append_to_text_file(self, file_path: str, content: str) -> None:
|
| """Append content to an existing text file."""
|
| with open(file_path, 'a', encoding='utf-8') as f:
|
| f.write(content)
|
|
|
|
|
|
|
| def create_word_document(self, output_path: str, content: List[Dict[str, Any]]) -> str:
|
| """
|
| Create a Word document.
|
|
|
| Args:
|
| output_path: Path for the output file
|
| content: List of dicts with 'type' ('paragraph', 'heading', 'table') and data
|
| """
|
| if not DOCX_AVAILABLE:
|
| raise ImportError("python-docx is not installed")
|
|
|
| doc = Document()
|
|
|
| for item in content:
|
| if item['type'] == 'paragraph':
|
| p = doc.add_paragraph(item['text'])
|
| if 'style' in item:
|
| p.style = item['style']
|
| elif item['type'] == 'heading':
|
| doc.add_heading(item['text'], level=item.get('level', 1))
|
| elif item['type'] == 'table':
|
| data = item['data']
|
| table = doc.add_table(rows=len(data), cols=len(data[0]))
|
| table.style = 'Table Grid'
|
| for i, row in enumerate(data):
|
| for j, cell_value in enumerate(row):
|
| table.cell(i, j).text = str(cell_value)
|
|
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
| doc.save(str(output_file))
|
|
|
| return str(output_file)
|
|
|
| def read_word_document(self, file_path: str) -> Dict[str, Any]:
|
| """Extract text and structure from a Word document."""
|
| if not DOCX_AVAILABLE:
|
| raise ImportError("python-docx is not installed")
|
|
|
| doc = Document(file_path)
|
|
|
| result = {
|
| 'paragraphs': [],
|
| 'tables': [],
|
| 'metadata': {
|
| 'core_properties': {},
|
| 'paragraph_count': len(doc.paragraphs),
|
| 'table_count': len(doc.tables)
|
| }
|
| }
|
|
|
|
|
| for para in doc.paragraphs:
|
| result['paragraphs'].append(para.text)
|
|
|
|
|
| for table in doc.tables:
|
| table_data = []
|
| for row in table.rows:
|
| row_data = [cell.text for cell in row.cells]
|
| table_data.append(row_data)
|
| result['tables'].append(table_data)
|
|
|
| return result
|
|
|
|
|
|
|
| def create_pdf(self, output_path: str, content: List[Dict[str, Any]],
|
| page_size: str = 'letter') -> str:
|
| """Create a PDF document."""
|
| if not PDF_AVAILABLE:
|
| raise ImportError("ReportLab is not installed")
|
|
|
| page_sizes = {'letter': letter, 'A4': A4}
|
| size = page_sizes.get(page_size, letter)
|
|
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
| doc = SimpleDocTemplate(str(output_file), pagesize=size)
|
| story = []
|
| styles = getSampleStyleSheet()
|
|
|
| for item in content:
|
| if item['type'] == 'paragraph':
|
| story.append(Paragraph(item['text'], styles['Normal']))
|
| story.append(Spacer(1, 12))
|
| elif item['type'] == 'heading':
|
| story.append(Paragraph(item['text'], styles['Heading1']))
|
| story.append(Spacer(1, 12))
|
| elif item['type'] == 'spacer':
|
| story.append(Spacer(1, item.get('height', 12)))
|
|
|
| doc.build(story)
|
| return str(output_file)
|
|
|
| def read_pdf(self, file_path: str, extract_tables: bool = True) -> Dict[str, Any]:
|
| """Extract text, tables, and metadata from a PDF."""
|
| result = {
|
| 'pages': [],
|
| 'metadata': {},
|
| 'tables': []
|
| }
|
|
|
|
|
| with open(file_path, 'rb') as f:
|
| reader = pypdf.PdfReader(f)
|
| result['metadata'] = reader.metadata
|
| result['page_count'] = len(reader.pages)
|
|
|
| for i, page in enumerate(reader.pages):
|
| result['pages'].append({
|
| 'page_number': i + 1,
|
| 'text': page.extract_text()
|
| })
|
|
|
|
|
| if extract_tables and PDF_AVAILABLE:
|
| try:
|
| with pdfplumber.open(file_path) as pdf:
|
| for i, page in enumerate(pdf.pages):
|
| tables = page.extract_tables()
|
| for table in tables:
|
| result['tables'].append({
|
| 'page': i + 1,
|
| 'data': table
|
| })
|
| except:
|
| pass
|
|
|
| return result
|
|
|
| def merge_pdfs(self, pdf_paths: List[str], output_path: str) -> str:
|
| """Merge multiple PDF files into one."""
|
| if not PDF_AVAILABLE:
|
| raise ImportError("pypdf is not installed")
|
|
|
| merger = pypdf.PdfMerger()
|
|
|
| for pdf_path in pdf_paths:
|
| merger.append(pdf_path)
|
|
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
| merger.write(str(output_file))
|
| merger.close()
|
|
|
| return str(output_file)
|
|
|
| def split_pdf(self, pdf_path: str, output_dir: str, pages_per_file: int = 1) -> List[str]:
|
| """Split a PDF into multiple files."""
|
| if not PDF_AVAILABLE:
|
| raise ImportError("pypdf is not installed")
|
|
|
| output_files = []
|
| output_path = Path(output_dir)
|
| output_path.mkdir(parents=True, exist_ok=True)
|
|
|
| with open(pdf_path, 'rb') as f:
|
| reader = pypdf.PdfReader(f)
|
| total_pages = len(reader.pages)
|
|
|
| for start_page in range(0, total_pages, pages_per_file):
|
| writer = pypdf.PdfWriter()
|
| end_page = min(start_page + pages_per_file, total_pages)
|
|
|
| for page_num in range(start_page, end_page):
|
| writer.add_page(reader.pages[page_num])
|
|
|
| output_file = output_path / f"part_{start_page // pages_per_file + 1}.pdf"
|
| with open(output_file, 'wb') as out_f:
|
| writer.write(out_f)
|
| output_files.append(str(output_file))
|
|
|
| return output_files
|
|
|
|
|
|
|
| def create_excel(self, output_path: str, sheets: Dict[str, List[List[Any]]]) -> str:
|
| """Create an Excel file with multiple sheets."""
|
| if not OPENPYXL_AVAILABLE:
|
| raise ImportError("openpyxl is not installed")
|
|
|
| wb = openpyxl.Workbook()
|
|
|
| for i, (sheet_name, data) in enumerate(sheets.items()):
|
| if i == 0:
|
| ws = wb.active
|
| ws.title = sheet_name[:31]
|
| else:
|
| ws = wb.create_sheet(title=sheet_name[:31])
|
|
|
| for row_idx, row in enumerate(data, 1):
|
| for col_idx, value in enumerate(row, 1):
|
| ws.cell(row=row_idx, column=col_idx, value=value)
|
|
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
| wb.save(str(output_file))
|
|
|
| return str(output_file)
|
|
|
| def read_excel(self, file_path: str, sheet_name: Optional[str] = None) -> Dict[str, pd.DataFrame]:
|
| """Read Excel file using pandas."""
|
| if sheet_name:
|
| df = pd.read_excel(file_path, sheet_name=sheet_name)
|
| return {sheet_name: df}
|
| else:
|
| return pd.read_excel(file_path, sheet_name=None)
|
|
|
| def read_csv(self, file_path: str, **kwargs) -> pd.DataFrame:
|
| """Read CSV file with flexible options."""
|
| return pd.read_csv(file_path, **kwargs)
|
|
|
| def create_csv(self, data: List[Dict[str, Any]], output_path: str) -> str:
|
| """Create CSV file from list of dictionaries."""
|
| if not data:
|
| raise ValueError("No data provided")
|
|
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
| with open(output_file, 'w', newline='', encoding='utf-8') as f:
|
| writer = csv.DictWriter(f, fieldnames=data[0].keys())
|
| writer.writeheader()
|
| writer.writerows(data)
|
|
|
| return str(output_file)
|
|
|
|
|
|
|
| def create_presentation(self, output_path: str, slides: List[Dict[str, Any]]) -> str:
|
| """Create a PowerPoint presentation."""
|
| if not PPTX_AVAILABLE:
|
| raise ImportError("python-pptx is not installed")
|
|
|
| prs = Presentation()
|
|
|
| for slide_data in slides:
|
| slide_layout = prs.slide_layouts[slide_data.get('layout', 0)]
|
| slide = prs.slides.add_slide(slide_layout)
|
|
|
|
|
| if 'title' in slide_data and slide.shapes.title:
|
| slide.shapes.title.text = slide_data['title']
|
|
|
|
|
| if 'content' in slide_data and len(slide.placeholders) > 1:
|
| content_placeholder = slide.placeholders[1]
|
| content_placeholder.text = slide_data['content']
|
|
|
|
|
| if 'images' in slide_data:
|
| for img_data in slide_data['images']:
|
| left = PptxInches(img_data.get('left', 1))
|
| top = PptxInches(img_data.get('top', 2))
|
| slide.shapes.add_picture(img_data['path'], left, top,
|
| width=PptxInches(img_data.get('width', 3)))
|
|
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
| prs.save(str(output_file))
|
|
|
| return str(output_file)
|
|
|
| def read_presentation(self, file_path: str) -> Dict[str, Any]:
|
| """Extract content from a PowerPoint presentation."""
|
| if not PPTX_AVAILABLE:
|
| raise ImportError("python-pptx is not installed")
|
|
|
| prs = Presentation(file_path)
|
|
|
| result = {
|
| 'slides': [],
|
| 'slide_count': len(prs.slides)
|
| }
|
|
|
| for slide in prs.slides:
|
| slide_data = {
|
| 'shapes': [],
|
| 'notes': None,
|
| 'text_content': ''
|
| }
|
|
|
|
|
| text_parts = []
|
| for shape in slide.shapes:
|
| if hasattr(shape, "text") and shape.text:
|
| text_parts.append(shape.text)
|
| slide_data['shapes'].append({
|
| 'type': type(shape).__name__,
|
| 'text': shape.text
|
| })
|
|
|
| slide_data['text_content'] = '\n'.join(text_parts)
|
|
|
|
|
| if slide.has_notes_slide:
|
| notes_slide = slide.notes_slide
|
| if notes_slide.notes_text_frame:
|
| slide_data['notes'] = notes_slide.notes_text_frame.text
|
|
|
| result['slides'].append(slide_data)
|
|
|
| return result
|
|
|
|
|
|
|
| def convert_image(self, input_path: str, output_path: str, format: str = 'PNG',
|
| quality: int = 95) -> str:
|
| """Convert image between formats with optional quality setting."""
|
| if not PILLOW_AVAILABLE:
|
| raise ImportError("Pillow is not installed")
|
|
|
| img = Image.open(input_path)
|
|
|
|
|
| if format.upper() == 'JPEG' and img.mode == 'RGBA':
|
| rgb_img = Image.new('RGB', img.size, (255, 255, 255))
|
| rgb_img.paste(img, mask=img.split()[3])
|
| img = rgb_img
|
|
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
| if format.upper() in ['JPEG', 'JPG']:
|
| img.save(str(output_file), format=format.upper(), quality=quality)
|
| else:
|
| img.save(str(output_file), format=format.upper())
|
|
|
| return str(output_file)
|
|
|
| def resize_image(self, input_path: str, output_path: str,
|
| width: int = None, height: int = None,
|
| maintain_aspect: bool = True) -> str:
|
| """Resize an image."""
|
| if not PILLOW_AVAILABLE:
|
| raise ImportError("Pillow is not installed")
|
|
|
| img = Image.open(input_path)
|
|
|
| if maintain_aspect and width and height:
|
| ratio = min(width / img.width, height / img.height)
|
| new_width = int(img.width * ratio)
|
| new_height = int(img.height * ratio)
|
| elif width:
|
| ratio = width / img.width
|
| new_width = width
|
| new_height = int(img.height * ratio)
|
| elif height:
|
| ratio = height / img.height
|
| new_height = height
|
| new_width = int(img.width * ratio)
|
| else:
|
| new_width, new_height = img.width, img.height
|
|
|
| resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
|
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
| resized_img.save(str(output_file))
|
|
|
| return str(output_file)
|
|
|
| def extract_text_from_image(self, image_path: str, lang: str = 'en') -> str:
|
| """Extract text from image using EasyOCR."""
|
| if not EASYOCR_AVAILABLE:
|
| return "[OCR not available. Please install EasyOCR: pip install easyocr]"
|
|
|
| try:
|
|
|
| reader = get_easyocr_reader(lang)
|
| if reader is None:
|
| return "[Failed to initialize EasyOCR]"
|
|
|
|
|
| result = reader.readtext(image_path, detail=0)
|
|
|
| if result:
|
|
|
| extracted_text = '\n'.join(result)
|
| return extracted_text
|
| else:
|
| return "[No text detected in image]"
|
|
|
| except Exception as e:
|
| return f"[OCR error: {str(e)}]"
|
|
|
| def create_image_from_text(self, text: str, output_path: str,
|
| width: int = 800, height: int = 600,
|
| bg_color: str = 'white', text_color: str = 'black') -> str:
|
| """Create an image with text."""
|
| if not PILLOW_AVAILABLE:
|
| raise ImportError("Pillow is not installed")
|
|
|
| img = Image.new('RGB', (width, height), color=bg_color)
|
| draw = ImageDraw.Draw(img)
|
|
|
|
|
| try:
|
| font = ImageFont.truetype("arial.ttf", 20)
|
| except:
|
| font = ImageFont.load_default()
|
|
|
|
|
| margin = 50
|
| max_width = width - 2 * margin
|
| words = text.split()
|
| lines = []
|
| current_line = []
|
|
|
| for word in words:
|
| test_line = ' '.join(current_line + [word])
|
| bbox = draw.textbbox((0, 0), test_line, font=font)
|
| text_width = bbox[2] - bbox[0]
|
| if text_width <= max_width:
|
| current_line.append(word)
|
| else:
|
| if current_line:
|
| lines.append(' '.join(current_line))
|
| current_line = [word]
|
|
|
| if current_line:
|
| lines.append(' '.join(current_line))
|
|
|
|
|
| y = margin
|
| for line in lines:
|
| draw.text((margin, y), line, fill=text_color, font=font)
|
| y += 30
|
|
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
| img.save(str(output_file))
|
|
|
| return str(output_file)
|
|
|
|
|
|
|
| def parse_html(self, html_content: str) -> BeautifulSoup:
|
| """Parse HTML content."""
|
| if not BEAUTIFULSOUP_AVAILABLE:
|
| raise ImportError("BeautifulSoup4 is not installed")
|
|
|
| return BeautifulSoup(html_content, 'lxml')
|
|
|
| def create_html(self, title: str, body_content: str, output_path: str) -> str:
|
| """Create an HTML document."""
|
| html_template = f"""<!DOCTYPE html>
|
| <html lang="en">
|
| <head>
|
| <meta charset="UTF-8">
|
| <meta name="viewport" content="width=device-width, initial-scale=1.0">
|
| <title>{title}</title>
|
| <style>
|
| body {{ font-family: Arial, sans-serif; margin: 40px; line-height: 1.6; }}
|
| h1 {{ color: #333; }}
|
| .container {{ max-width: 800px; margin: auto; }}
|
| </style>
|
| </head>
|
| <body>
|
| <div class="container">
|
| {body_content}
|
| </div>
|
| </body>
|
| </html>"""
|
|
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
| with open(output_file, 'w', encoding='utf-8') as f:
|
| f.write(html_template)
|
|
|
| return str(output_file)
|
|
|
| def convert_markdown_to_html(self, markdown_content: str, output_path: str) -> str:
|
| """Convert Markdown to HTML."""
|
| if not BEAUTIFULSOUP_AVAILABLE:
|
| raise ImportError("markdown library is not installed")
|
|
|
| html_content = markdown.markdown(markdown_content)
|
|
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
| with open(output_file, 'w', encoding='utf-8') as f:
|
| f.write(html_content)
|
|
|
| return str(output_file)
|
|
|
|
|
|
|
| def read_json(self, file_path: str) -> Dict[str, Any]:
|
| """Read JSON file."""
|
| with open(file_path, 'r', encoding='utf-8') as f:
|
| return json.load(f)
|
|
|
| def write_json(self, data: Dict[str, Any], output_path: str, indent: int = 2) -> str:
|
| """Write JSON file."""
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
| with open(output_file, 'w', encoding='utf-8') as f:
|
| json.dump(data, f, indent=indent)
|
|
|
| return str(output_file)
|
|
|
| def read_yaml(self, file_path: str) -> Dict[str, Any]:
|
| """Read YAML file."""
|
| if not YAML_AVAILABLE:
|
| raise ImportError("PyYAML is not installed")
|
|
|
| with open(file_path, 'r', encoding='utf-8') as f:
|
| return yaml.safe_load(f)
|
|
|
| def write_yaml(self, data: Dict[str, Any], output_path: str) -> str:
|
| """Write YAML file."""
|
| if not YAML_AVAILABLE:
|
| raise ImportError("PyYAML is not installed")
|
|
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
| with open(output_file, 'w', encoding='utf-8') as f:
|
| yaml.dump(data, f, default_flow_style=False)
|
|
|
| return str(output_file)
|
|
|
| def read_toml(self, file_path: str) -> Dict[str, Any]:
|
| """Read TOML file."""
|
| if not TOML_AVAILABLE:
|
| raise ImportError("toml library is not installed")
|
|
|
| with open(file_path, 'r', encoding='utf-8') as f:
|
| return toml.load(f)
|
|
|
|
|
|
|
| def create_zip_archive(self, files: List[str], output_path: str) -> str:
|
| """Create a ZIP archive from files."""
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
| with zipfile.ZipFile(output_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
| for file in files:
|
| file_path = Path(file)
|
| if file_path.exists():
|
| zipf.write(file_path, file_path.name)
|
|
|
| return str(output_file)
|
|
|
| def extract_archive(self, archive_path: str, extract_to: str = None) -> str:
|
| """Extract various archive formats."""
|
| extract_path = Path(extract_to) if extract_to else self.output_dir / "extracted"
|
| extract_path.mkdir(parents=True, exist_ok=True)
|
|
|
| archive_path = Path(archive_path)
|
|
|
| if archive_path.suffix == '.zip':
|
| with zipfile.ZipFile(archive_path, 'r') as zipf:
|
| zipf.extractall(extract_path)
|
|
|
| elif archive_path.suffix in ['.tar', '.gz', '.bz2']:
|
| with tarfile.open(archive_path, 'r:*') as tarf:
|
| tarf.extractall(extract_path)
|
|
|
| elif archive_path.suffix == '.7z' and SEVENZ_AVAILABLE:
|
| with py7zr.SevenZipFile(archive_path, mode='r') as szf:
|
| szf.extractall(extract_path)
|
|
|
| return str(extract_path)
|
|
|
|
|
|
|
| def execute_sql_query(self, db_path: str, query: str) -> List[tuple]:
|
| """Execute SQL query on SQLite database."""
|
| conn = sqlite3.connect(db_path)
|
| try:
|
| cursor = conn.cursor()
|
| cursor.execute(query)
|
| results = cursor.fetchall()
|
| conn.commit()
|
| return results
|
| finally:
|
| conn.close()
|
|
|
| def create_sqlite_db(self, output_path: str, tables: Dict[str, List[Dict[str, Any]]]) -> str:
|
| """Create SQLite database with tables."""
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
| conn = sqlite3.connect(str(output_file))
|
| cursor = conn.cursor()
|
|
|
| for table_name, rows in tables.items():
|
| if rows:
|
| columns = rows[0].keys()
|
| create_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns)})"
|
| cursor.execute(create_sql)
|
|
|
| for row in rows:
|
| placeholders = ', '.join(['?' for _ in columns])
|
| insert_sql = f"INSERT INTO {table_name} VALUES ({placeholders})"
|
| cursor.execute(insert_sql, list(row.values()))
|
|
|
| conn.commit()
|
| conn.close()
|
|
|
| return str(output_file)
|
|
|
|
|
|
|
| def convert_audio(self, input_path: str, output_path: str, format: str = 'mp3') -> str:
|
| """Convert audio between formats."""
|
| if not PYDUB_AVAILABLE:
|
| raise ImportError("pydub is not installed")
|
|
|
| audio = AudioSegment.from_file(input_path)
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
| audio.export(str(output_file), format=format)
|
|
|
| return str(output_file)
|
|
|
| def get_audio_metadata(self, audio_path: str) -> Dict[str, Any]:
|
| """Extract metadata from audio files."""
|
| if not MUTAGEN_AVAILABLE:
|
| raise ImportError("mutagen is not installed")
|
|
|
| audio_path = Path(audio_path)
|
| metadata = {}
|
|
|
| try:
|
| if audio_path.suffix == '.mp3':
|
| audio = MP3(audio_path)
|
| elif audio_path.suffix == '.flac':
|
| audio = FLAC(audio_path)
|
| else:
|
| audio = mutagen.File(audio_path)
|
|
|
| if audio:
|
| metadata['length'] = audio.info.length
|
| metadata['bitrate'] = getattr(audio.info, 'bitrate', None)
|
| metadata['sample_rate'] = getattr(audio.info, 'sample_rate', None)
|
| metadata['channels'] = getattr(audio.info, 'channels', None)
|
|
|
|
|
| if hasattr(audio, 'tags'):
|
| metadata['tags'] = dict(audio.tags)
|
|
|
| except Exception as e:
|
| metadata['error'] = str(e)
|
|
|
| return metadata
|
|
|
|
|
|
|
| def extract_video_info(self, video_path: str) -> Dict[str, Any]:
|
| """Extract information from video file."""
|
| if not MOVIEPY_AVAILABLE:
|
|
|
| try:
|
| import subprocess
|
| result = subprocess.run(['ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_format', '-show_streams', video_path],
|
| capture_output=True, text=True)
|
| if result.returncode == 0:
|
| import json
|
| data = json.loads(result.stdout)
|
| info = {
|
| 'duration': float(data.get('format', {}).get('duration', 0)),
|
| 'size': int(data.get('format', {}).get('size', 0)),
|
| 'filename': Path(video_path).name
|
| }
|
|
|
| for stream in data.get('streams', []):
|
| if stream.get('codec_type') == 'video':
|
| info['width'] = stream.get('width')
|
| info['height'] = stream.get('height')
|
| info['fps'] = stream.get('r_frame_rate')
|
| break
|
| return info
|
| except:
|
| pass
|
| return {
|
| 'duration': 0,
|
| 'size': 0,
|
| 'filename': Path(video_path).name,
|
| 'error': 'MoviePy not installed, video analysis limited'
|
| }
|
|
|
| try:
|
| clip = VideoFileClip(video_path)
|
|
|
| info = {
|
| 'duration': clip.duration,
|
| 'size': clip.size,
|
| 'fps': clip.fps,
|
| 'filename': Path(video_path).name
|
| }
|
|
|
| clip.close()
|
| return info
|
| except Exception as e:
|
| return {
|
| 'filename': Path(video_path).name,
|
| 'error': str(e)
|
| }
|
|
|
| def extract_audio_from_video(self, video_path: str, output_path: str) -> str:
|
| """Extract audio track from video."""
|
| if not MOVIEPY_AVAILABLE:
|
| raise ImportError("moviepy is not installed")
|
|
|
| clip = VideoFileClip(video_path)
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
| clip.audio.write_audiofile(str(output_file))
|
| clip.close()
|
|
|
| return str(output_file)
|
|
|
|
|
|
|
| def convert_document(self, input_path: str, output_path: str,
|
| input_format: str = None, output_format: str = None) -> str:
|
| """
|
| Convert documents between formats.
|
| Supports: text, PDF, HTML, Markdown, images, etc.
|
| """
|
| input_path = Path(input_path)
|
| output_path = Path(output_path)
|
|
|
|
|
| if not input_format:
|
| input_format = input_path.suffix.lower()
|
| if not output_format:
|
| output_format = output_path.suffix.lower()
|
|
|
| output_file = self.output_dir / output_path
|
| output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| if input_format in ['.txt', '.text'] and output_format == '.pdf':
|
| content = self.read_text_file(str(input_path))
|
| return self.create_pdf(str(output_file),
|
| [{'type': 'paragraph', 'text': content}])
|
|
|
|
|
| elif input_format == '.md' and output_format == '.pdf':
|
| content = self.read_text_file(str(input_path))
|
| html_content = markdown.markdown(content)
|
| temp_html = self.temp_dir / 'temp.html'
|
| with open(temp_html, 'w') as f:
|
| f.write(html_content)
|
|
|
| try:
|
| subprocess.run(['wkhtmltopdf', str(temp_html), str(output_file)], check=True)
|
| except:
|
|
|
| return self.create_pdf(str(output_file),
|
| [{'type': 'paragraph', 'text': content}])
|
|
|
|
|
| elif input_format in ['.docx', '.doc'] and output_format == '.pdf':
|
|
|
| if DOCX_AVAILABLE:
|
| doc_content = self.read_word_document(str(input_path))
|
| combined_text = '\n\n'.join(doc_content['paragraphs'])
|
| return self.create_pdf(str(output_file),
|
| [{'type': 'paragraph', 'text': combined_text}])
|
|
|
|
|
| elif input_format in ['.xlsx', '.csv', '.xls'] and output_format in ['.csv', '.xlsx']:
|
| df = pd.read_excel(str(input_path)) if input_format != '.csv' else pd.read_csv(str(input_path))
|
| if output_format == '.csv':
|
| df.to_csv(str(output_file), index=False)
|
| else:
|
| df.to_excel(str(output_file), index=False)
|
| return str(output_file)
|
|
|
|
|
| elif input_format in ['.jpg', '.jpeg', '.png', '.gif', '.bmp'] and output_format in ['.jpg', '.png', '.webp']:
|
| return self.convert_image(str(input_path), str(output_file), output_format[1:])
|
|
|
| else:
|
| raise NotImplementedError(f"Conversion from {input_format} to {output_format} not implemented")
|
|
|
| return str(output_file)
|
|
|
|
|
|
|
| def batch_process(self, input_dir: str, output_dir: str,
|
| operation: str, **kwargs) -> List[str]:
|
| """
|
| Batch process multiple files in a directory.
|
|
|
| Args:
|
| input_dir: Directory containing input files
|
| output_dir: Directory for output files
|
| operation: Operation to perform ('convert', 'extract_text', 'resize', etc.)
|
| **kwargs: Additional parameters for the operation
|
| """
|
| input_path = Path(input_dir)
|
| output_path = Path(output_dir)
|
| output_path.mkdir(parents=True, exist_ok=True)
|
|
|
| results = []
|
|
|
| for file_path in input_path.iterdir():
|
| if file_path.is_file():
|
| try:
|
| if operation == 'convert':
|
| output_format = kwargs.get('output_format', '.txt')
|
| output_file = output_path / f"{file_path.stem}{output_format}"
|
| result = self.convert_document(str(file_path), str(output_file))
|
| results.append(result)
|
|
|
| elif operation == 'extract_text':
|
| text = self.extract_text_from_file(str(file_path))
|
| output_file = output_path / f"{file_path.stem}.txt"
|
| with open(output_file, 'w', encoding='utf-8') as f:
|
| f.write(text)
|
| results.append(str(output_file))
|
|
|
| elif operation == 'resize_images' and file_path.suffix.lower() in ['.jpg', '.png']:
|
| output_file = output_path / file_path.name
|
| result = self.resize_image(str(file_path), str(output_file),
|
| width=kwargs.get('width'),
|
| height=kwargs.get('height'))
|
| results.append(result)
|
|
|
| except Exception as e:
|
| print(f"Error processing {file_path}: {e}")
|
|
|
| return results
|
|
|
| def extract_text_from_file(self, file_path: str) -> str:
|
| """Universal text extraction from any supported document type."""
|
| file_path = Path(file_path)
|
| suffix = file_path.suffix.lower()
|
|
|
| if suffix in ['.txt', '.text', '.log', '.asc']:
|
| return self.read_text_file(str(file_path))
|
|
|
| elif suffix in ['.docx', '.doc']:
|
| if DOCX_AVAILABLE:
|
| result = self.read_word_document(str(file_path))
|
| return '\n'.join(result['paragraphs'])
|
|
|
| elif suffix == '.pdf':
|
| result = self.read_pdf(str(file_path), extract_tables=False)
|
| return '\n'.join([page['text'] for page in result['pages']])
|
|
|
| elif suffix in ['.xlsx', '.xls', '.csv', '.tsv']:
|
| df = pd.read_excel(str(file_path)) if suffix != '.csv' else pd.read_csv(str(file_path))
|
| return df.to_string()
|
|
|
| elif suffix in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']:
|
| if EASYOCR_AVAILABLE:
|
| return self.extract_text_from_image(str(file_path))
|
| else:
|
| return "[OCR not available. Install EasyOCR: pip install easyocr]"
|
|
|
| elif suffix in ['.html', '.htm']:
|
| if BEAUTIFULSOUP_AVAILABLE:
|
| with open(file_path, 'r', encoding='utf-8') as f:
|
| soup = BeautifulSoup(f.read(), 'lxml')
|
| return soup.get_text()
|
|
|
| elif suffix == '.json':
|
| data = self.read_json(str(file_path))
|
| return json.dumps(data, indent=2)
|
|
|
| else:
|
| return f"Unsupported format: {suffix}"
|
|
|
| def cleanup_temp(self):
|
| """Remove temporary files."""
|
| shutil.rmtree(self.temp_dir, ignore_errors=True)
|
|
|
| def __del__(self):
|
| """Cleanup on destruction."""
|
| self.cleanup_temp()
|
|
|
|
|
|
|
|
|
| def create_document_processor(output_dir: str = "processed_docs") -> DocumentProcessor:
|
| """Factory function to create a DocumentProcessor instance."""
|
| return DocumentProcessor(output_dir)
|
|
|
|
|
|
|
|
|
| if __name__ == "__main__":
|
|
|
| processor = create_document_processor("example_output")
|
|
|
| print("Document Processor initialized. Available formats:",
|
| list(processor.supported_formats.keys()))
|
|
|
|
|
| processor.create_text_file("Hello, World!\nThis is a test.", "test.txt")
|
| print("Created test.txt")
|
|
|
|
|
| data = {"name": "Test", "version": 1.0, "data": [1, 2, 3, 4, 5]}
|
| processor.write_json(data, "test.json")
|
| print("Created test.json")
|
|
|
|
|
| read_data = processor.read_json("example_output/test.json")
|
| print("Read JSON:", read_data)
|
|
|
|
|
| csv_data = [
|
| {"name": "Alice", "age": 30, "city": "New York"},
|
| {"name": "Bob", "age": 25, "city": "Los Angeles"},
|
| {"name": "Charlie", "age": 35, "city": "Chicago"}
|
| ]
|
| processor.create_csv(csv_data, "test.csv")
|
| print("Created test.csv")
|
|
|
| print("\nAll examples completed successfully!") |