import os import joblib import numpy as np import torch import torchaudio import yt_dlp from fastapi import FastAPI, HTTPException, UploadFile, File, Form from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, HttpUrl from typing import Optional, Union import logging import uvicorn # --- 0. CONFIGURAÇÃO DE LOGGING --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- 1. SETUP DO FASTAPI --- app = FastAPI( title="Audio Classifier API", description="Uma API para classificar áudios como 'REAL' ou 'IA', aceitando uploads de arquivo ou URLs." ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Monta o diretório estático para servir o index.html e outros arquivos. app.mount("/static", StaticFiles(directory="static"), name="static") # --- 2. CARREGAMENTO DO MODELO --- try: modelo_path = 'modelo_random_forest.joblib' scaler_path = 'scaler.joblib' if not os.path.exists(modelo_path) or not os.path.exists(scaler_path): raise FileNotFoundError("Arquivos do modelo ou scaler não encontrados.") model = joblib.load(modelo_path) scaler = joblib.load(scaler_path) logger.info("Modelo e scaler carregados com sucesso.") except Exception as e: logger.error(f"Erro ao carregar o modelo ou scaler: {e}") # Encerra o aplicativo se os modelos não puderem ser carregados. raise RuntimeError(f"Não foi possível carregar os artefatos do modelo: {e}") from e # --- 3. FUNÇÕES DE PROCESSAMENTO DE ÁUDIO --- def extract_features(waveform, sample_rate, n_mfcc=12): """Extrai MFCCs de um waveform.""" mfcc_transform = torchaudio.transforms.MFCC( sample_rate=sample_rate, n_mfcc=n_mfcc, melkwargs={'n_fft': 400, 'hop_length': 160, 'n_mels': 23, 'center': False} ) mfcc = mfcc_transform(waveform) return np.mean(mfcc.squeeze(0).numpy(), axis=1) def process_audio_file(file_path: str): """ Carrega um arquivo de áudio, extrai features e as escala. Retorna None se o áudio for muito curto ou inválido. """ try: waveform, sample_rate = torchaudio.load(file_path) # Garante que o áudio tenha pelo menos uma duração mínima min_duration_samples = sample_rate * 1 # 1 segundo if waveform.shape[1] < min_duration_samples: logger.warning(f"Áudio {file_path} é muito curto para análise.") return None # Garante a monocanalidade somando os canais, se houver mais de um. if waveform.shape[0] > 1: waveform = torch.mean(waveform, dim=0, keepdim=True) features = extract_features(waveform, sample_rate) scaled_features = scaler.transform([features]) return scaled_features except Exception as e: logger.error(f"Erro ao processar o arquivo de áudio {file_path}: {e}") raise ValueError(f"Não foi possível processar o arquivo de áudio: {e}") def download_audio_from_url(url: str, output_path: str = "temp_audio"): """Baixa áudio com retry, proxy e DNS fallback.""" if not os.path.exists(output_path): os.makedirs(output_path) if url: if "youtube.com" in url or "youtu.be" in url: url = url.replace('youtube.com', 'piped.video') url = url.replace('youtu.be', 'piped.video') # Configurações com fallback DNS/proxy proxies = { 'http': 'socks5://8.8.8.8:53', # SOCKS para DNS bypass 'https': 'socks5://8.8.8.8:53' } ydl_opts_base = { 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav', 'preferredquality': '192', }], 'outtmpl': os.path.join(output_path, '%(id)s.%(ext)s'), 'quiet': True, 'socket_timeout': 30, 'retries': 5, 'fragment_retries': 5, 'extractor_retries': 5, 'retry_sleep': 3, } strategies = [ # 1. Direto {'proxy': None}, # 2. Google DNS proxy {'proxy': 'socks5://dns.google:53'}, # 3. Cloudflare WARP {'proxy': 'socks5://1.1.1.1:53'}, # 4. IPv4 force {'source_address': '0.0.0.0'}, ] for i, strategy in enumerate(strategies): try: opts = ydl_opts_base.copy() opts.update(strategy) logger.info(f"Tentativa {i+1}: {strategy}") with yt_dlp.YoutubeDL(opts) as ydl: info = ydl.extract_info(url, download=True) filename = ydl.prepare_filename(info).replace(info['ext'], 'wav') if os.path.exists(filename): return filename base_filename = os.path.join(output_path, f"{info['id']}.wav") if os.path.exists(base_filename): return base_filename except Exception as e: logger.warning(f"Estratégia {i+1} falhou: {e}") continue raise ConnectionError("Todas as estratégias de download falharam. Verifique conectividade de rede.") # --- 4. ROTA DE CLASSIFICAÇÃO --- @app.post("/api/classify") async def classify_audio(url: Optional[str] = Form(None), file: Optional[UploadFile] = File(None)): temp_file_path = None try: if file: # Lógica para upload de arquivo temp_dir = "temp_uploads" if not os.path.exists(temp_dir): os.makedirs(temp_dir) temp_file_path = os.path.join(temp_dir, file.filename) with open(temp_file_path, "wb") as buffer: buffer.write(await file.read()) logger.info(f"Arquivo '{file.filename}' recebido.") audio_path = temp_file_path elif url: # Lógica para download de URL logger.info(f"Recebida URL para classificação: {url}") audio_path = download_audio_from_url(url) temp_file_path = audio_path # Marcar para exclusão posterior else: raise HTTPException(status_code=400, detail="Nenhum arquivo ou URL fornecido.") # Processamento e classificação do áudio scaled_features = process_audio_file(audio_path) if scaled_features is None: raise HTTPException(status_code=400, detail="O áudio é muito curto ou não pôde ser processado.") prediction = model.predict(scaled_features) probability = model.predict_proba(scaled_features) # Converter numpy types para Python nativos label_idx = int(prediction[0]) # Converte numpy.int32 para int Python label_str = 'IA' if label_idx == 1 else 'REAL' prob_value = float(probability[0][label_idx]) # Converte numpy.float64 para float Python return {"label": label_str, "probability": prob_value} except (ValueError, ConnectionError, FileNotFoundError) as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Erro inesperado durante a classificação: {e}") raise HTTPException(status_code=500, detail=f"Erro interno do servidor: {e}") finally: # Limpeza do arquivo temporário if temp_file_path and os.path.exists(temp_file_path): os.remove(temp_file_path) logger.info(f"Arquivo temporário '{temp_file_path}' removido.") # --- 5. ROTA DE SAÚDE --- @app.get("/", response_class=FileResponse) def home(): """Serve a página inicial da aplicação.""" return "./static/index.html" if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)