File size: 4,510 Bytes
5fffd14 6950cd1 5fffd14 6950cd1 5fffd14 6950cd1 5fffd14 6950cd1 5fffd14 6950cd1 5fffd14 6950cd1 5fffd14 6950cd1 5fffd14 f37acfa 6950cd1 f37acfa 6950cd1 f37acfa 6950cd1 f37acfa 5fffd14 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
import os
import fitz # PyMuPDF
import pandas as pd
from typing import List
import docx
class SimpleDocumentParser:
def __init__(self):
"""Initialize simple document parser for various file types"""
pass
def parse_document(self, file_path: str) -> List[str]:
"""Parse a document and return text chunks"""
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == '.pdf':
return self.parse_pdf(file_path)
elif file_ext == '.txt':
return self.parse_text(file_path)
elif file_ext == '.docx':
return self.parse_docx(file_path)
elif file_ext in ['.csv', '.xlsx', '.xls']:
return self.parse_tabular(file_path)
else:
return self.parse_text(file_path)
def parse_pdf(self, file_path: str) -> List[str]:
"""Parse PDF using PyMuPDF"""
chunks = []
try:
# Opening the PDF
doc = fitz.open(file_path)
# Extracting text from each page
for page_num in range(len(doc)):
page = doc.load_page(page_num)
text = page.get_text()
# Simple chunking by paragraphs
paragraphs = text.split('\n\n')
for para in paragraphs:
if len(para.strip()) > 0:
chunks.append(para.strip())
doc.close()
except Exception as e:
print(f"Error parsing PDF {file_path}: {e}")
chunks = [f"Error parsing PDF: {str(e)}"]
return chunks
def parse_text(self, file_path: str) -> List[str]:
"""Parse plain text file"""
chunks = []
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read()
# Splitting by paragraphs
paragraphs = text.split('\n\n')
for para in paragraphs:
if len(para.strip()) > 0:
chunks.append(para.strip())
except Exception as e:
print(f"Error parsing text file {file_path}: {e}")
chunks = [f"Error parsing text file: {str(e)}"]
return chunks
def parse_docx(self, file_path: str) -> List[str]:
"""Parse DOCX using python-docx"""
chunks = []
try:
doc = docx.Document(file_path)
# Extracting text from paragraphs
for para in doc.paragraphs:
if len(para.text.strip()) > 0:
chunks.append(para.text.strip())
except Exception as e:
print(f"Error parsing DOCX {file_path}: {e}")
chunks = [f"Error parsing DOCX: {str(e)}"]
return chunks
def parse_tabular(self, file_path: str) -> List[str]:
"""Parsing CSV or Excel files using pandas"""
chunks = []
try:
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == '.csv':
df = pd.read_csv(file_path)
else: # Excel files
df = pd.read_excel(file_path)
# Adding table summary
summary = f"Table with {len(df)} rows and {len(df.columns)} columns. "
summary += f"Columns: {', '.join(df.columns.tolist())}"
chunks.append(summary)
# Adding column descriptions with data types
col_types = df.dtypes.to_dict()
col_desc = "Column details:\n"
for col, dtype in col_types.items():
# Adding sample values for each column (first 3 unique values)
sample_values = df[col].dropna().unique()[:3]
sample_str = ", ".join([str(v) for v in sample_values])
col_desc += f"- {col} (Type: {dtype}): Sample values: {sample_str}\n"
chunks.append(col_desc)
# Converting each row to a text chunk (limit to first 50 rows for indexing)
for index, row in df.head(50).iterrows():
row_text = " | ".join([f"{col}: {val}" for col, val in row.items()])
chunks.append(row_text)
except Exception as e:
print(f"Error parsing tabular file {file_path}: {e}")
chunks = [f"Error parsing tabular file: {str(e)}"]
return chunks |