Spaces:
Sleeping
Sleeping
| from langchain_core.documents import Document | |
| from pathlib import Path | |
| from typing import Optional, List | |
| from datetime import datetime, date | |
| import uuid | |
| import yaml | |
| from app.services.text_splitter import TextSplitter | |
| import json | |
| # Allowed types for metadata cleaning | |
| ALLOWED = (str, int, float, bool, list, type(None)) | |
| def get_references_v2(docs, threshold: float): | |
| results = [] | |
| context = "" | |
| for doc in docs: | |
| _doc = doc.document | |
| _similarity = doc.fused_score | |
| # print(_similarity, threshold) | |
| if _similarity < threshold: | |
| continue | |
| metadata = _doc.metadata | |
| document = { | |
| "title": metadata.get("title", metadata.get("name", metadata.get("topic", "untitled"))), | |
| "chunk_index": metadata.get("chunk_index"), | |
| "source": metadata.get("source_file", metadata.get("source", "untitled")), | |
| "page_content": _doc.page_content, | |
| "similarity": _similarity | |
| } | |
| ctx = f"""{document['title']} page_content: {document['page_content']}, from source: {document['source']}.\n\n""" | |
| context += ctx | |
| results.append(document) | |
| return { | |
| "documents": results, | |
| "context": context | |
| } | |
| def get_references(docs, threshold: float): | |
| results = [] | |
| context = "" | |
| for doc in docs: | |
| _doc = doc[0] | |
| _similarity = 1 - doc[1] | |
| if _similarity < threshold: | |
| continue | |
| metadata = _doc.metadata | |
| document = { | |
| "title": metadata.get("title", metadata.get("name", metadata.get("topic", "untitled"))), | |
| "chunk_index": metadata.get("chunk_index"), | |
| "source": metadata.get("source_file", metadata.get("source", "untitled")), | |
| "page_content": _doc.page_content, | |
| "similarity": _similarity | |
| } | |
| ctx = f""" | |
| page_content: {document['page_content']}, from source: {document['source']}. | |
| """ | |
| context += ctx | |
| results.append(document) | |
| return { | |
| "documents": results, | |
| "context": context | |
| } | |
| def create_documents( | |
| chunks: List[str], | |
| filePath: Optional[Path] = None, | |
| built_in_metadata: Optional[dict] = {}, | |
| title: Optional[str] = None | |
| ) -> List[Document]: | |
| """ | |
| Create Document objects from text chunks with standard metadata (UUIDs, timestamps, indices). | |
| Works for both files (filePath provided) and raw text (filePath=None). | |
| """ | |
| if filePath and filePath.exists(): | |
| created_date = datetime.fromtimestamp(filePath.stat().st_ctime).isoformat() | |
| modified_date = datetime.fromtimestamp(filePath.stat().st_mtime).isoformat() | |
| source = filePath.name | |
| given_title = title or filePath.stem | |
| else: | |
| now = datetime.now().isoformat() | |
| created_date = now | |
| modified_date = now | |
| # Use existing source from metadata if available, else empty | |
| source = built_in_metadata.get("source", "") | |
| if not source and filePath: | |
| source = filePath.name | |
| given_title = title or built_in_metadata.get("title", "Untitled") | |
| docs = [] | |
| for i, chunk in enumerate(chunks): | |
| # Base metadata | |
| metadata = { | |
| "doc_id": str(uuid.uuid4()), # unique chunk id | |
| "source": source, | |
| "title": given_title, | |
| "created_date": created_date, | |
| "modified_date": modified_date, | |
| "chunk_index": i, | |
| } | |
| # Merge built-in, but don't overwrite our system fields if they exist | |
| # actually, built-in should probably take precedence for some things? | |
| # Let's simple merge: | |
| metadata.update(built_in_metadata) | |
| # Ensure our critical fields are set correctly after merge (if built-in had conflict) | |
| metadata["doc_id"] = metadata.get("doc_id", str(uuid.uuid4())) | |
| metadata["chunk_index"] = i | |
| doc = Document(page_content=chunk, metadata=metadata) | |
| docs.append(doc) | |
| return docs | |
| def create_document( | |
| text: str, | |
| metadata: dict | |
| ): | |
| return Document(page_content=text, metadata=metadata) | |
| def clean_metadata(metadata: dict): | |
| cleaned = {} | |
| for k, v in metadata.items(): | |
| if isinstance(v, (datetime, date)): | |
| cleaned[k] = v.isoformat() | |
| elif isinstance(v, ALLOWED): | |
| cleaned[k] = v | |
| else: | |
| cleaned[k] = str(v) | |
| return cleaned | |
| def read_text_file(filePath: Path): | |
| with open(filePath, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| return content | |
| def read_json_file(filePath: Path): | |
| with open(filePath, 'r') as file: | |
| data = json.load(file) | |
| return data | |
| def build_metadata(filePath: Optional[Path] = None, content: Optional[str] = None): | |
| if filePath: | |
| content = read_text_file(filePath) | |
| parts = content.split("---", 2) | |
| if len(parts) >= 3: | |
| frontmatter = yaml.safe_load(parts[1]) or {} | |
| frontmatter = clean_metadata(frontmatter) | |
| # add file name as source always | |
| if filePath: | |
| frontmatter["source"] = filePath.name | |
| elif "source" not in frontmatter: | |
| frontmatter["source"] = "" | |
| return { | |
| "metadata": frontmatter, | |
| "content": parts[2].strip() | |
| } | |
| else: | |
| # Don't enforce empty source if not provided, allows external metadata to stick | |
| meta = {} | |
| if filePath: | |
| meta["source"] = filePath.name | |
| return { | |
| "metadata": meta, | |
| "content": content.strip() | |
| } | |
| def create_documents_from_text(text: str, metadata: dict = {}): | |
| """ | |
| Create documents from raw text with automatic splitting and metadata enrichment. | |
| """ | |
| text = text.strip() | |
| data = build_metadata(content=text) | |
| # 1. Smart Metadata Merge | |
| final_metadata = data["metadata"].copy() | |
| # Update with provided metadata | |
| if final_metadata.get("source") == "" and metadata.get("source"): | |
| final_metadata["source"] = metadata["source"] | |
| # Merge regular keys | |
| final_metadata.update({k:v for k,v in metadata.items() if k != "source"}) | |
| text = data["content"] | |
| # 2. Split text into chunks (strings) | |
| # Use section-aware splitter if text contains markdown section delimiters | |
| if "\n---\n" in text or text.startswith("---\n"): | |
| splitter = TextSplitter.for_markdown_with_sections() | |
| else: | |
| splitter = TextSplitter() | |
| chunks = splitter.split_text(text) | |
| # 3. Create documents using standard helper (adds IDs, indices, dates) | |
| return create_documents( | |
| chunks=chunks, | |
| filePath=None, | |
| built_in_metadata=final_metadata | |
| ) | |
| def load_json(filePath: Path): | |
| data = read_json_file(filePath=filePath) | |
| filePath = Path(filePath) | |
| file_name = filePath.name | |
| metadata = { | |
| "id": data["id"], | |
| "title": data.get("name", data.get("title", "Untitled")), | |
| "source": data["source"], | |
| "source_file": file_name or "Untitled", | |
| "created_date": datetime.now().isoformat() | |
| } | |
| docs= [] | |
| splitter = TextSplitter() | |
| for key,value in data["content"].items(): | |
| ctx = splitter.split_text(value.strip()) | |
| for idx, chunk in enumerate(ctx): | |
| if(chunk.strip() == ""): | |
| continue | |
| else: | |
| chunk = f"{key}: {chunk.strip()}" | |
| docs.append(Document(page_content=chunk, metadata={**metadata, "topic": key, "chunk_index": idx})) | |
| return docs | |