File size: 10,823 Bytes
f871fed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
"""
Sources service layer using API.
"""

from dataclasses import dataclass
from typing import Dict, List, Optional, Union

from loguru import logger

from api.client import api_client
from open_notebook.domain.notebook import Asset, Source


@dataclass
class SourceProcessingResult:
    """Result of source creation with optional async processing info."""
    source: Source
    is_async: bool = False
    command_id: Optional[str] = None
    status: Optional[str] = None
    processing_info: Optional[Dict] = None


@dataclass
class SourceWithMetadata:
    """Source object with additional metadata from API."""
    source: Source
    embedded_chunks: int
    
    # Expose common source properties for easy access
    @property
    def id(self):
        return self.source.id
    
    @property  
    def title(self):
        return self.source.title
        
    @title.setter
    def title(self, value):
        self.source.title = value
    
    @property
    def topics(self):
        return self.source.topics
    
    @property
    def asset(self):
        return self.source.asset
    
    @property
    def full_text(self):
        return self.source.full_text
    
    @property
    def created(self):
        return self.source.created
    
    @property
    def updated(self):
        return self.source.updated


class SourcesService:
    """Service layer for sources operations using API."""

    def __init__(self):
        logger.info("Using API for sources operations")

    def get_all_sources(self, notebook_id: Optional[str] = None) -> List[SourceWithMetadata]:
        """Get all sources with optional notebook filtering."""
        sources_data = api_client.get_sources(notebook_id=notebook_id)
        # Convert API response to SourceWithMetadata objects
        sources = []
        for source_data in sources_data:
            source = Source(
                title=source_data["title"],
                topics=source_data["topics"],
                asset=Asset(
                    file_path=source_data["asset"]["file_path"]
                    if source_data["asset"]
                    else None,
                    url=source_data["asset"]["url"] if source_data["asset"] else None,
                )
                if source_data["asset"]
                else None,
            )
            source.id = source_data["id"]
            source.created = source_data["created"]
            source.updated = source_data["updated"]
            
            # Wrap in SourceWithMetadata
            source_with_metadata = SourceWithMetadata(
                source=source,
                embedded_chunks=source_data.get("embedded_chunks", 0)
            )
            sources.append(source_with_metadata)
        return sources

    def get_source(self, source_id: str) -> SourceWithMetadata:
        """Get a specific source."""
        response = api_client.get_source(source_id)
        source_data = response if isinstance(response, dict) else response[0]
        source = Source(
            title=source_data["title"],
            topics=source_data["topics"],
            full_text=source_data["full_text"],
            asset=Asset(
                file_path=source_data["asset"]["file_path"]
                if source_data["asset"]
                else None,
                url=source_data["asset"]["url"] if source_data["asset"] else None,
            )
            if source_data["asset"]
            else None,
        )
        source.id = source_data["id"]
        source.created = source_data["created"]
        source.updated = source_data["updated"]

        return SourceWithMetadata(
            source=source,
            embedded_chunks=source_data.get("embedded_chunks", 0)
        )

    def create_source(
        self,
        notebook_id: Optional[str] = None,
        source_type: str = "text",
        url: Optional[str] = None,
        file_path: Optional[str] = None,
        content: Optional[str] = None,
        title: Optional[str] = None,
        transformations: Optional[List[str]] = None,
        embed: bool = False,
        delete_source: bool = False,
        notebooks: Optional[List[str]] = None,
        async_processing: bool = False,
    ) -> Union[Source, SourceProcessingResult]:
        """
        Create a new source with support for async processing.
        
        Args:
            notebook_id: Single notebook ID (deprecated, use notebooks parameter)
            source_type: Type of source (link, upload, text)
            url: URL for link sources
            file_path: File path for upload sources
            content: Text content for text sources
            title: Optional source title
            transformations: List of transformation IDs to apply
            embed: Whether to embed content for vector search
            delete_source: Whether to delete uploaded file after processing
            notebooks: List of notebook IDs to add source to (preferred over notebook_id)
            async_processing: Whether to process source asynchronously
            
        Returns:
            Source object for sync processing (backward compatibility)
            SourceProcessingResult for async processing (contains additional metadata)
        """
        source_data = api_client.create_source(
            notebook_id=notebook_id,
            notebooks=notebooks,
            source_type=source_type,
            url=url,
            file_path=file_path,
            content=content,
            title=title,
            transformations=transformations,
            embed=embed,
            delete_source=delete_source,
            async_processing=async_processing,
        )

        # Create Source object from response
        response_data = source_data if isinstance(source_data, dict) else source_data[0]
        source = Source(
            title=response_data["title"],
            topics=response_data.get("topics") or [],
            full_text=response_data.get("full_text"),
            asset=Asset(
                file_path=response_data["asset"]["file_path"]
                if response_data.get("asset")
                else None,
                url=response_data["asset"]["url"]
                if response_data.get("asset")
                else None,
            )
            if response_data.get("asset")
            else None,
        )
        source.id = response_data["id"]
        source.created = response_data["created"]
        source.updated = response_data["updated"]

        # Check if this is an async processing response
        if response_data.get("command_id") or response_data.get("status") or response_data.get("processing_info"):
            # Ensure source_data is a dict for accessing attributes
            source_data_dict = source_data if isinstance(source_data, dict) else source_data[0]
            # Return enhanced result for async processing
            return SourceProcessingResult(
                source=source,
                is_async=True,
                command_id=source_data_dict.get("command_id"),
                status=source_data_dict.get("status"),
                processing_info=source_data_dict.get("processing_info"),
            )
        else:
            # Return simple Source for backward compatibility
            return source

    def get_source_status(self, source_id: str) -> Dict:
        """Get processing status for a source."""
        response = api_client.get_source_status(source_id)
        return response if isinstance(response, dict) else response[0]

    def create_source_async(
        self,
        notebook_id: Optional[str] = None,
        source_type: str = "text",
        url: Optional[str] = None,
        file_path: Optional[str] = None,
        content: Optional[str] = None,
        title: Optional[str] = None,
        transformations: Optional[List[str]] = None,
        embed: bool = False,
        delete_source: bool = False,
        notebooks: Optional[List[str]] = None,
    ) -> SourceProcessingResult:
        """
        Create a new source with async processing enabled.
        
        This is a convenience method that always uses async processing.
        Returns a SourceProcessingResult with processing status information.
        """
        result = self.create_source(
            notebook_id=notebook_id,
            notebooks=notebooks,
            source_type=source_type,
            url=url,
            file_path=file_path,
            content=content,
            title=title,
            transformations=transformations,
            embed=embed,
            delete_source=delete_source,
            async_processing=True,
        )
        
        # Since we forced async_processing=True, this should always be a SourceProcessingResult
        if isinstance(result, SourceProcessingResult):
            return result
        else:
            # Fallback: wrap Source in SourceProcessingResult
            return SourceProcessingResult(
                source=result,
                is_async=False,  # This shouldn't happen, but handle it gracefully
            )

    def is_source_processing_complete(self, source_id: str) -> bool:
        """
        Check if a source's async processing is complete.
        
        Returns True if processing is complete (success or failure),
        False if still processing or queued.
        """
        try:
            status_data = self.get_source_status(source_id)
            status = status_data.get("status")
            return status in ["completed", "failed", None]  # None indicates legacy/sync source
        except Exception as e:
            logger.error(f"Error checking source processing status: {e}")
            return True  # Assume complete on error

    def update_source(self, source: Source) -> Source:
        """Update a source."""
        if not source.id:
            raise ValueError("Source ID is required for update")
            
        updates = {
            "title": source.title,
            "topics": source.topics,
        }
        source_data = api_client.update_source(source.id, **updates)

        # Ensure source_data is a dict
        source_data_dict = source_data if isinstance(source_data, dict) else source_data[0]

        # Update the source object with the response
        source.title = source_data_dict["title"]
        source.topics = source_data_dict["topics"]
        source.updated = source_data_dict["updated"]

        return source

    def delete_source(self, source_id: str) -> bool:
        """Delete a source."""
        api_client.delete_source(source_id)
        return True


# Global service instance
sources_service = SourcesService()

# Export important classes for easy importing
__all__ = ["SourcesService", "SourceWithMetadata", "SourceProcessingResult", "sources_service"]