Testing / data_extraction.py
purajith's picture
Upload 5 file
f5f1a85 verified
raw
history blame
6.52 kB
import pandas as pd
from docx import Document as DocxDocument # Avoids conflict with langchain's Document
import csv
import fitz # PyMuPDF for text extraction
import camelot # Table extraction
from langchain.schema import Document # Structured document format
from langchain.text_splitter import RecursiveCharacterTextSplitter
import os
from dotenv import load_dotenv
load_dotenv()
import warnings
warnings.filterwarnings("ignore")
# Ensure the API key is properly set
openai_key = os.getenv("openai_key")
os.environ["OPENAI_API_KEY"] = openai_key # Ensure 'openai_key' is defined
# Function to read and process .docx files
def extract_text_and_tables(docx_path):
doc = DocxDocument(docx_path) # Use renamed import to avoid conflict
# Extract text
text = "\n".join([para.text for para in doc.paragraphs if para.text.strip()])
# Extract tables
tables = []
for table in doc.tables:
table_data = []
for row in table.rows:
row_data = [cell.text.strip() for cell in row.cells]
table_data.append(row_data)
tables.append(Document(page_content=str(table_data), metadata={"source": docx_path})) # Store as Document object
return text, tables
# Function to read and process .xlsx (Excel) files
def read_excel(file_path):
print(f"Reading Excel file: {file_path}")
excel_data = pd.read_excel(file_path, sheet_name=None)
text = []
for sheet_name, df in excel_data.items():
text.append(f"Sheet: {sheet_name}")
for row in df.values:
row_text = " | ".join(str(cell) for cell in row)
text.append(row_text)
return text
# Function to read and process .csv files
def read_csv(file_path):
print(f"Reading CSV file: {file_path}")
text = []
with open(file_path, mode='r') as file:
reader = csv.reader(file)
for row in reader:
row_text = " | ".join(row)
text.append(row_text)
return text
# Function to extract text from PDFs
def extract_text(pdf_path):
"""Extracts text from a PDF file and returns it as a list of Document objects."""
documents = []
try:
doc = fitz.open(pdf_path)
for page_num, page in enumerate(doc, start=1):
text = page.get_text("text").strip()
if text:
documents.append(Document(
page_content=text,
metadata={"source": pdf_path, "page": page_num}
))
except Exception as e:
print(f"❌ Error extracting text: {e}")
return documents
# Function to extract tables from PDFs
def extract_tables(pdf_path):
"""Extracts tables from a PDF using Camelot and returns them as Document objects."""
table_documents = []
try:
tables = camelot.read_pdf(pdf_path, pages="all", flavor="stream")
if tables.n == 0:
print(f"⚠️ No tables found in {pdf_path}. Adding dummy data for testing.")
return [Document(page_content="Dummy Table: No real data found", metadata={"source": pdf_path, "table_index": 0})]
for i in range(tables.n):
table_text = tables[i].df.to_string()
table_documents.append(Document(
page_content=table_text,
metadata={"source": pdf_path, "table_index": i+1}
))
except Exception as e:
print(f"❌ Error extracting tables from {pdf_path}: {e}")
return [Document(page_content="Dummy Table: Extraction error", metadata={"source": pdf_path, "table_index": -1})]
return table_documents
# Function to chunk tables (for docx and pdf)
def chunk_table(documents, chunk_size=2):
"""Chunks table data row-wise from Document objects."""
chunks = []
for doc in documents:
if isinstance(doc, Document): # Ensure it's a Document object
table_text = doc.page_content # Extract the actual text
rows = table_text.split("\n") # Split into rows
for i in range(0, len(rows), chunk_size):
chunk = "\n".join(rows[i:i+chunk_size]) # Group rows
chunks.append(Document(page_content=chunk, metadata=doc.metadata)) # Preserve metadata
return chunks
# Function to process .docx, .xlsx, .csv, and PDF files
def process_files(file, text_chunk_size=1000, chunk_overlap=40, table_chunk_size=2):
text = []
tables = []
# Process .docx file
if file.endswith(".docx"):
docx_text, docx_tables = extract_text_and_tables(file)
text.append(docx_text)
tables.extend(docx_tables)
# Process .xlsx file
if file.endswith((".xlsx", ".xls")):
excel_text = read_excel(file)
text.extend(excel_text)
# Process .csv file
if file.endswith(".csv"):
csv_text = read_csv(file)
text.extend(csv_text)
# Process PDF file
if file.endswith(".pdf"):
pdf_text_documents = extract_text(file)
pdf_table_documents = extract_tables(file)
text.extend([doc.page_content for doc in pdf_text_documents])
if pdf_table_documents: # Only add tables if they exist
tables.extend(pdf_table_documents)
else:
print(f"⚠️ No tables found in {file}, skipping table embeddings.")
# Chunk the tables **only if tables exist**
table_chunks = chunk_table(tables, chunk_size=table_chunk_size) if tables else []
# Chunk the text
text_splitter = RecursiveCharacterTextSplitter(chunk_size=text_chunk_size, chunk_overlap=chunk_overlap)
text_chunks = text_splitter.split_documents([Document(page_content=t) for t in text]) if text else []
combined_chunks = text_chunks + table_chunks
return combined_chunks if combined_chunks else [] # Ensure no empty embeddings
# Function to process multiple files
# def data_processing(file_paths):
# all_combined_chunks = {}
# for file in file_paths:
# print(f"Processing file: {file.split('/')[-1]}")
# combined_chunks = process_files(file)
# all_combined_chunks[file] = combined_chunks
# return all_combined_chunks
# # Example usage
# file_paths = ["/content/Acceptable Use Policy.docx","/content/RiskAnalysisGuide.pdf"]
# all_combined_chunks = data_processing(file_paths)