Spaces:
Build error
Build error
File size: 19,316 Bytes
c1fa745 1457034 c1fa745 33890b1 d4f456e 14ac8e9 d4f456e 73fccdf f056e53 6d62dad c3dca60 c1fa745 73fccdf d4f456e e868a05 73fccdf 33890b1 4968b71 33890b1 4968b71 e868a05 c3dca60 4968b71 e868a05 4968b71 d4f456e 4968b71 e868a05 4968b71 e868a05 4968b71 4bea86e e868a05 4968b71 e868a05 4968b71 33890b1 4968b71 e868a05 4968b71 e868a05 73fccdf d4f456e c1fa745 73fccdf c3db886 e868a05 c3db886 73fccdf e868a05 73fccdf c1fa745 33890b1 73fccdf d4f456e 33890b1 d4f456e 73fccdf f056e53 d4f456e 73fccdf e868a05 73fccdf e868a05 73fccdf 8291d80 c1fa745 73fccdf 8291d80 73fccdf e376063 73fccdf d4f456e 4bea86e 73fccdf c3db886 73fccdf c3db886 73fccdf c3db886 73fccdf 4bea86e c3db886 4bea86e c3db886 4bea86e c3db886 4bea86e c3db886 73fccdf c1fa745 60f7bfc d4f456e 9949bd0 c9f4fb2 7b3ba19 c9f4fb2 7b3ba19 c9f4fb2 7b3ba19 c9f4fb2 7b3ba19 c9f4fb2 7b3ba19 c9f4fb2 9949bd0 d4f456e 7b3ba19 c9f4fb2 d4f456e 7b3ba19 73fccdf d4f456e c1fa745 73fccdf d4f456e c1fa745 8291d80 c1fa745 73fccdf d4f456e c1fa745 60f7bfc f46236f 60f7bfc c3db886 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 | from langgraph.graph import StateGraph, START, END
from langgraph.constants import Send
from src.langgraphagenticai.state.state import BlogState as State, Sections, Section # Import from state.py
from langchain_core.messages import SystemMessage, HumanMessage
import streamlit as st
import json
from datetime import datetime
from typing import List
from src.langgraphagenticai.logging.logging_utils import logger, log_entry_exit
import functools
import time
class BlogGenerationNode:
def __init__(self, model):
"""Initialize the BlogGenerationNode with an LLM."""
self.llm = model
self.planner = model.with_structured_output(Sections)
@log_entry_exit
def validate_and_standardize_structure(self, user_input: str) -> List[str]:
"""
Uses an LLM to interpret user input and generate a standardized list of blog section names.
Ensures the user's specified structure is respected if provided.
Args:
user_input (str): The full user input from the Streamlit form (e.g., "Topic: AI\nStructure: Intro, Benefits, Summary").
Returns:
List[str]: A list of standardized section names (e.g., ["Intro", "Benefits", "Summary"]).
"""
# Default structure if all else fails
default_structure = ["Introduction", "Main Content", "Conclusion"]
# If input is empty or whitespace-only, return default
if not user_input or not user_input.strip():
logger.info("Empty or whitespace-only input; returning default structure")
return default_structure
# Extract the user's structure if provided
user_structure = None
for line in user_input.split("\n"):
if line.lower().startswith("structure:"):
user_structure = line.split(":", 1)[1].strip()
break
if not user_structure:
logger.info("No structure provided; returning default structure")
return default_structure
# Define the prompt for the LLM
system_prompt = (
"You are an expert blog planner. Your task is to analyze the user's input and extract or infer a clear, concise structure "
"for a blog post as a list of section names. The input may explicitly list sections (e.g., 'Structure: Intro, Benefits, Summary') "
"or describe them implicitly (e.g., 'I want an intro, some benefits, and a conclusion'). "
"If the user provides a 'Structure' field (e.g., 'Structure: Intro, Benefits, Summary'), you MUST use those exact section names "
"without modification, except for capitalizing the first letter of each section. "
"If no structure is provided or it's unclear, propose a logical default structure based on the topic or context. "
"Return the result as a JSON object with a single key 'sections' containing the list of section names. "
"Capitalize each section name and avoid adding unnecessary sections beyond what’s indicated."
)
# Prepare messages for the LLM
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=f"User input: {user_input}")
]
try:
# Invoke the LLM and expect a JSON response
response = self.llm.invoke(messages)
response_content = response.content if hasattr(response, "content") else str(response)
logger.info(f"LLM response for structure: {response_content}")
# Parse the JSON response
result = json.loads(response_content)
sections = result.get("sections", default_structure)
# Validate and standardize the output
if not isinstance(sections, list) or not sections:
logger.warning("LLM returned invalid sections; using default structure")
return default_structure
# Clean up section names: strip whitespace, capitalize, remove empty strings
cleaned_sections = [s.strip().capitalize() for s in sections if s.strip()]
# If user provided a structure, enforce it
if user_structure:
user_sections = [s.strip().capitalize() for s in user_structure.split(",") if s.strip()]
if len(cleaned_sections) == len(user_sections):
# Override LLM sections with user sections if lengths match
cleaned_sections = user_sections
else:
logger.warning(f"LLM section count ({len(cleaned_sections)}) doesn't match user section count ({len(user_sections)}); using user structure")
cleaned_sections = user_sections
return cleaned_sections if cleaned_sections else default_structure
except Exception as e:
logger.error(f"Error in LLM structure generation: {e}")
return default_structure
@log_entry_exit
def user_input(self, state: State) -> dict:
"""Handle user input, distinguishing between initial requirements and feedback."""
logger.info(f"Executing user_input with state: {state}")
# Initialize requirements with existing state values to preserve them
requirements = {
"topic": state.get("topic", "No topic provided"),
"objective": state.get("objective", "Informative"),
"target_audience": state.get("target_audience", "General Audience"),
"tone_style": state.get("tone_style", "Casual"),
"word_count": state.get("word_count", 1000),
"structure": state.get("structure", "Introduction, Main Content, Conclusion"),
"feedback": state.get("feedback", "No feedback provided yet."),
# Always reset these values to ensure old content doesn't persist
"initial_draft": "",
"completed_sections": []
}
# Get the latest message
user_message = state["messages"][-1].content if state["messages"] else ""
if not user_message:
logger.warning("No user message provided; returning existing requirements with reset content")
return requirements
# Flag to track if the message is feedback
is_feedback = False
try:
# Check if the message is feedback (JSON format)
feedback_data = json.loads(user_message)
if isinstance(feedback_data, dict) and "approved" in feedback_data:
# This is a feedback message, update only the feedback field
requirements["feedback"] = feedback_data.get("comments", "No feedback provided.")
is_feedback = True
logger.info(f"Processed feedback message: {requirements['feedback']}")
# For feedback, we definitely want to ensure content reset
requirements["initial_draft"] = ""
requirements["completed_sections"] = []
else:
# Treat as requirements input
temp_requirements = {}
for line in user_message.split("\n"):
if ": " in line:
key, value = line.split(": ", 1)
temp_requirements[key.lower().replace(" & ", "_").replace(" ", "_")] = value
# Update requirements only for provided fields
requirements.update({
"topic": temp_requirements.get("topic", requirements["topic"]),
"objective": temp_requirements.get("objective", requirements["objective"]),
"target_audience": temp_requirements.get("target_audience", requirements["target_audience"]),
"tone_style": temp_requirements.get("tone_style", requirements["tone_style"]),
"word_count": int(temp_requirements.get("word_count", requirements["word_count"])),
"structure": temp_requirements.get("structure", requirements["structure"]),
"feedback": temp_requirements.get("feedback", requirements["feedback"]),
# Always reset content for new requirements
"initial_draft": "",
"completed_sections": []
})
logger.info(f"Processed requirements input: {requirements}")
except Exception as e:
logger.error(f"Unexpected error processing user message: {e}")
# Return existing requirements to avoid crashing, but still clear content
requirements["initial_draft"] = ""
requirements["completed_sections"] = []
return requirements
structure_input = requirements["structure"] if is_feedback else user_message
standardized_structure = self.validate_and_standardize_structure(structure_input)
requirements["structure"] = ", ".join(standardized_structure)
# Log the final state that will be returned
logger.info(f"Final parsed requirements with reset content: {requirements}")
logger.info(f"Completed sections (should be empty): {requirements['completed_sections']}")
logger.info(f"Initial draft (should be empty): {requirements['initial_draft']}")
return requirements
@log_entry_exit
def orchestrator(self, state: State) -> dict:
logger.info(f"Executing orchestrator with state: {state}")
needs_revision = False
logger.info(f"Orchestrator received completed_sections: {state.get('completed_sections', [])}")
# Initialize default return values in case of early return or exception
return_state = {
"sections": [],
"completed_sections": [],
"initial_draft": ""
}
needs_revision=False
if state.get("messages"):
last_message_content = state["messages"][-1].content
try:
feedback_data = json.loads(last_message_content)
if isinstance(feedback_data, dict) and feedback_data.get("approved") is False:
needs_revision = True
except json.JSONDecodeError:
pass
except Exception as e:
logger.warning(f"Error checking last message for revision trigger: {e}")
if needs_revision:
logger.info("Orchestrator identified revision cycle: Clearing completed_sections.")
# Don't modify state directly, include this in return dictionary instead
return_state["completed_sections"] = []
structure_list = [s.strip() for s in state["structure"].split(",")]
section_count = len(structure_list)
feedback = state.get("feedback", "No feedback provided yet.")
prompt = (
f"Create a detailed and structured plan for a blog report consisting of exactly {section_count} sections. "
f"The content should be directly relevant to the topic: '{state['topic']}'. "
f"The primary objective of the blog is to {state['objective']}, targeting an audience of {state['target_audience']}. "
f"Please maintain a {state['tone_style']} tone throughout the writing. "
f"Aim for a total word count of approximately {state['word_count']} words. "
f"Follow this specific structure and section names: {', '.join(structure_list)}. "
f"Incorporate {feedback} to enhance the quality of the content. "
f"Please refrain from adding any extra sections or altering the section names unless {feedback} is provided."
)
try:
report_sections = self.planner.invoke([
SystemMessage(content=prompt),
HumanMessage(content=f"Topic: {state['topic']} with feedback {feedback}")
])
return_state["sections"] = report_sections.sections
except Exception as e:
logger.error(f"Error generating plan with LLM: {e}")
# Keep the default empty values in return_state
logger.info(f"Orchestrator returning: {return_state}")
return return_state
@log_entry_exit
def llm_call(self, state: State) -> dict:
"""Worker writes a section of the report."""
section = self.llm.invoke([
SystemMessage(content="Write a report section following the provided name and description. Include no preamble for each section. Use markdown formatting."),
HumanMessage(content=f"Here is the section name: {state['section'].name} and description: {state['section'].description}")
])
logger.info(f"\n{'='*20}:llm_call output:{'='*20}\nGenerated section: {section.content}\n{'='*20}\n")
logger.info(f"\n---------------------state[completed_sections]:---------------------------- \n{state.get('completed_sections', [])}")
return {"completed_sections": state.get("completed_sections", []) + [section.content]}
@log_entry_exit
def synthesizer(self, state: State) -> dict:
"""Synthesize full report from sections and clear the sections list."""
# Safely get the list, defaulting to empty if it's None or missing
completed_sections = state.get("completed_sections", [])
# Handle case where synthesizer might be called unexpectedly with no sections
if not completed_sections:
logger.warning("Synthesizer called but 'completed_sections' is empty or None.")
# Return an empty draft and ensure the sections list is cleared in the state
return {"initial_draft": "", "completed_sections": []}
# Determine the expected number of sections based on the current plan
expected_section_count = len(state.get("sections", []))
# If we received more sections than expected (likely due to revision state issue),
# take only the last 'expected_section_count' sections.
if expected_section_count > 0 and len(completed_sections) > expected_section_count:
logger.warning(f"Synthesizer received {len(completed_sections)} sections, "
f"but expected {expected_section_count}. Using the last {expected_section_count}.")
sections_to_use = completed_sections[-expected_section_count:]
else:
# Otherwise, use all received sections (normal first run or correct state)
sections_to_use = completed_sections
logger.info(f"Synthesizing report with sections: {completed_sections}")
logger.info(f"Synthesizing report with {len(sections_to_use)} sections:")
logger.info("SYNTHESIZER DEBUG:")
logger.info(f"completed_sections count: {len(completed_sections)}")
for i, section in enumerate(sections_to_use):
# Log only the first few characters to avoid overly long logs
logger.info(f"Section {i+1} (start): {section[:100]}...")
logger.info(f"{'='*20}")
# Join the selected sections to create the draft
initial_draft = "\n\n---\n\n".join(sections_to_use)
logger.info(f"Synthesized report draft generated (length: {len(initial_draft)}).")
# Return the generated draft AND explicitly return an empty list
# for completed_sections to update the state, clearing the old sections.
return {
"initial_draft": initial_draft,
"completed_sections": [] # Explicitly clear the list in the returned state update
}
@log_entry_exit
def feedback_collector(self, state: State) -> dict:
logger.info(f"\n\n----------------:Entered feedback_collector with state:----------------------\n\n{state}")
logger.info(f"Message count: {len(state.get('messages', []))}")
logger.info(f"Last message type: {type(state['messages'][-1]) if state.get('messages') else 'None'}")
if state.get("messages") and len(state["messages"]) > 0 and isinstance(state["messages"][-1], HumanMessage):
try:
feedback_data = json.loads(state["messages"][-1].content)
is_approved = feedback_data.get("approved", False)
comments = feedback_data.get("comments", "")
logger.info(f"Parsed feedback: approved={is_approved}, comments={comments}")
if is_approved:
logger.info("Content approved, preparing final report")
final_report = state.get("initial_draft", "")
collector_output = {
"feedback": comments,
"draft_approved": True,
"final_report": final_report
}
else:
collector_output = {
"feedback": comments,
"draft_approved": False,
"final_report": ""
}
logger.info(f"{'='*20}:feedback_collector output:{'='*20}\n{collector_output}") # Add this log
return collector_output
except json.JSONDecodeError:
logger.warning("Invalid feedback format; returning default values")
return {"feedback": "", "draft_approved": False, "final_report": ""}
logger.info("No new feedback message found; returning default values")
return {"feedback": "", "draft_approved": False, "final_report": ""}
@log_entry_exit
def file_generator(self, state: State) -> dict:
"""Generates the final report and ends the process."""
final_report = state["final_report"]
# In a real scenario, you would save this to a file
logger.info(f"Final Report Generated:\n{final_report}")
return {"final_report_path": "report.md"} # Simulate saving to a file
@log_entry_exit # Conditional edge function to create llm_call workers
def assign_workers(self, state: State):
"""Assign a worker to each section in the plan."""
logger.info(f"\n{'='*10} State before assigning workers {'='*10}")
logger.info(f" Current sections plan: {len(state.get('sections', []))} sections")
# Log the completed_sections list specifically
logger.info(f" Completed Sections before dispatch: {state.get('completed_sections', [])}")
logger.info(f"{'='*40}\n")
return [Send("llm_call", {"section": s}) for s in state["sections"]]
@log_entry_exit# Conditional edge for feedback loop
def route_feedback(self, state: State):
"""Route based on whether draft is approved."""
draft_approved = state.get('draft_approved', False)
logger.info(f"route_feedback: draft_approved = {draft_approved}")
if draft_approved is True: # Strict comparison
logger.info("Draft approved; routing to file_generator")
return "file_generator"
else:
logger.info("Draft not approved; routing back to orchestrator for revision")
return "orchestrator"
|