ohmyapi commited on
Commit
ad25dd6
·
1 Parent(s): 7db8c45

fix: Integrations 401 auth + Jobs API speed optimization

Browse files
emergent2api/backends/integrations.py CHANGED
@@ -1,9 +1,18 @@
1
- """Integrations API backend: proxy to integrations.emergentagent.com/llm."""
 
 
 
 
 
 
 
 
2
  from __future__ import annotations
3
 
4
  import asyncio
5
  import json
6
  import logging
 
7
  from typing import Any, AsyncGenerator
8
 
9
  import httpx
@@ -16,10 +25,26 @@ logger = logging.getLogger("emergent2api.integrations")
16
 
17
  class IntegrationsBackend(EmergentBackend):
18
  """Proxy to Emergent's OpenAI-compatible integrations endpoint.
19
-
20
  Requires a US-based proxy since the endpoint has IP restrictions.
 
21
  """
22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
  async def chat(
24
  self,
25
  messages: list[dict[str, Any]],
@@ -28,15 +53,20 @@ class IntegrationsBackend(EmergentBackend):
28
  account: dict[str, Any],
29
  stream: bool = False,
30
  ) -> AsyncGenerator[dict[str, Any], None]:
31
- jwt = account["jwt"]
32
  url = f"{settings.integrations_base}/v1/chat/completions"
33
 
34
- body = {
35
  "model": model,
36
  "messages": messages,
37
  "stream": stream,
38
  }
39
 
 
 
 
 
 
40
  proxy = settings.proxy or None
41
  transport = httpx.AsyncHTTPTransport(proxy=proxy) if proxy else None
42
 
@@ -47,13 +77,7 @@ class IntegrationsBackend(EmergentBackend):
47
  ) as client:
48
  if stream:
49
  async with client.stream(
50
- "POST",
51
- url,
52
- json=body,
53
- headers={
54
- "Authorization": f"Bearer {jwt}",
55
- "Content-Type": "application/json",
56
- },
57
  ) as resp:
58
  if resp.status_code != 200:
59
  err = await resp.aread()
@@ -79,14 +103,7 @@ class IntegrationsBackend(EmergentBackend):
79
 
80
  yield {"type": "done", "content": full_text, "thinking": ""}
81
  else:
82
- resp = await client.post(
83
- url,
84
- json=body,
85
- headers={
86
- "Authorization": f"Bearer {jwt}",
87
- "Content-Type": "application/json",
88
- },
89
- )
90
  if resp.status_code != 200:
91
  yield {"type": "error", "content": f"Integrations API: HTTP {resp.status_code} {resp.text[:200]}"}
92
  return
 
1
+ """Integrations API backend: proxy to integrations.emergentagent.com/llm.
2
+
3
+ The Integrations endpoint is an Anthropic-compatible proxy that requires either:
4
+ 1. A per-account integration_key (provisioned by Emergent for Pro/paying users)
5
+ 2. An explicit ANTHROPIC_API_KEY env var
6
+ 3. Falls back to the Supabase JWT (which won't work for free accounts)
7
+
8
+ The key is sent via ``x-api-key`` header (Anthropic convention).
9
+ """
10
  from __future__ import annotations
11
 
12
  import asyncio
13
  import json
14
  import logging
15
+ import os
16
  from typing import Any, AsyncGenerator
17
 
18
  import httpx
 
25
 
26
  class IntegrationsBackend(EmergentBackend):
27
  """Proxy to Emergent's OpenAI-compatible integrations endpoint.
28
+
29
  Requires a US-based proxy since the endpoint has IP restrictions.
30
+ Auth: ``x-api-key`` header with the account's ``integration_key``.
31
  """
32
 
33
+ def _resolve_api_key(self, account: dict[str, Any]) -> str:
34
+ """Pick the best API key for the Integrations endpoint.
35
+
36
+ Priority:
37
+ 1. ``integration_key`` stored per-account (DB column)
38
+ 2. ``ANTHROPIC_API_KEY`` env var (global fallback)
39
+ 3. Supabase JWT (will 401 for free accounts, but tried as last resort)
40
+ """
41
+ if account.get("integration_key"):
42
+ return account["integration_key"]
43
+ env_key = os.environ.get("ANTHROPIC_API_KEY", "")
44
+ if env_key:
45
+ return env_key
46
+ return account["jwt"]
47
+
48
  async def chat(
49
  self,
50
  messages: list[dict[str, Any]],
 
53
  account: dict[str, Any],
54
  stream: bool = False,
55
  ) -> AsyncGenerator[dict[str, Any], None]:
56
+ api_key = self._resolve_api_key(account)
57
  url = f"{settings.integrations_base}/v1/chat/completions"
58
 
59
+ body: dict[str, Any] = {
60
  "model": model,
61
  "messages": messages,
62
  "stream": stream,
63
  }
64
 
65
+ headers = {
66
+ "x-api-key": api_key,
67
+ "Content-Type": "application/json",
68
+ }
69
+
70
  proxy = settings.proxy or None
71
  transport = httpx.AsyncHTTPTransport(proxy=proxy) if proxy else None
72
 
 
77
  ) as client:
78
  if stream:
79
  async with client.stream(
80
+ "POST", url, json=body, headers=headers,
 
 
 
 
 
 
81
  ) as resp:
82
  if resp.status_code != 200:
83
  err = await resp.aread()
 
103
 
104
  yield {"type": "done", "content": full_text, "thinking": ""}
105
  else:
106
+ resp = await client.post(url, json=body, headers=headers)
 
 
 
 
 
 
 
107
  if resp.status_code != 200:
108
  yield {"type": "error", "content": f"Integrations API: HTTP {resp.status_code} {resp.text[:200]}"}
109
  return
emergent2api/backends/jobs.py CHANGED
@@ -64,21 +64,21 @@ class JobsBackend(EmergentBackend):
64
  confirmed_end = False
65
  has_received_anything = False
66
  job_completed = False
67
- max_wait_cycles = 600
 
68
  start_time = time.time()
69
 
70
- await asyncio.sleep(8.0)
71
 
72
- for _ in range(max_wait_cycles):
73
  result = await asyncio.to_thread(self._poll_trajectory, jwt, ref_id)
74
  if result is None:
75
- await self._dynamic_sleep(consecutive_unchanged)
76
  consecutive_unchanged += 1
77
  continue
78
 
79
  data_items = result.get("data", [])
80
 
81
- # Check for explicit completion signal in trajectory items
82
  for item in data_items:
83
  status = (item.get("traj_payload") or {}).get("status", "")
84
  if status in ("completed", "done", "finished"):
@@ -107,6 +107,7 @@ class JobsBackend(EmergentBackend):
107
  consecutive_unchanged = 0
108
  confirmed_end = False
109
  has_received_anything = True
 
110
  if stream and delta:
111
  yield {"type": "text", "content": delta}
112
  else:
@@ -116,18 +117,25 @@ class JobsBackend(EmergentBackend):
116
  logger.info("Stream completed (job status=completed)")
117
  break
118
 
119
- # Require more stability cycles and longer confirmation for complex queries
120
- stable_threshold = 15 if len(full_text) < 500 else 10
121
- if has_received_anything and full_text and consecutive_unchanged >= stable_threshold:
 
 
 
 
 
 
122
  if not confirmed_end:
123
- logger.debug("Content stable, confirming end...")
124
  confirmed_end = True
125
- await asyncio.sleep(8.0)
 
126
  continue
127
- logger.info("Stream completed (double-confirmed stable)")
128
  break
129
 
130
- await self._dynamic_sleep(consecutive_unchanged)
131
 
132
  elapsed = time.time() - start_time
133
  logger.info(f"Job finished: {len(full_text)} chars in {elapsed:.1f}s")
@@ -142,10 +150,15 @@ class JobsBackend(EmergentBackend):
142
  }
143
 
144
  @staticmethod
145
- async def _dynamic_sleep(consecutive_unchanged: int) -> None:
146
- if consecutive_unchanged == 0:
147
- await asyncio.sleep(1.0)
 
 
 
148
  elif consecutive_unchanged < 3:
 
 
149
  await asyncio.sleep(2.0)
150
  else:
151
  await asyncio.sleep(3.0)
 
64
  confirmed_end = False
65
  has_received_anything = False
66
  job_completed = False
67
+ last_change_time = 0.0
68
+ max_wait_cycles = 900
69
  start_time = time.time()
70
 
71
+ await asyncio.sleep(3.0)
72
 
73
+ for cycle in range(max_wait_cycles):
74
  result = await asyncio.to_thread(self._poll_trajectory, jwt, ref_id)
75
  if result is None:
76
+ await self._dynamic_sleep(consecutive_unchanged, has_received_anything)
77
  consecutive_unchanged += 1
78
  continue
79
 
80
  data_items = result.get("data", [])
81
 
 
82
  for item in data_items:
83
  status = (item.get("traj_payload") or {}).get("status", "")
84
  if status in ("completed", "done", "finished"):
 
107
  consecutive_unchanged = 0
108
  confirmed_end = False
109
  has_received_anything = True
110
+ last_change_time = time.time()
111
  if stream and delta:
112
  yield {"type": "text", "content": delta}
113
  else:
 
117
  logger.info("Stream completed (job status=completed)")
118
  break
119
 
120
+ stable_secs = time.time() - last_change_time if last_change_time else 0
121
+ text_len = len(full_text)
122
+ if text_len < 50:
123
+ stable_limit = 15.0
124
+ elif text_len < 500:
125
+ stable_limit = 25.0
126
+ else:
127
+ stable_limit = 45.0
128
+ if has_received_anything and full_text and stable_secs >= stable_limit:
129
  if not confirmed_end:
130
+ logger.debug(f"Content stable for {stable_secs:.0f}s (len={text_len}), confirming...")
131
  confirmed_end = True
132
+ confirm_wait = 5.0 if text_len < 200 else 10.0
133
+ await asyncio.sleep(confirm_wait)
134
  continue
135
+ logger.info(f"Stream completed (stable {stable_secs:.0f}s, {text_len} chars)")
136
  break
137
 
138
+ await self._dynamic_sleep(consecutive_unchanged, has_received_anything)
139
 
140
  elapsed = time.time() - start_time
141
  logger.info(f"Job finished: {len(full_text)} chars in {elapsed:.1f}s")
 
150
  }
151
 
152
  @staticmethod
153
+ async def _dynamic_sleep(consecutive_unchanged: int, has_content: bool) -> None:
154
+ """Adaptive sleep: poll fast early, back off when stable."""
155
+ if not has_content:
156
+ await asyncio.sleep(0.8)
157
+ elif consecutive_unchanged == 0:
158
+ await asyncio.sleep(0.5)
159
  elif consecutive_unchanged < 3:
160
+ await asyncio.sleep(1.0)
161
+ elif consecutive_unchanged < 8:
162
  await asyncio.sleep(2.0)
163
  else:
164
  await asyncio.sleep(3.0)
emergent2api/database.py CHANGED
@@ -41,6 +41,13 @@ BEGIN
41
  ) THEN
42
  ALTER TABLE emergent_accounts ADD COLUMN use_count INTEGER DEFAULT 0;
43
  END IF;
 
 
 
 
 
 
 
44
  END $$;
45
  """
46
 
 
41
  ) THEN
42
  ALTER TABLE emergent_accounts ADD COLUMN use_count INTEGER DEFAULT 0;
43
  END IF;
44
+
45
+ IF NOT EXISTS (
46
+ SELECT 1 FROM information_schema.columns
47
+ WHERE table_name = 'emergent_accounts' AND column_name = 'integration_key'
48
+ ) THEN
49
+ ALTER TABLE emergent_accounts ADD COLUMN integration_key TEXT DEFAULT '';
50
+ END IF;
51
  END $$;
52
  """
53
 
emergent2api/routes/admin.py CHANGED
@@ -4,6 +4,7 @@ from __future__ import annotations
4
  import io
5
  import json
6
  import logging
 
7
  import secrets
8
  import zipfile
9
  from typing import Any
@@ -313,6 +314,7 @@ async def get_config(request: Request):
313
  "poll_interval": db_config.get("poll_interval", str(settings.poll_interval)),
314
  "poll_timeout": db_config.get("poll_timeout", str(settings.poll_timeout)),
315
  "admin_password": db_config.get("admin_password", settings.admin_password),
 
316
  }
317
 
318
 
@@ -322,7 +324,7 @@ async def update_config(request: Request):
322
  if err:
323
  return err
324
  body = await request.json()
325
- allowed = {"api_key", "backend", "proxy", "poll_interval", "poll_timeout", "admin_password"}
326
  saved = []
327
  for key, value in body.items():
328
  if key in allowed:
@@ -335,6 +337,8 @@ async def update_config(request: Request):
335
  settings.proxy = str(value)
336
  elif key == "admin_password":
337
  settings.admin_password = str(value)
 
 
338
  saved.append(key)
339
  return {"saved": saved}
340
 
 
4
  import io
5
  import json
6
  import logging
7
+ import os
8
  import secrets
9
  import zipfile
10
  from typing import Any
 
314
  "poll_interval": db_config.get("poll_interval", str(settings.poll_interval)),
315
  "poll_timeout": db_config.get("poll_timeout", str(settings.poll_timeout)),
316
  "admin_password": db_config.get("admin_password", settings.admin_password),
317
+ "anthropic_key": db_config.get("anthropic_key", os.environ.get("ANTHROPIC_API_KEY", "")),
318
  }
319
 
320
 
 
324
  if err:
325
  return err
326
  body = await request.json()
327
+ allowed = {"api_key", "backend", "proxy", "poll_interval", "poll_timeout", "admin_password", "anthropic_key"}
328
  saved = []
329
  for key, value in body.items():
330
  if key in allowed:
 
337
  settings.proxy = str(value)
338
  elif key == "admin_password":
339
  settings.admin_password = str(value)
340
+ elif key == "anthropic_key":
341
+ os.environ["ANTHROPIC_API_KEY"] = str(value)
342
  saved.append(key)
343
  return {"saved": saved}
344
 
emergent2api/static/admin/index.html CHANGED
@@ -234,9 +234,10 @@ tr:hover td{background:rgba(255,255,255,.02)}
234
  </div>
235
  <div class="fld">
236
  <label>Backend</label>
237
- <select id="cfg-backend"><option value="jobs">Jobs API(无 IP 限制)</option><option value="integrations">Integrations API(更快,需代理)</option></select>
238
  </div>
239
- <div class="fld"><label>Proxy</label><input id="cfg-proxy" placeholder="http://user:pass@host:port" spellcheck="false"><div class="hint">Integrations 后端必填;Jobs 可选</div></div>
 
240
  </div>
241
  <div class="card">
242
  <h3>Admin</h3>
@@ -443,9 +444,10 @@ async function loadConfig(){
443
  const r=await api('/config');if(!r)return;const d=await r.json();
444
  $('cfg-api_key').value=d.api_key||'';$('cfg-backend').value=d.backend||'jobs';
445
  $('cfg-proxy').value=d.proxy||'';$('cfg-admin_password').value=d.admin_password||'';
 
446
  }
447
  async function saveConfig(){
448
- const b={api_key:$('cfg-api_key').value,backend:$('cfg-backend').value,proxy:$('cfg-proxy').value,admin_password:$('cfg-admin_password').value};
449
  const r=await api('/config',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify(b)});
450
  if(r){const d=await r.json();toast(`已保存: ${d.saved.join(', ')}`)}
451
  }
 
234
  </div>
235
  <div class="fld">
236
  <label>Backend</label>
237
+ <select id="cfg-backend"><option value="jobs">Jobs API(无 IP 限制,推荐)</option><option value="integrations">Integrations API(需 Anthropic Key + 代理)</option></select>
238
  </div>
239
+ <div class="fld"><label>Proxy</label><input id="cfg-proxy" placeholder="http://user:pass@host:port" spellcheck="false"><div class="hint">Integrations 后端必填(需美国 IP);Jobs 可选</div></div>
240
+ <div class="fld"><label>ANTHROPIC_API_KEY</label><input id="cfg-anthropic_key" placeholder="sk-ant-... (Integrations 后端专用)" spellcheck="false"><div class="hint">仅 Integrations 后端需要;可留空使用 Jobs 后端</div></div>
241
  </div>
242
  <div class="card">
243
  <h3>Admin</h3>
 
444
  const r=await api('/config');if(!r)return;const d=await r.json();
445
  $('cfg-api_key').value=d.api_key||'';$('cfg-backend').value=d.backend||'jobs';
446
  $('cfg-proxy').value=d.proxy||'';$('cfg-admin_password').value=d.admin_password||'';
447
+ $('cfg-anthropic_key').value=d.anthropic_key||'';
448
  }
449
  async function saveConfig(){
450
+ const b={api_key:$('cfg-api_key').value,backend:$('cfg-backend').value,proxy:$('cfg-proxy').value,admin_password:$('cfg-admin_password').value,anthropic_key:$('cfg-anthropic_key').value};
451
  const r=await api('/config',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify(b)});
452
  if(r){const d=await r.json();toast(`已保存: ${d.saved.join(', ')}`)}
453
  }