Spaces:
Runtime error
Runtime error
File size: 6,402 Bytes
3793f68 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 | import operator
from typing import Any, Dict, List, Optional
from content_core import extract_content
from content_core.common import ProcessSourceState
from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, START, StateGraph
from langgraph.types import Send
from loguru import logger
from typing_extensions import Annotated, TypedDict
from open_notebook.ai.models import Model, ModelManager
from open_notebook.domain.content_settings import ContentSettings
from open_notebook.domain.notebook import Asset, Source
from open_notebook.domain.transformation import Transformation
from open_notebook.graphs.transformation import graph as transform_graph
class SourceState(TypedDict):
content_state: ProcessSourceState
apply_transformations: List[Transformation]
source_id: str
notebook_ids: List[str]
source: Source
transformation: Annotated[list, operator.add]
embed: bool
class TransformationState(TypedDict):
source: Source
transformation: Transformation
async def content_process(state: SourceState) -> dict:
content_settings = ContentSettings(
default_content_processing_engine_doc="auto",
default_content_processing_engine_url="auto",
default_embedding_option="ask",
auto_delete_files="yes",
youtube_preferred_languages=[
"en",
"pt",
"es",
"de",
"nl",
"en-GB",
"fr",
"hi",
"ja",
],
)
content_state: Dict[str, Any] = state["content_state"] # type: ignore[assignment]
content_state["url_engine"] = (
content_settings.default_content_processing_engine_url or "auto"
)
content_state["document_engine"] = (
content_settings.default_content_processing_engine_doc or "auto"
)
content_state["output_format"] = "markdown"
# Add speech-to-text model configuration from Default Models
try:
model_manager = ModelManager()
defaults = await model_manager.get_defaults()
if defaults.default_speech_to_text_model:
stt_model = await Model.get(defaults.default_speech_to_text_model)
if stt_model:
content_state["audio_provider"] = stt_model.provider
content_state["audio_model"] = stt_model.name
logger.debug(
f"Using speech-to-text model: {stt_model.provider}/{stt_model.name}"
)
except Exception as e:
logger.warning(f"Failed to retrieve speech-to-text model configuration: {e}")
# Continue without custom audio model (content-core will use its default)
processed_state = await extract_content(content_state)
if not processed_state.content or not processed_state.content.strip():
url = processed_state.url or ""
if url and ("youtube.com" in url or "youtu.be" in url):
raise ValueError(
"Could not extract content from this YouTube video. "
"No transcript or subtitles are available. "
"Try configuring a Speech-to-Text model in Settings "
"to transcribe the audio instead."
)
raise ValueError(
"Could not extract any text content from this source. "
"The content may be empty, inaccessible, or in an unsupported format."
)
return {"content_state": processed_state}
async def save_source(state: SourceState) -> dict:
content_state = state["content_state"]
# Get existing source using the provided source_id
source = await Source.get(state["source_id"])
if not source:
raise ValueError(f"Source with ID {state['source_id']} not found")
# Update the source with processed content
source.asset = Asset(url=content_state.url, file_path=content_state.file_path)
source.full_text = content_state.content
# Preserve user-set title; only overwrite placeholder or empty titles
if content_state.title and (not source.title or source.title == "Processing..."):
source.title = content_state.title
await source.save()
# NOTE: Notebook associations are created by the API immediately for UI responsiveness
# No need to create them here to avoid duplicate edges
if state["embed"]:
if source.full_text and source.full_text.strip():
logger.debug("Embedding content for vector search")
await source.vectorize()
else:
logger.warning(
f"Source {source.id} has no text content to embed, skipping vectorization"
)
return {"source": source}
def trigger_transformations(state: SourceState, config: RunnableConfig) -> List[Send]:
if len(state["apply_transformations"]) == 0:
return []
to_apply = state["apply_transformations"]
logger.debug(f"Applying transformations {to_apply}")
return [
Send(
"transform_content",
{
"source": state["source"],
"transformation": t,
},
)
for t in to_apply
]
async def transform_content(state: TransformationState) -> Optional[dict]:
source = state["source"]
content = source.full_text
if not content:
return None
transformation: Transformation = state["transformation"]
logger.debug(f"Applying transformation {transformation.name}")
result = await transform_graph.ainvoke(
dict(input_text=content, transformation=transformation) # type: ignore[arg-type]
)
await source.add_insight(transformation.title, result["output"])
return {
"transformation": [
{
"output": result["output"],
"transformation_name": transformation.name,
}
]
}
# Create and compile the workflow
workflow = StateGraph(SourceState)
# Add nodes
workflow.add_node("content_process", content_process)
workflow.add_node("save_source", save_source)
workflow.add_node("transform_content", transform_content)
# Define the graph edges
workflow.add_edge(START, "content_process")
workflow.add_edge("content_process", "save_source")
workflow.add_conditional_edges(
"save_source", trigger_transformations, ["transform_content"]
)
workflow.add_edge("transform_content", END)
# Compile the graph
source_graph = workflow.compile()
|