| from langchain_community.document_loaders import PyMuPDFLoader, TextLoader |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| from langchain_qdrant import QdrantVectorStore, FastEmbedSparse |
| from langchain_openai import OpenAIEmbeddings |
| from src import config |
| import hashlib |
| import os |
|
|
| def ingest_file(file_path: str): |
| """ |
| Document ingestion pipeline for GeneSeek. |
| Handles loading, chunking, embedding and upserting documents into the Qdrant vector store. |
| Duplicate ingestion is prevented via MD5 hash tracking. |
| """ |
| |
| with open(file_path, "rb") as f: |
| file_hash = hashlib.md5(f.read()).hexdigest() |
|
|
| hash_record_path = config.DATA_DIR / "ingested_hashes.txt" |
|
|
| if hash_record_path.exists(): |
| with open(hash_record_path, "r") as f: |
| known_hashes = f.read().splitlines() |
| if file_hash in known_hashes: |
| print("File already ingested. Skipping.") |
| return False |
|
|
| |
| print(f"Loading file: {file_path}...") |
| if file_path.endswith(".pdf"): |
| loader = PyMuPDFLoader(file_path) |
| else: |
| loader = TextLoader(file_path) |
|
|
| docs = loader.load() |
|
|
| def extract_metadata(text): |
| metadata = {} |
|
|
| for line in text.split("\n"): |
| if "Clinical Trial ID:" in line: |
| metadata["trial_id"] = line.split(":")[-1].strip() |
| elif "Study Title:" in line: |
| metadata["title"] = line.split(":")[-1].strip() |
| elif "Date:" in line: |
| metadata["date"] = line.split(":")[-1].strip() |
|
|
| return metadata |
|
|
| |
| for doc in docs: |
| extracted = extract_metadata(doc.page_content) |
| doc.metadata.update(extracted) |
|
|
| |
| text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=1000, |
| chunk_overlap=200, |
| separators=["\n\n", "\n", ".", " "] |
| ) |
| splits = text_splitter.split_documents(docs) |
| print(f"Split into {len(splits)} chunks.") |
|
|
| |
| dense_embeddings = OpenAIEmbeddings( |
| model=config.EMBEDDING_MODEL, |
| openai_api_key=config.OPENAI_API_KEY |
| ) |
| sparse_embeddings = FastEmbedSparse(model_name=config.SPARSE_MODEL) |
|
|
| |
| existing = [c.name for c in config.qdrant_client.get_collections().collections] |
|
|
| if config.COLLECTION_NAME not in existing: |
| print("Collection not found. Creating new collection...") |
| QdrantVectorStore.from_documents( |
| documents=splits, |
| embedding=dense_embeddings, |
| sparse_embedding=sparse_embeddings, |
| url=os.getenv("QDRANT_URL"), |
| api_key=os.getenv("QDRANT_API_KEY"), |
| collection_name=config.COLLECTION_NAME, |
| retrieval_mode="hybrid", |
| ) |
| else: |
| print("Collection exists. Appending documents...") |
| vector_store = QdrantVectorStore( |
| client=config.qdrant_client, |
| collection_name=config.COLLECTION_NAME, |
| embedding=dense_embeddings, |
| sparse_embedding=sparse_embeddings, |
| retrieval_mode="hybrid" |
| ) |
| vector_store.add_documents(splits) |
|
|
| |
| with open(hash_record_path, "a") as f: |
| f.write(file_hash + "\n") |
|
|
| print("Ingestion complete.") |
| return True |