File size: 4,873 Bytes
ebeba12
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python3

import argparse
import logging
from pathlib import Path

from private_gpt.di import global_injector
from private_gpt.server.ingest.ingest_service import IngestService
from private_gpt.server.ingest.ingest_watcher import IngestWatcher
from private_gpt.settings.settings import Settings

logger = logging.getLogger(__name__)


class LocalIngestWorker:
    def __init__(self, ingest_service: IngestService, setting: Settings) -> None:
        self.ingest_service = ingest_service

        self.total_documents = 0
        self.current_document_count = 0

        self._files_under_root_folder: list[Path] = []

        self.is_local_ingestion_enabled = setting.data.local_ingestion.enabled
        self.allowed_local_folders = setting.data.local_ingestion.allow_ingest_from

    def _validate_folder(self, folder_path: Path) -> None:
        if not self.is_local_ingestion_enabled:
            raise ValueError(
                "Local ingestion is disabled."
                "You can enable it in settings `ingestion.enabled`"
            )

        # Allow all folders if wildcard is present
        if "*" in self.allowed_local_folders:
            return

        for allowed_folder in self.allowed_local_folders:
            if not folder_path.is_relative_to(allowed_folder):
                raise ValueError(f"Folder {folder_path} is not allowed for ingestion")

    def _find_all_files_in_folder(self, root_path: Path, ignored: list[str]) -> None:
        """Search all files under the root folder recursively.

        Count them at the same time
        """
        for file_path in root_path.iterdir():
            if file_path.is_file() and file_path.name not in ignored:
                self.total_documents += 1
                self._validate_folder(file_path)
                self._files_under_root_folder.append(file_path)
            elif file_path.is_dir() and file_path.name not in ignored:
                self._find_all_files_in_folder(file_path, ignored)

    def ingest_folder(self, folder_path: Path, ignored: list[str]) -> None:
        # Count total documents before ingestion
        self._find_all_files_in_folder(folder_path, ignored)
        self._ingest_all(self._files_under_root_folder)

    def _ingest_all(self, files_to_ingest: list[Path]) -> None:
        logger.info("Ingesting files=%s", [f.name for f in files_to_ingest])
        self.ingest_service.bulk_ingest([(str(p.name), p) for p in files_to_ingest])

    def ingest_on_watch(self, changed_path: Path) -> None:
        logger.info("Detected change in at path=%s, ingesting", changed_path)
        self._do_ingest_one(changed_path)

    def _do_ingest_one(self, changed_path: Path) -> None:
        try:
            if changed_path.exists():
                logger.info(f"Started ingesting file={changed_path}")
                self.ingest_service.ingest_file(changed_path.name, changed_path)
                logger.info(f"Completed ingesting file={changed_path}")
        except Exception:
            logger.exception(
                f"Failed to ingest document: {changed_path}, find the exception attached"
            )


parser = argparse.ArgumentParser(prog="ingest_folder.py")
parser.add_argument("folder", help="Folder to ingest")
parser.add_argument(
    "--watch",
    help="Watch for changes",
    action=argparse.BooleanOptionalAction,
    default=False,
)
parser.add_argument(
    "--ignored",
    nargs="*",
    help="List of files/directories to ignore",
    default=[],
)
parser.add_argument(
    "--log-file",
    help="Optional path to a log file. If provided, logs will be written to this file.",
    type=str,
    default=None,
)

args = parser.parse_args()

# Set up logging to a file if a path is provided
if args.log_file:
    file_handler = logging.FileHandler(args.log_file, mode="a")
    file_handler.setFormatter(
        logging.Formatter(
            "[%(asctime)s.%(msecs)03d] [%(levelname)s] %(message)s",
            datefmt="%Y-%m-%d %H:%M:%S",
        )
    )
    logger.addHandler(file_handler)

if __name__ == "__main__":
    root_path = Path(args.folder)
    if not root_path.exists():
        raise ValueError(f"Path {args.folder} does not exist")

    ingest_service = global_injector.get(IngestService)
    settings = global_injector.get(Settings)
    worker = LocalIngestWorker(ingest_service, settings)
    worker.ingest_folder(root_path, args.ignored)

    if args.ignored:
        logger.info(f"Skipping following files and directories: {args.ignored}")

    if args.watch:
        logger.info(f"Watching {args.folder} for changes, press Ctrl+C to stop...")
        directories_to_watch = [
            dir
            for dir in root_path.iterdir()
            if dir.is_dir() and dir.name not in args.ignored
        ]
        watcher = IngestWatcher(args.folder, worker.ingest_on_watch)
        watcher.start()