|
|
from .tool import Tool,Toolkit |
|
|
import os |
|
|
import PyPDF2 |
|
|
from typing import Dict, Any, List, Optional |
|
|
from ..core.logging import logger |
|
|
from ..core.module import BaseModule |
|
|
|
|
|
|
|
|
class FileBase(BaseModule): |
|
|
""" |
|
|
Base class containing shared file handling logic for different file types. |
|
|
""" |
|
|
|
|
|
def __init__(self, **kwargs): |
|
|
super().__init__(**kwargs) |
|
|
|
|
|
self.file_handlers = { |
|
|
'.pdf': { |
|
|
'read': self._read_pdf, |
|
|
'write': self._write_pdf, |
|
|
'append': self._append_pdf |
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
def get_file_handlers(self): |
|
|
"""Returns file type handlers for special file formats""" |
|
|
return self.file_handlers |
|
|
|
|
|
def _read_pdf(self, file_path: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Read content from a PDF file. |
|
|
|
|
|
Args: |
|
|
file_path (str): Path to the PDF file |
|
|
|
|
|
Returns: |
|
|
dict: A dictionary with the PDF content and metadata |
|
|
""" |
|
|
try: |
|
|
with open(file_path, 'rb') as f: |
|
|
pdf_reader = PyPDF2.PdfReader(f) |
|
|
text = "" |
|
|
for page_num in range(len(pdf_reader.pages)): |
|
|
page = pdf_reader.pages[page_num] |
|
|
text += page.extract_text() + "\n" |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"content": text, |
|
|
"file_path": file_path, |
|
|
"file_type": "pdf", |
|
|
"pages": len(pdf_reader.pages) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error reading PDF {file_path}: {str(e)}") |
|
|
return {"success": False, "error": str(e), "file_path": file_path} |
|
|
|
|
|
def _write_pdf(self, file_path: str, content: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Write content to a PDF file using reportlab. |
|
|
|
|
|
Args: |
|
|
file_path (str): Path to the PDF file |
|
|
content (str): Content to write to the PDF |
|
|
|
|
|
Returns: |
|
|
dict: A dictionary with the operation status |
|
|
""" |
|
|
try: |
|
|
|
|
|
from reportlab.lib.pagesizes import letter |
|
|
from reportlab.lib.styles import getSampleStyleSheet |
|
|
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer |
|
|
|
|
|
|
|
|
doc = SimpleDocTemplate(file_path, pagesize=letter) |
|
|
styles = getSampleStyleSheet() |
|
|
story = [] |
|
|
|
|
|
|
|
|
paragraphs = content.split('\n') |
|
|
|
|
|
for para_text in paragraphs: |
|
|
if para_text.strip(): |
|
|
|
|
|
para = Paragraph(para_text, styles['Normal']) |
|
|
story.append(para) |
|
|
story.append(Spacer(1, 12)) |
|
|
else: |
|
|
|
|
|
story.append(Spacer(1, 12)) |
|
|
|
|
|
|
|
|
doc.build(story) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"message": f"PDF created at {file_path} with text content using reportlab", |
|
|
"file_path": file_path, |
|
|
"content_length": len(content), |
|
|
"paragraphs": len([p for p in paragraphs if p.strip()]), |
|
|
"library_used": "reportlab" |
|
|
} |
|
|
|
|
|
except ImportError: |
|
|
|
|
|
pdf_writer = PyPDF2.PdfWriter() |
|
|
pdf_writer.add_blank_page(width=612, height=792) |
|
|
|
|
|
with open(file_path, 'wb') as f: |
|
|
pdf_writer.write(f) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"message": f"Basic PDF created at {file_path} (blank page only - reportlab not available)", |
|
|
"file_path": file_path, |
|
|
"warning": "Text content not added - reportlab library not found", |
|
|
"note": "Install reportlab for full PDF text support: pip install reportlab", |
|
|
"library_used": "PyPDF2" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error writing PDF {file_path}: {str(e)}") |
|
|
return {"success": False, "error": str(e), "file_path": file_path} |
|
|
|
|
|
def _append_pdf(self, file_path: str, content: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Append content to a PDF file by creating a new page. |
|
|
|
|
|
Args: |
|
|
file_path (str): Path to the PDF file |
|
|
content (str): Content to append to the PDF |
|
|
|
|
|
Returns: |
|
|
dict: A dictionary with the operation status |
|
|
""" |
|
|
try: |
|
|
if not os.path.exists(file_path): |
|
|
return self._write_pdf(file_path, content) |
|
|
|
|
|
try: |
|
|
|
|
|
from reportlab.lib.pagesizes import letter |
|
|
from reportlab.lib.styles import getSampleStyleSheet |
|
|
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer |
|
|
import tempfile |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as temp_file: |
|
|
temp_pdf_path = temp_file.name |
|
|
|
|
|
|
|
|
doc = SimpleDocTemplate(temp_pdf_path, pagesize=letter) |
|
|
styles = getSampleStyleSheet() |
|
|
story = [] |
|
|
|
|
|
|
|
|
paragraphs = content.split('\n') |
|
|
|
|
|
for para_text in paragraphs: |
|
|
if para_text.strip(): |
|
|
para = Paragraph(para_text, styles['Normal']) |
|
|
story.append(para) |
|
|
story.append(Spacer(1, 12)) |
|
|
else: |
|
|
|
|
|
story.append(Spacer(1, 12)) |
|
|
|
|
|
|
|
|
doc.build(story) |
|
|
|
|
|
|
|
|
pdf_writer = PyPDF2.PdfWriter() |
|
|
|
|
|
|
|
|
with open(file_path, 'rb') as existing_file: |
|
|
pdf_reader = PyPDF2.PdfReader(existing_file) |
|
|
for page_num in range(len(pdf_reader.pages)): |
|
|
pdf_writer.add_page(pdf_reader.pages[page_num]) |
|
|
|
|
|
|
|
|
with open(temp_pdf_path, 'rb') as temp_file: |
|
|
temp_reader = PyPDF2.PdfReader(temp_file) |
|
|
for page_num in range(len(temp_reader.pages)): |
|
|
pdf_writer.add_page(temp_reader.pages[page_num]) |
|
|
|
|
|
|
|
|
with open(file_path, 'wb') as output_file: |
|
|
pdf_writer.write(output_file) |
|
|
|
|
|
|
|
|
os.unlink(temp_pdf_path) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"message": f"Content appended as new page(s) to PDF at {file_path}", |
|
|
"file_path": file_path, |
|
|
"operation": "append_new_page", |
|
|
"appended_content_length": len(content), |
|
|
"paragraphs_added": len([p for p in paragraphs if p.strip()]), |
|
|
"library_used": "reportlab + PyPDF2" |
|
|
} |
|
|
|
|
|
except ImportError: |
|
|
|
|
|
pdf_writer = PyPDF2.PdfWriter() |
|
|
|
|
|
|
|
|
with open(file_path, 'rb') as existing_file: |
|
|
pdf_reader = PyPDF2.PdfReader(existing_file) |
|
|
for page_num in range(len(pdf_reader.pages)): |
|
|
pdf_writer.add_page(pdf_reader.pages[page_num]) |
|
|
|
|
|
|
|
|
pdf_writer.add_blank_page(width=612, height=792) |
|
|
|
|
|
|
|
|
with open(file_path, 'wb') as output_file: |
|
|
pdf_writer.write(output_file) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"message": f"Blank page appended to PDF at {file_path} (reportlab not available for text)", |
|
|
"file_path": file_path, |
|
|
"operation": "append_blank_page", |
|
|
"warning": "Text content not added - reportlab library not found", |
|
|
"note": "Install reportlab for full PDF text support: pip install reportlab", |
|
|
"library_used": "PyPDF2" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error appending to PDF {file_path}: {str(e)}") |
|
|
return {"success": False, "error": str(e), "file_path": file_path} |
|
|
|
|
|
|
|
|
class ReadFileTool(Tool): |
|
|
name: str = "read_file" |
|
|
description: str = "Read content from a file with special handling for different file types like PDFs" |
|
|
inputs: Dict[str, Dict[str, str]] = { |
|
|
"file_path": { |
|
|
"type": "string", |
|
|
"description": "Path to the file to read" |
|
|
} |
|
|
} |
|
|
required: Optional[List[str]] = ["file_path"] |
|
|
|
|
|
def __init__(self, file_base: FileBase = None): |
|
|
super().__init__() |
|
|
self.file_base = file_base |
|
|
|
|
|
def __call__(self, file_path: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Read content from a file with special handling for different file types. |
|
|
|
|
|
Args: |
|
|
file_path (str): Path to the file to read |
|
|
|
|
|
Returns: |
|
|
dict: A dictionary with the file content and metadata |
|
|
""" |
|
|
try: |
|
|
if not os.path.exists(file_path): |
|
|
return {"success": False, "error": f"File not found: {file_path}"} |
|
|
|
|
|
file_ext = os.path.splitext(file_path)[1].lower() |
|
|
|
|
|
|
|
|
if self.file_base and file_ext in self.file_base.get_file_handlers(): |
|
|
file_handlers = self.file_base.get_file_handlers() |
|
|
if 'read' in file_handlers[file_ext]: |
|
|
return file_handlers[file_ext]['read'](file_path) |
|
|
|
|
|
|
|
|
with open(file_path, 'r') as f: |
|
|
content = f.read() |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"content": content, |
|
|
"file_path": file_path, |
|
|
"file_type": file_ext or "text" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error reading file {file_path}: {str(e)}") |
|
|
return {"success": False, "error": str(e), "file_path": file_path} |
|
|
|
|
|
|
|
|
class WriteFileTool(Tool): |
|
|
name: str = "write_file" |
|
|
description: str = "Write content to a file with special handling for different file types like PDFs" |
|
|
inputs: Dict[str, Dict[str, str]] = { |
|
|
"file_path": { |
|
|
"type": "string", |
|
|
"description": "Path to the file to write" |
|
|
}, |
|
|
"content": { |
|
|
"type": "string", |
|
|
"description": "Content to write to the file" |
|
|
} |
|
|
} |
|
|
required: Optional[List[str]] = ["file_path", "content"] |
|
|
|
|
|
def __init__(self, file_base: FileBase = None): |
|
|
super().__init__() |
|
|
self.file_base = file_base |
|
|
|
|
|
def __call__(self, file_path: str, content: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Write content to a file with special handling for different file types. |
|
|
|
|
|
Args: |
|
|
file_path (str): Path to the file to write |
|
|
content (str): Content to write to the file |
|
|
|
|
|
Returns: |
|
|
dict: A dictionary with the operation status |
|
|
""" |
|
|
try: |
|
|
file_ext = os.path.splitext(file_path)[1].lower() |
|
|
|
|
|
|
|
|
directory = os.path.dirname(file_path) |
|
|
if directory: |
|
|
os.makedirs(directory, exist_ok=True) |
|
|
|
|
|
|
|
|
if self.file_base and file_ext in self.file_base.get_file_handlers(): |
|
|
file_handlers = self.file_base.get_file_handlers() |
|
|
if 'write' in file_handlers[file_ext]: |
|
|
return file_handlers[file_ext]['write'](file_path, content) |
|
|
|
|
|
|
|
|
with open(file_path, 'w') as f: |
|
|
f.write(content) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"message": f"Content written to {file_path}", |
|
|
"file_path": file_path |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error writing to file {file_path}: {str(e)}") |
|
|
return {"success": False, "error": str(e), "file_path": file_path} |
|
|
|
|
|
|
|
|
class AppendFileTool(Tool): |
|
|
name: str = "append_file" |
|
|
description: str = "Append content to a file with special handling for different file types like PDFs" |
|
|
inputs: Dict[str, Dict[str, str]] = { |
|
|
"file_path": { |
|
|
"type": "string", |
|
|
"description": "Path to the file to append to" |
|
|
}, |
|
|
"content": { |
|
|
"type": "string", |
|
|
"description": "Content to append to the file" |
|
|
} |
|
|
} |
|
|
required: Optional[List[str]] = ["file_path", "content"] |
|
|
|
|
|
def __init__(self, file_base: FileBase = None): |
|
|
super().__init__() |
|
|
self.file_base = file_base |
|
|
|
|
|
def __call__(self, file_path: str, content: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Append content to a file with special handling for different file types. |
|
|
|
|
|
Args: |
|
|
file_path (str): Path to the file to append to |
|
|
content (str): Content to append to the file |
|
|
|
|
|
Returns: |
|
|
dict: A dictionary with the operation status |
|
|
""" |
|
|
file_ext = os.path.splitext(file_path)[1].lower() |
|
|
|
|
|
|
|
|
directory = os.path.dirname(file_path) |
|
|
if directory: |
|
|
os.makedirs(directory, exist_ok=True) |
|
|
|
|
|
|
|
|
if self.file_base and file_ext in self.file_base.get_file_handlers(): |
|
|
file_handlers = self.file_base.get_file_handlers() |
|
|
if 'append' in file_handlers[file_ext]: |
|
|
return file_handlers[file_ext]['append'](file_path, content) |
|
|
|
|
|
|
|
|
try: |
|
|
with open(file_path, 'a') as f: |
|
|
f.write(content) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"message": f"Content appended to {file_path}", |
|
|
"file_path": file_path |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error appending to file {file_path}: {str(e)}") |
|
|
return {"success": False, "error": str(e), "file_path": file_path} |
|
|
|
|
|
|
|
|
class FileToolkit(Toolkit): |
|
|
def __init__(self, name: str = "FileToolkit"): |
|
|
|
|
|
file_base = FileBase() |
|
|
|
|
|
|
|
|
tools = [ |
|
|
ReadFileTool(file_base=file_base), |
|
|
WriteFileTool(file_base=file_base), |
|
|
AppendFileTool(file_base=file_base) |
|
|
] |
|
|
|
|
|
|
|
|
super().__init__(name=name, tools=tools) |
|
|
|
|
|
|
|
|
self.file_base = file_base |
|
|
|
|
|
|
|
|
|
|
|
|