Spaces:
Running
Running
File size: 12,968 Bytes
f18a082 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 | import re
path = r"c:\Users\ganxa\Downloads\My Project\Eduai\backend\server.py"
with open(path, "r", encoding="utf-8") as f:
content = f.read()
target = """ # Always attempt Chatterbox AI voice cloning with the premium voice
if True:
async def _try_chatterbox():
premium_voice_url = "https://eduai-deploy.vercel.app/suara/voice_18-05-2026_14-06-54.ogg"
try:
logger.info(f"Calling Chatterbox Gradio 6 submission endpoint")
async with httpx.AsyncClient(timeout=180.0) as client:
submit_resp = await client.post(
"https://alstears-chatterbox-id-clone-api.hf.space/gradio_api/call/clone_voice",
json={"data": [cleaned, None, premium_voice_url]}
)
submit_resp.raise_for_status()
submit_data = submit_resp.json()
event_id = submit_data.get("event_id")
if not event_id:
logger.error(f"Gradio 6 submission failed to return event_id: {submit_data}")
return None
logger.info(f"Chatterbox job submitted successfully. Event ID: {event_id}. Fetching stream...")
stream_resp = await client.get(
f"https://alstears-chatterbox-id-clone-api.hf.space/gradio_api/call/clone_voice/{event_id}"
)
stream_resp.raise_for_status()
stream_text = stream_resp.text
import re
matches = re.findall(r"data:\s*([^\r\n]+)", stream_text)
if not matches:
logger.error(f"No data events received in Gradio 6 stream: {stream_text}")
return None
for match in reversed(matches):
try:
import json
parsed = json.loads(match.strip())
if isinstance(parsed, list) and len(parsed) > 0:
audio_info = parsed[0]
audio_url = None
if isinstance(audio_info, dict):
if "url" in audio_info:
audio_url = audio_info["url"]
elif "path" in audio_info:
audio_url = f"https://alstears-chatterbox-id-clone-api.hf.space/gradio_api/file={audio_info['path']}"
elif isinstance(audio_info, str):
audio_url = audio_info
if audio_url:
if audio_url.startswith("/"):
audio_url = f"https://alstears-chatterbox-id-clone-api.hf.space{audio_url}"
elif not audio_url.startswith("http"):
audio_url = f"https://alstears-chatterbox-id-clone-api.hf.space/gradio_api/file={audio_url}"
logger.info(f"Downloading premium audio from: {audio_url}")
audio_resp = await client.get(audio_url)
audio_resp.raise_for_status()
return audio_resp.content
except Exception as parse_err:
logger.debug(f"Skipping unparseable stream data block: {parse_err}")
except Exception as e:
logger.exception(f"Chatterbox Gradio 6 processing failed: {e}")
return None
audio_content = await _try_chatterbox()
if audio_content:
with open(output_path, "wb") as f:
f.write(audio_content)
return
else:
logger.error("Chatterbox Gradio 6 failed to generate audio. Falling back to default TTS.")"""
replacement = """ # Always attempt Chatterbox AI voice cloning with the premium voice
if True:
def _chunk_text(text_to_chunk, max_sentences=5, max_chars=200):
import re
sentences = re.findall(r'[^.!?\\n]+[.!?\\n]*|\\n+', text_to_chunk)
if not sentences:
return [text_to_chunk]
chunks_list = []
current_chunk = ""
current_sentence_count = 0
for sentence in sentences:
trimmed = sentence.strip()
if not trimmed:
continue
if (current_sentence_count >= max_sentences or
(len(current_chunk) + len(trimmed) > max_chars and len(current_chunk) > 0)):
chunks_list.append(current_chunk.strip())
current_chunk = ""
current_sentence_count = 0
current_chunk += (" " if current_chunk else "") + trimmed
current_sentence_count += 1
if current_chunk.strip():
chunks_list.append(current_chunk.strip())
return chunks_list
def _concatenate_wavs(wav_contents):
if not wav_contents:
return b""
if len(wav_contents) == 1:
return wav_contents[0]
first_wav = wav_contents[0]
pcm_datas = [wav[44:] for wav in wav_contents]
total_pcm_length = sum(len(pcm) for pcm in pcm_datas)
import struct
header = bytearray(first_wav[:44])
header[4:8] = struct.pack("<I", total_pcm_length + 36)
header[40:44] = struct.pack("<I", total_pcm_length)
return bytes(header) + b"".join(pcm_datas)
async def _try_chatterbox():
premium_voice_url = "https://eduai-deploy.vercel.app/suara/voice_18-05-2026_14-06-54.ogg"
chunks = _chunk_text(cleaned, 5, 200)
logger.info(f"Split text into {len(chunks)} chunks for premium voice generation.")
wav_results = []
async with httpx.AsyncClient(timeout=180.0) as client:
for idx, chunk in enumerate(chunks):
try:
logger.info(f"Generating chunk {idx + 1}/{len(chunks)}: '{chunk}'")
submit_resp = await client.post(
"https://alstears-chatterbox-id-clone-api.hf.space/gradio_api/call/clone_voice",
json={"data": [chunk, None, premium_voice_url]}
)
submit_resp.raise_for_status()
submit_data = submit_resp.json()
event_id = submit_data.get("event_id")
if not event_id:
logger.error(f"Gradio 6 submission failed to return event_id: {submit_data}")
return None
logger.info(f"Job submitted. Event ID: {event_id}. Fetching stream...")
stream_resp = await client.get(
f"https://alstears-chatterbox-id-clone-api.hf.space/gradio_api/call/clone_voice/{event_id}"
)
stream_resp.raise_for_status()
stream_text = stream_resp.text
# Check for Gradio stream errors
error_match = re.search(r"event:\s*error\s*\r?\ndata:\s*([^\r\n]+)", stream_text)
if error_match:
try:
import json
err_obj = json.loads(error_match.group(1))
if err_obj and "error" in err_obj:
logger.error(f"Gradio returned error: {err_obj['error']}")
except Exception:
pass
matches = re.findall(r"data:\s*([^\r\n]+)", stream_text)
if not matches:
logger.error(f"No data events received in Gradio 6 stream: {stream_text}")
return None
chunk_wav = None
for match in reversed(matches):
try:
import json
parsed = json.loads(match.strip())
if isinstance(parsed, list) and len(parsed) > 0:
audio_info = parsed[0]
audio_url = None
if isinstance(audio_info, dict):
if "url" in audio_info:
audio_url = audio_info["url"]
elif "path" in audio_info:
audio_url = f"https://alstears-chatterbox-id-clone-api.hf.space/gradio_api/file={audio_info['path']}"
elif isinstance(audio_info, str):
audio_url = audio_info
if audio_url:
if audio_url.startswith("/"):
audio_url = f"https://alstears-chatterbox-id-clone-api.hf.space{audio_url}"
elif not audio_url.startswith("http"):
audio_url = f"https://alstears-chatterbox-id-clone-api.hf.space/gradio_api/file={audio_url}"
logger.info(f"Downloading audio from: {audio_url}")
audio_resp = await client.get(audio_url)
audio_resp.raise_for_status()
chunk_wav = audio_resp.content
break
except Exception as parse_err:
logger.debug(f"Skipping unparseable stream data block: {parse_err}")
if chunk_wav:
wav_results.append(chunk_wav)
else:
logger.error(f"Failed to generate chunk {idx + 1}")
return None
except Exception as e:
logger.exception(f"Failed to generate chunk {idx + 1}: {e}")
return None
if len(wav_results) == len(chunks):
logger.info(f"Successfully generated all {len(chunks)} chunks. Combining WAVs...")
return _concatenate_wavs(wav_results)
return None
audio_content = await _try_chatterbox()
if audio_content:
with open(output_path, "wb") as f:
f.write(audio_content)
return
else:
logger.error("Chatterbox Gradio 6 failed to generate audio. Falling back to default TTS.")"""
# Try line by line matching which is 100% immune to spacing differences and line endings!
target_lines = [line.strip() for line in target.splitlines() if line.strip()]
content_lines = content.splitlines()
found_idx = -1
for i in range(len(content_lines) - len(target_lines) + 1):
match = True
target_ptr = 0
content_ptr = i
while target_ptr < len(target_lines) and content_ptr < len(content_lines):
if not content_lines[content_ptr].strip():
content_ptr += 1
continue
if content_lines[content_ptr].strip() != target_lines[target_ptr]:
match = False
break
target_ptr += 1
content_ptr += 1
if match and target_ptr == len(target_lines):
found_idx = i
found_end_idx = content_ptr
break
if found_idx != -1:
content_lines[found_idx:found_end_idx] = replacement.splitlines()
with open(path, "w", encoding="utf-8") as f:
f.write("\r\n".join(content_lines) if "\r\n" in content else "\n".join(content_lines))
print("Successfully patched via advanced fuzzy line matching!")
else:
# Try exact match as fallback
if target in content:
content = content.replace(target, replacement)
with open(path, "w", encoding="utf-8") as f:
f.write(content)
print("Successfully patched via exact match!")
else:
print("Target not found in server.py!")
|