Spaces:
Running
Running
File size: 5,315 Bytes
f871fed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
import operator
from typing import Any, Dict, List, Optional
from content_core import extract_content
from content_core.common import ProcessSourceState
from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, START, StateGraph
from langgraph.types import Send
from loguru import logger
from typing_extensions import Annotated, TypedDict
from open_notebook.domain.content_settings import ContentSettings
from open_notebook.domain.models import Model, ModelManager
from open_notebook.domain.notebook import Asset, Source
from open_notebook.domain.transformation import Transformation
from open_notebook.graphs.transformation import graph as transform_graph
class SourceState(TypedDict):
content_state: ProcessSourceState
apply_transformations: List[Transformation]
source_id: str
notebook_ids: List[str]
source: Source
transformation: Annotated[list, operator.add]
embed: bool
class TransformationState(TypedDict):
source: Source
transformation: Transformation
async def content_process(state: SourceState) -> dict:
content_settings = ContentSettings(
default_content_processing_engine_doc="auto",
default_content_processing_engine_url="auto",
default_embedding_option="ask",
auto_delete_files="yes",
youtube_preferred_languages=["en", "pt", "es", "de", "nl", "en-GB", "fr", "hi", "ja"]
)
content_state: Dict[str, Any] = state["content_state"] # type: ignore[assignment]
content_state["url_engine"] = (
content_settings.default_content_processing_engine_url or "auto"
)
content_state["document_engine"] = (
content_settings.default_content_processing_engine_doc or "auto"
)
content_state["output_format"] = "markdown"
# Add speech-to-text model configuration from Default Models
try:
model_manager = ModelManager()
defaults = await model_manager.get_defaults()
if defaults.default_speech_to_text_model:
stt_model = await Model.get(defaults.default_speech_to_text_model)
if stt_model:
content_state["audio_provider"] = stt_model.provider
content_state["audio_model"] = stt_model.name
logger.debug(f"Using speech-to-text model: {stt_model.provider}/{stt_model.name}")
except Exception as e:
logger.warning(f"Failed to retrieve speech-to-text model configuration: {e}")
# Continue without custom audio model (content-core will use its default)
processed_state = await extract_content(content_state)
return {"content_state": processed_state}
async def save_source(state: SourceState) -> dict:
content_state = state["content_state"]
# Get existing source using the provided source_id
source = await Source.get(state["source_id"])
if not source:
raise ValueError(f"Source with ID {state['source_id']} not found")
# Update the source with processed content
source.asset = Asset(url=content_state.url, file_path=content_state.file_path)
source.full_text = content_state.content
# Preserve existing title if none provided in processed content
if content_state.title:
source.title = content_state.title
await source.save()
# NOTE: Notebook associations are created by the API immediately for UI responsiveness
# No need to create them here to avoid duplicate edges
if state["embed"]:
logger.debug("Embedding content for vector search")
await source.vectorize()
return {"source": source}
def trigger_transformations(state: SourceState, config: RunnableConfig) -> List[Send]:
if len(state["apply_transformations"]) == 0:
return []
to_apply = state["apply_transformations"]
logger.debug(f"Applying transformations {to_apply}")
return [
Send(
"transform_content",
{
"source": state["source"],
"transformation": t,
},
)
for t in to_apply
]
async def transform_content(state: TransformationState) -> Optional[dict]:
source = state["source"]
content = source.full_text
if not content:
return None
transformation: Transformation = state["transformation"]
logger.debug(f"Applying transformation {transformation.name}")
result = await transform_graph.ainvoke(
dict(input_text=content, transformation=transformation) # type: ignore[arg-type]
)
await source.add_insight(transformation.title, result["output"])
return {
"transformation": [
{
"output": result["output"],
"transformation_name": transformation.name,
}
]
}
# Create and compile the workflow
workflow = StateGraph(SourceState)
# Add nodes
workflow.add_node("content_process", content_process)
workflow.add_node("save_source", save_source)
workflow.add_node("transform_content", transform_content)
# Define the graph edges
workflow.add_edge(START, "content_process")
workflow.add_edge("content_process", "save_source")
workflow.add_conditional_edges(
"save_source", trigger_transformations, ["transform_content"]
)
workflow.add_edge("transform_content", END)
# Compile the graph
source_graph = workflow.compile()
|