Spaces:
Running on Zero
Running on Zero
File size: 15,264 Bytes
0ccf2f0 55d584b 93b8585 55d584b 93b8585 55d584b 93b8585 55d584b 0ccf2f0 55d584b 93b8585 0ccf2f0 55d584b 93b8585 55d584b 0ccf2f0 55d584b 93b8585 55d584b 93b8585 55d584b 93b8585 55d584b 2133289 55d584b 93b8585 2133289 0ccf2f0 55d584b 0ccf2f0 55d584b 0ccf2f0 55d584b 2133289 0ccf2f0 55d584b 0ccf2f0 55d584b 0ccf2f0 55d584b 2133289 55d584b 0ccf2f0 55d584b 2133289 0ccf2f0 55d584b 0ccf2f0 55d584b 0ccf2f0 55d584b 2133289 55d584b 2133289 55d584b ec38897 55d584b 2133289 0ccf2f0 55d584b 0ccf2f0 2133289 55d584b 91d1f81 2133289 55d584b 0ccf2f0 2133289 55d584b 2133289 0ccf2f0 55d584b 2133289 55d584b 0ccf2f0 55d584b 0ccf2f0 55d584b 0ccf2f0 2133289 55d584b 0ccf2f0 55d584b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 | """Load Warbler pack data into the RetrievalAPI."""
import fnmatch
import json
import logging
import os
from pathlib import Path
from typing import List, Dict, Any, Optional
logger = logging.getLogger(__name__)
class PackLoader:
"""Load Warbler pack data into the system."""
def __init__(
self,
packs_dir: Path = None,
include_packs: Optional[List[str]] = None,
exclude_packs: Optional[List[str]] = None,
max_documents: Optional[int] = None,
max_documents_per_pack: Optional[int] = None,
):
"""Initialize the pack loader."""
if packs_dir is None:
packs_dir = Path(__file__).parent.parent / "packs"
self.packs_dir = Path(packs_dir)
self.documents = []
self.include_packs = include_packs or []
self.exclude_packs = exclude_packs or []
self.max_documents = max_documents
self.max_documents_per_pack = max_documents_per_pack
@classmethod
def from_environment(cls, packs_dir: Path = None):
"""Create a loader configured from environment variables."""
include_packs = cls._split_csv_env("WARBLER_INCLUDE_PACKS")
exclude_packs = cls._split_csv_env("WARBLER_EXCLUDE_PACKS")
max_documents = cls._parse_int_env("WARBLER_MAX_DOCUMENTS")
max_documents_per_pack = cls._parse_int_env("WARBLER_MAX_DOCUMENTS_PER_PACK")
if cls._is_hosted_environment():
if not exclude_packs:
exclude_packs = ["warbler-pack-hf-tinystories"]
if max_documents is None:
max_documents = 25000
if max_documents_per_pack is None:
max_documents_per_pack = 5000
return cls(
packs_dir=packs_dir,
include_packs=include_packs,
exclude_packs=exclude_packs,
max_documents=max_documents,
max_documents_per_pack=max_documents_per_pack,
)
@staticmethod
def _is_hosted_environment() -> bool:
"""Detect whether the loader is running in a hosted environment."""
hosted_flag = os.getenv("WARBLER_HOSTED_MODE", "").lower()
return hosted_flag in {"1", "true", "yes", "on"} or bool(
os.getenv("SPACE_ID") or os.getenv("HF_SPACE_ID")
)
@staticmethod
def _split_csv_env(name: str) -> List[str]:
"""Parse a comma-separated environment variable into patterns."""
raw_value = os.getenv(name, "")
return [part.strip() for part in raw_value.split(",") if part.strip()]
@staticmethod
def _parse_int_env(name: str) -> Optional[int]:
"""Parse an integer environment variable if present."""
raw_value = os.getenv(name)
if raw_value is None or raw_value == "":
return None
try:
return int(raw_value)
except ValueError:
logger.warning("Ignoring invalid integer for %s: %s", name, raw_value)
return None
def discover_documents(self) -> List[Dict[str, Any]]:
"""Discover all documents across all packs."""
if not self.packs_dir.exists():
logger.warning(f"Packs directory not found: {self.packs_dir}")
return []
documents = []
remaining_documents = self.max_documents
for pack_dir in sorted(self.packs_dir.iterdir()):
if not pack_dir.is_dir():
continue
pack_name = pack_dir.name
if not self._should_load_pack(pack_name):
logger.info("Skipping pack due to loader policy: %s", pack_name)
continue
if remaining_documents is not None and remaining_documents <= 0:
logger.info("Stopping pack discovery after reaching max_documents=%s", self.max_documents)
break
logger.info(f"Loading pack: {pack_name}")
pack_docs = self._load_pack(pack_dir, pack_name)
if self.max_documents_per_pack is not None:
pack_docs = pack_docs[: self.max_documents_per_pack]
if remaining_documents is not None:
pack_docs = pack_docs[:remaining_documents]
remaining_documents -= len(pack_docs)
documents.extend(pack_docs)
logger.info(f"✓ Loaded {len(pack_docs)} documents from {pack_name}")
self.documents = documents
return documents
def _should_load_pack(self, pack_name: str) -> bool:
"""Check whether a pack should be included under the current loader policy."""
if self.include_packs:
included = any(fnmatch.fnmatch(pack_name, pattern) for pattern in self.include_packs)
if not included:
return False
if self.exclude_packs:
excluded = any(fnmatch.fnmatch(pack_name, pattern) for pattern in self.exclude_packs)
if excluded:
return False
return True
def _load_pack(self, pack_dir: Path, pack_name: str) -> List[Dict[str, Any]]:
"""Load documents from a specific pack."""
documents = []
jsonl_file = pack_dir / f"{pack_name}.jsonl"
# Validate this is actually a Warbler pack before loading
if self._is_valid_warbler_pack(pack_dir, pack_name, jsonl_file):
docs = self._load_jsonl_pack(pack_dir, pack_name)
documents.extend(docs)
else:
# Fall back to structured pack format
docs = self._load_structured_pack(pack_dir, pack_name)
documents.extend(docs)
return documents
def _is_valid_warbler_pack(self, pack_dir: Path, pack_name: str, jsonl_file: Path) -> bool:
"""Validate that a directory is a valid Warbler pack.
A valid Warbler pack must have:
1. Either:
- A JSONL file matching the pack name (single-file pack), OR
- Chunk files matching the pattern (chunked pack)
- OR structured templates that can be converted to JSONL
2. AND either:
- A package.json metadata file, OR
- The pack name starts with 'warbler-pack-hf-' (HuggingFace packs)
"""
# Check for package.json metadata first
package_json = pack_dir / "package.json"
has_valid_metadata = False
is_chunked = False
if package_json.exists():
try:
with open(package_json, "r", encoding="utf-8") as f:
metadata = json.load(f)
# Validate it has required fields
if "name" in metadata and "version" in metadata:
has_valid_metadata = True
is_chunked = metadata.get("chunked", False)
except (json.JSONDecodeError, IOError) as e:
logger.warning(f"Invalid package.json in {pack_dir}: {e}")
# Allow HuggingFace packs even without package.json (for backward compatibility)
if pack_name.startswith("warbler-pack-hf-"):
has_valid_metadata = True
if not has_valid_metadata:
return False
# Check for appropriate JSONL files based on chunked status
if is_chunked:
# For chunked packs, look for chunk files
chunk_files = list(pack_dir.glob(f"{pack_name}-chunk-*.jsonl"))
if chunk_files:
logger.debug(f"Found {len(chunk_files)} chunk files for {pack_name}")
return True
else:
logger.warning(f"Chunked pack {pack_name} has no chunk files")
return False
else:
# For single-file packs, check if JSONL file exists
if jsonl_file.exists():
return True
# Check for structured pack templates that can be converted
templates_file = pack_dir / "pack" / "templates.json"
if templates_file.exists():
logger.debug(f"Single-file pack {pack_name} missing JSONL, but templates.json exists")
return True
logger.warning(f"Single-file pack {pack_name} missing JSONL file: {jsonl_file}")
return False
def _load_jsonl_pack(self, pack_dir: Path, pack_name: str) -> List[Dict[str, Any]]:
"""Load JSONL-based pack (supports both single-file and chunked packs)."""
documents = []
# Check if this is a chunked pack by reading package.json
package_json = pack_dir / "package.json"
is_chunked = False
if package_json.exists():
try:
with open(package_json, "r", encoding="utf-8") as f:
metadata = json.load(f)
is_chunked = metadata.get("chunked", False)
except (json.JSONDecodeError, IOError) as err:
logger.warning(f"Could not read package.json for {pack_name}: {err}")
if is_chunked:
# Load chunked pack
logger.info(f"Loading chunked pack: {pack_name}")
# Find all chunk files matching the pattern
# Pattern is like "warbler-pack-hf-arxiv-chunk-*.jsonl"
# We need to find files like "warbler-pack-hf-arxiv-chunk-001.jsonl", etc.
chunk_files = sorted(pack_dir.glob(f"{pack_name}-chunk-*.jsonl"))
if not chunk_files:
logger.warning(f"No chunk files found for chunked pack {pack_name}")
return documents
logger.info(f"Found {len(chunk_files)} chunk files for {pack_name}")
# Load each chunk file in order
for chunk_file in chunk_files:
logger.debug(f"Loading chunk: {chunk_file.name}")
chunk_docs = self._load_jsonl_file(chunk_file, pack_name)
documents.extend(chunk_docs)
logger.info(f"Loaded {len(documents)} total documents from {len(chunk_files)} chunks")
else:
# Load single-file pack (backward compatibility)
jsonl_file = pack_dir / f"{pack_name}.jsonl"
if not jsonl_file.exists():
logger.warning(f"JSONL file not found: {jsonl_file}")
return documents
documents = self._load_jsonl_file(jsonl_file, pack_name)
return documents
def _load_jsonl_file(self, jsonl_file: Path, pack_name: str) -> List[Dict[str, Any]]:
"""Load a single JSONL file with robust error handling."""
documents = []
error_count = 0
max_errors_to_log = 5
try:
with open(jsonl_file, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
if not line.strip():
continue
try:
entry = json.loads(line)
doc = self._format_document(
entry, pack_name, f"{jsonl_file.stem}_line_{line_num}"
)
documents.append(doc)
except json.JSONDecodeError as e:
error_count += 1
# Only log first few errors to avoid spam
if error_count <= max_errors_to_log:
logger.warning(
f"Error parsing line {line_num} in {jsonl_file.name}: {e}"
)
# Continue processing other lines instead of failing
continue
if error_count > 0:
logger.info(
f"Loaded {len(documents)} documents from {jsonl_file.name} "
f"({error_count} lines skipped due to errors)"
)
except Exception as e:
logger.error(f"Error loading JSONL file {jsonl_file}: {e}")
return documents
def _load_structured_pack(self, pack_dir: Path, pack_name: str) -> List[Dict[str, Any]]:
"""Load structured pack with templates."""
documents = []
templates_file = pack_dir / "pack" / "templates.json"
if not templates_file.exists():
logger.debug(f"No templates.json found in {pack_dir}")
return documents
try:
with open(templates_file, "r", encoding="utf-8") as f:
data = json.load(f)
templates = data if isinstance(data, list) else data.get("templates", [])
for template in templates:
doc = {
"id": f"{pack_name}/{template.get('id', 'unknown')}",
"content": template.get("content", json.dumps(template)),
"metadata": {
"pack": pack_name,
"type": "template",
"template_id": template.get("id"),
"realm_type": self._infer_realm(pack_name),
"realm_label": pack_name.replace("warbler-pack-", ""),
"lifecycle_stage": "peak",
"activity_level": 0.8,
},
}
documents.append(doc)
self._generate_jsonl_from_templates(pack_dir, pack_name, documents)
except Exception as e:
logger.error(f"Error loading templates from {pack_name}: {e}")
return documents
def _generate_jsonl_from_templates(
self, pack_dir: Path, pack_name: str, documents: List[Dict[str, Any]]
) -> None:
"""Generate JSONL file from templates for future loads."""
try:
jsonl_file = pack_dir / f"{pack_name}.jsonl"
if jsonl_file.exists():
return
with open(jsonl_file, "w", encoding="utf-8") as f:
for doc in documents:
f.write(json.dumps(doc, ensure_ascii=False) + "\n")
logger.info(f"Generated JSONL file for {pack_name}: {jsonl_file.name}")
except Exception as e:
logger.debug(f"Could not generate JSONL for {pack_name}: {e}")
def _format_document(
self, entry: Dict[str, Any], pack_name: str, doc_id: str
) -> Dict[str, Any]:
"""Format a pack entry into a document."""
content = entry.get("content") or entry.get("text") or json.dumps(entry)
return {
"id": f"{pack_name}/{doc_id}",
"content": str(content),
"metadata": {
"pack": pack_name,
"type": entry.get("type", "dialogue"),
"realm_type": self._infer_realm(pack_name),
"realm_label": pack_name.replace("warbler-pack-", ""),
"lifecycle_stage": "emergence",
"activity_level": 0.7,
**{k: v for k, v in entry.items() if k not in ["content", "text"]},
},
}
def _infer_realm(self, pack_name: str) -> str:
"""Infer realm type from pack name."""
if "wisdom" in pack_name:
return "wisdom"
elif "faction" in pack_name or "politics" in pack_name:
return "faction"
elif "dialogue" in pack_name or "npc" in pack_name:
return "narrative"
else:
return "narrative"
|