File size: 5,716 Bytes
88bdcff
 
 
 
 
 
 
 
 
 
 
0699c5f
88bdcff
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0699c5f
88bdcff
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""Index builder for FDAM RAG knowledge base.

Processes markdown documents from RAG-KB/ and indexes them in ChromaDB.

Usage:
    python -m rag.index_builder [--rebuild]
"""

import argparse
from pathlib import Path

from rag.chunker import SemanticChunker
from rag.vectorstore import ChromaVectorStore


# Document configuration: filename -> (category, priority)
DOCUMENT_CONFIG = {
    # PRIMARY - FDAM Methodology (authoritative source)
    "FDAM_v4_METHODOLOGY.md": ("methodology", "primary"),
    # REFERENCE - Threshold Tables (critical for metals clearance)
    "Metals clearance criteria-QVC.md": ("thresholds", "reference-threshold"),
    # REFERENCE - Narrative (supporting documentation)
    "air-o-cell-method-guide-atlas.md": ("lab-methods", "reference-narrative"),
    "Industrial Hygiene Lab Services Guide.md": ("lab-methods", "reference-narrative"),
    "Fire Remediation Processes and Methodologies_ A Review of Industry-Endorsed Standards.md": (
        "cleaning-procedures",
        "reference-narrative",
    ),
    "Technical Guide for Wildfire Restoration - Key Information.md": (
        "wildfire",
        "reference-narrative",
    ),
    "wildfire_soot_particulate_removal_full_text_extraction.md": (
        "wildfire",
        "reference-narrative",
    ),
}

# Files to skip (per user decision)
SKIP_FILES = {
    "Lead Contamination in Indoor Firing_Gun Ranges _ Atlantic Environmental.pdf",
}


def get_rag_kb_path() -> Path:
    """Get path to RAG-KB directory."""
    # Try relative to this file first
    this_dir = Path(__file__).parent
    rag_kb = this_dir.parent / "RAG-KB"
    if rag_kb.exists():
        return rag_kb

    # Try from current working directory
    rag_kb = Path("RAG-KB")
    if rag_kb.exists():
        return rag_kb

    raise FileNotFoundError("Could not find RAG-KB directory")


def get_chroma_path() -> Path:
    """Get path to ChromaDB persistence directory."""
    this_dir = Path(__file__).parent
    chroma_path = this_dir.parent / "chroma_db"
    return chroma_path


def build_index(rebuild: bool = False) -> dict:
    """Build the RAG index from RAG-KB documents.

    Args:
        rebuild: If True, clear existing index before building

    Returns:
        Statistics about the indexing operation
    """
    rag_kb_path = get_rag_kb_path()
    chroma_path = get_chroma_path()

    print(f"RAG-KB path: {rag_kb_path}")
    print(f"ChromaDB path: {chroma_path}")

    # Initialize components
    chunker = SemanticChunker()
    vectorstore = ChromaVectorStore(persist_directory=str(chroma_path))

    if rebuild:
        print("Rebuilding index - clearing existing data...")
        vectorstore.clear()

    stats = {
        "documents_processed": 0,
        "documents_skipped": 0,
        "chunks_created": 0,
        "errors": [],
    }

    # Process markdown files
    for md_file in rag_kb_path.glob("*.md"):
        filename = md_file.name

        # Skip files not in config or in skip list
        if filename in SKIP_FILES:
            print(f"Skipping (excluded): {filename}")
            stats["documents_skipped"] += 1
            continue

        if filename not in DOCUMENT_CONFIG:
            print(f"Skipping (not configured): {filename}")
            stats["documents_skipped"] += 1
            continue

        category, priority = DOCUMENT_CONFIG[filename]
        print(f"Processing: {filename} ({category}, {priority})")

        try:
            # Read and chunk document
            text = md_file.read_text(encoding="utf-8")
            chunks = chunker.chunk_document(
                text=text,
                source=filename,
                category=category,
                priority=priority,
            )

            # Check if source already indexed (for incremental updates)
            existing_count = vectorstore.delete_by_source(filename)
            if existing_count > 0:
                print(f"  Replaced {existing_count} existing chunks")

            # Add to vectorstore
            added = vectorstore.add_chunks(chunks)
            print(f"  Added {added} chunks")

            stats["documents_processed"] += 1
            stats["chunks_created"] += added

        except Exception as e:
            error_msg = f"Error processing {filename}: {e}"
            print(f"  ERROR: {e}")
            stats["errors"].append(error_msg)

    # Report on PDFs that need conversion
    for pdf_file in rag_kb_path.glob("*.pdf"):
        if pdf_file.name not in SKIP_FILES:
            print(f"Note: PDF needs conversion to .md: {pdf_file.name}")

    # Print summary
    print("\n" + "=" * 50)
    print("Index Build Complete")
    print("=" * 50)
    print(f"Documents processed: {stats['documents_processed']}")
    print(f"Documents skipped: {stats['documents_skipped']}")
    print(f"Total chunks created: {stats['chunks_created']}")

    if stats["errors"]:
        print(f"Errors: {len(stats['errors'])}")
        for err in stats["errors"]:
            print(f"  - {err}")

    # Print collection stats
    collection_stats = vectorstore.get_stats()
    print("\nCollection stats:")
    print(f"  Total chunks in DB: {collection_stats['total_chunks']}")
    print(f"  Categories: {collection_stats['categories']}")
    print(f"  Priorities: {collection_stats['priorities']}")

    return stats


def main():
    """CLI entry point."""
    parser = argparse.ArgumentParser(
        description="Build FDAM RAG knowledge base index"
    )
    parser.add_argument(
        "--rebuild",
        action="store_true",
        help="Clear existing index and rebuild from scratch",
    )
    args = parser.parse_args()

    build_index(rebuild=args.rebuild)


if __name__ == "__main__":
    main()