""" ManusProviders - Integração com a API do Manus AI (v2 oficial) ============================================================== Documentação oficial: https://api.manus.im CORREÇÕES APLICADAS: 1. Base URL corrigida: api.manus.im (não .ai) 2. Endpoint de criação: /api/task.create (com prefixo /api/) 3. Payload simplificado: {"prompt": "..."} (sem agent_profile nem message.content) 4. Polling via /api/task.listMessages com params correctos 5. Parsing correcto: data.data.messages (não data.data.list) 6. Verificação de data.ok para erros explícitos da API """ import time import requests from typing import Dict, Any, Optional from modules.config import MANUS_API_KEY, MANUS_BASE_URL, logger class ManusProviders: """ Provedor para a API do Manus AI v2. Suporta criação de tarefas de pesquisa e polling de resultados. """ @staticmethod def create_research_task(prompt: str) -> Dict[str, Any]: """ Cria uma tarefa de pesquisa no Manus AI v2. Endpoint: POST /api/task.create Payload: {"prompt": ""} Resposta: {"ok": true, "data": {"task_id": "..."}} """ if not MANUS_API_KEY: return {"sucesso": False, "erro": "MANUS_API_KEY não configurada"} base = MANUS_BASE_URL.rstrip('/') if base.endswith('/v2'): url = f"{base}/task.create" else: url = f"{base}/v2/task.create" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {MANUS_API_KEY}", "x-manus-api-key": MANUS_API_KEY } # Payload conforme API v2 payload = { "title": prompt[:50], "message": { "content": prompt } } try: logger.info(f"🧠 [MANUS] Criando tarefa: {prompt[:80]}...") response = requests.post(url, headers=headers, json=payload, timeout=20) # Erro de autenticação if response.status_code == 401: logger.error("🔴 [MANUS] Erro 401 — Verifique a MANUS_API_KEY no .env") return {"sucesso": False, "erro": "MANUS_API_KEY inválida ou expirada (401)"} # Erro de payload (400 ou 422) if response.status_code in [400, 422]: logger.error(f"🔴 [MANUS] Erro {response.status_code} — Payload rejeitado: {response.text[:500]}") return {"sucesso": False, "erro": f"Payload inválido ({response.status_code}): {response.text[:500]}"} response.raise_for_status() data = response.json() # Verifica campo "ok" da API v2 if not data.get("ok", True): # se "ok" existir e for False err = data.get("error", {}) return {"sucesso": False, "erro": f"Manus API error: {err.get('message', 'desconhecido')} [{err.get('code', '')}]"} # Extrai task_id da resposta v2: {"ok": true, "data": {"task_id": "..."}} task_id = ( data.get("data", {}).get("task_id") or data.get("task_id") or data.get("data", {}).get("id") or data.get("id") ) if task_id: logger.info(f"✅ [MANUS] Tarefa criada: {task_id}") return {"sucesso": True, "task_id": task_id, "status": "running"} logger.error(f"🔴 [MANUS] task_id ausente na resposta: {data}") return {"sucesso": False, "erro": f"task_id não retornado: {data}"} except requests.exceptions.ConnectionError: logger.error("🔴 [MANUS] Erro de conexão — verifique MANUS_BASE_URL e conectividade") return {"sucesso": False, "erro": "Erro de conexão com api.manus.im"} except Exception as e: logger.error(f"🔴 [MANUS] Erro ao criar tarefa: {e}") return {"sucesso": False, "erro": str(e)} @staticmethod def get_task_status(task_id: str) -> Dict[str, Any]: """ Obtém o status actual de uma tarefa via /v2/task.detail. Resposta: {"ok": true, "data": {"status": "running|stopped|error|waiting"}} """ if not MANUS_API_KEY: return {"sucesso": False, "erro": "MANUS_API_KEY não configurada"} base = MANUS_BASE_URL.rstrip('/') if base.endswith('/v2'): url = f"{base}/task.detail" else: url = f"{base}/v2/task.detail" headers = { "Authorization": f"Bearer {MANUS_API_KEY}", "x-manus-api-key": MANUS_API_KEY } params = {"task_id": task_id} try: response = requests.get(url, headers=headers, params=params, timeout=12) if response.status_code == 404: return {"sucesso": False, "erro": f"Tarefa {task_id} não encontrada (404)"} response.raise_for_status() data = response.json() if not data.get("ok", True): err = data.get("error", {}) return {"sucesso": False, "erro": err.get("message", "Erro desconhecido")} task_data = data.get("data", data) status = task_data.get("status") or task_data.get("task_status", "running") return {"sucesso": True, "status": status} except Exception as e: logger.debug(f"[MANUS] get_task_status erro: {e}") return {"sucesso": False, "erro": str(e)} @staticmethod def get_task_result(task_id: str) -> Dict[str, Any]: """ Obtém mensagens/resultado de uma tarefa via /v2/task.listMessages. Endpoint: GET /v2/task.listMessages Params: task_id=&limit=50&order=desc Resposta: {"ok": true, "data": {"messages": [{"role": "assistant", "content": "..."}]}} """ if not MANUS_API_KEY: return {"sucesso": False, "erro": "MANUS_API_KEY não configurada"} base = MANUS_BASE_URL.rstrip('/') if base.endswith('/v2'): url = f"{base}/task.listMessages" else: url = f"{base}/v2/task.listMessages" headers = { "Authorization": f"Bearer {MANUS_API_KEY}", "x-manus-api-key": MANUS_API_KEY } params = { "task_id": task_id, "limit": 50, "order": "desc" # mais recentes primeiro } try: logger.debug(f"[MANUS] Polling task: {task_id}") response = requests.get(url, headers=headers, params=params, timeout=15) if response.status_code == 404: return {"sucesso": False, "erro": f"task_id não encontrado: {task_id}"} if response.status_code == 401: return {"sucesso": False, "erro": "MANUS_API_KEY inválida (401)"} response.raise_for_status() data = response.json() # Verifica campo "ok" da API v2 if not data.get("ok", True): err = data.get("error", {}) return {"sucesso": False, "erro": err.get("message", "API retornou ok=false")} # Extrai mensagens: data.data.messages (não data.data.list!) messages_data = data.get("data", {}) messages = ( messages_data.get("messages") # formato v2 principal or messages_data.get("list") # fallback v1 legado or [] ) # Filtra mensagens do agente (role = "assistant") agent_messages = [ m for m in messages if m.get("role") in ("assistant", "agent") and m.get("content") and m.get("type", "message") == "message" ] if agent_messages: # Pega o conteúdo mais recente (order=desc → primeiro é mais recente) last_msg = agent_messages[0] content = last_msg.get("content", "") # Pode ser string ou lista de partes if isinstance(content, list): text_result = "".join( part.get("text", "") for part in content if isinstance(part, dict) and part.get("type") == "text" ) else: text_result = str(content) if text_result.strip(): logger.info(f"✅ [MANUS] Resultado obtido ({len(text_result)} chars)") return { "sucesso": True, "status": "completed", "resultado": text_result.strip() } # Sem mensagens do agente ainda → ainda a processar return {"sucesso": True, "status": "running", "msg": "Processando..."} except Exception as e: logger.error(f"🔴 [MANUS] get_task_result erro: {e}") return {"sucesso": False, "erro": str(e)} @staticmethod def research_sync(prompt: str, max_wait_seconds: int = 90) -> Dict[str, Any]: """ Executa pesquisa no Manus de forma síncrona com polling inteligente. Args: prompt: Descrição da tarefa de pesquisa max_wait_seconds: Timeout máximo (padrão 90s — Manus pode demorar até 60s) Returns: {"sucesso": True, "resultado": "..."} ou {"sucesso": False, "erro": "..."} """ # 1. Criar tarefa create_res = ManusProviders.create_research_task(prompt) if not create_res.get("sucesso"): logger.warning(f"⚠️ [MANUS] Falha ao criar tarefa: {create_res.get('erro')}") return create_res task_id = create_res["task_id"] start_time = time.time() poll_interval = 8 # segundos entre polls (não sobrecarregar a API) poll_count = 0 logger.info(f"🔄 [MANUS] Aguardando resultado da tarefa {task_id}...") while time.time() - start_time < max_wait_seconds: time.sleep(poll_interval) poll_count += 1 elapsed = int(time.time() - start_time) # Verifica resultado via listMessages result = ManusProviders.get_task_result(task_id) if not result.get("sucesso"): logger.warning(f"⚠️ [MANUS] Erro no poll #{poll_count}: {result.get('erro')}") # Não abortar imediatamente em erros de poll — pode ser transitório if poll_count >= 3: return result continue if result.get("status") == "completed": logger.info(f"✅ [MANUS] Tarefa concluída em ~{elapsed}s após {poll_count} polls") return result # Verifica status da tarefa (stopped/error) status_res = ManusProviders.get_task_status(task_id) if status_res.get("sucesso"): task_status = status_res.get("status", "running") if task_status in ("stopped", "error", "failed"): logger.warning(f"⚠️ [MANUS] Tarefa encerrada com status: {task_status}") # Tenta obter resultado mesmo assim (pode ter output parcial) final = ManusProviders.get_task_result(task_id) if final.get("status") == "completed": return final return {"sucesso": False, "erro": f"Tarefa encerrada com status: {task_status}"} logger.debug(f"[MANUS] Poll #{poll_count} — elapsed: {elapsed}s — ainda a processar...") logger.error(f"⏰ [MANUS] Timeout após {max_wait_seconds}s ({poll_count} polls)") return {"sucesso": False, "erro": f"Timeout aguardando o Manus AI finalizar (>{max_wait_seconds}s)"}