Spaces:
No application file
No application file
| from docx import Document | |
| from pypdf import PdfReader | |
| import datetime | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from openai import OpenAI | |
| import sqlite3 | |
| import json | |
| import os | |
| from io import BytesIO | |
| from typing import List | |
| from fastapi import UploadFile | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=225, chunk_overlap=64) | |
| EMBEDDING_MODEL = "togethercomputer/m2-bert-80M-2k-retrieval" | |
| api_key = ( | |
| os.getenv("TOGETHER_API") | |
| or "0d1849365485f54f5deb32458276cb348948608da5a89dad0efc780c2d356916" | |
| ) | |
| ai_client = OpenAI(api_key=api_key, base_url="https://api.together.xyz/v1") | |
| # Setup/Initiate SQLite database for metadata store | |
| conn = sqlite3.connect("metadata.db") | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS metadata_store ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| meta_dict TEXT NOT NULL | |
| ) | |
| """ | |
| ) | |
| conn.commit() | |
| async def full_processor(files: List[UploadFile], user_tags: str, collection_name: str): | |
| user_metadata = {"tags": user_tags} | |
| file_names = [str(file.filename) for file in files] | |
| file_types = [name.split(".")[-1] for name in file_names] | |
| processed_docs = await processor(files, file_types) | |
| chunks, metadata = create_chunks(processed_docs, file_names, user_metadata) | |
| response = ai_client.embeddings.create(input=chunks, model=EMBEDDING_MODEL) | |
| embeddings = [item.embedding for item in response.data] | |
| write_metadata(file_names, file_types, user_tags, len(chunks), collection_name) | |
| return (chunks, metadata, embeddings) | |
| async def processor(files: List[UploadFile], file_types: List[str]) -> List[str]: | |
| processed_docs = [] | |
| for idx, file_type in enumerate(file_types): | |
| if file_type == "pdf": | |
| processed_docs.append(await process_pdf(files[idx])) | |
| elif file_type == "docx": | |
| processed_docs.append(await process_docx(files[idx])) | |
| return processed_docs | |
| async def process_pdf(file: UploadFile) -> str: | |
| data = await file.read() | |
| doc = PdfReader(BytesIO(data)) | |
| text = "" | |
| for page in doc.pages: | |
| text += page.extract_text() or "" | |
| return text | |
| async def process_docx(file: UploadFile) -> str: | |
| data = await file.read() | |
| doc = Document(BytesIO(data)) | |
| text = "" | |
| for para in doc.paragraphs: | |
| text += para.text + "\n" | |
| return text | |
| def create_chunks(docs: List[str], file_names: List[str], user_metadata): | |
| all_chunks = [] | |
| all_metadata = [] | |
| dateTime = datetime.datetime.now().strftime("%d/%m/%y-%I") | |
| for idx, doc in enumerate(docs): | |
| chunks = splitter.split_text(doc) | |
| all_chunks += chunks | |
| metadata = { | |
| "file_name": file_names[idx], | |
| "timestamp": dateTime, | |
| **user_metadata, | |
| } | |
| all_metadata += [metadata] * len(chunks) | |
| return all_chunks, all_metadata | |
| def write_metadata(file_names, file_types, user_tags, total_chunks, collection_name): | |
| time = datetime.datetime.now().strftime("%d/%m/%y-%I") | |
| pdf, docx, unsupported = 0, 0, 0 | |
| for i in file_types: | |
| if i == "pdf": | |
| pdf += 1 | |
| elif i == "docx": | |
| docx += 1 | |
| else: | |
| unsupported += 1 | |
| metadata = { | |
| "Collection": collection_name, | |
| "FileName": file_names, | |
| "TotalPDF": pdf, | |
| "TotalDocx": docx, | |
| "Unsupported": unsupported, | |
| "CustomTag": user_tags, | |
| "TotalChunks": total_chunks, | |
| "Time": time, | |
| } | |
| cursor.execute( | |
| "INSERT INTO metadata_store (meta_dict) VALUES (?)", (json.dumps(metadata),) | |
| ) | |
| conn.commit() | |
| def read_metadata(): | |
| cursor.execute("SELECT meta_dict FROM metadata_store") | |
| rows = cursor.fetchall() | |
| data = [] | |
| for row in rows: | |
| meta = json.loads(row[0]) | |
| data.append(meta) | |
| return data | |