panscience-api / data_ingetion /pre_processor.py
ashishbangwal's picture
init
f1d1d20
from docx import Document
from pypdf import PdfReader
import datetime
from langchain_text_splitters import RecursiveCharacterTextSplitter
from openai import OpenAI
import sqlite3
import json
import os
from io import BytesIO
from typing import List
from fastapi import UploadFile
splitter = RecursiveCharacterTextSplitter(chunk_size=225, chunk_overlap=64)
EMBEDDING_MODEL = "togethercomputer/m2-bert-80M-2k-retrieval"
api_key = (
os.getenv("TOGETHER_API")
or "0d1849365485f54f5deb32458276cb348948608da5a89dad0efc780c2d356916"
)
ai_client = OpenAI(api_key=api_key, base_url="https://api.together.xyz/v1")
# Setup/Initiate SQLite database for metadata store
conn = sqlite3.connect("metadata.db")
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS metadata_store (
id INTEGER PRIMARY KEY AUTOINCREMENT,
meta_dict TEXT NOT NULL
)
"""
)
conn.commit()
async def full_processor(files: List[UploadFile], user_tags: str, collection_name: str):
user_metadata = {"tags": user_tags}
file_names = [str(file.filename) for file in files]
file_types = [name.split(".")[-1] for name in file_names]
processed_docs = await processor(files, file_types)
chunks, metadata = create_chunks(processed_docs, file_names, user_metadata)
response = ai_client.embeddings.create(input=chunks, model=EMBEDDING_MODEL)
embeddings = [item.embedding for item in response.data]
write_metadata(file_names, file_types, user_tags, len(chunks), collection_name)
return (chunks, metadata, embeddings)
async def processor(files: List[UploadFile], file_types: List[str]) -> List[str]:
processed_docs = []
for idx, file_type in enumerate(file_types):
if file_type == "pdf":
processed_docs.append(await process_pdf(files[idx]))
elif file_type == "docx":
processed_docs.append(await process_docx(files[idx]))
return processed_docs
async def process_pdf(file: UploadFile) -> str:
data = await file.read()
doc = PdfReader(BytesIO(data))
text = ""
for page in doc.pages:
text += page.extract_text() or ""
return text
async def process_docx(file: UploadFile) -> str:
data = await file.read()
doc = Document(BytesIO(data))
text = ""
for para in doc.paragraphs:
text += para.text + "\n"
return text
def create_chunks(docs: List[str], file_names: List[str], user_metadata):
all_chunks = []
all_metadata = []
dateTime = datetime.datetime.now().strftime("%d/%m/%y-%I")
for idx, doc in enumerate(docs):
chunks = splitter.split_text(doc)
all_chunks += chunks
metadata = {
"file_name": file_names[idx],
"timestamp": dateTime,
**user_metadata,
}
all_metadata += [metadata] * len(chunks)
return all_chunks, all_metadata
def write_metadata(file_names, file_types, user_tags, total_chunks, collection_name):
time = datetime.datetime.now().strftime("%d/%m/%y-%I")
pdf, docx, unsupported = 0, 0, 0
for i in file_types:
if i == "pdf":
pdf += 1
elif i == "docx":
docx += 1
else:
unsupported += 1
metadata = {
"Collection": collection_name,
"FileName": file_names,
"TotalPDF": pdf,
"TotalDocx": docx,
"Unsupported": unsupported,
"CustomTag": user_tags,
"TotalChunks": total_chunks,
"Time": time,
}
cursor.execute(
"INSERT INTO metadata_store (meta_dict) VALUES (?)", (json.dumps(metadata),)
)
conn.commit()
def read_metadata():
cursor.execute("SELECT meta_dict FROM metadata_store")
rows = cursor.fetchall()
data = []
for row in rows:
meta = json.loads(row[0])
data.append(meta)
return data