AgentiAI / src /langgraphagenticai /nodes /blog_generation_node.py
kamaleswar Mohanta
Refactor logging implementation and enhance SDLC state management
14ac8e9
from langgraph.graph import StateGraph, START, END
from langgraph.constants import Send
from src.langgraphagenticai.state.state import BlogState as State, Sections, Section # Import from state.py
from langchain_core.messages import SystemMessage, HumanMessage
import streamlit as st
import json
from datetime import datetime
from typing import List
from src.langgraphagenticai.logging.logging_utils import logger, log_entry_exit
import functools
import time
class BlogGenerationNode:
def __init__(self, model):
"""Initialize the BlogGenerationNode with an LLM."""
self.llm = model
self.planner = model.with_structured_output(Sections)
@log_entry_exit
def validate_and_standardize_structure(self, user_input: str) -> List[str]:
"""
Uses an LLM to interpret user input and generate a standardized list of blog section names.
Ensures the user's specified structure is respected if provided.
Args:
user_input (str): The full user input from the Streamlit form (e.g., "Topic: AI\nStructure: Intro, Benefits, Summary").
Returns:
List[str]: A list of standardized section names (e.g., ["Intro", "Benefits", "Summary"]).
"""
# Default structure if all else fails
default_structure = ["Introduction", "Main Content", "Conclusion"]
# If input is empty or whitespace-only, return default
if not user_input or not user_input.strip():
logger.info("Empty or whitespace-only input; returning default structure")
return default_structure
# Extract the user's structure if provided
user_structure = None
for line in user_input.split("\n"):
if line.lower().startswith("structure:"):
user_structure = line.split(":", 1)[1].strip()
break
if not user_structure:
logger.info("No structure provided; returning default structure")
return default_structure
# Define the prompt for the LLM
system_prompt = (
"You are an expert blog planner. Your task is to analyze the user's input and extract or infer a clear, concise structure "
"for a blog post as a list of section names. The input may explicitly list sections (e.g., 'Structure: Intro, Benefits, Summary') "
"or describe them implicitly (e.g., 'I want an intro, some benefits, and a conclusion'). "
"If the user provides a 'Structure' field (e.g., 'Structure: Intro, Benefits, Summary'), you MUST use those exact section names "
"without modification, except for capitalizing the first letter of each section. "
"If no structure is provided or it's unclear, propose a logical default structure based on the topic or context. "
"Return the result as a JSON object with a single key 'sections' containing the list of section names. "
"Capitalize each section name and avoid adding unnecessary sections beyond what’s indicated."
)
# Prepare messages for the LLM
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=f"User input: {user_input}")
]
try:
# Invoke the LLM and expect a JSON response
response = self.llm.invoke(messages)
response_content = response.content if hasattr(response, "content") else str(response)
logger.info(f"LLM response for structure: {response_content}")
# Parse the JSON response
result = json.loads(response_content)
sections = result.get("sections", default_structure)
# Validate and standardize the output
if not isinstance(sections, list) or not sections:
logger.warning("LLM returned invalid sections; using default structure")
return default_structure
# Clean up section names: strip whitespace, capitalize, remove empty strings
cleaned_sections = [s.strip().capitalize() for s in sections if s.strip()]
# If user provided a structure, enforce it
if user_structure:
user_sections = [s.strip().capitalize() for s in user_structure.split(",") if s.strip()]
if len(cleaned_sections) == len(user_sections):
# Override LLM sections with user sections if lengths match
cleaned_sections = user_sections
else:
logger.warning(f"LLM section count ({len(cleaned_sections)}) doesn't match user section count ({len(user_sections)}); using user structure")
cleaned_sections = user_sections
return cleaned_sections if cleaned_sections else default_structure
except Exception as e:
logger.error(f"Error in LLM structure generation: {e}")
return default_structure
@log_entry_exit
def user_input(self, state: State) -> dict:
"""Handle user input, distinguishing between initial requirements and feedback."""
logger.info(f"Executing user_input with state: {state}")
# Initialize requirements with existing state values to preserve them
requirements = {
"topic": state.get("topic", "No topic provided"),
"objective": state.get("objective", "Informative"),
"target_audience": state.get("target_audience", "General Audience"),
"tone_style": state.get("tone_style", "Casual"),
"word_count": state.get("word_count", 1000),
"structure": state.get("structure", "Introduction, Main Content, Conclusion"),
"feedback": state.get("feedback", "No feedback provided yet."),
# Always reset these values to ensure old content doesn't persist
"initial_draft": "",
"completed_sections": []
}
# Get the latest message
user_message = state["messages"][-1].content if state["messages"] else ""
if not user_message:
logger.warning("No user message provided; returning existing requirements with reset content")
return requirements
# Flag to track if the message is feedback
is_feedback = False
try:
# Check if the message is feedback (JSON format)
feedback_data = json.loads(user_message)
if isinstance(feedback_data, dict) and "approved" in feedback_data:
# This is a feedback message, update only the feedback field
requirements["feedback"] = feedback_data.get("comments", "No feedback provided.")
is_feedback = True
logger.info(f"Processed feedback message: {requirements['feedback']}")
# For feedback, we definitely want to ensure content reset
requirements["initial_draft"] = ""
requirements["completed_sections"] = []
else:
# Treat as requirements input
temp_requirements = {}
for line in user_message.split("\n"):
if ": " in line:
key, value = line.split(": ", 1)
temp_requirements[key.lower().replace(" & ", "_").replace(" ", "_")] = value
# Update requirements only for provided fields
requirements.update({
"topic": temp_requirements.get("topic", requirements["topic"]),
"objective": temp_requirements.get("objective", requirements["objective"]),
"target_audience": temp_requirements.get("target_audience", requirements["target_audience"]),
"tone_style": temp_requirements.get("tone_style", requirements["tone_style"]),
"word_count": int(temp_requirements.get("word_count", requirements["word_count"])),
"structure": temp_requirements.get("structure", requirements["structure"]),
"feedback": temp_requirements.get("feedback", requirements["feedback"]),
# Always reset content for new requirements
"initial_draft": "",
"completed_sections": []
})
logger.info(f"Processed requirements input: {requirements}")
except Exception as e:
logger.error(f"Unexpected error processing user message: {e}")
# Return existing requirements to avoid crashing, but still clear content
requirements["initial_draft"] = ""
requirements["completed_sections"] = []
return requirements
structure_input = requirements["structure"] if is_feedback else user_message
standardized_structure = self.validate_and_standardize_structure(structure_input)
requirements["structure"] = ", ".join(standardized_structure)
# Log the final state that will be returned
logger.info(f"Final parsed requirements with reset content: {requirements}")
logger.info(f"Completed sections (should be empty): {requirements['completed_sections']}")
logger.info(f"Initial draft (should be empty): {requirements['initial_draft']}")
return requirements
@log_entry_exit
def orchestrator(self, state: State) -> dict:
logger.info(f"Executing orchestrator with state: {state}")
needs_revision = False
logger.info(f"Orchestrator received completed_sections: {state.get('completed_sections', [])}")
# Initialize default return values in case of early return or exception
return_state = {
"sections": [],
"completed_sections": [],
"initial_draft": ""
}
needs_revision=False
if state.get("messages"):
last_message_content = state["messages"][-1].content
try:
feedback_data = json.loads(last_message_content)
if isinstance(feedback_data, dict) and feedback_data.get("approved") is False:
needs_revision = True
except json.JSONDecodeError:
pass
except Exception as e:
logger.warning(f"Error checking last message for revision trigger: {e}")
if needs_revision:
logger.info("Orchestrator identified revision cycle: Clearing completed_sections.")
# Don't modify state directly, include this in return dictionary instead
return_state["completed_sections"] = []
structure_list = [s.strip() for s in state["structure"].split(",")]
section_count = len(structure_list)
feedback = state.get("feedback", "No feedback provided yet.")
prompt = (
f"Create a detailed and structured plan for a blog report consisting of exactly {section_count} sections. "
f"The content should be directly relevant to the topic: '{state['topic']}'. "
f"The primary objective of the blog is to {state['objective']}, targeting an audience of {state['target_audience']}. "
f"Please maintain a {state['tone_style']} tone throughout the writing. "
f"Aim for a total word count of approximately {state['word_count']} words. "
f"Follow this specific structure and section names: {', '.join(structure_list)}. "
f"Incorporate {feedback} to enhance the quality of the content. "
f"Please refrain from adding any extra sections or altering the section names unless {feedback} is provided."
)
try:
report_sections = self.planner.invoke([
SystemMessage(content=prompt),
HumanMessage(content=f"Topic: {state['topic']} with feedback {feedback}")
])
return_state["sections"] = report_sections.sections
except Exception as e:
logger.error(f"Error generating plan with LLM: {e}")
# Keep the default empty values in return_state
logger.info(f"Orchestrator returning: {return_state}")
return return_state
@log_entry_exit
def llm_call(self, state: State) -> dict:
"""Worker writes a section of the report."""
section = self.llm.invoke([
SystemMessage(content="Write a report section following the provided name and description. Include no preamble for each section. Use markdown formatting."),
HumanMessage(content=f"Here is the section name: {state['section'].name} and description: {state['section'].description}")
])
logger.info(f"\n{'='*20}:llm_call output:{'='*20}\nGenerated section: {section.content}\n{'='*20}\n")
logger.info(f"\n---------------------state[completed_sections]:---------------------------- \n{state.get('completed_sections', [])}")
return {"completed_sections": state.get("completed_sections", []) + [section.content]}
@log_entry_exit
def synthesizer(self, state: State) -> dict:
"""Synthesize full report from sections and clear the sections list."""
# Safely get the list, defaulting to empty if it's None or missing
completed_sections = state.get("completed_sections", [])
# Handle case where synthesizer might be called unexpectedly with no sections
if not completed_sections:
logger.warning("Synthesizer called but 'completed_sections' is empty or None.")
# Return an empty draft and ensure the sections list is cleared in the state
return {"initial_draft": "", "completed_sections": []}
# Determine the expected number of sections based on the current plan
expected_section_count = len(state.get("sections", []))
# If we received more sections than expected (likely due to revision state issue),
# take only the last 'expected_section_count' sections.
if expected_section_count > 0 and len(completed_sections) > expected_section_count:
logger.warning(f"Synthesizer received {len(completed_sections)} sections, "
f"but expected {expected_section_count}. Using the last {expected_section_count}.")
sections_to_use = completed_sections[-expected_section_count:]
else:
# Otherwise, use all received sections (normal first run or correct state)
sections_to_use = completed_sections
logger.info(f"Synthesizing report with sections: {completed_sections}")
logger.info(f"Synthesizing report with {len(sections_to_use)} sections:")
logger.info("SYNTHESIZER DEBUG:")
logger.info(f"completed_sections count: {len(completed_sections)}")
for i, section in enumerate(sections_to_use):
# Log only the first few characters to avoid overly long logs
logger.info(f"Section {i+1} (start): {section[:100]}...")
logger.info(f"{'='*20}")
# Join the selected sections to create the draft
initial_draft = "\n\n---\n\n".join(sections_to_use)
logger.info(f"Synthesized report draft generated (length: {len(initial_draft)}).")
# Return the generated draft AND explicitly return an empty list
# for completed_sections to update the state, clearing the old sections.
return {
"initial_draft": initial_draft,
"completed_sections": [] # Explicitly clear the list in the returned state update
}
@log_entry_exit
def feedback_collector(self, state: State) -> dict:
logger.info(f"\n\n----------------:Entered feedback_collector with state:----------------------\n\n{state}")
logger.info(f"Message count: {len(state.get('messages', []))}")
logger.info(f"Last message type: {type(state['messages'][-1]) if state.get('messages') else 'None'}")
if state.get("messages") and len(state["messages"]) > 0 and isinstance(state["messages"][-1], HumanMessage):
try:
feedback_data = json.loads(state["messages"][-1].content)
is_approved = feedback_data.get("approved", False)
comments = feedback_data.get("comments", "")
logger.info(f"Parsed feedback: approved={is_approved}, comments={comments}")
if is_approved:
logger.info("Content approved, preparing final report")
final_report = state.get("initial_draft", "")
collector_output = {
"feedback": comments,
"draft_approved": True,
"final_report": final_report
}
else:
collector_output = {
"feedback": comments,
"draft_approved": False,
"final_report": ""
}
logger.info(f"{'='*20}:feedback_collector output:{'='*20}\n{collector_output}") # Add this log
return collector_output
except json.JSONDecodeError:
logger.warning("Invalid feedback format; returning default values")
return {"feedback": "", "draft_approved": False, "final_report": ""}
logger.info("No new feedback message found; returning default values")
return {"feedback": "", "draft_approved": False, "final_report": ""}
@log_entry_exit
def file_generator(self, state: State) -> dict:
"""Generates the final report and ends the process."""
final_report = state["final_report"]
# In a real scenario, you would save this to a file
logger.info(f"Final Report Generated:\n{final_report}")
return {"final_report_path": "report.md"} # Simulate saving to a file
@log_entry_exit # Conditional edge function to create llm_call workers
def assign_workers(self, state: State):
"""Assign a worker to each section in the plan."""
logger.info(f"\n{'='*10} State before assigning workers {'='*10}")
logger.info(f" Current sections plan: {len(state.get('sections', []))} sections")
# Log the completed_sections list specifically
logger.info(f" Completed Sections before dispatch: {state.get('completed_sections', [])}")
logger.info(f"{'='*40}\n")
return [Send("llm_call", {"section": s}) for s in state["sections"]]
@log_entry_exit# Conditional edge for feedback loop
def route_feedback(self, state: State):
"""Route based on whether draft is approved."""
draft_approved = state.get('draft_approved', False)
logger.info(f"route_feedback: draft_approved = {draft_approved}")
if draft_approved is True: # Strict comparison
logger.info("Draft approved; routing to file_generator")
return "file_generator"
else:
logger.info("Draft not approved; routing back to orchestrator for revision")
return "orchestrator"