devesh1011 commited on
Commit
9b09514
·
1 Parent(s): 9b373ae

Refactor audio transcription process to handle chunking and improve error logging

Browse files
Files changed (1) hide show
  1. workflow.py +140 -271
workflow.py CHANGED
@@ -1,23 +1,31 @@
1
  from agno.workflow import Workflow, RunResponse, RunEvent
2
  from agents.transcription_agent import transcription_agent, Transcription
3
  from agents.site_builder_agent import microsite_builder_agent
 
 
 
 
 
4
  from agents.info_extractor_agent import info_extractor
5
- from utils.netlify_deployment import deploy_html_file_with_digest
6
  from textwrap import dedent
7
  from agno.agent import Agent
8
  from typing import Iterator, Union, Optional
9
- from logging import Logger
10
  from pathlib import Path
11
  from agno.media import Audio
12
  from dotenv import load_dotenv
13
- import requests
14
  import json
15
- from datetime import datetime
 
 
 
 
 
 
 
16
 
17
  load_dotenv()
18
 
19
- # It's good practice to get a logger instance here, though `logging` module needs configuration
20
- logger = Logger(__name__)
21
 
22
 
23
  class MicroSiteGenerator(Workflow):
@@ -31,86 +39,55 @@ class MicroSiteGenerator(Workflow):
31
  info_extractor: Agent = info_extractor
32
  microsite_builder: Agent = microsite_builder_agent
33
 
34
- def save_html_to_file(self, html_content: str) -> str:
35
- """
36
- Manually save HTML content to the microsites directory.
37
-
38
- Args:
39
- html_content: The HTML content to save
40
-
41
- Returns:
42
- str: The full path to the saved HTML file
43
- """
44
- # Create microsites directory if it doesn't exist
45
- microsites_dir = Path(__file__).parent.parent / "microsites"
46
- microsites_dir.mkdir(exist_ok=True)
47
-
48
- # Generate filename with timestamp
49
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
50
- filename = f"demo_{timestamp}.html"
51
- file_path = microsites_dir / filename
52
-
53
- try:
54
- # Write HTML content to file
55
- with open(file_path, "w", encoding="utf-8") as f:
56
- f.write(html_content)
57
-
58
- logger.info(f"HTML saved successfully to: {file_path}")
59
- return str(file_path)
60
-
61
- except Exception as e:
62
- logger.error(f"Failed to save HTML file: {e}")
63
- raise Exception(f"Could not save HTML file: {e}")
64
-
65
  def run(
66
  self,
67
  audio_source: str,
68
  audio_format: str,
69
- use_transcription_cache: bool = True,
70
  ) -> Iterator[RunResponse]:
71
  logger.info("Microsite generation initiated.")
72
 
73
- transcription_results: Optional[Transcription] = None
74
- if use_transcription_cache:
75
- transcription_results = self.get_cached_transcription(audio_source)
76
- if transcription_results:
77
- logger.info(f"Using cached transcription for {audio_source}")
78
- else:
79
- logger.info(
80
- f"No cached transcription found for {audio_source}, transcribing now."
81
- )
82
- transcription_results = self.transcribe_audio(
83
- audio_source, audio_format
84
- )
85
  if transcription_results:
86
- self._add_transcription_to_cache(audio_source, transcription_results)
87
  extracted_info: RunResponse = self.info_extractor.run(
88
  message=transcription_results.transcription
89
  )
90
- extracted_info = self.remove_markdown_json_wrapper(extracted_info.content)
91
- print(extracted_info)
92
 
93
  microsite_builder_input = {
94
- "extracted_info_json": extracted_info,
95
- "raw_transcription": transcription_results.transcription,
96
  }
97
- site_html: RunResponse = microsite_builder_agent.run(
98
- json.dumps(microsite_builder_input)
99
- )
100
-
101
- # Save HTML to filesystem using manual function
102
- html_file_path = self.save_html_to_file(site_html.content.content)
103
- logger.info(f"HTML saved to: {html_file_path}")
104
 
105
- product_name = json.loads(extracted_info)["product_name"]
 
 
 
106
 
107
- site_details = deploy_html_file_with_digest(
108
- title=product_name,
109
- html_file_path=html_file_path,
110
  )
111
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
112
  yield RunResponse(
113
- content=site_details,
114
  event=RunEvent.workflow_completed,
115
  )
116
  else:
@@ -119,142 +96,6 @@ class MicroSiteGenerator(Workflow):
119
  event=RunEvent.workflow_completed,
120
  )
121
 
122
- # transcription_results = self.transcribe_audio(audio_source, audio_format)
123
- # if transcription_results:
124
- # yield RunResponse(
125
- # content=transcription_results.transcription, # The transcription text
126
- # event=RunEvent.workflow_completed,
127
- # )
128
- # else:
129
- # yield RunResponse(
130
- # content="Transcription failed.", event=RunEvent.workflow_completed
131
- # )
132
- # extracted_info: RunResponse = self.info_extractor.run(
133
- # message=transcription_results.transcription
134
- # )
135
- # print(self.remove_markdown_json_wrapper(extracted_info.content))
136
-
137
- def get_cached_transcription(
138
- self, audio_source: Union[str, Path, bytes]
139
- ) -> Optional[Transcription]:
140
- """
141
- Retrieves a cached transcription result for a given audio source.
142
- """
143
- # For caching, audio_source needs to be hashable. If it's bytes, convert to a string key.
144
- cache_key = (
145
- str(audio_source)
146
- if isinstance(audio_source, (str, Path))
147
- else f"bytes_hash_{hash(audio_source)}"
148
- )
149
- logger.info(f"Checking if cached transcription exists for {cache_key}.")
150
- transcription_result = self.session_state.get("transcription_cache", {}).get(
151
- cache_key
152
- )
153
- # Use model_validate to convert dict from cache back to Transcription object
154
- return (
155
- Transcription.model_validate(transcription_result)
156
- if transcription_result and isinstance(transcription_result, dict)
157
- else None
158
- )
159
-
160
- def _add_transcription_to_cache(
161
- self, audio_source: Union[str, Path, bytes], transcription_result: Transcription
162
- ):
163
- """
164
- Adds a transcription result to the session cache.
165
- """
166
- cache_key = (
167
- str(audio_source)
168
- if isinstance(audio_source, (str, Path))
169
- else f"bytes_hash_{hash(audio_source)}"
170
- )
171
- logger.info(f"Saving transcription results for audio source: {cache_key}")
172
- self.session_state.setdefault("transcription_cache", {})
173
- # Store the Pydantic model as a dictionary
174
- self.session_state["transcription_cache"][
175
- cache_key
176
- ] = transcription_result.model_dump()
177
-
178
- def remove_markdown_json_wrapper(self, json_string_with_markdown: str) -> str:
179
- """
180
- Removes the '```json' prefix and '```' suffix from a string,
181
- assuming the JSON content is wrapped in a Markdown code block.
182
-
183
- Args:
184
- json_string_with_markdown: The string containing the JSON wrapped in markdown.
185
- Expected format: ```json\n{...json content...}\n```
186
-
187
- Returns:
188
- The cleaned JSON string without the markdown wrapper.
189
- """
190
- cleaned_string = json_string_with_markdown
191
-
192
- # Remove '```json\n' from the start
193
- if cleaned_string.startswith("```json\n"):
194
- cleaned_string = cleaned_string[len("```json\n") :]
195
-
196
- # Remove '\n```' from the end
197
- if cleaned_string.endswith("\n```"):
198
- cleaned_string = cleaned_string[: -len("\n```")]
199
-
200
- return cleaned_string
201
-
202
- # --- Caching Functions ---
203
- def get_cached_transcription(
204
- self, audio_source: Union[str, Path, bytes]
205
- ) -> Optional[Transcription]:
206
- """
207
- Retrieves a cached transcription result for a given audio source.
208
- """
209
- # For caching, audio_source needs to be hashable. If it's bytes, convert to a string key.
210
- cache_key = (
211
- str(audio_source)
212
- if isinstance(audio_source, (str, Path))
213
- else f"bytes_hash_{hash(audio_source)}"
214
- )
215
- logger.info(f"Checking if cached transcription exists for {cache_key}.")
216
- transcription_result = self.session_state.get("transcription_cache", {}).get(
217
- cache_key
218
- )
219
- # Use model_validate to convert dict from cache back to Transcription object
220
- return (
221
- Transcription.model_validate(transcription_result)
222
- if transcription_result and isinstance(transcription_result, dict)
223
- else None
224
- )
225
-
226
- def _add_transcription_to_cache(
227
- self, audio_source: Union[str, Path, bytes], transcription_result: Transcription
228
- ):
229
- """
230
- Adds a transcription result to the session cache.
231
- """
232
- cache_key = (
233
- str(audio_source)
234
- if isinstance(audio_source, (str, Path))
235
- else f"bytes_hash_{hash(audio_source)}"
236
- )
237
- logger.info(f"Saving transcription results for audio source: {cache_key}")
238
- self.session_state.setdefault("transcription_cache", {})
239
- # Store the Pydantic model as a dictionary
240
- self.session_state["transcription_cache"][
241
- cache_key
242
- ] = transcription_result.model_dump()
243
-
244
- # --- Audio Handling Function ---
245
- def _download_audio(self, url: str) -> bytes:
246
- """
247
- Downloads audio from a given URL.
248
- """
249
- logger.info(f"Attempting to download audio from URL: {url}")
250
- try:
251
- response = requests.get(url, stream=True)
252
- response.raise_for_status() # Raise an exception for HTTP errors
253
- return response.content
254
- except requests.exceptions.RequestException as e:
255
- logger.error(f"Failed to download audio from {url}: {e}")
256
- raise ValueError(f"Could not download audio from URL: {e}")
257
-
258
  def _get_audio_bytes(self, source: Union[str, Path, bytes]) -> bytes:
259
  """
260
  Retrieves audio content as bytes from various sources (path, URL, or raw bytes).
@@ -263,8 +104,6 @@ class MicroSiteGenerator(Workflow):
263
  return source
264
  elif isinstance(source, (str, Path)):
265
  str_source = str(source)
266
- if str_source.startswith(("http://", "https://")):
267
- return self._download_audio(str_source)
268
  return Path(str_source).read_bytes()
269
  raise ValueError("Unsupported audio source type.")
270
 
@@ -292,84 +131,114 @@ class MicroSiteGenerator(Workflow):
292
  self,
293
  audio_source: Union[str, Path, bytes],
294
  audio_format: str = "wav",
295
- num_attempts: int = 3,
 
296
  ):
297
  """
298
- Manages the transcription process, including getting audio bytes and retrying the agent.
 
299
  """
300
- logger.info("Initiating audio transcription process.")
 
 
301
  try:
302
  audio_bytes = self._get_audio_bytes(audio_source)
303
  except (ValueError, NotImplementedError) as e:
304
  logger.error(f"Failed to get audio bytes: {str(e)}")
305
  return None
306
 
307
- for attempt in range(num_attempts):
308
- transcription_response = self._run_transcription_agent(
309
- audio_bytes, audio_format
 
 
 
 
 
 
 
310
  )
311
- if transcription_response:
312
- logger.info(f"Transcription successful after {attempt + 1} attempt(s).")
313
- return transcription_response
314
- else:
315
- logger.warning(
316
- f"Transcription attempt {attempt + 1}/{num_attempts} failed."
317
- )
318
- logger.error(
319
- f"Transcription failed after {num_attempts} attempts for {audio_source}."
 
 
 
 
 
 
 
 
 
 
320
  )
321
- return None
322
 
323
- # # --- Transcription Phase ---
324
- # transcription_results: Optional[Transcription] = None
325
- # if use_transcription_cache:
326
- # transcription_results = self.get_cached_transcription(audio_source)
327
- # if transcription_results:
328
- # logger.info(f"Using cached transcription for {audio_source}")
329
- # # Yield cached transcription as RunResponse
330
- # yield RunResponse(
331
- # content=f"Using cached transcription: {transcription_results.transcription}",
332
- # event=RunEvent.workflow_completed,
333
- # )
334
- # return
335
- # else:
336
- # logger.info(
337
- # f"No cached transcription found for {audio_source}, transcribing now."
338
- # )
339
- # transcription_results = self.transcribe_audio(
340
- # audio_source, audio_format
341
- # )
342
- # if transcription_results:
343
- # self._add_transcription_to_cache(
344
- # audio_source, transcription_results
345
- # )
346
- # else:
347
- # logger.info(
348
- # f"Transcription cache disabled, transcribing {audio_source} now."
349
- # )
350
- # transcription_results = self.transcribe_audio(audio_source, audio_format)
351
- # if transcription_results:
352
- # self._add_transcription_to_cache(audio_source, transcription_results)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
353
 
354
- # if transcription_results is None:
355
- # logger.error("Transcription was not successful. Workflow halted.")
356
- # yield RunResponse(
357
- # content="Transcription failed. Workflow halted.",
358
- # event=RunEvent.workflow_completed,
359
- # )
360
- # return
361
 
362
- # # --- Information Extraction Phase ---
363
- # logger.info("Transcription successful. Proceeding to information extraction.")
364
- # # Run the info_extractor agent and yield its response
365
- # yield from self.info_extractor.run(
366
- # input=transcription_results.transcription, # Pass the raw string transcription to the extractor
367
- # stream=True,
368
- # )
369
 
370
- # # Cache the final result
371
- # if (
372
- # self.info_extractor.run_response
373
- # and self.info_extractor.run_response.content
374
- # ):
375
- # logger.info("Information extraction successful. Workflow completed.")
 
1
  from agno.workflow import Workflow, RunResponse, RunEvent
2
  from agents.transcription_agent import transcription_agent, Transcription
3
  from agents.site_builder_agent import microsite_builder_agent
4
+ from openinference.instrumentation.agno import AgnoInstrumentor
5
+ from opentelemetry import trace as trace_api
6
+ from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
7
+ from opentelemetry.sdk.trace import TracerProvider
8
+ from opentelemetry.sdk.trace.export import SimpleSpanProcessor
9
  from agents.info_extractor_agent import info_extractor
 
10
  from textwrap import dedent
11
  from agno.agent import Agent
12
  from typing import Iterator, Union, Optional
 
13
  from pathlib import Path
14
  from agno.media import Audio
15
  from dotenv import load_dotenv
 
16
  import json
17
+ import logging
18
+ import io
19
+ from pydub import AudioSegment
20
+ from langsmith import traceable
21
+ import ast
22
+
23
+ AgnoInstrumentor().instrument()
24
+
25
 
26
  load_dotenv()
27
 
28
+ logger = logging.getLogger(__name__)
 
29
 
30
 
31
  class MicroSiteGenerator(Workflow):
 
39
  info_extractor: Agent = info_extractor
40
  microsite_builder: Agent = microsite_builder_agent
41
 
42
+ @traceable
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  def run(
44
  self,
45
  audio_source: str,
46
  audio_format: str,
 
47
  ) -> Iterator[RunResponse]:
48
  logger.info("Microsite generation initiated.")
49
 
50
+ transcription_results = self.transcribe_audio(audio_source, audio_format)
51
+
 
 
 
 
 
 
 
 
 
 
52
  if transcription_results:
 
53
  extracted_info: RunResponse = self.info_extractor.run(
54
  message=transcription_results.transcription
55
  )
 
 
56
 
57
  microsite_builder_input = {
58
+ "extracted_info_json": extracted_info.content.model_dump_json(),
 
59
  }
 
 
 
 
 
 
 
60
 
61
+ site_details: RunResponse = microsite_builder_agent.run(
62
+ message=json.dumps(microsite_builder_input),
63
+ stream_intermediate_steps=True,
64
+ )
65
 
66
+ logger.info(
67
+ f"Microsite built and deployment details received: {site_details}"
 
68
  )
69
 
70
+ # Parse the JSON string back to dictionary if it's a string
71
+ deployment_result = site_details.content
72
+ if isinstance(deployment_result, str):
73
+ try:
74
+ # Try JSON first (double quotes)
75
+ deployment_result = json.loads(deployment_result)
76
+ except json.JSONDecodeError:
77
+ try:
78
+ # Try Python literal eval (single quotes)
79
+ deployment_result = ast.literal_eval(deployment_result)
80
+ except (ValueError, SyntaxError):
81
+ logger.error(
82
+ f"Failed to parse deployment result: {deployment_result}"
83
+ )
84
+ deployment_result = {
85
+ "success": False,
86
+ "error": "Invalid deployment result format",
87
+ }
88
+
89
  yield RunResponse(
90
+ content=deployment_result,
91
  event=RunEvent.workflow_completed,
92
  )
93
  else:
 
96
  event=RunEvent.workflow_completed,
97
  )
98
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99
  def _get_audio_bytes(self, source: Union[str, Path, bytes]) -> bytes:
100
  """
101
  Retrieves audio content as bytes from various sources (path, URL, or raw bytes).
 
104
  return source
105
  elif isinstance(source, (str, Path)):
106
  str_source = str(source)
 
 
107
  return Path(str_source).read_bytes()
108
  raise ValueError("Unsupported audio source type.")
109
 
 
131
  self,
132
  audio_source: Union[str, Path, bytes],
133
  audio_format: str = "wav",
134
+ num_attempts: int = 3, # This might apply per chunk or for the whole process
135
+ chunk_duration_ms: int = 60000, # Default to 60-second chunks
136
  ):
137
  """
138
+ Manages the transcription process, including getting audio bytes,
139
+ chunking the audio, and retrying the agent for each chunk.
140
  """
141
+ logger.info(
142
+ f"Initiating audio transcription process for {audio_source} (format: {audio_format})."
143
+ )
144
  try:
145
  audio_bytes = self._get_audio_bytes(audio_source)
146
  except (ValueError, NotImplementedError) as e:
147
  logger.error(f"Failed to get audio bytes: {str(e)}")
148
  return None
149
 
150
+ try:
151
+ pydub_format = audio_format.lower()
152
+ if pydub_format == "m4a":
153
+ pydub_format = "mp4"
154
+ elif pydub_format == "opus":
155
+ pass
156
+
157
+ sound = AudioSegment.from_file(io.BytesIO(audio_bytes), format=pydub_format)
158
+ logger.info(
159
+ f"Audio loaded into pydub. Duration: {len(sound) / 1000:.2f} seconds."
160
  )
161
+ except Exception as e:
162
+ logger.error(
163
+ f"Failed to load audio with pydub: {e}. Ensure ffmpeg is installed and audio format is supported."
164
+ )
165
+ return None
166
+
167
+ chunks = [
168
+ sound[i : i + chunk_duration_ms]
169
+ for i in range(0, len(sound), chunk_duration_ms)
170
+ ]
171
+
172
+ if not chunks:
173
+ logger.error(
174
+ "Audio was too short to be chunked or pydub failed to create chunks."
175
+ )
176
+ return None
177
+
178
+ logger.info(
179
+ f"Audio split into {len(chunks)} chunk(s) of approximately {chunk_duration_ms / 1000}s each."
180
  )
 
181
 
182
+ all_transcription_parts = []
183
+ successful_chunks = 0
184
+
185
+ for i, audio_chunk_segment in enumerate(chunks):
186
+ logger.info(f"Processing audio chunk {i + 1}/{len(chunks)}...")
187
+ chunk_io = io.BytesIO()
188
+
189
+ # Export chunk in the format expected by the transcription agent
190
+ # This should ideally be a lossless format if possible, or the original format
191
+ # if the agent handles various inputs well.
192
+ try:
193
+ # Use the original audio_format for exporting to the agent,
194
+ # as pydub_format was for loading.
195
+ audio_chunk_segment.export(chunk_io, format=audio_format)
196
+ chunk_bytes = chunk_io.getvalue()
197
+ except Exception as e:
198
+ logger.error(
199
+ f"Failed to export audio chunk {i+1} to format {audio_format}: {e}"
200
+ )
201
+ all_transcription_parts.append(f"[chunk {i+1} export failed]")
202
+ continue
203
+
204
+ transcription_response_content = None
205
+ for attempt in range(num_attempts):
206
+ logger.info(f"Attempt {attempt + 1}/{num_attempts} for chunk {i + 1}.")
207
+ transcription_response_content = self._run_transcription_agent(
208
+ chunk_bytes, audio_format
209
+ )
210
+ if transcription_response_content and hasattr(
211
+ transcription_response_content, "transcription"
212
+ ):
213
+ logger.info(
214
+ f"Transcription successful for chunk {i + 1} on attempt {attempt + 1}."
215
+ )
216
+ break
217
+ else:
218
+ logger.warning(
219
+ f"Transcription attempt {attempt + 1}/{num_attempts} for chunk {i + 1} failed or returned unexpected content."
220
+ )
221
+
222
+ if transcription_response_content and hasattr(
223
+ transcription_response_content, "transcription"
224
+ ):
225
+ all_transcription_parts.append(
226
+ transcription_response_content.transcription
227
+ )
228
+ successful_chunks += 1
229
+ else:
230
+ logger.error(
231
+ f"Transcription failed for chunk {i + 1} after {num_attempts} attempts."
232
+ )
233
+ all_transcription_parts.append(f"[chunk {i+1} transcription failed]")
234
 
235
+ if successful_chunks == 0 and len(chunks) > 0:
236
+ logger.error("All audio chunks failed to transcribe.")
237
+ return None # Or a Transcription object with an error message
 
 
 
 
238
 
239
+ full_transcription_text = " ".join(all_transcription_parts).strip()
240
+ logger.info(
241
+ f"Combined transcription from {successful_chunks}/{len(chunks)} chunks generated."
242
+ )
 
 
 
243
 
244
+ return Transcription(transcription=full_transcription_text)