Spaces:
Paused
Paused
File size: 7,782 Bytes
8d1819a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
import glob
import os
import hashlib
from typing import Any, Dict, Literal, TypedDict
from langchain_community.document_loaders import (
CSVLoader,
PyPDFLoader,
TextLoader,
UnstructuredHTMLLoader,
)
from python.helpers.log import LogItem
from python.helpers.print_style import PrintStyle
text_loader_kwargs = {"autodetect_encoding": True}
class KnowledgeImport(TypedDict):
file: str
checksum: str
ids: list[str]
state: Literal["changed", "original", "removed"]
documents: list[Any]
def calculate_checksum(file_path: str) -> str:
hasher = hashlib.md5()
with open(file_path, "rb") as f:
buf = f.read()
hasher.update(buf)
return hasher.hexdigest()
def load_knowledge(
log_item: LogItem | None,
knowledge_dir: str,
index: Dict[str, KnowledgeImport],
metadata: dict[str, Any] = {},
filename_pattern: str = "**/*",
recursive: bool = True,
) -> Dict[str, KnowledgeImport]:
"""
Load knowledge files from a directory with change detection and metadata enhancement.
This function now includes enhanced error handling and compatibility with the
intelligent memory consolidation system.
"""
# Mapping file extensions to corresponding loader classes
# Note: Using TextLoader for JSON and MD to avoid parsing issues with consolidation
file_types_loaders = {
"txt": TextLoader,
"pdf": PyPDFLoader,
"csv": CSVLoader,
"html": UnstructuredHTMLLoader,
"json": TextLoader, # Use TextLoader for better consolidation compatibility
"md": TextLoader, # Use TextLoader for better consolidation compatibility
}
cnt_files = 0
cnt_docs = 0
# Validate and create knowledge directory if needed
if not knowledge_dir:
if log_item:
log_item.stream(progress="\nNo knowledge directory specified")
PrintStyle(font_color="yellow").print("No knowledge directory specified")
return index
if not os.path.exists(knowledge_dir):
try:
os.makedirs(knowledge_dir, exist_ok=True)
# Verify the directory was actually created and is accessible
if not os.path.exists(knowledge_dir) or not os.access(knowledge_dir, os.R_OK):
error_msg = f"Knowledge directory {knowledge_dir} was created but is not accessible"
if log_item:
log_item.stream(progress=f"\n{error_msg}")
PrintStyle(font_color="red").print(error_msg)
return index
if log_item:
log_item.stream(progress=f"\nCreated knowledge directory: {knowledge_dir}")
PrintStyle(font_color="green").print(f"Created knowledge directory: {knowledge_dir}")
except (OSError, PermissionError) as e:
error_msg = f"Failed to create knowledge directory {knowledge_dir}: {e}"
if log_item:
log_item.stream(progress=f"\n{error_msg}")
PrintStyle(font_color="red").print(error_msg)
return index
# Final accessibility check for existing directories
if not os.access(knowledge_dir, os.R_OK):
error_msg = f"Knowledge directory {knowledge_dir} exists but is not readable"
if log_item:
log_item.stream(progress=f"\n{error_msg}")
PrintStyle(font_color="red").print(error_msg)
return index
# Fetch all files in the directory with specified extensions
try:
kn_files = glob.glob(os.path.join(knowledge_dir, filename_pattern), recursive=recursive)
kn_files = [f for f in kn_files if os.path.isfile(f) and not os.path.basename(f).startswith('.')]
except Exception as e:
PrintStyle(font_color="red").print(f"Error scanning knowledge directory {knowledge_dir}: {e}")
if log_item:
log_item.stream(progress=f"\nError scanning directory: {e}")
return index
if kn_files:
PrintStyle.standard(
f"Found {len(kn_files)} knowledge files in {knowledge_dir}, processing..."
)
if log_item:
log_item.stream(
progress=f"\nFound {len(kn_files)} knowledge files in {knowledge_dir}, processing...",
)
for file_path in kn_files:
try:
# Get file extension safely
file_parts = os.path.basename(file_path).split('.')
if len(file_parts) < 2:
continue # Skip files without extensions
ext = file_parts[-1].lower()
if ext not in file_types_loaders:
continue # Skip unsupported file types
checksum = calculate_checksum(file_path)
if not checksum:
continue # Skip files with checksum errors
file_key = file_path
# Load existing data from the index or create a new entry
file_data: KnowledgeImport = index.get(file_key, {
"file": file_key,
"checksum": "",
"ids": [],
"state": "changed",
"documents": []
})
# Check if file has changed
if file_data.get("checksum") == checksum:
file_data["state"] = "original"
else:
file_data["state"] = "changed"
# Process changed files
if file_data["state"] == "changed":
file_data["checksum"] = checksum
loader_cls = file_types_loaders[ext]
try:
loader = loader_cls(
file_path,
**(
text_loader_kwargs
if ext in ["txt", "csv", "html", "md"]
else {}
),
)
documents = loader.load_and_split()
# Enhanced metadata for better consolidation compatibility
enhanced_metadata = {
**metadata,
"source_file": os.path.basename(file_path),
"source_path": file_path,
"file_type": ext,
"knowledge_source": True, # Flag to distinguish from conversation memories
"import_timestamp": None, # Will be set when inserted into memory
}
# Apply metadata to all documents
for doc in documents:
doc.metadata = {**doc.metadata, **enhanced_metadata}
file_data["documents"] = documents
cnt_files += 1
cnt_docs += len(documents)
except Exception as e:
PrintStyle(font_color="red").print(f"Error loading {file_path}: {e}")
if log_item:
log_item.stream(progress=f"\nError loading {os.path.basename(file_path)}: {e}")
continue
# Update the index
index[file_key] = file_data
except Exception as e:
PrintStyle(font_color="red").print(f"Error processing {file_path}: {e}")
continue
# Mark removed files
current_files = set(kn_files)
for file_key, file_data in list(index.items()):
if file_key not in current_files and not file_data.get("state"):
index[file_key]["state"] = "removed"
# Log results
if cnt_files > 0 or cnt_docs > 0:
PrintStyle.standard(f"Processed {cnt_docs} documents from {cnt_files} files.")
if log_item:
log_item.stream(
progress=f"\nProcessed {cnt_docs} documents from {cnt_files} files."
)
return index
|