akira / modules /api_integrations /manus_providers.py
akra35567's picture
Upload 6 files
2b11c4c verified
"""
ManusProviders - Integração com a API do Manus AI (v2 oficial)
==============================================================
Documentação oficial: https://api.manus.im
CORREÇÕES APLICADAS:
1. Base URL corrigida: api.manus.im (não .ai)
2. Endpoint de criação: /api/task.create (com prefixo /api/)
3. Payload simplificado: {"prompt": "..."} (sem agent_profile nem message.content)
4. Polling via /api/task.listMessages com params correctos
5. Parsing correcto: data.data.messages (não data.data.list)
6. Verificação de data.ok para erros explícitos da API
"""
import time
import requests
from typing import Dict, Any, Optional
from modules.config import MANUS_API_KEY, MANUS_BASE_URL, logger
class ManusProviders:
"""
Provedor para a API do Manus AI v2.
Suporta criação de tarefas de pesquisa e polling de resultados.
"""
@staticmethod
def create_research_task(prompt: str) -> Dict[str, Any]:
"""
Cria uma tarefa de pesquisa no Manus AI v2.
Endpoint: POST /api/task.create
Payload: {"prompt": "<texto>"}
Resposta: {"ok": true, "data": {"task_id": "..."}}
"""
if not MANUS_API_KEY:
return {"sucesso": False, "erro": "MANUS_API_KEY não configurada"}
base = MANUS_BASE_URL.rstrip('/')
if base.endswith('/v2'):
url = f"{base}/task.create"
else:
url = f"{base}/v2/task.create"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {MANUS_API_KEY}",
"x-manus-api-key": MANUS_API_KEY
}
# Payload conforme API v2
payload = {
"title": prompt[:50],
"message": {
"content": prompt
}
}
try:
logger.info(f"🧠 [MANUS] Criando tarefa: {prompt[:80]}...")
response = requests.post(url, headers=headers, json=payload, timeout=20)
# Erro de autenticação
if response.status_code == 401:
logger.error("🔴 [MANUS] Erro 401 — Verifique a MANUS_API_KEY no .env")
return {"sucesso": False, "erro": "MANUS_API_KEY inválida ou expirada (401)"}
# Erro de payload (400 ou 422)
if response.status_code in [400, 422]:
logger.error(f"🔴 [MANUS] Erro {response.status_code} — Payload rejeitado: {response.text[:500]}")
return {"sucesso": False, "erro": f"Payload inválido ({response.status_code}): {response.text[:500]}"}
response.raise_for_status()
data = response.json()
# Verifica campo "ok" da API v2
if not data.get("ok", True): # se "ok" existir e for False
err = data.get("error", {})
return {"sucesso": False, "erro": f"Manus API error: {err.get('message', 'desconhecido')} [{err.get('code', '')}]"}
# Extrai task_id da resposta v2: {"ok": true, "data": {"task_id": "..."}}
task_id = (
data.get("data", {}).get("task_id")
or data.get("task_id")
or data.get("data", {}).get("id")
or data.get("id")
)
if task_id:
logger.info(f"✅ [MANUS] Tarefa criada: {task_id}")
return {"sucesso": True, "task_id": task_id, "status": "running"}
logger.error(f"🔴 [MANUS] task_id ausente na resposta: {data}")
return {"sucesso": False, "erro": f"task_id não retornado: {data}"}
except requests.exceptions.ConnectionError:
logger.error("🔴 [MANUS] Erro de conexão — verifique MANUS_BASE_URL e conectividade")
return {"sucesso": False, "erro": "Erro de conexão com api.manus.im"}
except Exception as e:
logger.error(f"🔴 [MANUS] Erro ao criar tarefa: {e}")
return {"sucesso": False, "erro": str(e)}
@staticmethod
def get_task_status(task_id: str) -> Dict[str, Any]:
"""
Obtém o status actual de uma tarefa via /v2/task.detail.
Resposta: {"ok": true, "data": {"status": "running|stopped|error|waiting"}}
"""
if not MANUS_API_KEY:
return {"sucesso": False, "erro": "MANUS_API_KEY não configurada"}
base = MANUS_BASE_URL.rstrip('/')
if base.endswith('/v2'):
url = f"{base}/task.detail"
else:
url = f"{base}/v2/task.detail"
headers = {
"Authorization": f"Bearer {MANUS_API_KEY}",
"x-manus-api-key": MANUS_API_KEY
}
params = {"task_id": task_id}
try:
response = requests.get(url, headers=headers, params=params, timeout=12)
if response.status_code == 404:
return {"sucesso": False, "erro": f"Tarefa {task_id} não encontrada (404)"}
response.raise_for_status()
data = response.json()
if not data.get("ok", True):
err = data.get("error", {})
return {"sucesso": False, "erro": err.get("message", "Erro desconhecido")}
task_data = data.get("data", data)
status = task_data.get("status") or task_data.get("task_status", "running")
return {"sucesso": True, "status": status}
except Exception as e:
logger.debug(f"[MANUS] get_task_status erro: {e}")
return {"sucesso": False, "erro": str(e)}
@staticmethod
def get_task_result(task_id: str) -> Dict[str, Any]:
"""
Obtém mensagens/resultado de uma tarefa via /v2/task.listMessages.
Endpoint: GET /v2/task.listMessages
Params: task_id=<id>&limit=50&order=desc
Resposta: {"ok": true, "data": {"messages": [{"role": "assistant", "content": "..."}]}}
"""
if not MANUS_API_KEY:
return {"sucesso": False, "erro": "MANUS_API_KEY não configurada"}
base = MANUS_BASE_URL.rstrip('/')
if base.endswith('/v2'):
url = f"{base}/task.listMessages"
else:
url = f"{base}/v2/task.listMessages"
headers = {
"Authorization": f"Bearer {MANUS_API_KEY}",
"x-manus-api-key": MANUS_API_KEY
}
params = {
"task_id": task_id,
"limit": 50,
"order": "desc" # mais recentes primeiro
}
try:
logger.debug(f"[MANUS] Polling task: {task_id}")
response = requests.get(url, headers=headers, params=params, timeout=15)
if response.status_code == 404:
return {"sucesso": False, "erro": f"task_id não encontrado: {task_id}"}
if response.status_code == 401:
return {"sucesso": False, "erro": "MANUS_API_KEY inválida (401)"}
response.raise_for_status()
data = response.json()
# Verifica campo "ok" da API v2
if not data.get("ok", True):
err = data.get("error", {})
return {"sucesso": False, "erro": err.get("message", "API retornou ok=false")}
# Extrai mensagens: data.data.messages (não data.data.list!)
messages_data = data.get("data", {})
messages = (
messages_data.get("messages") # formato v2 principal
or messages_data.get("list") # fallback v1 legado
or []
)
# Filtra mensagens do agente (role = "assistant")
agent_messages = [
m for m in messages
if m.get("role") in ("assistant", "agent")
and m.get("content")
and m.get("type", "message") == "message"
]
if agent_messages:
# Pega o conteúdo mais recente (order=desc → primeiro é mais recente)
last_msg = agent_messages[0]
content = last_msg.get("content", "")
# Pode ser string ou lista de partes
if isinstance(content, list):
text_result = "".join(
part.get("text", "") for part in content
if isinstance(part, dict) and part.get("type") == "text"
)
else:
text_result = str(content)
if text_result.strip():
logger.info(f"✅ [MANUS] Resultado obtido ({len(text_result)} chars)")
return {
"sucesso": True,
"status": "completed",
"resultado": text_result.strip()
}
# Sem mensagens do agente ainda → ainda a processar
return {"sucesso": True, "status": "running", "msg": "Processando..."}
except Exception as e:
logger.error(f"🔴 [MANUS] get_task_result erro: {e}")
return {"sucesso": False, "erro": str(e)}
@staticmethod
def research_sync(prompt: str, max_wait_seconds: int = 90) -> Dict[str, Any]:
"""
Executa pesquisa no Manus de forma síncrona com polling inteligente.
Args:
prompt: Descrição da tarefa de pesquisa
max_wait_seconds: Timeout máximo (padrão 90s — Manus pode demorar até 60s)
Returns:
{"sucesso": True, "resultado": "..."} ou {"sucesso": False, "erro": "..."}
"""
# 1. Criar tarefa
create_res = ManusProviders.create_research_task(prompt)
if not create_res.get("sucesso"):
logger.warning(f"⚠️ [MANUS] Falha ao criar tarefa: {create_res.get('erro')}")
return create_res
task_id = create_res["task_id"]
start_time = time.time()
poll_interval = 8 # segundos entre polls (não sobrecarregar a API)
poll_count = 0
logger.info(f"🔄 [MANUS] Aguardando resultado da tarefa {task_id}...")
while time.time() - start_time < max_wait_seconds:
time.sleep(poll_interval)
poll_count += 1
elapsed = int(time.time() - start_time)
# Verifica resultado via listMessages
result = ManusProviders.get_task_result(task_id)
if not result.get("sucesso"):
logger.warning(f"⚠️ [MANUS] Erro no poll #{poll_count}: {result.get('erro')}")
# Não abortar imediatamente em erros de poll — pode ser transitório
if poll_count >= 3:
return result
continue
if result.get("status") == "completed":
logger.info(f"✅ [MANUS] Tarefa concluída em ~{elapsed}s após {poll_count} polls")
return result
# Verifica status da tarefa (stopped/error)
status_res = ManusProviders.get_task_status(task_id)
if status_res.get("sucesso"):
task_status = status_res.get("status", "running")
if task_status in ("stopped", "error", "failed"):
logger.warning(f"⚠️ [MANUS] Tarefa encerrada com status: {task_status}")
# Tenta obter resultado mesmo assim (pode ter output parcial)
final = ManusProviders.get_task_result(task_id)
if final.get("status") == "completed":
return final
return {"sucesso": False, "erro": f"Tarefa encerrada com status: {task_status}"}
logger.debug(f"[MANUS] Poll #{poll_count} — elapsed: {elapsed}s — ainda a processar...")
logger.error(f"⏰ [MANUS] Timeout após {max_wait_seconds}s ({poll_count} polls)")
return {"sucesso": False, "erro": f"Timeout aguardando o Manus AI finalizar (>{max_wait_seconds}s)"}