File size: 5,314 Bytes
ef9a767 3797366 f7ed60e ef9a767 7c2d826 ef9a767 3797366 f7ed60e ef9a767 f7ed60e 5e4f596 f7ed60e 5e4f596 f7ed60e ef9a767 f7ed60e 3797366 ef9a767 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
from flask import Flask, request, jsonify, send_file
import os
import requests
from bs4 import BeautifulSoup
import ebooklib
from ebooklib import epub
from urllib.parse import urlparse
import io
from fish_audio import clone_voice_with_fish
import uuid
from dotenv import load_dotenv
import tempfile
import shutil
from pydub import AudioSegment
# Load environment variables from a .env file if it exists.
# This is particularly useful for local development.
load_dotenv()
app = Flask(__name__)
def get_text_from_url(url):
try:
response = requests.get(url)
response.raise_for_status() # Raise an exception for bad status codes
content_type = response.headers.get('content-type')
if 'epub' in content_type:
book = epub.read_epub(io.BytesIO(response.content))
text_content = ""
for item in book.get_items_of_type(ebooklib.ITEM_DOCUMENT):
soup = BeautifulSoup(item.get_body_content(), 'html.parser')
text_content += soup.get_text() + "\n"
return text_content
elif 'html' in content_type:
soup = BeautifulSoup(response.content, 'html.parser')
return soup.get_text()
elif 'text' in content_type:
return response.text
else:
# Fallback for other content types or if content-type is not specific
# You might want to add more sophisticated handling here
return response.text
except requests.exceptions.RequestException as e:
app.logger.error(f"Error fetching URL: {e}")
return None
def is_url(string):
try:
result = urlparse(string)
return all([result.scheme, result.netloc])
except ValueError:
return False
@app.route('/api/voice-transfer', methods=['POST'])
def voice_transfer():
temp_dir = tempfile.mkdtemp()
try:
file = None
voice_file_path = None
# # 1. Check for file upload
# if 'voice_file' in request.files:
# file = request.files['voice_file']
# if file.filename == '':
# return jsonify({"error": "No selected file"}), 400
# voice_file_path = os.path.join(temp_dir, file.filename)
# file.save(voice_file_path)
# 2. Check for file URL
if 'cid' in request.form:
cid = request.form.get('cid')
if not cid:
return jsonify({"error": "No cid (for voice file) provided"}), 400
# Construct the IPFS URL from the CID
file_url = f"http://ipfs.io/ipfs/{cid}"
response = requests.get(file_url)
if response.status_code != 200:
return jsonify({"error": "Failed to download voice file from IPFS"}), 400
# Save as webm first
webm_path = os.path.join(temp_dir, "input.webm")
with open(webm_path, "wb") as f:
f.write(response.content)
# Convert to mp3
mp3_path = os.path.join(temp_dir, "input.mp3")
audio = AudioSegment.from_file(webm_path, format="webm")
audio.export(mp3_path, format="mp3")
voice_file_path = mp3_path
else:
return jsonify({"error": "No voice file or voice_file_url provided"}), 400
text_input = request.form.get('text')
app.logger.info(f"Received text input: {text_input}")
if not text_input:
return jsonify({"error": "No text or text_url provided"}), 400
text_content = ""
if is_url(text_input):
text_content = get_text_from_url(text_input)
if text_content is None:
return jsonify({"error": "Failed to retrieve or parse content from URL"}), 400
else:
app.logger.info("Input is not a URL, using as raw text.")
text_content = text_input
# --- Perform Voice Cloning using Fish Audio ---
app.logger.info("Starting voice cloning process with Fish Audio...")
output_filename = f"output_cloned_{uuid.uuid4().hex}.mp3"
output_file_path = os.path.join(temp_dir, output_filename)
try:
clone_voice_with_fish(
text=text_content,
reference_audio_path=voice_file_path,
output_path=output_file_path,
reference_text="This is a reference audio for voice cloning."
)
except Exception as e:
app.logger.error(f"Error during voice cloning with Fish Audio: {e}")
return jsonify({"error": "Failed to generate voice file."}), 500
app.logger.info(f"Successfully generated audio file at: {output_file_path}")
with open(output_file_path, 'rb') as f:
audio_buffer = io.BytesIO(f.read())
return send_file(
audio_buffer,
as_attachment=True,
download_name=output_filename,
mimetype='audio/mpeg'
)
except Exception as e:
app.logger.error(f"An unexpected error occurred: {e}")
return jsonify({"error": "An internal server error occurred."}), 500
finally:
if temp_dir and os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
# if __name__ == '__main__':
# app.run(debug=True, port=5001)
|