File size: 13,733 Bytes
2dfc473
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7289c0c
 
 
 
 
2dfc473
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7289c0c
 
 
2dfc473
 
7289c0c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2dfc473
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7289c0c
 
 
2dfc473
 
 
 
 
 
 
 
 
7289c0c
 
 
 
 
2dfc473
7289c0c
2dfc473
 
 
 
7289c0c
2dfc473
7289c0c
2dfc473
7289c0c
 
 
 
 
2dfc473
 
7289c0c
2dfc473
 
 
 
 
 
7289c0c
2dfc473
 
7289c0c
 
 
 
 
 
 
 
 
 
 
2dfc473
7289c0c
2dfc473
 
 
 
 
7289c0c
2dfc473
7289c0c
 
2dfc473
 
7289c0c
 
 
 
 
2dfc473
7289c0c
2dfc473
 
 
7289c0c
 
 
 
 
 
2dfc473
7289c0c
2dfc473
7289c0c
2dfc473
 
 
7289c0c
 
 
 
 
2dfc473
7289c0c
2dfc473
 
 
7289c0c
 
 
 
 
 
2dfc473
7289c0c
2dfc473
7289c0c
 
2dfc473
 
7289c0c
 
 
 
 
2dfc473
7289c0c
2dfc473
 
 
7289c0c
 
 
 
 
 
2dfc473
7289c0c
2dfc473
 
7289c0c
2dfc473
 
7289c0c
 
 
 
 
2dfc473
7289c0c
2dfc473
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7289c0c
2dfc473
 
7289c0c
 
 
 
 
 
2dfc473
7289c0c
2dfc473
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
"""Pipeline orchestrator that manages agent flow and data passing."""
import json
import logging
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
from agents import (
    ShowrunnerAgent,
    StoryEditorAgent,
    CulturalConsultantAgent,
    LeadWriterAgent,
    DialogueSpecialistAgent,
    ComedyWriterAgent,
    ProofreaderAgent,
)
from hf_uploader import HFUploader
from config import settings


logger = logging.getLogger(__name__)


class PipelineValidationError(Exception):
    """Raised when pipeline validation fails."""
    pass


class PipelineOrchestrator:
    """Orchestrates the multi-agent content generation pipeline."""

    def __init__(self):
        """Initialize the orchestrator with all agents."""
        self.showrunner = ShowrunnerAgent()
        self.story_editor = StoryEditorAgent()
        self.cultural_consultant = CulturalConsultantAgent()
        self.lead_writer = LeadWriterAgent()
        self.dialogue_specialist = DialogueSpecialistAgent()
        self.comedy_writer = ComedyWriterAgent()
        self.proofreader = ProofreaderAgent()
        self.hf_uploader = HFUploader()
        
        # Pipeline state
        self.run_id = str(uuid.uuid4())
        self.pipeline_state = {
            "run_id": self.run_id,
            "start_time": datetime.now().isoformat(),
            "stages": {},
        }
        
        # Extracted character list for consistency
        self.character_list = []
        
        logger.info(f"Initialized pipeline orchestrator with run_id: {self.run_id}")

    def _validate_output(self, stage_name: str, output: Dict[str, Any], required_keys: list) -> None:
        """Validate that a stage output contains required keys and is not empty.
        
        Args:
            stage_name: Name of the stage
            output: Output dictionary from the stage
            required_keys: List of required keys
            
        Raises:
            PipelineValidationError: If validation fails
        """
        if not output:
            raise PipelineValidationError(f"{stage_name}: Output is empty or None")
        
        for key in required_keys:
            if key not in output:
                raise PipelineValidationError(f"{stage_name}: Missing required key '{key}'")
            
            value = output.get(key, "")
            if isinstance(value, str) and not value.strip():
                raise PipelineValidationError(
                    f"{stage_name}: Required field '{key}' is empty. "
                    f"This indicates a processing failure. Aborting pipeline."
                )

    def _extract_characters(self, character_bible: str) -> list:
        """Extract character names from character bible.
        
        Args:
            character_bible: Character definitions
            
        Returns:
            List of character names
        """
        # Simple extraction - look for common patterns
        characters = []
        lines = character_bible.split('\n')
        for line in lines:
            # Look for lines that define characters (e.g., "Alex (CEO)" or "- Jordan")
            if any(marker in line for marker in ['(', ':', '-']):
                # Extract the first word as potential character name
                words = line.strip().split()
                if words and words[0].replace('-', '').replace('*', '').isalpha():
                    char_name = words[0].replace('-', '').replace('*', '').strip()
                    if len(char_name) > 1 and char_name[0].isupper():
                        characters.append(char_name)
        
        # Remove duplicates while preserving order
        seen = set()
        unique_chars = []
        for char in characters:
            if char not in seen:
                seen.add(char)
                unique_chars.append(char)
        
        self.character_list = unique_chars
        logger.info(f"Extracted characters: {self.character_list}")
        return unique_chars

    def execute_pipeline(
        self,
        user_brief: str,
        season_arc_document: str,
        character_bible: str,
        world_building_document: str,
        character_voice_guide: str,
        style_guide: str,
        continuity_log: str,
        hook_brief: Optional[str] = None,
    ) -> Dict[str, Any]:
        """Execute the full content generation pipeline.

        Args:
            user_brief: Initial user brief
            season_arc_document: Season context
            character_bible: Character definitions
            world_building_document: World context
            character_voice_guide: Character voice definitions
            style_guide: Style reference
            continuity_log: Continuity tracking
            hook_brief: Optional hook brief for comedy writer

        Returns:
            Dictionary with final output and metadata
        """
        try:
            logger.info("Starting pipeline execution")
            
            # Extract character list for consistency enforcement
            self._extract_characters(character_bible)

            # Stage 1: Showrunner
            logger.info("Stage 1: Showrunner - Generating directive")
            showrunner_inputs = {
                "user_brief": user_brief,
                "season_arc_document": season_arc_document,
                "character_bible": character_bible,
            }
            showrunner_output = self.showrunner.generate_directive(showrunner_inputs)
            self._validate_output(
                "Showrunner",
                showrunner_output,
                ["episode_directive", "story_premise", "tone_brief", "character_focus_notes"]
            )
            self.pipeline_state["stages"]["showrunner"] = showrunner_output
            logger.info("Stage 1 completed βœ“")

            # Stage 2: Story Editor
            logger.info("Stage 2: Story Editor - Generating outline")
            story_editor_inputs = {
                "episode_directive": showrunner_output.get("episode_directive", ""),
                "series_continuity_log": continuity_log,
                "character_list": self.character_list,  # Pass character list for consistency
            }
            story_editor_output = self.story_editor.generate_outline(story_editor_inputs)
            self._validate_output(
                "Story Editor",
                story_editor_output,
                ["episode_outline", "act_structure"]
            )
            self.pipeline_state["stages"]["story_editor"] = story_editor_output
            logger.info("Stage 2 completed βœ“")

            # Stage 3: Cultural Consultant (parallel with Lead Writer)
            logger.info("Stage 3: Cultural Consultant - Reviewing outline")
            cultural_inputs = {
                "episode_outline": story_editor_output.get("episode_outline", ""),
                "world_building_document": world_building_document,
                "character_list": self.character_list,
            }
            cultural_output = self.cultural_consultant.review_outline(cultural_inputs)
            self._validate_output(
                "Cultural Consultant",
                cultural_output,
                ["cultural_accuracy_notes"]
            )
            
            # Check if cultural consultant flagged critical issues
            flagged = cultural_output.get("flagged_inaccuracies", [])
            if flagged and len(flagged) > 2:
                logger.warning(f"Cultural Consultant flagged {len(flagged)} issues - proceeding with caution")
            
            self.pipeline_state["stages"]["cultural_consultant"] = cultural_output
            logger.info("Stage 3 completed βœ“")

            # Stage 4: Lead Writer
            logger.info("Stage 4: Lead Writer - Writing script")
            lead_writer_inputs = {
                "approved_outline": story_editor_output.get("episode_outline", ""),
                "cultural_consultant_notes": cultural_output.get("cultural_accuracy_notes", ""),
                "character_voice_guide": character_voice_guide,
                "character_list": self.character_list,  # Enforce character consistency
                "story_premise": showrunner_output.get("story_premise", ""),
            }
            lead_writer_output = self.lead_writer.write_script(lead_writer_inputs)
            self._validate_output(
                "Lead Writer",
                lead_writer_output,
                ["full_episode_first_draft"]
            )
            self.pipeline_state["stages"]["lead_writer"] = lead_writer_output
            logger.info("Stage 4 completed βœ“")

            # Stage 5: Dialogue Specialist
            logger.info("Stage 5: Dialogue Specialist - Polishing dialogue")
            
            # Ensure script is properly serialized as string
            first_draft = lead_writer_output.get("full_episode_first_draft", "")
            if isinstance(first_draft, dict):
                first_draft = json.dumps(first_draft, indent=2)
            
            dialogue_inputs = {
                "first_draft_script": first_draft,
                "character_voice_guide": character_voice_guide,
                "character_list": self.character_list,
                "dialect_slang_reference": "",
            }
            dialogue_output = self.dialogue_specialist.polish_dialogue(dialogue_inputs)
            self._validate_output(
                "Dialogue Specialist",
                dialogue_output,
                ["dialogue_polished_script"]
            )
            self.pipeline_state["stages"]["dialogue_specialist"] = dialogue_output
            logger.info("Stage 5 completed βœ“")

            # Stage 6: Comedy Writer
            logger.info("Stage 6: Comedy Writer - Adding humor")
            
            # Ensure script is properly serialized
            polished_script = dialogue_output.get("dialogue_polished_script", "")
            if isinstance(polished_script, dict):
                polished_script = json.dumps(polished_script, indent=2)
            
            comedy_inputs = {
                "dialogue_polished_script": polished_script,
                "hook_brief_from_showrunner": hook_brief or user_brief,
                "character_list": self.character_list,
                "tone_brief": showrunner_output.get("tone_brief", ""),
            }
            comedy_output = self.comedy_writer.add_humor(comedy_inputs)
            self._validate_output(
                "Comedy Writer",
                comedy_output,
                ["comedy_sharpened_script"]
            )
            self.pipeline_state["stages"]["comedy_writer"] = comedy_output
            logger.info("Stage 6 completed βœ“")

            # Stage 7: Proofreader (Final QC)
            logger.info("Stage 7: Proofreader - Final quality control")
            
            # Ensure script is properly serialized
            comedy_script = comedy_output.get("comedy_sharpened_script", "")
            if isinstance(comedy_script, dict):
                comedy_script = json.dumps(comedy_script, indent=2)
            
            proofreader_inputs = {
                "comedy_sharpened_script": comedy_script,
                "style_guide": style_guide,
                "continuity_log": continuity_log,
                "character_list": self.character_list,
            }
            proofreader_output = self.proofreader.final_qc(proofreader_inputs)
            self._validate_output(
                "Proofreader",
                proofreader_output,
                ["final_locked_script"]
            )
            self.pipeline_state["stages"]["proofreader"] = proofreader_output
            logger.info("Stage 7 completed βœ“")

            # Mark completion
            self.pipeline_state["end_time"] = datetime.now().isoformat()
            self.pipeline_state["status"] = "completed"

            # Save local state
            self._save_pipeline_state()

            # Upload to Hugging Face
            logger.info("Uploading final output to Hugging Face")
            hf_url = self.hf_uploader.upload_final_output(
                proofreader_output, self.run_id
            )
            hf_metadata_url = self.hf_uploader.upload_pipeline_metadata(
                self.pipeline_state
            )

            final_result = {
                "run_id": self.run_id,
                "status": "success",
                "final_output": proofreader_output,
                "hf_output_url": hf_url,
                "hf_metadata_url": hf_metadata_url,
                "pipeline_state": self.pipeline_state,
            }

            logger.info("βœ“ Pipeline execution completed successfully")
            return final_result

        except PipelineValidationError as e:
            logger.error(f"βœ— Pipeline validation failed: {str(e)}")
            self.pipeline_state["status"] = "failed"
            self.pipeline_state["error"] = str(e)
            self._save_pipeline_state()
            raise
        except Exception as e:
            logger.error(f"βœ— Pipeline execution failed: {str(e)}")
            self.pipeline_state["status"] = "failed"
            self.pipeline_state["error"] = str(e)
            self._save_pipeline_state()
            raise

    def _save_pipeline_state(self) -> None:
        """Save the pipeline state to local storage."""
        output_dir = Path(settings.output_dir)
        output_dir.mkdir(parents=True, exist_ok=True)
        
        state_file = output_dir / f"pipeline_{self.run_id}.json"
        with open(state_file, "w") as f:
            json.dump(self.pipeline_state, f, indent=2)
        
        logger.info(f"Pipeline state saved to {state_file}")