Spaces:
Sleeping
Sleeping
| import os | |
| import joblib | |
| import numpy as np | |
| import torch | |
| import torchaudio | |
| import yt_dlp | |
| from fastapi import FastAPI, HTTPException, UploadFile, File, Form | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel, HttpUrl | |
| from typing import Optional, Union | |
| import logging | |
| import uvicorn | |
| # --- 0. CONFIGURAÇÃO DE LOGGING --- | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # --- 1. SETUP DO FASTAPI --- | |
| app = FastAPI( | |
| title="Audio Classifier API", | |
| description="Uma API para classificar áudios como 'REAL' ou 'IA', aceitando uploads de arquivo ou URLs." | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Monta o diretório estático para servir o index.html e outros arquivos. | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| # --- 2. CARREGAMENTO DO MODELO --- | |
| try: | |
| modelo_path = 'modelo_random_forest.joblib' | |
| scaler_path = 'scaler.joblib' | |
| if not os.path.exists(modelo_path) or not os.path.exists(scaler_path): | |
| raise FileNotFoundError("Arquivos do modelo ou scaler não encontrados.") | |
| model = joblib.load(modelo_path) | |
| scaler = joblib.load(scaler_path) | |
| logger.info("Modelo e scaler carregados com sucesso.") | |
| except Exception as e: | |
| logger.error(f"Erro ao carregar o modelo ou scaler: {e}") | |
| # Encerra o aplicativo se os modelos não puderem ser carregados. | |
| raise RuntimeError(f"Não foi possível carregar os artefatos do modelo: {e}") from e | |
| # --- 3. FUNÇÕES DE PROCESSAMENTO DE ÁUDIO --- | |
| def extract_features(waveform, sample_rate, n_mfcc=12): | |
| """Extrai MFCCs de um waveform.""" | |
| mfcc_transform = torchaudio.transforms.MFCC( | |
| sample_rate=sample_rate, | |
| n_mfcc=n_mfcc, | |
| melkwargs={'n_fft': 400, 'hop_length': 160, 'n_mels': 23, 'center': False} | |
| ) | |
| mfcc = mfcc_transform(waveform) | |
| return np.mean(mfcc.squeeze(0).numpy(), axis=1) | |
| def process_audio_file(file_path: str): | |
| """ | |
| Carrega um arquivo de áudio, extrai features e as escala. | |
| Retorna None se o áudio for muito curto ou inválido. | |
| """ | |
| try: | |
| waveform, sample_rate = torchaudio.load(file_path) | |
| # Garante que o áudio tenha pelo menos uma duração mínima | |
| min_duration_samples = sample_rate * 1 # 1 segundo | |
| if waveform.shape[1] < min_duration_samples: | |
| logger.warning(f"Áudio {file_path} é muito curto para análise.") | |
| return None | |
| # Garante a monocanalidade somando os canais, se houver mais de um. | |
| if waveform.shape[0] > 1: | |
| waveform = torch.mean(waveform, dim=0, keepdim=True) | |
| features = extract_features(waveform, sample_rate) | |
| scaled_features = scaler.transform([features]) | |
| return scaled_features | |
| except Exception as e: | |
| logger.error(f"Erro ao processar o arquivo de áudio {file_path}: {e}") | |
| raise ValueError(f"Não foi possível processar o arquivo de áudio: {e}") | |
| def download_audio_from_url(url: str, output_path: str = "temp_audio"): | |
| """Baixa áudio com retry, proxy e DNS fallback.""" | |
| if not os.path.exists(output_path): | |
| os.makedirs(output_path) | |
| if url: | |
| if "youtube.com" in url or "youtu.be" in url: | |
| url = url.replace('youtube.com', 'piped.video') | |
| url = url.replace('youtu.be', 'piped.video') | |
| # Configurações com fallback DNS/proxy | |
| proxies = { | |
| 'http': 'socks5://8.8.8.8:53', # SOCKS para DNS bypass | |
| 'https': 'socks5://8.8.8.8:53' | |
| } | |
| ydl_opts_base = { | |
| 'format': 'bestaudio/best', | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'wav', | |
| 'preferredquality': '192', | |
| }], | |
| 'outtmpl': os.path.join(output_path, '%(id)s.%(ext)s'), | |
| 'quiet': True, | |
| 'socket_timeout': 30, | |
| 'retries': 5, | |
| 'fragment_retries': 5, | |
| 'extractor_retries': 5, | |
| 'retry_sleep': 3, | |
| } | |
| strategies = [ | |
| # 1. Direto | |
| {'proxy': None}, | |
| # 2. Google DNS proxy | |
| {'proxy': 'socks5://dns.google:53'}, | |
| # 3. Cloudflare WARP | |
| {'proxy': 'socks5://1.1.1.1:53'}, | |
| # 4. IPv4 force | |
| {'source_address': '0.0.0.0'}, | |
| ] | |
| for i, strategy in enumerate(strategies): | |
| try: | |
| opts = ydl_opts_base.copy() | |
| opts.update(strategy) | |
| logger.info(f"Tentativa {i+1}: {strategy}") | |
| with yt_dlp.YoutubeDL(opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| filename = ydl.prepare_filename(info).replace(info['ext'], 'wav') | |
| if os.path.exists(filename): | |
| return filename | |
| base_filename = os.path.join(output_path, f"{info['id']}.wav") | |
| if os.path.exists(base_filename): | |
| return base_filename | |
| except Exception as e: | |
| logger.warning(f"Estratégia {i+1} falhou: {e}") | |
| continue | |
| raise ConnectionError("Todas as estratégias de download falharam. Verifique conectividade de rede.") | |
| # --- 4. ROTA DE CLASSIFICAÇÃO --- | |
| async def classify_audio(url: Optional[str] = Form(None), file: Optional[UploadFile] = File(None)): | |
| temp_file_path = None | |
| try: | |
| if file: | |
| # Lógica para upload de arquivo | |
| temp_dir = "temp_uploads" | |
| if not os.path.exists(temp_dir): | |
| os.makedirs(temp_dir) | |
| temp_file_path = os.path.join(temp_dir, file.filename) | |
| with open(temp_file_path, "wb") as buffer: | |
| buffer.write(await file.read()) | |
| logger.info(f"Arquivo '{file.filename}' recebido.") | |
| audio_path = temp_file_path | |
| elif url: | |
| # Lógica para download de URL | |
| logger.info(f"Recebida URL para classificação: {url}") | |
| audio_path = download_audio_from_url(url) | |
| temp_file_path = audio_path # Marcar para exclusão posterior | |
| else: | |
| raise HTTPException(status_code=400, detail="Nenhum arquivo ou URL fornecido.") | |
| # Processamento e classificação do áudio | |
| scaled_features = process_audio_file(audio_path) | |
| if scaled_features is None: | |
| raise HTTPException(status_code=400, detail="O áudio é muito curto ou não pôde ser processado.") | |
| prediction = model.predict(scaled_features) | |
| probability = model.predict_proba(scaled_features) | |
| # Converter numpy types para Python nativos | |
| label_idx = int(prediction[0]) # Converte numpy.int32 para int Python | |
| label_str = 'IA' if label_idx == 1 else 'REAL' | |
| prob_value = float(probability[0][label_idx]) # Converte numpy.float64 para float Python | |
| return {"label": label_str, "probability": prob_value} | |
| except (ValueError, ConnectionError, FileNotFoundError) as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| except Exception as e: | |
| logger.error(f"Erro inesperado durante a classificação: {e}") | |
| raise HTTPException(status_code=500, detail=f"Erro interno do servidor: {e}") | |
| finally: | |
| # Limpeza do arquivo temporário | |
| if temp_file_path and os.path.exists(temp_file_path): | |
| os.remove(temp_file_path) | |
| logger.info(f"Arquivo temporário '{temp_file_path}' removido.") | |
| # --- 5. ROTA DE SAÚDE --- | |
| def home(): | |
| """Serve a página inicial da aplicação.""" | |
| return "./static/index.html" | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |