Spaces:
Running
Running
| import operator | |
| from typing import Any, Dict, List, Optional | |
| from content_core import extract_content | |
| from content_core.common import ProcessSourceState | |
| from langchain_core.runnables import RunnableConfig | |
| from langgraph.graph import END, START, StateGraph | |
| from langgraph.types import Send | |
| from loguru import logger | |
| from typing_extensions import Annotated, TypedDict | |
| from open_notebook.domain.content_settings import ContentSettings | |
| from open_notebook.domain.models import Model, ModelManager | |
| from open_notebook.domain.notebook import Asset, Source | |
| from open_notebook.domain.transformation import Transformation | |
| from open_notebook.graphs.transformation import graph as transform_graph | |
| class SourceState(TypedDict): | |
| content_state: ProcessSourceState | |
| apply_transformations: List[Transformation] | |
| source_id: str | |
| notebook_ids: List[str] | |
| source: Source | |
| transformation: Annotated[list, operator.add] | |
| embed: bool | |
| class TransformationState(TypedDict): | |
| source: Source | |
| transformation: Transformation | |
| async def content_process(state: SourceState) -> dict: | |
| content_settings = ContentSettings( | |
| default_content_processing_engine_doc="auto", | |
| default_content_processing_engine_url="auto", | |
| default_embedding_option="ask", | |
| auto_delete_files="yes", | |
| youtube_preferred_languages=["en", "pt", "es", "de", "nl", "en-GB", "fr", "hi", "ja"] | |
| ) | |
| content_state: Dict[str, Any] = state["content_state"] # type: ignore[assignment] | |
| content_state["url_engine"] = ( | |
| content_settings.default_content_processing_engine_url or "auto" | |
| ) | |
| content_state["document_engine"] = ( | |
| content_settings.default_content_processing_engine_doc or "auto" | |
| ) | |
| content_state["output_format"] = "markdown" | |
| # Add speech-to-text model configuration from Default Models | |
| try: | |
| model_manager = ModelManager() | |
| defaults = await model_manager.get_defaults() | |
| if defaults.default_speech_to_text_model: | |
| stt_model = await Model.get(defaults.default_speech_to_text_model) | |
| if stt_model: | |
| content_state["audio_provider"] = stt_model.provider | |
| content_state["audio_model"] = stt_model.name | |
| logger.debug(f"Using speech-to-text model: {stt_model.provider}/{stt_model.name}") | |
| except Exception as e: | |
| logger.warning(f"Failed to retrieve speech-to-text model configuration: {e}") | |
| # Continue without custom audio model (content-core will use its default) | |
| processed_state = await extract_content(content_state) | |
| return {"content_state": processed_state} | |
| async def save_source(state: SourceState) -> dict: | |
| content_state = state["content_state"] | |
| # Get existing source using the provided source_id | |
| source = await Source.get(state["source_id"]) | |
| if not source: | |
| raise ValueError(f"Source with ID {state['source_id']} not found") | |
| # Update the source with processed content | |
| source.asset = Asset(url=content_state.url, file_path=content_state.file_path) | |
| source.full_text = content_state.content | |
| # Preserve existing title if none provided in processed content | |
| if content_state.title: | |
| source.title = content_state.title | |
| await source.save() | |
| # NOTE: Notebook associations are created by the API immediately for UI responsiveness | |
| # No need to create them here to avoid duplicate edges | |
| if state["embed"]: | |
| logger.debug("Embedding content for vector search") | |
| await source.vectorize() | |
| return {"source": source} | |
| def trigger_transformations(state: SourceState, config: RunnableConfig) -> List[Send]: | |
| if len(state["apply_transformations"]) == 0: | |
| return [] | |
| to_apply = state["apply_transformations"] | |
| logger.debug(f"Applying transformations {to_apply}") | |
| return [ | |
| Send( | |
| "transform_content", | |
| { | |
| "source": state["source"], | |
| "transformation": t, | |
| }, | |
| ) | |
| for t in to_apply | |
| ] | |
| async def transform_content(state: TransformationState) -> Optional[dict]: | |
| source = state["source"] | |
| content = source.full_text | |
| if not content: | |
| return None | |
| transformation: Transformation = state["transformation"] | |
| logger.debug(f"Applying transformation {transformation.name}") | |
| result = await transform_graph.ainvoke( | |
| dict(input_text=content, transformation=transformation) # type: ignore[arg-type] | |
| ) | |
| await source.add_insight(transformation.title, result["output"]) | |
| return { | |
| "transformation": [ | |
| { | |
| "output": result["output"], | |
| "transformation_name": transformation.name, | |
| } | |
| ] | |
| } | |
| # Create and compile the workflow | |
| workflow = StateGraph(SourceState) | |
| # Add nodes | |
| workflow.add_node("content_process", content_process) | |
| workflow.add_node("save_source", save_source) | |
| workflow.add_node("transform_content", transform_content) | |
| # Define the graph edges | |
| workflow.add_edge(START, "content_process") | |
| workflow.add_edge("content_process", "save_source") | |
| workflow.add_conditional_edges( | |
| "save_source", trigger_transformations, ["transform_content"] | |
| ) | |
| workflow.add_edge("transform_content", END) | |
| # Compile the graph | |
| source_graph = workflow.compile() | |