| """ |
| AudioBook Forge - Backend |
| Model-agnostic TTS engine with Qwen3-TTS support. |
| Character extraction, dialogue parsing, audio stitching, file import, |
| chapter detection, segment preview, and multi-format export. |
| """ |
|
|
| import os |
| import re |
| import json |
| import hashlib |
| import tempfile |
| import zipfile |
| from pathlib import Path |
| from typing import List, Dict, Optional, Tuple, Any |
| from dataclasses import dataclass, field, asdict |
| from collections import defaultdict |
| from html.parser import HTMLParser |
| import warnings |
|
|
| import numpy as np |
| import soundfile as sf |
| from pydub import AudioSegment |
|
|
| warnings.filterwarnings("ignore") |
|
|
| |
| |
| |
| PRESET_SPEAKERS = { |
| "Ryan": {"lang": "English", "desc": "Dynamic, expressive male"}, |
| "Aiden": {"lang": "English", "desc": "Sunny, warm male"}, |
| "Serena": {"lang": "Chinese", "desc": "Young female (Chinese)"}, |
| "Vivian": {"lang": "Chinese", "desc": "Young female (Chinese)"}, |
| "Uncle_Fu": {"lang": "Chinese", "desc": "Seasoned elder male (Chinese)"}, |
| "Ono_Anna": {"lang": "Japanese", "desc": "Playful female (Japanese)"}, |
| "Sohee": {"lang": "Korean", "desc": "Warm female (Korean)"}, |
| "Dylan": {"lang": "Chinese", "desc": "Beijing dialect male"}, |
| "Eric": {"lang": "Chinese", "desc": "Sichuan dialect male"}, |
| } |
|
|
| MAX_CHUNK_CHARS = 380 |
| MIN_CHUNK_CHARS = 80 |
| CROSSFADE_MS = 80 |
| WORDS_PER_MINUTE = 150 |
|
|
| SAMPLE_STORIES = { |
| "The Velveteen Rabbit (excerpt)": """There was once a velveteen rabbit, and in the beginning he was really splendid. He was fat and bunchy, as a rabbit should be; his coat was spotted brown and white, he had real thread whiskers, and his ears were lined with pink sateen. |
| |
| On Christmas morning, when he sat wedged in the top of the Boy's stocking, with a sprig of holly between his paws, the effect was charming. |
| |
| There were other things in the stocking, nuts and oranges and a toy engine, and chocolate almonds and a clockwork mouse, but the Rabbit was quite the best of all. For at least two hours the Boy loved him, and then Aunts and Uncles came to dinner, and there was a great rustling of tissue paper and unwrapping of parcels, and in the excitement of looking at all the new presents the Velveteen Rabbit was forgotten. |
| |
| For a long time he lived in the toy cupboard or on the nursery floor, and no one thought very much about him. He was naturally shy, and being only made of velveteen, some of the more expensive toys quite snubbed him. The mechanical toys were very superior, and looked down upon every one else; they were full of modern ideas, and pretended they were real. |
| |
| The Rabbit could not claim to be a model of anything, for he didn't know that real rabbits existed; he thought they were all stuffed with sawdust like himself, and he understood that sawdust was quite out-of-date and should never be mentioned in modern circles. |
| |
| Even Timothy, the jointed wooden lion, who was made by the disabled soldiers, and should have had broader views, put on airs and pretended he was connected with Government. Between them all the poor little Rabbit was made to feel himself very insignificant and commonplace, and the only person who was kind to him at all was the Skin Horse. |
| |
| The Skin Horse had lived longer in the nursery than any of the others. He was so old that his brown coat was bald in patches and showed the seams underneath, and most of the hairs in his tail had been pulled out to string bead necklaces. |
| |
| He was wise, for he had seen a long succession of mechanical toys arrive to boast and swagger, and by-and-by break their mainsprings and pass away, and he knew that they were only toys, and would never turn into anything else. For nursery magic is very strange and wonderful, and only those playthings that are old and wise and experienced like the Skin Horse understand all about it.""", |
|
|
| "A Study in Scarlet (excerpt)": """In the year 1878 I took my degree of Doctor of Medicine of the University of London, and proceeded to Netley to go through the course prescribed for surgeons in the army. Having completed my studies there, I was duly attached to the Fifth Northumberland Fusiliers as Assistant Surgeon. |
| |
| The regiment was stationed in India at the time, and before I could join it, the second Afghan war had broken out. On landing at Bombay, I learned that my corps had advanced through the passes, and was already deep in the enemy's country. |
| |
| I followed, however, with many other officers who were in the same situation as myself, and succeeded in reaching Candahar in safety, where I found my regiment, and at once entered upon my new duties. |
| |
| The campaign brought honours and promotion to many, but for me it had nothing but misfortune and disaster. I was removed from my brigade and attached to the Berkshires, with whom I served at the fatal battle of Maiwand. |
| |
| There I was struck on the shoulder by a Jezail bullet, which shattered the bone and grazed the subclavian artery. I should have fallen into the hands of the murderous Ghazis had it not been for the devotion and courage shown by Murray, my orderly, who threw me across a pack-horse, and succeeded in bringing me safely to the British lines. |
| |
| Worn with pain, and weak from the prolonged hardships which I had undergone, I was removed, with a great train of wounded sufferers, to the base hospital at Peshawar. Here I rallied, and had already improved so far as to be able to walk about the wards, and even to bask a little upon the verandah, when I was struck down by enteric fever, that curse of our Indian possessions. |
| |
| For months my life was despaired of, and when at last I came to myself and became convalescent, I was so weak and emaciated that a medical board determined that not a day should be lost in sending me back to England. I was dispatched, accordingly, in the troopship Orontes, and landed a month later on Portsmouth jetty, with my health irretrievably ruined, but with permission from a paternal government to spend the next nine months in attempting to improve it.""", |
|
|
| "Pride and Prejudice (excerpt)": """It is a truth universally acknowledged, that a single man in possession of a good fortune, must be in want of a wife. |
| |
| However little known the feelings or views of such a man may be on his first entering a neighbourhood, this truth is so well fixed in the minds of the surrounding families, that he is considered the rightful property of some one or other of their daughters. |
| |
| \"My dear Mr. Bennet,\" said his lady to him one day, \"have you heard that Netherfield Park is let at last?\" |
| |
| Mr. Bennet replied that he had not. |
| |
| \"But it is,\" returned she; \"for Mrs. Long has just been here, and she told me all about it.\" |
| |
| Mr. Bennet made no answer. |
| |
| \"Do you not want to know who has taken it?\" cried his wife impatiently. |
| |
| \"You want to tell me, and I have no objection to hearing it.\" |
| |
| This was invitation enough. |
| |
| \"Why, my dear, you must know, Mrs. Long says that Netherfield is taken by a young man of large fortune from the north of England; that he came down on Monday in a chaise and four to see the place, and was so much delighted with it, that he agreed with Mr. Morris immediately; that he is to take possession before Michaelmas, and some of his servants are to be in the house by the end of next week.\" |
| |
| \"What is his name?\" |
| |
| \"Bingley.\" |
| |
| \"Is he married or single?\" |
| |
| \"Oh! Single, my dear, to be sure! A single man of large fortune; four or five thousand a year. What a fine thing for our girls!\" |
| |
| \"How so? How can it affect them?\" |
| |
| \"My dear Mr. Bennet,\" replied his wife, \"how can you be so tiresome! You must know that I am thinking of his marrying one of them.\" |
| |
| \"Is that his design in settling here?\" |
| |
| \"Design! Nonsense, how can you talk so! But it is very likely that he may fall in love with one of them, and therefore you must visit him as soon as he comes.\"""", |
| } |
|
|
| |
| |
| |
|
|
| @dataclass |
| class VoiceConfig: |
| name: str = "Narrator" |
| mode: str = "preset" |
| preset: Optional[str] = None |
| ref_audio: Optional[str] = None |
| ref_text: Optional[str] = None |
| design_desc: Optional[str] = None |
| instruct: str = "" |
| language: str = "English" |
| speed: float = 1.0 |
| description: str = "" |
|
|
| def to_dict(self) -> dict: |
| return asdict(self) |
|
|
| @classmethod |
| def from_dict(cls, d: dict) -> "VoiceConfig": |
| return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__}) |
|
|
|
|
| @dataclass |
| class TextSegment: |
| text: str |
| seg_type: str = "narration" |
| speaker: Optional[str] = None |
| emotion_hint: Optional[str] = None |
| chapter_idx: int = 0 |
|
|
|
|
| @dataclass |
| class CharacterProfile: |
| name: str |
| description: str = "" |
| voice: VoiceConfig = field(default_factory=VoiceConfig) |
| occurrences: int = 0 |
|
|
|
|
| @dataclass |
| class Chapter: |
| idx: int |
| title: str |
| text: str |
| word_count: int = 0 |
|
|
|
|
| |
| |
| |
|
|
| class EPUBTextExtractor(HTMLParser): |
| def __init__(self): |
| super().__init__() |
| self.text_parts = [] |
| self.in_script = False |
| self.in_body = False |
|
|
| def handle_starttag(self, tag, attrs): |
| if tag in ("script", "style"): |
| self.in_script = True |
| if tag == "body": |
| self.in_body = True |
| if tag in ("p", "div", "h1", "h2", "h3", "h4", "br"): |
| self.text_parts.append("\n") |
|
|
| def handle_endtag(self, tag): |
| if tag in ("script", "style"): |
| self.in_script = False |
| if tag in ("p", "div", "h1", "h2", "h3", "h4"): |
| self.text_parts.append("\n") |
|
|
| def handle_data(self, data): |
| if not self.in_script: |
| self.text_parts.append(data) |
|
|
| def get_text(self) -> str: |
| text = "".join(self.text_parts) |
| text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text) |
| text = re.sub(r"[ \t]+", " ", text) |
| return text.strip() |
|
|
|
|
| def parse_file(filepath: str) -> Tuple[str, str]: |
| """Parse uploaded file and return (text, filename).""" |
| path = Path(filepath) |
| suffix = path.suffix.lower() |
|
|
| if suffix == ".txt": |
| with open(path, "r", encoding="utf-8", errors="ignore") as f: |
| return f.read(), path.name |
|
|
| elif suffix == ".epub": |
| with zipfile.ZipFile(path, "r") as z: |
| texts = [] |
| for name in z.namelist(): |
| if name.endswith((".html", ".htm", ".xhtml", ".xml")): |
| with z.open(name) as f: |
| content = f.read().decode("utf-8", errors="ignore") |
| parser = EPUBTextExtractor() |
| parser.feed(content) |
| texts.append(parser.get_text()) |
| return "\n\n".join(texts), path.name |
|
|
| elif suffix == ".pdf": |
| try: |
| from PyPDF2 import PdfReader |
| reader = PdfReader(str(path)) |
| texts = [] |
| for page in reader.pages: |
| t = page.extract_text() |
| if t: |
| texts.append(t) |
| return "\n\n".join(texts), path.name |
| except Exception as e: |
| raise ValueError(f"PDF parsing failed: {e}") |
|
|
| elif suffix in (".html", ".htm"): |
| with open(path, "r", encoding="utf-8", errors="ignore") as f: |
| content = f.read() |
| parser = EPUBTextExtractor() |
| parser.feed(content) |
| return parser.get_text(), path.name |
|
|
| else: |
| raise ValueError(f"Unsupported file type: {suffix}") |
|
|
|
|
| |
| |
| |
|
|
| class TextProcessor: |
| DIALOGUE_RE = re.compile(r'(?:^|[.!?\n]\s+)\s*"([^"]{3,500})"') |
| SPEAKER_RE = re.compile(r'(?:^|\n)\s*([A-Z][a-zA-Z\s]{1,20})(?:\s*[:\-–])\s*"([^"]+)"') |
| NAME_RE = re.compile(r'\b([A-Z][a-z]{1,15})\b') |
| CHAPTER_RE = re.compile( |
| r'^(?:\s*(?:Chapter|CHAPTER|Part|PART|Book|BOOK|Section|SECTION)\s*(?:[IVX\d]+|[A-Z]).*)$', |
| re.MULTILINE, |
| ) |
| HEADER_RE = re.compile( |
| r'^(?:\s*\d+\s+|\s*Page\s*\d+.*|\s*www\.\S+.*|\s*Copyright.*|\s*All rights reserved.*)$', |
| re.MULTILINE | re.IGNORECASE, |
| ) |
|
|
| @staticmethod |
| def clean_text(text: str) -> str: |
| """Remove headers, page numbers, excessive whitespace.""" |
| text = TextProcessor.HEADER_RE.sub("", text) |
| text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text) |
| text = re.sub(r"[ \t]+", " ", text) |
| text = re.sub(r"^\s+", "", text, flags=re.MULTILINE) |
| return text.strip() |
|
|
| @staticmethod |
| def detect_chapters(text: str) -> List[Chapter]: |
| """Split text into chapters by chapter headings.""" |
| matches = list(TextProcessor.CHAPTER_RE.finditer(text)) |
| if len(matches) < 2: |
| |
| words = len(text.split()) |
| return [Chapter(idx=0, title="Full Text", text=text, word_count=words)] |
|
|
| chapters = [] |
| for i, match in enumerate(matches): |
| start = match.start() |
| title = match.group(0).strip() |
| end = matches[i + 1].start() if i + 1 < len(matches) else len(text) |
| ch_text = text[start:end].strip() |
| words = len(ch_text.split()) |
| chapters.append(Chapter(idx=i, title=title, text=ch_text, word_count=words)) |
| return chapters |
|
|
| @staticmethod |
| def extract_characters(text: str) -> List[CharacterProfile]: |
| profiles: Dict[str, CharacterProfile] = {} |
|
|
| for match in TextProcessor.SPEAKER_RE.finditer(text): |
| name = match.group(1).strip() |
| if len(name) > 2: |
| if name not in profiles: |
| profiles[name] = CharacterProfile(name=name) |
| profiles[name].occurrences += 1 |
|
|
| for match in TextProcessor.DIALOGUE_RE.finditer(text): |
| before = text[max(0, match.start() - 120):match.start()] |
| said_match = re.search( |
| r'([A-Z][a-z]{1,15})\s+(?:said|cried|shouted|whispered|replied|asked|answered|called|exclaimed)', |
| before, |
| ) |
| if said_match: |
| name = said_match.group(1) |
| if name not in profiles: |
| profiles[name] = CharacterProfile(name=name) |
| profiles[name].occurrences += 1 |
|
|
| all_names = TextProcessor.NAME_RE.findall(text) |
| from collections import Counter |
| common = Counter(all_names).most_common(30) |
| for name, count in common: |
| if count >= 3 and len(name) > 2 and name not in profiles: |
| if name.lower() in { |
| "the", "and", "but", "for", "are", "was", "were", "had", "have", "has", |
| "his", "her", "she", "him", "they", "them", "said", "with", "from", |
| "that", "this", "what", "when", "where", "would", "could", "should", |
| "not", "you", "all", "any", "can", "had", "her", "was", "one", "our", |
| "out", "day", "get", "has", "him", "his", "how", "its", "may", "new", |
| "now", "old", "see", "two", "who", "boy", "man", "way", "too", "upon", |
| }: |
| continue |
| profiles[name] = CharacterProfile(name=name, occurrences=count) |
|
|
| result = sorted(profiles.values(), key=lambda p: p.occurrences, reverse=True) |
| return result[:12] |
|
|
| @staticmethod |
| def segment_text(text: str, characters: List[str]) -> List[TextSegment]: |
| text = text.replace("\r\n", "\n").replace("\r", "\n") |
| paragraphs = [p.strip() for p in re.split(r'\n\s*\n', text) if p.strip()] |
| segments = [] |
|
|
| for para in paragraphs: |
| speaker_match = re.match( |
| r'^([A-Z][a-zA-Z\s]{1,20})[:\-–]\s*"([^"]+)"', |
| para, |
| ) |
| if speaker_match: |
| speaker = speaker_match.group(1).strip() |
| dialogue = speaker_match.group(2) |
| segments.append(TextSegment(text=dialogue, seg_type="dialogue", speaker=speaker)) |
| remainder = para[speaker_match.end():].strip() |
| if remainder: |
| segments.append(TextSegment(text=remainder, seg_type="narration")) |
| continue |
|
|
| parts = re.split(r'"([^"]{3,500})"', para) |
| for i, part in enumerate(parts): |
| part = part.strip() |
| if not part: |
| continue |
| if i % 2 == 1: |
| segments.append(TextSegment(text=part, seg_type="dialogue", speaker=None)) |
| else: |
| segments.append(TextSegment(text=part, seg_type="narration")) |
|
|
| merged = [] |
| for seg in segments: |
| if merged and seg.seg_type == "narration" and merged[-1].seg_type == "narration": |
| merged[-1].text += " " + seg.text |
| else: |
| merged.append(seg) |
| return merged |
|
|
| @staticmethod |
| def chunk_segments(segments: List[TextSegment], max_chars: int = MAX_CHUNK_CHARS) -> List[TextSegment]: |
| result = [] |
| for seg in segments: |
| if len(seg.text) <= max_chars: |
| result.append(seg) |
| continue |
| sentences = re.split(r'(?<=[.!?])\s+', seg.text) |
| current_text = "" |
| for sent in sentences: |
| if len(current_text) + len(sent) + 1 <= max_chars: |
| current_text += (" " if current_text else "") + sent |
| else: |
| if current_text: |
| result.append(TextSegment( |
| text=current_text.strip(), |
| seg_type=seg.seg_type, |
| speaker=seg.speaker, |
| )) |
| current_text = sent |
| if current_text: |
| result.append(TextSegment( |
| text=current_text.strip(), |
| seg_type=seg.seg_type, |
| speaker=seg.speaker, |
| )) |
| return result |
|
|
|
|
| |
| |
| |
|
|
| class TTSEngine: |
| def __init__(self): |
| self._custom_voice_model = None |
| self._base_model = None |
| self._design_model = None |
| self._model_ids = { |
| "custom": "Qwen/Qwen3-TTS-12Hz-1.7B-CustomVoice", |
| "base": "Qwen/Qwen3-TTS-12Hz-1.7B-Base", |
| "design": "Qwen/Qwen3-TTS-12Hz-1.7B-VoiceDesign", |
| } |
| self._cache_dir = Path(tempfile.gettempdir()) / "audiobook_cache" |
| self._cache_dir.mkdir(exist_ok=True) |
|
|
| def _load_custom_voice(self): |
| if self._custom_voice_model is not None: |
| return self._custom_voice_model |
| try: |
| from qwen_tts import Qwen3TTSModel |
| import torch |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| print(f"[TTS] Loading CustomVoice model on {device}...") |
| self._custom_voice_model = Qwen3TTSModel.from_pretrained( |
| self._model_ids["custom"], |
| device_map=device, |
| dtype=torch.bfloat16 if device == "cuda" else torch.float32, |
| ) |
| print("[TTS] CustomVoice ready.") |
| except Exception as e: |
| print(f"[TTS] CustomVoice load failed: {e}") |
| raise |
| return self._custom_voice_model |
|
|
| def _load_base(self): |
| if self._base_model is not None: |
| return self._base_model |
| try: |
| from qwen_tts import Qwen3TTSModel |
| import torch |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| print(f"[TTS] Loading Base (clone) model on {device}...") |
| self._base_model = Qwen3TTSModel.from_pretrained( |
| self._model_ids["base"], |
| device_map=device, |
| dtype=torch.bfloat16 if device == "cuda" else torch.float32, |
| ) |
| print("[TTS] Base ready.") |
| except Exception as e: |
| print(f"[TTS] Base load failed: {e}") |
| raise |
| return self._base_model |
|
|
| def _load_design(self): |
| if self._design_model is not None: |
| return self._design_model |
| try: |
| from qwen_tts import Qwen3TTSModel |
| import torch |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| print(f"[TTS] Loading VoiceDesign model on {device}...") |
| self._design_model = Qwen3TTSModel.from_pretrained( |
| self._model_ids["design"], |
| device_map=device, |
| dtype=torch.bfloat16 if device == "cuda" else torch.float32, |
| ) |
| print("[TTS] VoiceDesign ready.") |
| except Exception as e: |
| print(f"[TTS] VoiceDesign load failed: {e}") |
| raise |
| return self._design_model |
|
|
| def _cache_key(self, text: str, voice: VoiceConfig) -> str: |
| payload = ( |
| f"{text}|{voice.mode}|{voice.preset}|{voice.ref_audio}|" |
| f"{voice.design_desc}|{voice.instruct}|{voice.language}|{voice.speed}" |
| ) |
| return hashlib.md5(payload.encode()).hexdigest() |
|
|
| def _cached_path(self, key: str) -> Path: |
| return self._cache_dir / f"{key}.wav" |
|
|
| def synthesize( |
| self, |
| text: str, |
| voice: VoiceConfig, |
| temperature: float = 0.7, |
| seed: int = 42, |
| ) -> Tuple[np.ndarray, int]: |
| cache_key = self._cache_key(text, voice) |
| cache_path = self._cached_path(cache_key) |
| if cache_path.exists(): |
| audio, sr = sf.read(str(cache_path)) |
| return audio, sr |
|
|
| if voice.mode == "preset": |
| model = self._load_custom_voice() |
| wavs, sr = model.generate_custom_voice( |
| text=text, |
| language=voice.language, |
| speaker=voice.preset or "Ryan", |
| instruct=voice.instruct or "Narrate clearly and expressively.", |
| temperature=temperature, |
| seed=seed, |
| ) |
| elif voice.mode == "clone": |
| model = self._load_base() |
| if not voice.ref_audio or not Path(voice.ref_audio).exists(): |
| raise ValueError("Clone mode requires ref_audio path.") |
| wavs, sr = model.generate_voice_clone( |
| text=text, |
| language=voice.language, |
| ref_audio=voice.ref_audio, |
| ref_text=voice.ref_text or text[:100], |
| temperature=temperature, |
| seed=seed, |
| ) |
| elif voice.mode == "design": |
| model = self._load_design() |
| desc = voice.design_desc or "A clear, expressive narrator voice." |
| wavs, sr = model.generate_voice_design( |
| text=text, |
| language=voice.language, |
| instruct=desc, |
| temperature=temperature, |
| seed=seed, |
| ) |
| else: |
| raise ValueError(f"Unknown voice mode: {voice.mode}") |
|
|
| if isinstance(wavs, list): |
| wavs = wavs[0] |
| if wavs.ndim > 1: |
| wavs = wavs.mean(axis=1) |
|
|
| |
| if voice.speed != 1.0 and voice.speed > 0.3: |
| wavs = self._adjust_speed(wavs, sr, voice.speed) |
|
|
| sf.write(str(cache_path), wavs, sr) |
| return wavs, sr |
|
|
| @staticmethod |
| def _adjust_speed(audio: np.ndarray, sr: int, speed: float) -> np.ndarray: |
| """Adjust audio speed using pydub.""" |
| if abs(speed - 1.0) < 0.05: |
| return audio |
| |
| audio = (audio * 32767).astype(np.int16) |
| seg = AudioSegment( |
| audio.tobytes(), |
| frame_rate=sr, |
| sample_width=2, |
| channels=1 if audio.ndim == 1 else audio.shape[1], |
| ) |
| if speed > 1.0: |
| seg = seg.speedup(playback_speed=speed) |
| else: |
| |
| seg = seg._spawn(seg.raw_data, overrides={ |
| "frame_rate": int(seg.frame_rate * speed) |
| }) |
| seg = seg.set_frame_rate(sr) |
| |
| samples = np.array(seg.get_array_of_samples()) |
| return samples.astype(np.float32) / 32767.0 |
|
|
| def status(self) -> Dict[str, Any]: |
| return { |
| "custom_loaded": self._custom_voice_model is not None, |
| "base_loaded": self._base_model is not None, |
| "design_loaded": self._design_model is not None, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def stitch_audio(paths: List[str], crossfade_ms: int = CROSSFADE_MS) -> AudioSegment: |
| if not paths: |
| return AudioSegment.silent(duration=0) |
| combined = AudioSegment.from_wav(paths[0]) |
| for p in paths[1:]: |
| next_seg = AudioSegment.from_wav(p) |
| if crossfade_ms > 0 and len(combined) > crossfade_ms and len(next_seg) > crossfade_ms: |
| combined = combined.append(next_seg, crossfade=crossfade_ms) |
| else: |
| combined += next_seg |
| return combined |
|
|
|
|
| def normalize_audio(audio: AudioSegment, target_dBFS: float = -1.5) -> AudioSegment: |
| change = target_dBFS - audio.max_dBFS |
| return audio.apply_gain(change) |
|
|
|
|
| def save_audiobook(segments_paths: List[str], output_path: str, title: str = "Audiobook", fmt: str = "mp3") -> str: |
| if not segments_paths: |
| return "" |
| combined = stitch_audio(segments_paths) |
| combined = normalize_audio(combined) |
| if fmt == "mp3": |
| combined.export(output_path, format="mp3", bitrate="192k", tags={"title": title, "artist": "AudioBook Forge"}) |
| elif fmt == "wav": |
| combined.export(output_path, format="wav", tags={"title": title, "artist": "AudioBook Forge"}) |
| elif fmt == "ogg": |
| combined.export(output_path, format="ogg", tags={"title": title, "artist": "AudioBook Forge"}) |
| return output_path |
|
|
|
|
| def save_segment_zip(segments_paths: List[str], output_path: str) -> str: |
| """Save individual segment WAVs as a ZIP.""" |
| with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zf: |
| for i, p in enumerate(segments_paths): |
| arcname = f"segment_{i:04d}.wav" |
| zf.write(p, arcname) |
| return output_path |
|
|
|
|
| def estimate_duration(word_count: int, wpm: int = WORDS_PER_MINUTE) -> str: |
| minutes = word_count / wpm |
| if minutes < 1: |
| return f"{int(minutes * 60)} seconds" |
| elif minutes < 60: |
| return f"{minutes:.1f} minutes" |
| else: |
| hours = int(minutes // 60) |
| mins = int(minutes % 60) |
| return f"{hours}h {mins}m" |
|
|
|
|
| |
| |
| |
|
|
| def save_project( |
| text: str, |
| narrator: VoiceConfig, |
| characters: Dict[str, VoiceConfig], |
| settings: dict, |
| ) -> str: |
| """Save project to JSON string.""" |
| data = { |
| "version": "1.1", |
| "text_sample": text[:2000] + ("..." if len(text) > 2000 else ""), |
| "text_hash": hashlib.md5(text.encode()).hexdigest()[:16], |
| "narrator": narrator.to_dict(), |
| "characters": {k: v.to_dict() for k, v in characters.items()}, |
| "settings": settings, |
| } |
| return json.dumps(data, indent=2) |
|
|
|
|
| def load_project(json_str: str) -> dict: |
| """Load project from JSON string.""" |
| data = json.loads(json_str) |
| if data.get("version", "1.0").startswith("1."): |
| data["narrator"] = VoiceConfig.from_dict(data["narrator"]) |
| data["characters"] = {k: VoiceConfig.from_dict(v) for k, v in data.get("characters", {}).items()} |
| return data |
|
|
|
|
| |
| |
| |
|
|
| def ai_extract_characters(text: str, api_token: Optional[str] = None) -> List[CharacterProfile]: |
| try: |
| from huggingface_hub import InferenceClient |
| client = InferenceClient(token=api_token or os.getenv("HF_TOKEN")) |
| sample = text[:4000] + ("\n...[truncated]" if len(text) > 4000 else "") |
| prompt = ( |
| "Extract all named characters from the following story excerpt. " |
| "For each character, provide: (1) their name, (2) a brief description of their personality/role, " |
| "and (3) a vivid voice description suitable for AI text-to-speech voice design " |
| "(e.g. 'A raspy old man with a warm chuckle and slow pacing'). " |
| "Return ONLY a JSON array like: [{\"name\":\"Alice\",\"description\":\"Curious young girl\",\"voice_description\":\"Bright, enthusiastic child voice\"},...]\n\n" |
| f"STORY:\n{sample}\n\nJSON:" |
| ) |
| response = client.text_generation( |
| model="Qwen/Qwen3-1.7B", |
| prompt=prompt, |
| max_new_tokens=512, |
| temperature=0.3, |
| return_full_text=False, |
| ) |
| json_match = re.search(r'\[.*?\]', response, re.DOTALL) |
| if json_match: |
| data = json.loads(json_match.group()) |
| profiles = [] |
| for item in data: |
| name = item.get("name", "") |
| desc = item.get("description", "") |
| if name: |
| profiles.append(CharacterProfile(name=name, description=desc)) |
| return profiles |
| except Exception as e: |
| print(f"[AI Extraction] Failed: {e}") |
| return [] |
|
|
|
|
| |
| |
| |
|
|
| class AudiobookPipeline: |
| def __init__(self): |
| self.tts = TTSEngine() |
| self.processor = TextProcessor() |
| self.temp_dir = Path(tempfile.gettempdir()) / "audiobook_segments" |
| self.temp_dir.mkdir(exist_ok=True) |
|
|
| def parse_upload(self, filepath: str) -> Tuple[str, str]: |
| return parse_file(filepath) |
|
|
| def detect_chapters(self, text: str) -> List[Dict]: |
| chapters = self.processor.detect_chapters(text) |
| return [ |
| {"idx": c.idx, "title": c.title, "word_count": c.word_count} |
| for c in chapters |
| ] |
|
|
| def get_chapter_text(self, text: str, chapter_idx: int) -> str: |
| chapters = self.processor.detect_chapters(text) |
| if 0 <= chapter_idx < len(chapters): |
| return chapters[chapter_idx].text |
| return text |
|
|
| def extract_characters(self, text: str) -> List[Dict]: |
| profiles = ai_extract_characters(text) |
| if not profiles: |
| profiles = self.processor.extract_characters(text) |
| return [ |
| { |
| "name": p.name, |
| "description": p.description, |
| "occurrences": p.occurrences, |
| "voice_mode": "design", |
| "voice_description": p.description or f"A distinct voice for {p.name}.", |
| "voice_preset": "Ryan", |
| "voice_instruct": "", |
| "speed": 1.0, |
| "language": "English", |
| } |
| for p in profiles |
| ] |
|
|
| def preview_segment( |
| self, |
| text: str, |
| voice: VoiceConfig, |
| temperature: float = 0.7, |
| seed: int = 42, |
| ) -> Tuple[np.ndarray, int]: |
| return self.tts.synthesize(text, voice, temperature=temperature, seed=seed) |
|
|
| def generate( |
| self, |
| text: str, |
| narrator_config: VoiceConfig, |
| character_configs: Dict[str, VoiceConfig], |
| progress_callback=None, |
| temperature: float = 0.7, |
| seed: int = 42, |
| ) -> Tuple[str, List[str], List[Dict]]: |
| """ |
| Generate audiobook. |
| Returns (final_path, segment_paths, segment_metadata). |
| """ |
| segments = self.processor.segment_text(text, list(character_configs.keys())) |
| segments = self.processor.chunk_segments(segments) |
|
|
| segment_paths = [] |
| segment_meta = [] |
| total = len(segments) |
|
|
| for i, seg in enumerate(segments): |
| if progress_callback: |
| progress_callback(i / total, f"Segment {i+1}/{total} ({seg.seg_type})...") |
|
|
| if seg.seg_type == "dialogue" and seg.speaker and seg.speaker in character_configs: |
| voice = character_configs[seg.speaker] |
| else: |
| voice = narrator_config |
|
|
| try: |
| wav, sr = self.tts.synthesize(seg.text, voice, temperature=temperature, seed=seed) |
| seg_path = self.temp_dir / f"seg_{i:04d}_{voice.name}.wav" |
| sf.write(str(seg_path), wav, sr) |
| segment_paths.append(str(seg_path)) |
| segment_meta.append({ |
| "idx": i, |
| "type": seg.seg_type, |
| "speaker": seg.speaker or voice.name, |
| "text": seg.text[:100] + ("..." if len(seg.text) > 100 else ""), |
| "path": str(seg_path), |
| }) |
| except Exception as e: |
| print(f"[Pipeline] Segment {i} failed: {e}") |
| silent = AudioSegment.silent(duration=500) |
| seg_path = self.temp_dir / f"seg_{i:04d}_silent.wav" |
| silent.export(str(seg_path), format="wav") |
| segment_paths.append(str(seg_path)) |
| segment_meta.append({ |
| "idx": i, |
| "type": seg.seg_type, |
| "speaker": voice.name, |
| "text": seg.text[:100] + "...", |
| "path": str(seg_path), |
| "error": str(e), |
| }) |
|
|
| if progress_callback: |
| progress_callback(1.0, "Finalizing audiobook...") |
|
|
| output_path = str(self.temp_dir / "audiobook_final.mp3") |
| save_audiobook(segment_paths, output_path, title="Generated Audiobook") |
| return output_path, segment_paths, segment_meta |
|
|
| def export_segments_zip(self, segment_paths: List[str]) -> str: |
| output_path = str(self.temp_dir / "audiobook_segments.zip") |
| return save_segment_zip(segment_paths, output_path) |
|
|
| def preview_voice( |
| self, |
| voice: VoiceConfig, |
| sample_text: str = "Hello, this is a preview of my voice. I hope you enjoy the story.", |
| ) -> Tuple[np.ndarray, int]: |
| return self.tts.synthesize(sample_text, voice, temperature=0.7, seed=42) |
|
|