File size: 15,264 Bytes
0ccf2f0
55d584b
93b8585
55d584b
 
93b8585
55d584b
93b8585
55d584b
 
 
 
 
0ccf2f0
55d584b
93b8585
 
 
 
 
 
 
 
0ccf2f0
55d584b
 
 
 
 
93b8585
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55d584b
 
0ccf2f0
55d584b
 
 
 
 
93b8585
55d584b
 
 
 
 
 
93b8585
 
 
 
 
 
 
 
55d584b
 
 
93b8585
 
 
 
 
 
 
55d584b
2133289
55d584b
 
 
 
93b8585
 
 
 
 
 
 
 
 
 
 
 
 
 
2133289
0ccf2f0
55d584b
 
0ccf2f0
 
 
 
 
55d584b
 
0ccf2f0
 
55d584b
 
 
 
2133289
0ccf2f0
55d584b
 
 
 
 
0ccf2f0
55d584b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0ccf2f0
55d584b
 
 
 
 
 
 
 
 
 
 
2133289
55d584b
 
 
 
 
 
 
 
0ccf2f0
 
 
 
 
 
 
55d584b
2133289
0ccf2f0
55d584b
 
 
 
 
 
 
 
 
 
 
0ccf2f0
 
55d584b
 
 
 
 
 
0ccf2f0
 
 
55d584b
 
2133289
55d584b
 
2133289
55d584b
 
 
 
 
 
 
ec38897
55d584b
 
 
 
 
 
 
 
 
 
 
2133289
0ccf2f0
55d584b
 
 
 
 
 
 
 
 
 
 
 
 
0ccf2f0
2133289
55d584b
 
 
 
 
 
91d1f81
2133289
55d584b
 
 
 
 
0ccf2f0
 
2133289
55d584b
 
 
 
 
2133289
0ccf2f0
55d584b
 
 
 
 
 
 
 
 
 
 
2133289
55d584b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0ccf2f0
 
55d584b
 
 
 
 
0ccf2f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55d584b
 
 
0ccf2f0
2133289
55d584b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0ccf2f0
55d584b
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
"""Load Warbler pack data into the RetrievalAPI."""

import fnmatch
import json
import logging
import os
from pathlib import Path
from typing import List, Dict, Any, Optional

logger = logging.getLogger(__name__)


class PackLoader:
    """Load Warbler pack data into the system."""

    def __init__(
        self,
        packs_dir: Path = None,
        include_packs: Optional[List[str]] = None,
        exclude_packs: Optional[List[str]] = None,
        max_documents: Optional[int] = None,
        max_documents_per_pack: Optional[int] = None,
    ):
        """Initialize the pack loader."""
        if packs_dir is None:
            packs_dir = Path(__file__).parent.parent / "packs"

        self.packs_dir = Path(packs_dir)
        self.documents = []
        self.include_packs = include_packs or []
        self.exclude_packs = exclude_packs or []
        self.max_documents = max_documents
        self.max_documents_per_pack = max_documents_per_pack

    @classmethod
    def from_environment(cls, packs_dir: Path = None):
        """Create a loader configured from environment variables."""
        include_packs = cls._split_csv_env("WARBLER_INCLUDE_PACKS")
        exclude_packs = cls._split_csv_env("WARBLER_EXCLUDE_PACKS")
        max_documents = cls._parse_int_env("WARBLER_MAX_DOCUMENTS")
        max_documents_per_pack = cls._parse_int_env("WARBLER_MAX_DOCUMENTS_PER_PACK")

        if cls._is_hosted_environment():
            if not exclude_packs:
                exclude_packs = ["warbler-pack-hf-tinystories"]
            if max_documents is None:
                max_documents = 25000
            if max_documents_per_pack is None:
                max_documents_per_pack = 5000

        return cls(
            packs_dir=packs_dir,
            include_packs=include_packs,
            exclude_packs=exclude_packs,
            max_documents=max_documents,
            max_documents_per_pack=max_documents_per_pack,
        )

    @staticmethod
    def _is_hosted_environment() -> bool:
        """Detect whether the loader is running in a hosted environment."""
        hosted_flag = os.getenv("WARBLER_HOSTED_MODE", "").lower()
        return hosted_flag in {"1", "true", "yes", "on"} or bool(
            os.getenv("SPACE_ID") or os.getenv("HF_SPACE_ID")
        )

    @staticmethod
    def _split_csv_env(name: str) -> List[str]:
        """Parse a comma-separated environment variable into patterns."""
        raw_value = os.getenv(name, "")
        return [part.strip() for part in raw_value.split(",") if part.strip()]

    @staticmethod
    def _parse_int_env(name: str) -> Optional[int]:
        """Parse an integer environment variable if present."""
        raw_value = os.getenv(name)
        if raw_value is None or raw_value == "":
            return None

        try:
            return int(raw_value)
        except ValueError:
            logger.warning("Ignoring invalid integer for %s: %s", name, raw_value)
            return None

    def discover_documents(self) -> List[Dict[str, Any]]:
        """Discover all documents across all packs."""
        if not self.packs_dir.exists():
            logger.warning(f"Packs directory not found: {self.packs_dir}")
            return []

        documents = []
        remaining_documents = self.max_documents

        for pack_dir in sorted(self.packs_dir.iterdir()):
            if not pack_dir.is_dir():
                continue

            pack_name = pack_dir.name
            if not self._should_load_pack(pack_name):
                logger.info("Skipping pack due to loader policy: %s", pack_name)
                continue

            if remaining_documents is not None and remaining_documents <= 0:
                logger.info("Stopping pack discovery after reaching max_documents=%s", self.max_documents)
                break

            logger.info(f"Loading pack: {pack_name}")

            pack_docs = self._load_pack(pack_dir, pack_name)
            if self.max_documents_per_pack is not None:
                pack_docs = pack_docs[: self.max_documents_per_pack]

            if remaining_documents is not None:
                pack_docs = pack_docs[:remaining_documents]
                remaining_documents -= len(pack_docs)

            documents.extend(pack_docs)
            logger.info(f"✓ Loaded {len(pack_docs)} documents from {pack_name}")

        self.documents = documents
        return documents

    def _should_load_pack(self, pack_name: str) -> bool:
        """Check whether a pack should be included under the current loader policy."""
        if self.include_packs:
            included = any(fnmatch.fnmatch(pack_name, pattern) for pattern in self.include_packs)
            if not included:
                return False

        if self.exclude_packs:
            excluded = any(fnmatch.fnmatch(pack_name, pattern) for pattern in self.exclude_packs)
            if excluded:
                return False

        return True

    def _load_pack(self, pack_dir: Path, pack_name: str) -> List[Dict[str, Any]]:
        """Load documents from a specific pack."""
        documents = []

        jsonl_file = pack_dir / f"{pack_name}.jsonl"

        # Validate this is actually a Warbler pack before loading
        if self._is_valid_warbler_pack(pack_dir, pack_name, jsonl_file):
            docs = self._load_jsonl_pack(pack_dir, pack_name)
            documents.extend(docs)
        else:
            # Fall back to structured pack format
            docs = self._load_structured_pack(pack_dir, pack_name)
            documents.extend(docs)

        return documents

    def _is_valid_warbler_pack(self, pack_dir: Path, pack_name: str, jsonl_file: Path) -> bool:
        """Validate that a directory is a valid Warbler pack.

        A valid Warbler pack must have:
        1. Either:
           - A JSONL file matching the pack name (single-file pack), OR
           - Chunk files matching the pattern (chunked pack)
           - OR structured templates that can be converted to JSONL
        2. AND either:
           - A package.json metadata file, OR
           - The pack name starts with 'warbler-pack-hf-' (HuggingFace packs)
        """
        # Check for package.json metadata first
        package_json = pack_dir / "package.json"
        has_valid_metadata = False
        is_chunked = False

        if package_json.exists():
            try:
                with open(package_json, "r", encoding="utf-8") as f:
                    metadata = json.load(f)
                    # Validate it has required fields
                    if "name" in metadata and "version" in metadata:
                        has_valid_metadata = True
                        is_chunked = metadata.get("chunked", False)
            except (json.JSONDecodeError, IOError) as e:
                logger.warning(f"Invalid package.json in {pack_dir}: {e}")

        # Allow HuggingFace packs even without package.json (for backward compatibility)
        if pack_name.startswith("warbler-pack-hf-"):
            has_valid_metadata = True

        if not has_valid_metadata:
            return False

        # Check for appropriate JSONL files based on chunked status
        if is_chunked:
            # For chunked packs, look for chunk files
            chunk_files = list(pack_dir.glob(f"{pack_name}-chunk-*.jsonl"))
            if chunk_files:
                logger.debug(f"Found {len(chunk_files)} chunk files for {pack_name}")
                return True
            else:
                logger.warning(f"Chunked pack {pack_name} has no chunk files")
                return False
        else:
            # For single-file packs, check if JSONL file exists
            if jsonl_file.exists():
                return True
            # Check for structured pack templates that can be converted
            templates_file = pack_dir / "pack" / "templates.json"
            if templates_file.exists():
                logger.debug(f"Single-file pack {pack_name} missing JSONL, but templates.json exists")
                return True
            logger.warning(f"Single-file pack {pack_name} missing JSONL file: {jsonl_file}")
            return False

    def _load_jsonl_pack(self, pack_dir: Path, pack_name: str) -> List[Dict[str, Any]]:
        """Load JSONL-based pack (supports both single-file and chunked packs)."""
        documents = []

        # Check if this is a chunked pack by reading package.json
        package_json = pack_dir / "package.json"
        is_chunked = False

        if package_json.exists():
            try:
                with open(package_json, "r", encoding="utf-8") as f:
                    metadata = json.load(f)
                    is_chunked = metadata.get("chunked", False)
            except (json.JSONDecodeError, IOError) as err:
                logger.warning(f"Could not read package.json for {pack_name}: {err}")

        if is_chunked:
            # Load chunked pack
            logger.info(f"Loading chunked pack: {pack_name}")

            # Find all chunk files matching the pattern
            # Pattern is like "warbler-pack-hf-arxiv-chunk-*.jsonl"
            # We need to find files like "warbler-pack-hf-arxiv-chunk-001.jsonl", etc.
            chunk_files = sorted(pack_dir.glob(f"{pack_name}-chunk-*.jsonl"))

            if not chunk_files:
                logger.warning(f"No chunk files found for chunked pack {pack_name}")
                return documents

            logger.info(f"Found {len(chunk_files)} chunk files for {pack_name}")

            # Load each chunk file in order
            for chunk_file in chunk_files:
                logger.debug(f"Loading chunk: {chunk_file.name}")
                chunk_docs = self._load_jsonl_file(chunk_file, pack_name)
                documents.extend(chunk_docs)

            logger.info(f"Loaded {len(documents)} total documents from {len(chunk_files)} chunks")
        else:
            # Load single-file pack (backward compatibility)
            jsonl_file = pack_dir / f"{pack_name}.jsonl"
            if not jsonl_file.exists():
                logger.warning(f"JSONL file not found: {jsonl_file}")
                return documents

            documents = self._load_jsonl_file(jsonl_file, pack_name)

        return documents

    def _load_jsonl_file(self, jsonl_file: Path, pack_name: str) -> List[Dict[str, Any]]:
        """Load a single JSONL file with robust error handling."""
        documents = []
        error_count = 0
        max_errors_to_log = 5

        try:
            with open(jsonl_file, "r", encoding="utf-8") as f:
                for line_num, line in enumerate(f, 1):
                    if not line.strip():
                        continue

                    try:
                        entry = json.loads(line)
                        doc = self._format_document(
                            entry, pack_name, f"{jsonl_file.stem}_line_{line_num}"
                        )
                        documents.append(doc)
                    except json.JSONDecodeError as e:
                        error_count += 1
                        # Only log first few errors to avoid spam
                        if error_count <= max_errors_to_log:
                            logger.warning(
                                f"Error parsing line {line_num} in {jsonl_file.name}: {e}"
                            )
                        # Continue processing other lines instead of failing
                        continue

            if error_count > 0:
                logger.info(
                    f"Loaded {len(documents)} documents from {jsonl_file.name} "
                    f"({error_count} lines skipped due to errors)"
                )
        except Exception as e:
            logger.error(f"Error loading JSONL file {jsonl_file}: {e}")

        return documents

    def _load_structured_pack(self, pack_dir: Path, pack_name: str) -> List[Dict[str, Any]]:
        """Load structured pack with templates."""
        documents = []

        templates_file = pack_dir / "pack" / "templates.json"
        if not templates_file.exists():
            logger.debug(f"No templates.json found in {pack_dir}")
            return documents

        try:
            with open(templates_file, "r", encoding="utf-8") as f:
                data = json.load(f)

            templates = data if isinstance(data, list) else data.get("templates", [])

            for template in templates:
                doc = {
                    "id": f"{pack_name}/{template.get('id', 'unknown')}",
                    "content": template.get("content", json.dumps(template)),
                    "metadata": {
                        "pack": pack_name,
                        "type": "template",
                        "template_id": template.get("id"),
                        "realm_type": self._infer_realm(pack_name),
                        "realm_label": pack_name.replace("warbler-pack-", ""),
                        "lifecycle_stage": "peak",
                        "activity_level": 0.8,
                    },
                }
                documents.append(doc)
            
            self._generate_jsonl_from_templates(pack_dir, pack_name, documents)
        except Exception as e:
            logger.error(f"Error loading templates from {pack_name}: {e}")

        return documents

    def _generate_jsonl_from_templates(
        self, pack_dir: Path, pack_name: str, documents: List[Dict[str, Any]]
    ) -> None:
        """Generate JSONL file from templates for future loads."""
        try:
            jsonl_file = pack_dir / f"{pack_name}.jsonl"
            if jsonl_file.exists():
                return
            
            with open(jsonl_file, "w", encoding="utf-8") as f:
                for doc in documents:
                    f.write(json.dumps(doc, ensure_ascii=False) + "\n")
            
            logger.info(f"Generated JSONL file for {pack_name}: {jsonl_file.name}")
        except Exception as e:
            logger.debug(f"Could not generate JSONL for {pack_name}: {e}")

    def _format_document(
        self, entry: Dict[str, Any], pack_name: str, doc_id: str
    ) -> Dict[str, Any]:
        """Format a pack entry into a document."""
        content = entry.get("content") or entry.get("text") or json.dumps(entry)

        return {
            "id": f"{pack_name}/{doc_id}",
            "content": str(content),
            "metadata": {
                "pack": pack_name,
                "type": entry.get("type", "dialogue"),
                "realm_type": self._infer_realm(pack_name),
                "realm_label": pack_name.replace("warbler-pack-", ""),
                "lifecycle_stage": "emergence",
                "activity_level": 0.7,
                **{k: v for k, v in entry.items() if k not in ["content", "text"]},
            },
        }

    def _infer_realm(self, pack_name: str) -> str:
        """Infer realm type from pack name."""
        if "wisdom" in pack_name:
            return "wisdom"
        elif "faction" in pack_name or "politics" in pack_name:
            return "faction"
        elif "dialogue" in pack_name or "npc" in pack_name:
            return "narrative"
        else:
            return "narrative"