|
|
from flask import Flask, request, jsonify, send_file |
|
|
import os |
|
|
import requests |
|
|
from bs4 import BeautifulSoup |
|
|
import ebooklib |
|
|
from ebooklib import epub |
|
|
from urllib.parse import urlparse |
|
|
import io |
|
|
from fish_audio import clone_voice_with_fish |
|
|
import uuid |
|
|
from dotenv import load_dotenv |
|
|
import tempfile |
|
|
import shutil |
|
|
from pydub import AudioSegment |
|
|
|
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
def get_text_from_url(url): |
|
|
try: |
|
|
response = requests.get(url) |
|
|
response.raise_for_status() |
|
|
|
|
|
content_type = response.headers.get('content-type') |
|
|
|
|
|
if 'epub' in content_type: |
|
|
book = epub.read_epub(io.BytesIO(response.content)) |
|
|
text_content = "" |
|
|
for item in book.get_items_of_type(ebooklib.ITEM_DOCUMENT): |
|
|
soup = BeautifulSoup(item.get_body_content(), 'html.parser') |
|
|
text_content += soup.get_text() + "\n" |
|
|
return text_content |
|
|
elif 'html' in content_type: |
|
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
return soup.get_text() |
|
|
elif 'text' in content_type: |
|
|
return response.text |
|
|
else: |
|
|
|
|
|
|
|
|
return response.text |
|
|
except requests.exceptions.RequestException as e: |
|
|
app.logger.error(f"Error fetching URL: {e}") |
|
|
return None |
|
|
|
|
|
def is_url(string): |
|
|
try: |
|
|
result = urlparse(string) |
|
|
return all([result.scheme, result.netloc]) |
|
|
except ValueError: |
|
|
return False |
|
|
|
|
|
@app.route('/api/voice-transfer', methods=['POST']) |
|
|
def voice_transfer(): |
|
|
temp_dir = tempfile.mkdtemp() |
|
|
try: |
|
|
file = None |
|
|
voice_file_path = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if 'cid' in request.form: |
|
|
cid = request.form.get('cid') |
|
|
if not cid: |
|
|
return jsonify({"error": "No cid (for voice file) provided"}), 400 |
|
|
|
|
|
file_url = f"http://ipfs.io/ipfs/{cid}" |
|
|
response = requests.get(file_url) |
|
|
if response.status_code != 200: |
|
|
return jsonify({"error": "Failed to download voice file from IPFS"}), 400 |
|
|
|
|
|
webm_path = os.path.join(temp_dir, "input.webm") |
|
|
with open(webm_path, "wb") as f: |
|
|
f.write(response.content) |
|
|
|
|
|
mp3_path = os.path.join(temp_dir, "input.mp3") |
|
|
audio = AudioSegment.from_file(webm_path, format="webm") |
|
|
audio.export(mp3_path, format="mp3") |
|
|
voice_file_path = mp3_path |
|
|
else: |
|
|
return jsonify({"error": "No voice file or voice_file_url provided"}), 400 |
|
|
|
|
|
text_input = request.form.get('text') |
|
|
app.logger.info(f"Received text input: {text_input}") |
|
|
if not text_input: |
|
|
return jsonify({"error": "No text or text_url provided"}), 400 |
|
|
|
|
|
text_content = "" |
|
|
if is_url(text_input): |
|
|
text_content = get_text_from_url(text_input) |
|
|
if text_content is None: |
|
|
return jsonify({"error": "Failed to retrieve or parse content from URL"}), 400 |
|
|
else: |
|
|
app.logger.info("Input is not a URL, using as raw text.") |
|
|
text_content = text_input |
|
|
|
|
|
|
|
|
app.logger.info("Starting voice cloning process with Fish Audio...") |
|
|
|
|
|
output_filename = f"output_cloned_{uuid.uuid4().hex}.mp3" |
|
|
output_file_path = os.path.join(temp_dir, output_filename) |
|
|
|
|
|
try: |
|
|
clone_voice_with_fish( |
|
|
text=text_content, |
|
|
reference_audio_path=voice_file_path, |
|
|
output_path=output_file_path, |
|
|
reference_text="This is a reference audio for voice cloning." |
|
|
) |
|
|
except Exception as e: |
|
|
app.logger.error(f"Error during voice cloning with Fish Audio: {e}") |
|
|
return jsonify({"error": "Failed to generate voice file."}), 500 |
|
|
|
|
|
app.logger.info(f"Successfully generated audio file at: {output_file_path}") |
|
|
|
|
|
with open(output_file_path, 'rb') as f: |
|
|
audio_buffer = io.BytesIO(f.read()) |
|
|
|
|
|
return send_file( |
|
|
audio_buffer, |
|
|
as_attachment=True, |
|
|
download_name=output_filename, |
|
|
mimetype='audio/mpeg' |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
app.logger.error(f"An unexpected error occurred: {e}") |
|
|
return jsonify({"error": "An internal server error occurred."}), 500 |
|
|
finally: |
|
|
if temp_dir and os.path.exists(temp_dir): |
|
|
shutil.rmtree(temp_dir) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|