File size: 19,316 Bytes
c1fa745
 
1457034
c1fa745
33890b1
 
d4f456e
 
 
14ac8e9
d4f456e
73fccdf
 
 
f056e53
 
6d62dad
c3dca60
c1fa745
 
73fccdf
d4f456e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e868a05
73fccdf
33890b1
4968b71
33890b1
4968b71
 
 
 
 
 
 
 
 
 
e868a05
 
 
c3dca60
4968b71
 
 
 
e868a05
4968b71
d4f456e
4968b71
 
 
 
 
 
 
 
 
 
 
e868a05
 
 
 
4968b71
 
 
 
 
 
 
e868a05
4968b71
 
 
 
 
 
 
 
4bea86e
e868a05
 
 
4968b71
 
 
 
e868a05
 
 
4968b71
33890b1
4968b71
 
 
 
e868a05
 
 
 
 
4968b71
e868a05
73fccdf
 
d4f456e
c1fa745
73fccdf
c3db886
 
e868a05
 
 
 
 
 
 
c3db886
73fccdf
 
 
 
 
 
 
 
 
 
 
 
 
 
e868a05
 
73fccdf
c1fa745
33890b1
73fccdf
d4f456e
33890b1
d4f456e
 
 
 
 
 
73fccdf
 
f056e53
d4f456e
73fccdf
 
 
 
 
e868a05
 
73fccdf
 
e868a05
 
 
 
73fccdf
8291d80
c1fa745
 
 
 
 
73fccdf
8291d80
 
73fccdf
e376063
73fccdf
d4f456e
4bea86e
 
 
 
 
 
 
 
 
73fccdf
c3db886
 
 
 
 
 
 
 
 
 
 
 
73fccdf
 
 
c3db886
73fccdf
 
c3db886
 
 
 
73fccdf
4bea86e
c3db886
 
 
4bea86e
c3db886
4bea86e
 
c3db886
4bea86e
 
c3db886
73fccdf
c1fa745
60f7bfc
 
 
 
d4f456e
9949bd0
 
 
 
 
c9f4fb2
7b3ba19
 
 
c9f4fb2
7b3ba19
c9f4fb2
 
7b3ba19
 
c9f4fb2
7b3ba19
 
c9f4fb2
7b3ba19
c9f4fb2
 
 
9949bd0
d4f456e
7b3ba19
c9f4fb2
d4f456e
7b3ba19
73fccdf
d4f456e
 
 
 
 
 
c1fa745
73fccdf
d4f456e
c1fa745
8291d80
 
 
 
 
c1fa745
 
73fccdf
d4f456e
c1fa745
60f7bfc
 
 
 
f46236f
60f7bfc
 
 
 
c3db886
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
from langgraph.graph import StateGraph, START, END
from langgraph.constants import Send
from src.langgraphagenticai.state.state import BlogState as State, Sections, Section  # Import from state.py
from langchain_core.messages import SystemMessage, HumanMessage
import streamlit as st
import json
from datetime import datetime
from typing import List

from src.langgraphagenticai.logging.logging_utils import logger, log_entry_exit

import functools
import time

class BlogGenerationNode:
    def __init__(self, model):
        """Initialize the BlogGenerationNode with an LLM."""
        self.llm = model
        self.planner = model.with_structured_output(Sections)

    @log_entry_exit
    def validate_and_standardize_structure(self, user_input: str) -> List[str]:
        """
        Uses an LLM to interpret user input and generate a standardized list of blog section names.
        Ensures the user's specified structure is respected if provided.

        Args:
            user_input (str): The full user input from the Streamlit form (e.g., "Topic: AI\nStructure: Intro, Benefits, Summary").

        Returns:
            List[str]: A list of standardized section names (e.g., ["Intro", "Benefits", "Summary"]).
        """
        # Default structure if all else fails
        default_structure = ["Introduction", "Main Content", "Conclusion"]

        # If input is empty or whitespace-only, return default
        if not user_input or not user_input.strip():
            logger.info("Empty or whitespace-only input; returning default structure")
            return default_structure

        # Extract the user's structure if provided
        user_structure = None
        for line in user_input.split("\n"):
            if line.lower().startswith("structure:"):
                user_structure = line.split(":", 1)[1].strip()
                break

        if not user_structure:
            logger.info("No structure provided; returning default structure")
            return default_structure

        # Define the prompt for the LLM
        system_prompt = (
            "You are an expert blog planner. Your task is to analyze the user's input and extract or infer a clear, concise structure "
            "for a blog post as a list of section names. The input may explicitly list sections (e.g., 'Structure: Intro, Benefits, Summary') "
            "or describe them implicitly (e.g., 'I want an intro, some benefits, and a conclusion'). "
            "If the user provides a 'Structure' field (e.g., 'Structure: Intro, Benefits, Summary'), you MUST use those exact section names "
            "without modification, except for capitalizing the first letter of each section. "
            "If no structure is provided or it's unclear, propose a logical default structure based on the topic or context. "
            "Return the result as a JSON object with a single key 'sections' containing the list of section names. "
            "Capitalize each section name and avoid adding unnecessary sections beyond what’s indicated."
        )

        # Prepare messages for the LLM
        messages = [
            SystemMessage(content=system_prompt),
            HumanMessage(content=f"User input: {user_input}")
        ]

        try:
            # Invoke the LLM and expect a JSON response
            response = self.llm.invoke(messages)
            response_content = response.content if hasattr(response, "content") else str(response)
            logger.info(f"LLM response for structure: {response_content}")

            # Parse the JSON response
            result = json.loads(response_content)
            sections = result.get("sections", default_structure)

            # Validate and standardize the output
            if not isinstance(sections, list) or not sections:
                logger.warning("LLM returned invalid sections; using default structure")
                return default_structure

            # Clean up section names: strip whitespace, capitalize, remove empty strings
            cleaned_sections = [s.strip().capitalize() for s in sections if s.strip()]

            # If user provided a structure, enforce it
            if user_structure:
                user_sections = [s.strip().capitalize() for s in user_structure.split(",") if s.strip()]
                if len(cleaned_sections) == len(user_sections):
                    # Override LLM sections with user sections if lengths match
                    cleaned_sections = user_sections
                else:
                    logger.warning(f"LLM section count ({len(cleaned_sections)}) doesn't match user section count ({len(user_sections)}); using user structure")
                    cleaned_sections = user_sections

            return cleaned_sections if cleaned_sections else default_structure

        except Exception as e:
            logger.error(f"Error in LLM structure generation: {e}")
            return default_structure
    
    @log_entry_exit
    def user_input(self, state: State) -> dict:
        """Handle user input, distinguishing between initial requirements and feedback."""
        logger.info(f"Executing user_input with state: {state}")
        
        # Initialize requirements with existing state values to preserve them
        requirements = {
            "topic": state.get("topic", "No topic provided"),
            "objective": state.get("objective", "Informative"),
            "target_audience": state.get("target_audience", "General Audience"),
            "tone_style": state.get("tone_style", "Casual"),
            "word_count": state.get("word_count", 1000),
            "structure": state.get("structure", "Introduction, Main Content, Conclusion"),
            "feedback": state.get("feedback", "No feedback provided yet."),
            # Always reset these values to ensure old content doesn't persist
            "initial_draft": "",  
            "completed_sections": []  
        }
        
        # Get the latest message
        user_message = state["messages"][-1].content if state["messages"] else ""
        if not user_message:
            logger.warning("No user message provided; returning existing requirements with reset content")
            return requirements

        # Flag to track if the message is feedback
        is_feedback = False
        
        try:
            # Check if the message is feedback (JSON format)
            feedback_data = json.loads(user_message)
            if isinstance(feedback_data, dict) and "approved" in feedback_data:
                # This is a feedback message, update only the feedback field
                requirements["feedback"] = feedback_data.get("comments", "No feedback provided.")
                is_feedback = True
                logger.info(f"Processed feedback message: {requirements['feedback']}")
                
                # For feedback, we definitely want to ensure content reset
                requirements["initial_draft"] = ""
                requirements["completed_sections"] = []
            else:
                # Treat as requirements input
                temp_requirements = {}
                for line in user_message.split("\n"):
                    if ": " in line:
                        key, value = line.split(": ", 1)
                        temp_requirements[key.lower().replace(" & ", "_").replace(" ", "_")] = value
                
                # Update requirements only for provided fields
                requirements.update({
                    "topic": temp_requirements.get("topic", requirements["topic"]),
                    "objective": temp_requirements.get("objective", requirements["objective"]),
                    "target_audience": temp_requirements.get("target_audience", requirements["target_audience"]),
                    "tone_style": temp_requirements.get("tone_style", requirements["tone_style"]),
                    "word_count": int(temp_requirements.get("word_count", requirements["word_count"])),
                    "structure": temp_requirements.get("structure", requirements["structure"]),
                    "feedback": temp_requirements.get("feedback", requirements["feedback"]),
                    # Always reset content for new requirements
                    "initial_draft": "",  
                    "completed_sections": []  
                })
                logger.info(f"Processed requirements input: {requirements}")
        except Exception as e:
            logger.error(f"Unexpected error processing user message: {e}")
            # Return existing requirements to avoid crashing, but still clear content
            requirements["initial_draft"] = ""
            requirements["completed_sections"] = []
            return requirements

        structure_input = requirements["structure"] if is_feedback else user_message
        standardized_structure = self.validate_and_standardize_structure(structure_input)
        requirements["structure"] = ", ".join(standardized_structure)

        # Log the final state that will be returned
        logger.info(f"Final parsed requirements with reset content: {requirements}")
        logger.info(f"Completed sections (should be empty): {requirements['completed_sections']}")
        logger.info(f"Initial draft (should be empty): {requirements['initial_draft']}")
        
        return requirements

    
    @log_entry_exit        
    def orchestrator(self, state: State) -> dict:
        logger.info(f"Executing orchestrator with state: {state}")
        needs_revision = False

        logger.info(f"Orchestrator received completed_sections: {state.get('completed_sections', [])}")
        
        # Initialize default return values in case of early return or exception
        return_state = {
            "sections": [],
            "completed_sections": [],
            "initial_draft": ""
        }
        needs_revision=False

        if state.get("messages"):
            last_message_content = state["messages"][-1].content
            try:
                feedback_data = json.loads(last_message_content)
                if isinstance(feedback_data, dict) and feedback_data.get("approved") is False:
                    needs_revision = True
            except json.JSONDecodeError:
                pass
            except Exception as e:
                logger.warning(f"Error checking last message for revision trigger: {e}")

        if needs_revision:
            logger.info("Orchestrator identified revision cycle: Clearing completed_sections.")
            # Don't modify state directly, include this in return dictionary instead
            return_state["completed_sections"] = []

        structure_list = [s.strip() for s in state["structure"].split(",")]
        section_count = len(structure_list)
        feedback = state.get("feedback", "No feedback provided yet.")

        prompt = (
            f"Create a detailed and structured plan for a blog report consisting of exactly {section_count} sections. "
            f"The content should be directly relevant to the topic: '{state['topic']}'. "
            f"The primary objective of the blog is to {state['objective']}, targeting an audience of {state['target_audience']}. "
            f"Please maintain a {state['tone_style']} tone throughout the writing. "
            f"Aim for a total word count of approximately {state['word_count']} words. "
            f"Follow this specific structure and section names: {', '.join(structure_list)}. "
            f"Incorporate {feedback} to enhance the quality of the content. "
            f"Please refrain from adding any extra sections or altering the section names unless {feedback} is provided."
        )

        try:
            report_sections = self.planner.invoke([
                SystemMessage(content=prompt),
                HumanMessage(content=f"Topic: {state['topic']} with feedback {feedback}")
            ])
            return_state["sections"] = report_sections.sections
            
        except Exception as e:
            logger.error(f"Error generating plan with LLM: {e}")
            # Keep the default empty values in return_state
        
        logger.info(f"Orchestrator returning: {return_state}")
        return return_state
    @log_entry_exit
    def llm_call(self, state: State) -> dict:
        """Worker writes a section of the report."""
        section = self.llm.invoke([
            SystemMessage(content="Write a report section following the provided name and description. Include no preamble for each section. Use markdown formatting."),
            HumanMessage(content=f"Here is the section name: {state['section'].name} and description: {state['section'].description}")
        ])
        logger.info(f"\n{'='*20}:llm_call output:{'='*20}\nGenerated section: {section.content}\n{'='*20}\n")
        logger.info(f"\n---------------------state[completed_sections]:---------------------------- \n{state.get('completed_sections', [])}")

        return {"completed_sections": state.get("completed_sections", []) + [section.content]}
    
    @log_entry_exit
    def synthesizer(self, state: State) -> dict:
            """Synthesize full report from sections and clear the sections list."""
            # Safely get the list, defaulting to empty if it's None or missing
            completed_sections = state.get("completed_sections", []) 
            
            # Handle case where synthesizer might be called unexpectedly with no sections
            if not completed_sections:
                logger.warning("Synthesizer called but 'completed_sections' is empty or None.")
                # Return an empty draft and ensure the sections list is cleared in the state
                return {"initial_draft": "", "completed_sections": []} 
            
            # Determine the expected number of sections based on the current plan
            expected_section_count = len(state.get("sections", []))

            # If we received more sections than expected (likely due to revision state issue),
            # take only the last 'expected_section_count' sections.
            if expected_section_count > 0 and len(completed_sections) > expected_section_count:
                logger.warning(f"Synthesizer received {len(completed_sections)} sections, "
                               f"but expected {expected_section_count}. Using the last {expected_section_count}.")
                sections_to_use = completed_sections[-expected_section_count:]
            else:
                # Otherwise, use all received sections (normal first run or correct state)
                sections_to_use = completed_sections
            
            logger.info(f"Synthesizing report with sections: {completed_sections}")

            logger.info(f"Synthesizing report with {len(sections_to_use)} sections:")
            logger.info("SYNTHESIZER DEBUG:")
            logger.info(f"completed_sections count: {len(completed_sections)}")
            for i, section in enumerate(sections_to_use):
                # Log only the first few characters to avoid overly long logs
                logger.info(f"Section {i+1} (start): {section[:100]}...")
                logger.info(f"{'='*20}")


            # Join the selected sections to create the draft
            initial_draft = "\n\n---\n\n".join(sections_to_use)
            logger.info(f"Synthesized report draft generated (length: {len(initial_draft)}).")

            # Return the generated draft AND explicitly return an empty list
            # for completed_sections to update the state, clearing the old sections.
            return {
                "initial_draft": initial_draft,
                "completed_sections": []  # Explicitly clear the list in the returned state update
            }
    
    @log_entry_exit
    def feedback_collector(self, state: State) -> dict:
        logger.info(f"\n\n----------------:Entered feedback_collector with state:----------------------\n\n{state}")
        logger.info(f"Message count: {len(state.get('messages', []))}")
        logger.info(f"Last message type: {type(state['messages'][-1]) if state.get('messages') else 'None'}")
        
        if state.get("messages") and len(state["messages"]) > 0 and isinstance(state["messages"][-1], HumanMessage):
            try:
                feedback_data = json.loads(state["messages"][-1].content)
                is_approved = feedback_data.get("approved", False)
                comments = feedback_data.get("comments", "")
                logger.info(f"Parsed feedback: approved={is_approved}, comments={comments}")

                if is_approved:
                    logger.info("Content approved, preparing final report")
                    final_report = state.get("initial_draft", "")
                    collector_output = {
                        "feedback": comments,
                        "draft_approved": True,
                        "final_report": final_report
                    }
                else:
                    collector_output = {
                        "feedback": comments,
                        "draft_approved": False,
                        "final_report": ""
                    }
                logger.info(f"{'='*20}:feedback_collector output:{'='*20}\n{collector_output}") # Add this log
                return collector_output

            except json.JSONDecodeError:
                logger.warning("Invalid feedback format; returning default values")
                return {"feedback": "", "draft_approved": False, "final_report": ""}

        logger.info("No new feedback message found; returning default values")
        return {"feedback": "", "draft_approved": False, "final_report": ""}
    @log_entry_exit
    def file_generator(self, state: State) -> dict:
        """Generates the final report and ends the process."""
        final_report = state["final_report"]
        # In a real scenario, you would save this to a file
        logger.info(f"Final Report Generated:\n{final_report}")
        return {"final_report_path": "report.md"} # Simulate saving to a file

    @log_entry_exit # Conditional edge function to create llm_call workers
    def assign_workers(self, state: State):
        """Assign a worker to each section in the plan."""
        logger.info(f"\n{'='*10} State before assigning workers {'='*10}")
        logger.info(f"  Current sections plan: {len(state.get('sections', []))} sections")
        # Log the completed_sections list specifically
        logger.info(f"  Completed Sections before dispatch: {state.get('completed_sections', [])}")
        logger.info(f"{'='*40}\n")
        return [Send("llm_call", {"section": s}) for s in state["sections"]]

    @log_entry_exit# Conditional edge for feedback loop
    def route_feedback(self, state: State):
        """Route based on whether draft is approved."""
        draft_approved = state.get('draft_approved', False)
        logger.info(f"route_feedback: draft_approved = {draft_approved}")
        
        if draft_approved is True:  # Strict comparison
            logger.info("Draft approved; routing to file_generator")
            return "file_generator"
        else:
            logger.info("Draft not approved; routing back to orchestrator for revision")
            return "orchestrator"