| import pandas as pd
|
| from docx import Document as DocxDocument
|
| import csv
|
| import fitz
|
| import camelot
|
| from langchain.schema import Document
|
| from langchain.text_splitter import RecursiveCharacterTextSplitter
|
| import os
|
| from dotenv import load_dotenv
|
| load_dotenv()
|
| import warnings
|
| warnings.filterwarnings("ignore")
|
|
|
| openai_key = os.getenv("openai_key")
|
| os.environ["OPENAI_API_KEY"] = openai_key
|
|
|
| def extract_text_and_tables(docx_path):
|
| doc = DocxDocument(docx_path)
|
|
|
|
|
| text = "\n".join([para.text for para in doc.paragraphs if para.text.strip()])
|
|
|
|
|
| tables = []
|
| for table in doc.tables:
|
| table_data = []
|
| for row in table.rows:
|
| row_data = [cell.text.strip() for cell in row.cells]
|
| table_data.append(row_data)
|
| tables.append(Document(page_content=str(table_data), metadata={"source": docx_path}))
|
|
|
| return text, tables
|
|
|
|
|
| def read_excel(file_path):
|
| print(f"Reading Excel file: {file_path}")
|
| excel_data = pd.read_excel(file_path, sheet_name=None)
|
|
|
| text = []
|
| for sheet_name, df in excel_data.items():
|
| text.append(f"Sheet: {sheet_name}")
|
| for row in df.values:
|
| row_text = " | ".join(str(cell) for cell in row)
|
| text.append(row_text)
|
|
|
| return text
|
|
|
|
|
| def read_csv(file_path):
|
| print(f"Reading CSV file: {file_path}")
|
|
|
| text = []
|
| with open(file_path, mode='r') as file:
|
| reader = csv.reader(file)
|
| for row in reader:
|
| row_text = " | ".join(row)
|
| text.append(row_text)
|
|
|
| return text
|
|
|
|
|
| def extract_text(pdf_path):
|
| """Extracts text from a PDF file and returns it as a list of Document objects."""
|
| documents = []
|
| try:
|
| doc = fitz.open(pdf_path)
|
| for page_num, page in enumerate(doc, start=1):
|
| text = page.get_text("text").strip()
|
| if text:
|
| documents.append(Document(
|
| page_content=text,
|
| metadata={"source": pdf_path, "page": page_num}
|
| ))
|
| except Exception as e:
|
| print(f"❌ Error extracting text: {e}")
|
| return documents
|
|
|
|
|
| def extract_tables(pdf_path):
|
| """Extracts tables from a PDF using Camelot and returns them as Document objects."""
|
| table_documents = []
|
| try:
|
| tables = camelot.read_pdf(pdf_path, pages="all", flavor="stream")
|
|
|
| if tables.n == 0:
|
| print(f"⚠️ No tables found in {pdf_path}. Adding dummy data for testing.")
|
| return [Document(page_content="Dummy Table: No real data found", metadata={"source": pdf_path, "table_index": 0})]
|
|
|
| for i in range(tables.n):
|
| table_text = tables[i].df.to_string()
|
| table_documents.append(Document(
|
| page_content=table_text,
|
| metadata={"source": pdf_path, "table_index": i+1}
|
| ))
|
|
|
| except Exception as e:
|
| print(f"❌ Error extracting tables from {pdf_path}: {e}")
|
| return [Document(page_content="Dummy Table: Extraction error", metadata={"source": pdf_path, "table_index": -1})]
|
|
|
| return table_documents
|
|
|
|
|
| def chunk_table(documents, chunk_size=2):
|
| """Chunks table data row-wise from Document objects."""
|
| chunks = []
|
| for doc in documents:
|
| if isinstance(doc, Document):
|
| table_text = doc.page_content
|
|
|
| rows = table_text.split("\n")
|
| for i in range(0, len(rows), chunk_size):
|
| chunk = "\n".join(rows[i:i+chunk_size])
|
| chunks.append(Document(page_content=chunk, metadata=doc.metadata))
|
|
|
| return chunks
|
|
|
|
|
| def process_files(file, text_chunk_size=1000, chunk_overlap=40, table_chunk_size=2):
|
| text = []
|
| tables = []
|
|
|
|
|
| if file.endswith(".docx"):
|
| docx_text, docx_tables = extract_text_and_tables(file)
|
| text.append(docx_text)
|
| tables.extend(docx_tables)
|
|
|
|
|
| if file.endswith((".xlsx", ".xls")):
|
| excel_text = read_excel(file)
|
| text.extend(excel_text)
|
|
|
|
|
| if file.endswith(".csv"):
|
| csv_text = read_csv(file)
|
| text.extend(csv_text)
|
|
|
|
|
| if file.endswith(".pdf"):
|
| pdf_text_documents = extract_text(file)
|
| pdf_table_documents = extract_tables(file)
|
| text.extend([doc.page_content for doc in pdf_text_documents])
|
|
|
| if pdf_table_documents:
|
| tables.extend(pdf_table_documents)
|
| else:
|
| print(f"⚠️ No tables found in {file}, skipping table embeddings.")
|
|
|
|
|
| table_chunks = chunk_table(tables, chunk_size=table_chunk_size) if tables else []
|
|
|
|
|
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=text_chunk_size, chunk_overlap=chunk_overlap)
|
| text_chunks = text_splitter.split_documents([Document(page_content=t) for t in text]) if text else []
|
|
|
| combined_chunks = text_chunks + table_chunks
|
|
|
| return combined_chunks if combined_chunks else []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|