fredcaixeta
url
a492299
import os
import joblib
import numpy as np
import torch
import torchaudio
import yt_dlp
from fastapi import FastAPI, HTTPException, UploadFile, File, Form
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, HttpUrl
from typing import Optional, Union
import logging
import uvicorn
# --- 0. CONFIGURAÇÃO DE LOGGING ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- 1. SETUP DO FASTAPI ---
app = FastAPI(
title="Audio Classifier API",
description="Uma API para classificar áudios como 'REAL' ou 'IA', aceitando uploads de arquivo ou URLs."
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Monta o diretório estático para servir o index.html e outros arquivos.
app.mount("/static", StaticFiles(directory="static"), name="static")
# --- 2. CARREGAMENTO DO MODELO ---
try:
modelo_path = 'modelo_random_forest.joblib'
scaler_path = 'scaler.joblib'
if not os.path.exists(modelo_path) or not os.path.exists(scaler_path):
raise FileNotFoundError("Arquivos do modelo ou scaler não encontrados.")
model = joblib.load(modelo_path)
scaler = joblib.load(scaler_path)
logger.info("Modelo e scaler carregados com sucesso.")
except Exception as e:
logger.error(f"Erro ao carregar o modelo ou scaler: {e}")
# Encerra o aplicativo se os modelos não puderem ser carregados.
raise RuntimeError(f"Não foi possível carregar os artefatos do modelo: {e}") from e
# --- 3. FUNÇÕES DE PROCESSAMENTO DE ÁUDIO ---
def extract_features(waveform, sample_rate, n_mfcc=12):
"""Extrai MFCCs de um waveform."""
mfcc_transform = torchaudio.transforms.MFCC(
sample_rate=sample_rate,
n_mfcc=n_mfcc,
melkwargs={'n_fft': 400, 'hop_length': 160, 'n_mels': 23, 'center': False}
)
mfcc = mfcc_transform(waveform)
return np.mean(mfcc.squeeze(0).numpy(), axis=1)
def process_audio_file(file_path: str):
"""
Carrega um arquivo de áudio, extrai features e as escala.
Retorna None se o áudio for muito curto ou inválido.
"""
try:
waveform, sample_rate = torchaudio.load(file_path)
# Garante que o áudio tenha pelo menos uma duração mínima
min_duration_samples = sample_rate * 1 # 1 segundo
if waveform.shape[1] < min_duration_samples:
logger.warning(f"Áudio {file_path} é muito curto para análise.")
return None
# Garante a monocanalidade somando os canais, se houver mais de um.
if waveform.shape[0] > 1:
waveform = torch.mean(waveform, dim=0, keepdim=True)
features = extract_features(waveform, sample_rate)
scaled_features = scaler.transform([features])
return scaled_features
except Exception as e:
logger.error(f"Erro ao processar o arquivo de áudio {file_path}: {e}")
raise ValueError(f"Não foi possível processar o arquivo de áudio: {e}")
def download_audio_from_url(url: str, output_path: str = "temp_audio"):
"""Baixa áudio com retry, proxy e DNS fallback."""
if not os.path.exists(output_path):
os.makedirs(output_path)
if url:
if "youtube.com" in url or "youtu.be" in url:
url = url.replace('youtube.com', 'piped.video')
url = url.replace('youtu.be', 'piped.video')
# Configurações com fallback DNS/proxy
proxies = {
'http': 'socks5://8.8.8.8:53', # SOCKS para DNS bypass
'https': 'socks5://8.8.8.8:53'
}
ydl_opts_base = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
'preferredquality': '192',
}],
'outtmpl': os.path.join(output_path, '%(id)s.%(ext)s'),
'quiet': True,
'socket_timeout': 30,
'retries': 5,
'fragment_retries': 5,
'extractor_retries': 5,
'retry_sleep': 3,
}
strategies = [
# 1. Direto
{'proxy': None},
# 2. Google DNS proxy
{'proxy': 'socks5://dns.google:53'},
# 3. Cloudflare WARP
{'proxy': 'socks5://1.1.1.1:53'},
# 4. IPv4 force
{'source_address': '0.0.0.0'},
]
for i, strategy in enumerate(strategies):
try:
opts = ydl_opts_base.copy()
opts.update(strategy)
logger.info(f"Tentativa {i+1}: {strategy}")
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(url, download=True)
filename = ydl.prepare_filename(info).replace(info['ext'], 'wav')
if os.path.exists(filename):
return filename
base_filename = os.path.join(output_path, f"{info['id']}.wav")
if os.path.exists(base_filename):
return base_filename
except Exception as e:
logger.warning(f"Estratégia {i+1} falhou: {e}")
continue
raise ConnectionError("Todas as estratégias de download falharam. Verifique conectividade de rede.")
# --- 4. ROTA DE CLASSIFICAÇÃO ---
@app.post("/api/classify")
async def classify_audio(url: Optional[str] = Form(None), file: Optional[UploadFile] = File(None)):
temp_file_path = None
try:
if file:
# Lógica para upload de arquivo
temp_dir = "temp_uploads"
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
temp_file_path = os.path.join(temp_dir, file.filename)
with open(temp_file_path, "wb") as buffer:
buffer.write(await file.read())
logger.info(f"Arquivo '{file.filename}' recebido.")
audio_path = temp_file_path
elif url:
# Lógica para download de URL
logger.info(f"Recebida URL para classificação: {url}")
audio_path = download_audio_from_url(url)
temp_file_path = audio_path # Marcar para exclusão posterior
else:
raise HTTPException(status_code=400, detail="Nenhum arquivo ou URL fornecido.")
# Processamento e classificação do áudio
scaled_features = process_audio_file(audio_path)
if scaled_features is None:
raise HTTPException(status_code=400, detail="O áudio é muito curto ou não pôde ser processado.")
prediction = model.predict(scaled_features)
probability = model.predict_proba(scaled_features)
# Converter numpy types para Python nativos
label_idx = int(prediction[0]) # Converte numpy.int32 para int Python
label_str = 'IA' if label_idx == 1 else 'REAL'
prob_value = float(probability[0][label_idx]) # Converte numpy.float64 para float Python
return {"label": label_str, "probability": prob_value}
except (ValueError, ConnectionError, FileNotFoundError) as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Erro inesperado durante a classificação: {e}")
raise HTTPException(status_code=500, detail=f"Erro interno do servidor: {e}")
finally:
# Limpeza do arquivo temporário
if temp_file_path and os.path.exists(temp_file_path):
os.remove(temp_file_path)
logger.info(f"Arquivo temporário '{temp_file_path}' removido.")
# --- 5. ROTA DE SAÚDE ---
@app.get("/", response_class=FileResponse)
def home():
"""Serve a página inicial da aplicação."""
return "./static/index.html"
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)