File size: 9,601 Bytes
b694626
 
 
e0128d9
 
 
 
 
b694626
 
 
 
 
 
 
 
9b09514
 
 
 
 
 
3a73af5
9b09514
b694626
 
 
9b09514
b694626
 
 
 
 
 
 
 
 
 
 
 
 
3a73af5
b694626
 
 
 
 
 
 
9b09514
 
b694626
 
 
 
 
 
9b09514
b694626
 
9b09514
 
 
 
b694626
9b09514
 
b694626
 
9b09514
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b694626
9b09514
b694626
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9b09514
 
b694626
 
9b09514
 
b694626
9b09514
 
 
b694626
 
 
 
 
 
9b09514
 
 
 
 
 
 
 
 
 
b694626
9b09514
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b694626
 
9b09514
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b694626
9b09514
 
 
b694626
9b09514
 
 
 
b694626
9b09514
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
from agno.workflow import Workflow, RunResponse, RunEvent
from agents.transcription_agent import transcription_agent, Transcription
from agents.site_builder_agent import microsite_builder_agent
# from openinference.instrumentation.agno import AgnoInstrumentor
# from opentelemetry import trace as trace_api
# from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
# from opentelemetry.sdk.trace import TracerProvider
# from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from agents.info_extractor_agent import info_extractor
from textwrap import dedent
from agno.agent import Agent
from typing import Iterator, Union, Optional
from pathlib import Path
from agno.media import Audio
from dotenv import load_dotenv
import json
import logging
import io
from pydub import AudioSegment
from langsmith import traceable
import ast

# AgnoInstrumentor().instrument()


load_dotenv()

logger = logging.getLogger(__name__)


class MicroSiteGenerator(Workflow):
    description: str = dedent(
        """\
        An intelligent AI agent that seamlessly transforms product demo call recordings into personalized, interactive recap websites. This workflow orchestrates multiple AI agents to transcribe the demo, intelligently extract key discussion points and features, and dynamically assemble compelling, shareable microsites.
    """
    )

    transcriber: Agent = transcription_agent
    info_extractor: Agent = info_extractor
    microsite_builder: Agent = microsite_builder_agent

    # @traceable
    def run(
        self,
        audio_source: str,
        audio_format: str,
    ) -> Iterator[RunResponse]:
        logger.info("Microsite generation initiated.")

        transcription_results = self.transcribe_audio(audio_source, audio_format)

        if transcription_results:
            extracted_info: RunResponse = self.info_extractor.run(
                message=transcription_results.transcription
            )

            microsite_builder_input = {
                "extracted_info_json": extracted_info.content.model_dump_json(),
            }

            site_details: RunResponse = microsite_builder_agent.run(
                message=json.dumps(microsite_builder_input),
                stream_intermediate_steps=True,
            )

            logger.info(
                f"Microsite built and deployment details received: {site_details}"
            )

            # Parse the JSON string back to dictionary if it's a string
            deployment_result = site_details.content
            if isinstance(deployment_result, str):
                try:
                    # Try JSON first (double quotes)
                    deployment_result = json.loads(deployment_result)
                except json.JSONDecodeError:
                    try:
                        # Try Python literal eval (single quotes)
                        deployment_result = ast.literal_eval(deployment_result)
                    except (ValueError, SyntaxError):
                        logger.error(
                            f"Failed to parse deployment result: {deployment_result}"
                        )
                        deployment_result = {
                            "success": False,
                            "error": "Invalid deployment result format",
                        }

            yield RunResponse(
                content=deployment_result,
                event=RunEvent.workflow_completed,
            )
        else:
            yield RunResponse(
                content="Site was not generated",
                event=RunEvent.workflow_completed,
            )

    def _get_audio_bytes(self, source: Union[str, Path, bytes]) -> bytes:
        """
        Retrieves audio content as bytes from various sources (path, URL, or raw bytes).
        """
        if isinstance(source, bytes):
            return source
        elif isinstance(source, (str, Path)):
            str_source = str(source)
            return Path(str_source).read_bytes()
        raise ValueError("Unsupported audio source type.")

    # --- Transcription Execution Functions ---
    def _run_transcription_agent(
        self,
        audio_source_bytes: bytes,
        audio_format: str,
    ):
        """
        Executes the transcription agent with the given audio bytes.
        """
        logger.info(f"Running transcription agent for audio format: {audio_format}")
        try:
            run_response: RunResponse = self.transcriber.run(
                input="Transcribe this audio exactly as heard",
                audio=[Audio(content=audio_source_bytes, format=audio_format)],
            )
            return run_response.content
        except Exception as e:
            logger.error(f"Transcription agent failed: {str(e)}")
            return None

    def transcribe_audio(
        self,
        audio_source: Union[str, Path, bytes],
        audio_format: str = "wav",
        num_attempts: int = 3,  # This might apply per chunk or for the whole process
        chunk_duration_ms: int = 60000,  # Default to 60-second chunks
    ):
        """
        Manages the transcription process, including getting audio bytes,
        chunking the audio, and retrying the agent for each chunk.
        """
        logger.info(
            f"Initiating audio transcription process for {audio_source} (format: {audio_format})."
        )
        try:
            audio_bytes = self._get_audio_bytes(audio_source)
        except (ValueError, NotImplementedError) as e:
            logger.error(f"Failed to get audio bytes: {str(e)}")
            return None

        try:
            pydub_format = audio_format.lower()
            if pydub_format == "m4a":
                pydub_format = "mp4"
            elif pydub_format == "opus":
                pass

            sound = AudioSegment.from_file(io.BytesIO(audio_bytes), format=pydub_format)
            logger.info(
                f"Audio loaded into pydub. Duration: {len(sound) / 1000:.2f} seconds."
            )
        except Exception as e:
            logger.error(
                f"Failed to load audio with pydub: {e}. Ensure ffmpeg is installed and audio format is supported."
            )
            return None

        chunks = [
            sound[i : i + chunk_duration_ms]
            for i in range(0, len(sound), chunk_duration_ms)
        ]

        if not chunks:
            logger.error(
                "Audio was too short to be chunked or pydub failed to create chunks."
            )
            return None

        logger.info(
            f"Audio split into {len(chunks)} chunk(s) of approximately {chunk_duration_ms / 1000}s each."
        )

        all_transcription_parts = []
        successful_chunks = 0

        for i, audio_chunk_segment in enumerate(chunks):
            logger.info(f"Processing audio chunk {i + 1}/{len(chunks)}...")
            chunk_io = io.BytesIO()

            # Export chunk in the format expected by the transcription agent
            # This should ideally be a lossless format if possible, or the original format
            # if the agent handles various inputs well.
            try:
                # Use the original audio_format for exporting to the agent,
                # as pydub_format was for loading.
                audio_chunk_segment.export(chunk_io, format=audio_format)
                chunk_bytes = chunk_io.getvalue()
            except Exception as e:
                logger.error(
                    f"Failed to export audio chunk {i+1} to format {audio_format}: {e}"
                )
                all_transcription_parts.append(f"[chunk {i+1} export failed]")
                continue

            transcription_response_content = None
            for attempt in range(num_attempts):
                logger.info(f"Attempt {attempt + 1}/{num_attempts} for chunk {i + 1}.")
                transcription_response_content = self._run_transcription_agent(
                    chunk_bytes, audio_format
                )
                if transcription_response_content and hasattr(
                    transcription_response_content, "transcription"
                ):
                    logger.info(
                        f"Transcription successful for chunk {i + 1} on attempt {attempt + 1}."
                    )
                    break
                else:
                    logger.warning(
                        f"Transcription attempt {attempt + 1}/{num_attempts} for chunk {i + 1} failed or returned unexpected content."
                    )

            if transcription_response_content and hasattr(
                transcription_response_content, "transcription"
            ):
                all_transcription_parts.append(
                    transcription_response_content.transcription
                )
                successful_chunks += 1
            else:
                logger.error(
                    f"Transcription failed for chunk {i + 1} after {num_attempts} attempts."
                )
                all_transcription_parts.append(f"[chunk {i+1} transcription failed]")

        if successful_chunks == 0 and len(chunks) > 0:
            logger.error("All audio chunks failed to transcribe.")
            return None  # Or a Transcription object with an error message

        full_transcription_text = " ".join(all_transcription_parts).strip()
        logger.info(
            f"Combined transcription from {successful_chunks}/{len(chunks)} chunks generated."
        )

        return Transcription(transcription=full_transcription_text)