ONYX / main.go
smysle
initial commit
3ecfe58
package main
import (
"bufio"
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"strings"
"sync/atomic"
"time"
)
const (
onyxBase = "https://cloud.onyx.app"
defaultPersona = 1
maxRetries = 3
ver = "0.5.0"
)
var (
retryBackoff = [3]time.Duration{2 * time.Second, 5 * time.Second, 10 * time.Second}
retryStatus = map[int]bool{502: true, 503: true, 504: true, 429: true}
httpClient = &http.Client{
Timeout: 5 * time.Minute,
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 200,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}
keys []string
clientKeys map[string]struct{}
keyIdx uint64
)
// ── Key rotation ────────────────────────────────────────────────────────
func nextKey() string {
n := len(keys)
if n == 0 {
return ""
}
return keys[atomic.AddUint64(&keyIdx, 1)%uint64(n)]
}
func extractToken(s string) string {
s = strings.TrimSpace(s)
if s == "" {
return ""
}
if strings.HasPrefix(strings.ToLower(s), "bearer ") {
return strings.TrimSpace(s[7:])
}
return s
}
func resolveAuth(headers ...string) string {
if len(keys) > 0 {
return nextKey()
}
for _, h := range headers {
if h != "" {
return extractToken(h)
}
}
return ""
}
func checkClientAuth(r *http.Request) bool {
if len(clientKeys) == 0 {
return true
}
token := strings.TrimSpace(r.Header.Get("x-api-key"))
if token == "" {
token = extractToken(r.Header.Get("Authorization"))
}
if token == "" {
return false
}
_, ok := clientKeys[token]
return ok
}
// ── Model mapping ───────────────────────────────────────────────────────
type pm struct{ provider, version string }
var modelMap = map[string]pm{
"claude-opus-4-6": {"Anthropic", "claude-opus-4-6"},
"claude-opus-4-5": {"Anthropic", "claude-opus-4-5"},
"claude-sonnet-4-6": {"Anthropic", "claude-sonnet-4-6"},
"claude-sonnet-4-5": {"Anthropic", "claude-sonnet-4-5"},
"claude-sonnet-4": {"Anthropic", "claude-sonnet-4-20250514"},
"claude-3-5-sonnet": {"Anthropic", "claude-3-5-sonnet-20241022"},
"claude-3-5-haiku": {"Anthropic", "claude-3-5-haiku-20241022"},
"claude-3-opus": {"Anthropic", "claude-3-opus-20240229"},
"claude-haiku-4-5": {"Anthropic", "claude-haiku-4-5"},
"gpt-4o": {"OpenAI", "gpt-4o"},
"gpt-4o-mini": {"OpenAI", "gpt-4o-mini"},
"gpt-4-turbo": {"OpenAI", "gpt-4-turbo"},
"gpt-4": {"OpenAI", "gpt-4"},
"o1": {"OpenAI", "o1"},
"o1-mini": {"OpenAI", "o1-mini"},
"o3-mini": {"OpenAI", "o3-mini"},
"gemini-2.0-flash": {"Google", "gemini-2.0-flash"},
"gemini-2.5-pro": {"Google", "gemini-2.5-pro-preview-05-06"},
}
func buildLLMOverride(model string, temp float64) map[string]any {
m, ok := modelMap[model]
if !ok {
parts := strings.SplitN(model, "__", 3)
if len(parts) == 3 {
m = pm{parts[0], parts[2]}
} else {
m = pm{"Anthropic", model}
}
}
return map[string]any{"model_provider": m.provider, "model_version": m.version, "temperature": temp}
}
// ── Message conversion ──────────────────────────────────────────────────
func textContent(raw any) string {
switch v := raw.(type) {
case string:
return v
case []any:
var parts []string
for _, item := range v {
if m, ok := item.(map[string]any); ok && m["type"] == "text" {
if s, ok := m["text"].(string); ok {
parts = append(parts, s)
}
}
}
return strings.Join(parts, "\n")
default:
return fmt.Sprint(v)
}
}
type chatMsg struct {
Role string `json:"role"`
Content any `json:"content"`
ToolCallID string `json:"tool_call_id,omitempty"`
}
func messagesToOnyx(system string, msgs []chatMsg) string {
var conv []string
var lastUser string
for _, m := range msgs {
c := textContent(m.Content)
switch m.Role {
case "system":
if system == "" {
system = c
} else {
system += "\n" + c
}
case "user":
lastUser = c
conv = append(conv, "User: "+c)
case "assistant":
conv = append(conv, "Assistant: "+c)
case "tool":
tid := m.ToolCallID
if tid == "" {
tid = "unknown"
}
conv = append(conv, fmt.Sprintf("Tool result (%s): %s", tid, c))
}
}
if system == "" && len(conv) == 1 && len(msgs) == 1 && msgs[0].Role == "user" {
return lastUser
}
if system != "" && len(conv) == 1 && len(msgs) >= 1 {
return "[System: " + system + "]\n\n" + lastUser
}
var b strings.Builder
if system != "" {
b.WriteString("[System: " + system + "]\n\n")
}
b.WriteString(strings.Join(conv, "\n"))
return b.String()
}
// ── Helpers ─────────────────────────────────────────────────────────────
func genID(prefix string) string {
b := make([]byte, 15)
if _, err := rand.Read(b); err != nil {
return fmt.Sprintf("%s%d", prefix, time.Now().UnixNano())
}
return prefix + hex.EncodeToString(b)[:29]
}
func toStrSlice(v any) []string {
arr, ok := v.([]any)
if !ok {
return nil
}
var out []string
for _, item := range arr {
if s, ok := item.(string); ok {
out = append(out, s)
}
}
return out
}
func writeJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
json.NewEncoder(w).Encode(v)
}
func sleepOrDone(ctx context.Context, d time.Duration) bool {
t := time.NewTimer(d)
defer t.Stop()
select {
case <-ctx.Done():
return false
case <-t.C:
return true
}
}
// ── Core: send to Onyx with retry ──────────────────────────────────────
type onyxError struct {
Status int
Body string
}
func (e *onyxError) Error() string {
return fmt.Sprintf("Onyx HTTP %d: %s", e.Status, e.Body)
}
func doOnyxRequest(ctx context.Context, token, model string, msgs []chatMsg, system string, temp float64, persona int) (*http.Response, error) {
bodyBytes, err := json.Marshal(map[string]any{
"message": messagesToOnyx(system, msgs),
"chat_session_info": map[string]any{"persona_id": persona},
"llm_override": buildLLMOverride(model, temp),
"stream": true,
"file_descriptors": []any{},
"deep_research": false,
"origin": "api",
})
if err != nil {
return nil, fmt.Errorf("marshal onyx request: %w", err)
}
var lastErr error
for attempt := range maxRetries + 1 {
if err := ctx.Err(); err != nil {
return nil, err
}
cur := token
if attempt > 0 && len(keys) > 0 {
cur = nextKey()
log.Printf("Retry %d/%d, switched key", attempt, maxRetries)
}
req, err := http.NewRequestWithContext(
ctx,
http.MethodPost,
onyxBase+"/api/chat/send-chat-message",
bytes.NewReader(bodyBytes),
)
if err != nil {
return nil, fmt.Errorf("create onyx request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+cur)
resp, err := httpClient.Do(req)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return nil, err
}
lastErr = err
if attempt < maxRetries {
wait := retryBackoff[min(attempt, len(retryBackoff)-1)]
log.Printf("Attempt %d failed (%v), retry in %v", attempt+1, err, wait)
if !sleepOrDone(ctx, wait) {
return nil, ctx.Err()
}
continue
}
break
}
if resp.StatusCode != 200 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
resp.Body.Close()
log.Printf("Onyx HTTP %d for model=%s: %s", resp.StatusCode, model, string(body))
if retryStatus[resp.StatusCode] && attempt < maxRetries {
wait := retryBackoff[min(attempt, len(retryBackoff)-1)]
log.Printf("Attempt %d failed (HTTP %d), retry in %v", attempt+1, resp.StatusCode, wait)
if !sleepOrDone(ctx, wait) {
return nil, ctx.Err()
}
continue
}
return nil, &onyxError{Status: resp.StatusCode, Body: string(body)}
}
return resp, nil
}
return nil, fmt.Errorf("all retries failed: %v", lastErr)
}
// ── Onyx NDJSON scanner ────────────────────────────────────────────────
type onyxScanner struct {
sc *bufio.Scanner
}
type onyxEvent struct {
Type string
Obj map[string]any
Err string
}
func newOnyxScanner(r io.Reader) *onyxScanner {
sc := bufio.NewScanner(r)
sc.Buffer(make([]byte, 0, 64*1024), 1024*1024)
return &onyxScanner{sc: sc}
}
func (s *onyxScanner) Next() (onyxEvent, bool) {
for s.sc.Scan() {
line := strings.TrimSpace(s.sc.Text())
if line == "" {
continue
}
var raw map[string]any
if json.Unmarshal([]byte(line), &raw) != nil {
continue
}
if raw["user_message_id"] != nil {
continue
}
if raw["error"] != nil && raw["obj"] == nil {
return onyxEvent{Type: "error", Err: fmt.Sprint(raw["error"])}, true
}
obj, _ := raw["obj"].(map[string]any)
if obj == nil {
continue
}
t, _ := obj["type"].(string)
return onyxEvent{Type: t, Obj: obj}, true
}
return onyxEvent{}, false
}
// ══════════════════════════════════════════════════════════════════════════
// OpenAI FORMAT
// ══════════════════════════════════════════════════════════════════════════
func sse(v any) []byte {
b, _ := json.Marshal(v)
return append(append([]byte("data: "), b...), '\n', '\n')
}
var sseDone = []byte("data: [DONE]\n\n")
func makeChunk(id string, created int64, model string, delta map[string]any, finish *string) map[string]any {
return map[string]any{
"id": id, "object": "chat.completion.chunk", "created": created, "model": model,
"choices": []any{map[string]any{"index": 0, "delta": delta, "finish_reason": finish}},
}
}
func streamOpenAI(w io.Writer, flush func(), body io.Reader, model, rid string) {
created := time.Now().Unix()
sentRole := false
toolActive := false
stop := "stop"
scan := newOnyxScanner(body)
emit := func(delta map[string]any, fin *string) {
w.Write(sse(makeChunk(rid, created, model, delta, fin)))
flush()
}
ensureRole := func(d map[string]any) {
if !sentRole {
d["role"] = "assistant"
sentRole = true
}
}
for {
ev, ok := scan.Next()
if !ok {
break
}
if ev.Err != "" {
emit(map[string]any{"content": "\n\n[Error: " + ev.Err + "]"}, &stop)
w.Write(sseDone)
flush()
return
}
obj := ev.Obj
switch ev.Type {
case "reasoning_start", "reasoning_done":
case "reasoning_delta":
if s, _ := obj["reasoning"].(string); s != "" {
emit(map[string]any{"reasoning_content": s}, nil)
}
case "message_start":
if !sentRole {
emit(map[string]any{"role": "assistant", "content": ""}, nil)
sentRole = true
}
case "message_delta":
if c, _ := obj["content"].(string); c != "" {
d := map[string]any{"content": c}
ensureRole(d)
emit(d, nil)
}
case "search_tool_start":
toolActive = true
label := "Internal Search"
if v, _ := obj["is_internet_search"].(bool); v {
label = "Web Search"
}
d := map[string]any{"content": "\n[" + label + "] "}
ensureRole(d)
emit(d, nil)
case "search_tool_queries_delta":
if qs := toStrSlice(obj["queries"]); len(qs) > 0 {
var q []string
for _, s := range qs {
q = append(q, "\u201c"+s+"\u201d")
}
emit(map[string]any{"content": "Searching: " + strings.Join(q, ", ") + "\n"}, nil)
}
case "search_tool_documents_delta":
if docs, ok := obj["documents"].([]any); ok && len(docs) > 0 {
emit(map[string]any{"content": fmt.Sprintf("Found %d results.\n", len(docs))}, nil)
}
case "open_url_start":
toolActive = true
d := map[string]any{"content": "\n[Opening URL] "}
ensureRole(d)
emit(d, nil)
case "open_url_urls":
if urls := toStrSlice(obj["urls"]); len(urls) > 0 {
emit(map[string]any{"content": strings.Join(urls, ", ") + "\n"}, nil)
}
case "open_url_documents":
if docs, ok := obj["documents"].([]any); ok && len(docs) > 0 {
emit(map[string]any{"content": fmt.Sprintf("Loaded %d pages.\n", len(docs))}, nil)
}
case "image_generation_start":
toolActive = true
d := map[string]any{"content": "\n[Generating Image...]\n"}
ensureRole(d)
emit(d, nil)
case "image_generation_heartbeat":
case "image_generation_final":
if images, ok := obj["images"].([]any); ok {
for _, img := range images {
if m, ok := img.(map[string]any); ok {
u, _ := m["url"].(string)
p, _ := m["revised_prompt"].(string)
if u != "" {
emit(map[string]any{"content": fmt.Sprintf("![%s](%s)\n", p, u)}, nil)
}
}
}
}
case "python_tool_start":
toolActive = true
code, _ := obj["code"].(string)
text := "\n[Code Interpreter]\n"
if code != "" {
text += "```python\n" + code + "\n```\n"
}
d := map[string]any{"content": text}
ensureRole(d)
emit(d, nil)
case "python_tool_delta":
var parts []string
if s, _ := obj["stdout"].(string); s != "" {
parts = append(parts, "Output: "+s)
}
if s, _ := obj["stderr"].(string); s != "" {
parts = append(parts, "Error: "+s)
}
if len(parts) > 0 {
emit(map[string]any{"content": strings.Join(parts, "\n") + "\n"}, nil)
}
case "custom_tool_start":
toolActive = true
tn, _ := obj["tool_name"].(string)
if tn == "" {
tn = "custom_tool"
}
d := map[string]any{"content": "\n[Tool: " + tn + "]\n"}
ensureRole(d)
emit(d, nil)
case "custom_tool_delta":
if td := obj["data"]; td != nil {
var text string
if m, ok := td.(map[string]any); ok {
b, _ := json.Marshal(m)
text = string(b)
} else {
text = fmt.Sprint(td)
}
if text != "" {
emit(map[string]any{"content": text + "\n"}, nil)
}
}
case "file_reader_start":
toolActive = true
d := map[string]any{"content": "\n[Reading File...]\n"}
ensureRole(d)
emit(d, nil)
case "file_reader_result":
if fn, _ := obj["file_name"].(string); fn != "" {
emit(map[string]any{"content": "Read: " + fn + "\n"}, nil)
}
case "deep_research_plan_start":
d := map[string]any{"content": "\n[Research Plan]\n"}
ensureRole(d)
emit(d, nil)
case "deep_research_plan_delta", "intermediate_report_start", "intermediate_report_delta":
if c, _ := obj["content"].(string); c != "" {
emit(map[string]any{"content": c}, nil)
}
case "research_agent_start":
task, _ := obj["research_task"].(string)
emit(map[string]any{"content": "\n[Researching: " + task + "]\n"}, nil)
case "memory_tool_start", "memory_tool_delta", "memory_tool_no_access",
"citation_info", "top_level_branching", "intermediate_report_cited_docs", "tool_call_debug":
case "section_end":
if toolActive {
emit(map[string]any{"content": "\n"}, nil)
toolActive = false
}
case "stop":
emit(map[string]any{}, &stop)
w.Write(sseDone)
flush()
return
case "error":
msg, _ := obj["error"].(string)
if msg == "" {
msg = "Unknown error"
}
emit(map[string]any{"content": "\n[Error: " + msg + "]"}, &stop)
w.Write(sseDone)
flush()
return
default:
if ev.Type != "" {
log.Printf("Unknown SSE event: %s", ev.Type)
}
}
}
w.Write(sseDone)
flush()
}
func collectOpenAI(body io.Reader) string {
var parts, toolCtx []string
scan := newOnyxScanner(body)
for {
ev, ok := scan.Next()
if !ok {
break
}
if ev.Err != "" {
parts = append(parts, "\n[Error: "+ev.Err+"]")
break
}
obj := ev.Obj
switch ev.Type {
case "message_delta":
if c, _ := obj["content"].(string); c != "" {
parts = append(parts, c)
}
case "search_tool_start":
label := "Internal Search"
if v, _ := obj["is_internet_search"].(bool); v {
label = "Web Search"
}
toolCtx = append(toolCtx, "["+label+"]")
case "search_tool_queries_delta":
if qs := toStrSlice(obj["queries"]); len(qs) > 0 {
toolCtx = append(toolCtx, "Searching: "+strings.Join(qs, ", "))
}
case "search_tool_documents_delta":
if docs, ok := obj["documents"].([]any); ok && len(docs) > 0 {
toolCtx = append(toolCtx, fmt.Sprintf("Found %d results.", len(docs)))
}
case "open_url_start":
toolCtx = append(toolCtx, "[Opening URL]")
case "open_url_urls":
if urls := toStrSlice(obj["urls"]); len(urls) > 0 {
toolCtx = append(toolCtx, strings.Join(urls, ", "))
}
case "python_tool_start":
code, _ := obj["code"].(string)
toolCtx = append(toolCtx, "[Code Interpreter]\n```python\n"+code+"\n```")
case "python_tool_delta":
if s, _ := obj["stdout"].(string); s != "" {
toolCtx = append(toolCtx, "Output: "+s)
}
if s, _ := obj["stderr"].(string); s != "" {
toolCtx = append(toolCtx, "Error: "+s)
}
case "image_generation_final":
if images, ok := obj["images"].([]any); ok {
for _, img := range images {
if m, ok := img.(map[string]any); ok {
u, _ := m["url"].(string)
p, _ := m["revised_prompt"].(string)
if u != "" {
toolCtx = append(toolCtx, fmt.Sprintf("![%s](%s)", p, u))
}
}
}
}
case "custom_tool_start":
tn, _ := obj["tool_name"].(string)
toolCtx = append(toolCtx, "[Tool: "+tn+"]")
case "custom_tool_delta":
if td := obj["data"]; td != nil {
toolCtx = append(toolCtx, fmt.Sprint(td))
}
case "error":
msg, _ := obj["error"].(string)
parts = append(parts, "\n[Error: "+msg+"]")
case "stop":
goto done
}
}
done:
var b strings.Builder
if len(toolCtx) > 0 {
b.WriteString(strings.Join(toolCtx, "\n") + "\n\n")
}
for _, p := range parts {
b.WriteString(p)
}
return b.String()
}
// ══════════════════════════════════════════════════════════════════════════
// Anthropic FORMAT
// ══════════════════════════════════════════════════════════════════════════
func anthropicSSE(event string, data any) []byte {
b, _ := json.Marshal(data)
return []byte("event: " + event + "\ndata: " + string(b) + "\n\n")
}
func streamAnthropic(w io.Writer, flush func(), body io.Reader, model, rid string) {
scan := newOnyxScanner(body)
blockIdx := 0
inThinking := false
inText := false
// message_start
w.Write(anthropicSSE("message_start", map[string]any{
"type": "message_start",
"message": map[string]any{
"id": rid, "type": "message", "role": "assistant", "model": model,
"content": []any{}, "stop_reason": nil,
"usage": map[string]any{"input_tokens": 0, "output_tokens": 0},
},
}))
flush()
startBlock := func(btype string) {
block := map[string]any{"type": btype}
if btype == "thinking" {
block["thinking"] = ""
} else {
block["text"] = ""
}
w.Write(anthropicSSE("content_block_start", map[string]any{
"type": "content_block_start", "index": blockIdx, "content_block": block,
}))
flush()
}
stopBlock := func() {
w.Write(anthropicSSE("content_block_stop", map[string]any{
"type": "content_block_stop", "index": blockIdx,
}))
flush()
blockIdx++
}
finishMsg := func(reason string) {
if inThinking {
stopBlock()
inThinking = false
}
if inText {
stopBlock()
inText = false
}
w.Write(anthropicSSE("message_delta", map[string]any{
"type": "message_delta",
"delta": map[string]any{"stop_reason": reason},
"usage": map[string]any{"output_tokens": 0},
}))
w.Write(anthropicSSE("message_stop", map[string]any{"type": "message_stop"}))
flush()
}
for {
ev, ok := scan.Next()
if !ok {
break
}
if ev.Err != "" {
if !inText {
startBlock("text")
inText = true
}
w.Write(anthropicSSE("content_block_delta", map[string]any{
"type": "content_block_delta", "index": blockIdx,
"delta": map[string]any{"type": "text_delta", "text": "\n[Error: " + ev.Err + "]"},
}))
flush()
finishMsg("end_turn")
return
}
obj := ev.Obj
switch ev.Type {
case "reasoning_start":
if !inThinking {
startBlock("thinking")
inThinking = true
}
case "reasoning_delta":
if s, _ := obj["reasoning"].(string); s != "" {
if !inThinking {
startBlock("thinking")
inThinking = true
}
w.Write(anthropicSSE("content_block_delta", map[string]any{
"type": "content_block_delta", "index": blockIdx,
"delta": map[string]any{"type": "thinking_delta", "thinking": s},
}))
flush()
}
case "reasoning_done":
if inThinking {
stopBlock()
inThinking = false
}
case "message_start":
if !inText {
if inThinking {
stopBlock()
inThinking = false
}
startBlock("text")
inText = true
}
case "message_delta":
if c, _ := obj["content"].(string); c != "" {
if !inText {
if inThinking {
stopBlock()
inThinking = false
}
startBlock("text")
inText = true
}
w.Write(anthropicSSE("content_block_delta", map[string]any{
"type": "content_block_delta", "index": blockIdx,
"delta": map[string]any{"type": "text_delta", "text": c},
}))
flush()
}
// ── Tool events β†’ text deltas ──
case "search_tool_start":
if !inText {
if inThinking {
stopBlock()
inThinking = false
}
startBlock("text")
inText = true
}
label := "Internal Search"
if v, _ := obj["is_internet_search"].(bool); v {
label = "Web Search"
}
w.Write(anthropicSSE("content_block_delta", map[string]any{
"type": "content_block_delta", "index": blockIdx,
"delta": map[string]any{"type": "text_delta", "text": "\n[" + label + "] "},
}))
flush()
case "search_tool_queries_delta":
if qs := toStrSlice(obj["queries"]); len(qs) > 0 {
var q []string
for _, s := range qs {
q = append(q, "\u201c"+s+"\u201d")
}
w.Write(anthropicSSE("content_block_delta", map[string]any{
"type": "content_block_delta", "index": blockIdx,
"delta": map[string]any{"type": "text_delta", "text": "Searching: " + strings.Join(q, ", ") + "\n"},
}))
flush()
}
case "search_tool_documents_delta":
if docs, ok := obj["documents"].([]any); ok && len(docs) > 0 {
w.Write(anthropicSSE("content_block_delta", map[string]any{
"type": "content_block_delta", "index": blockIdx,
"delta": map[string]any{"type": "text_delta", "text": fmt.Sprintf("Found %d results.\n", len(docs))},
}))
flush()
}
case "open_url_start", "python_tool_start", "custom_tool_start",
"image_generation_start", "file_reader_start", "deep_research_plan_start":
if !inText {
if inThinking {
stopBlock()
inThinking = false
}
startBlock("text")
inText = true
}
var label string
switch ev.Type {
case "open_url_start":
label = "[Opening URL] "
case "python_tool_start":
code, _ := obj["code"].(string)
label = "[Code Interpreter]\n"
if code != "" {
label += "```python\n" + code + "\n```\n"
}
case "custom_tool_start":
tn, _ := obj["tool_name"].(string)
if tn == "" {
tn = "custom_tool"
}
label = "[Tool: " + tn + "]\n"
case "image_generation_start":
label = "[Generating Image...]\n"
case "file_reader_start":
label = "[Reading File...]\n"
case "deep_research_plan_start":
label = "[Research Plan]\n"
}
w.Write(anthropicSSE("content_block_delta", map[string]any{
"type": "content_block_delta", "index": blockIdx,
"delta": map[string]any{"type": "text_delta", "text": "\n" + label},
}))
flush()
case "open_url_urls":
if urls := toStrSlice(obj["urls"]); len(urls) > 0 {
w.Write(anthropicSSE("content_block_delta", map[string]any{
"type": "content_block_delta", "index": blockIdx,
"delta": map[string]any{"type": "text_delta", "text": strings.Join(urls, ", ") + "\n"},
}))
flush()
}
case "open_url_documents":
if docs, ok := obj["documents"].([]any); ok && len(docs) > 0 {
w.Write(anthropicSSE("content_block_delta", map[string]any{
"type": "content_block_delta", "index": blockIdx,
"delta": map[string]any{"type": "text_delta", "text": fmt.Sprintf("Loaded %d pages.\n", len(docs))},
}))
flush()
}
case "python_tool_delta":
var parts []string
if s, _ := obj["stdout"].(string); s != "" {
parts = append(parts, "Output: "+s)
}
if s, _ := obj["stderr"].(string); s != "" {
parts = append(parts, "Error: "+s)
}
if len(parts) > 0 {
w.Write(anthropicSSE("content_block_delta", map[string]any{
"type": "content_block_delta", "index": blockIdx,
"delta": map[string]any{"type": "text_delta", "text": strings.Join(parts, "\n") + "\n"},
}))
flush()
}
case "custom_tool_delta":
if td := obj["data"]; td != nil {
var text string
if m, ok := td.(map[string]any); ok {
b, _ := json.Marshal(m)
text = string(b)
} else {
text = fmt.Sprint(td)
}
if text != "" {
w.Write(anthropicSSE("content_block_delta", map[string]any{
"type": "content_block_delta", "index": blockIdx,
"delta": map[string]any{"type": "text_delta", "text": text + "\n"},
}))
flush()
}
}
case "image_generation_final":
if images, ok := obj["images"].([]any); ok {
for _, img := range images {
if m, ok := img.(map[string]any); ok {
u, _ := m["url"].(string)
p, _ := m["revised_prompt"].(string)
if u != "" {
w.Write(anthropicSSE("content_block_delta", map[string]any{
"type": "content_block_delta", "index": blockIdx,
"delta": map[string]any{"type": "text_delta", "text": fmt.Sprintf("![%s](%s)\n", p, u)},
}))
flush()
}
}
}
}
case "file_reader_result":
if fn, _ := obj["file_name"].(string); fn != "" {
w.Write(anthropicSSE("content_block_delta", map[string]any{
"type": "content_block_delta", "index": blockIdx,
"delta": map[string]any{"type": "text_delta", "text": "Read: " + fn + "\n"},
}))
flush()
}
case "deep_research_plan_delta", "intermediate_report_start",
"intermediate_report_delta":
if c, _ := obj["content"].(string); c != "" {
w.Write(anthropicSSE("content_block_delta", map[string]any{
"type": "content_block_delta", "index": blockIdx,
"delta": map[string]any{"type": "text_delta", "text": c},
}))
flush()
}
case "research_agent_start":
task, _ := obj["research_task"].(string)
w.Write(anthropicSSE("content_block_delta", map[string]any{
"type": "content_block_delta", "index": blockIdx,
"delta": map[string]any{"type": "text_delta", "text": "\n[Researching: " + task + "]\n"},
}))
flush()
case "image_generation_heartbeat", "memory_tool_start", "memory_tool_delta",
"memory_tool_no_access", "citation_info", "top_level_branching",
"intermediate_report_cited_docs", "tool_call_debug":
case "section_end":
// skip
case "stop":
finishMsg("end_turn")
return
case "error":
msg, _ := obj["error"].(string)
if msg == "" {
msg = "Unknown error"
}
if !inText {
startBlock("text")
inText = true
}
w.Write(anthropicSSE("content_block_delta", map[string]any{
"type": "content_block_delta", "index": blockIdx,
"delta": map[string]any{"type": "text_delta", "text": "\n[Error: " + msg + "]"},
}))
flush()
finishMsg("end_turn")
return
default:
if ev.Type != "" {
log.Printf("Unknown SSE event: %s", ev.Type)
}
}
}
finishMsg("end_turn")
}
func collectAnthropic(body io.Reader) (text, thinking string) {
var textParts, thinkParts, toolCtx []string
scan := newOnyxScanner(body)
for {
ev, ok := scan.Next()
if !ok {
break
}
if ev.Err != "" {
textParts = append(textParts, "\n[Error: "+ev.Err+"]")
break
}
obj := ev.Obj
switch ev.Type {
case "reasoning_delta":
if s, _ := obj["reasoning"].(string); s != "" {
thinkParts = append(thinkParts, s)
}
case "message_delta":
if c, _ := obj["content"].(string); c != "" {
textParts = append(textParts, c)
}
case "search_tool_start":
label := "Internal Search"
if v, _ := obj["is_internet_search"].(bool); v {
label = "Web Search"
}
toolCtx = append(toolCtx, "["+label+"]")
case "search_tool_queries_delta":
if qs := toStrSlice(obj["queries"]); len(qs) > 0 {
toolCtx = append(toolCtx, "Searching: "+strings.Join(qs, ", "))
}
case "search_tool_documents_delta":
if docs, ok := obj["documents"].([]any); ok && len(docs) > 0 {
toolCtx = append(toolCtx, fmt.Sprintf("Found %d results.", len(docs)))
}
case "open_url_start":
toolCtx = append(toolCtx, "[Opening URL]")
case "open_url_urls":
if urls := toStrSlice(obj["urls"]); len(urls) > 0 {
toolCtx = append(toolCtx, strings.Join(urls, ", "))
}
case "python_tool_start":
code, _ := obj["code"].(string)
toolCtx = append(toolCtx, "[Code Interpreter]\n```python\n"+code+"\n```")
case "python_tool_delta":
if s, _ := obj["stdout"].(string); s != "" {
toolCtx = append(toolCtx, "Output: "+s)
}
if s, _ := obj["stderr"].(string); s != "" {
toolCtx = append(toolCtx, "Error: "+s)
}
case "image_generation_final":
if images, ok := obj["images"].([]any); ok {
for _, img := range images {
if m, ok := img.(map[string]any); ok {
u, _ := m["url"].(string)
p, _ := m["revised_prompt"].(string)
if u != "" {
toolCtx = append(toolCtx, fmt.Sprintf("![%s](%s)", p, u))
}
}
}
}
case "custom_tool_start":
tn, _ := obj["tool_name"].(string)
toolCtx = append(toolCtx, "[Tool: "+tn+"]")
case "custom_tool_delta":
if td := obj["data"]; td != nil {
toolCtx = append(toolCtx, fmt.Sprint(td))
}
case "error":
msg, _ := obj["error"].(string)
textParts = append(textParts, "\n[Error: "+msg+"]")
case "stop":
goto done
}
}
done:
var tb strings.Builder
if len(toolCtx) > 0 {
tb.WriteString(strings.Join(toolCtx, "\n") + "\n\n")
}
for _, p := range textParts {
tb.WriteString(p)
}
return tb.String(), strings.Join(thinkParts, "")
}
// ══════════════════════════════════════════════════════════════════════════
// HTTP HANDLERS
// ══════════════════════════════════════════════════════════════════════════
func handleModels(w http.ResponseWriter, r *http.Request) {
models := []string{
"claude-opus-4-6", "claude-opus-4-5",
"claude-sonnet-4-6", "claude-sonnet-4-5", "claude-sonnet-4",
"claude-3-5-sonnet", "claude-3-5-haiku", "claude-haiku-4-5",
"gpt-4o", "gpt-4o-mini", "o1", "o3-mini",
"gemini-2.0-flash", "gemini-2.5-pro",
}
data := make([]any, len(models))
for i, m := range models {
data[i] = map[string]any{"id": m, "object": "model", "created": 1700000000, "owned_by": "onyx"}
}
writeJSON(w, 200, map[string]any{"object": "list", "data": data})
}
func handleHealth(w http.ResponseWriter, r *http.Request) {
writeJSON(w, 200, map[string]any{"status": "ok", "version": ver, "keys": len(keys)})
}
func handleRoot(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
writeJSON(w, 200, map[string]any{
"name": "onyx2api",
"version": ver,
"endpoints": []string{"/v1/chat/completions", "/v1/messages", "/v1/models", "/health"},
})
}
// ── OpenAI: POST /v1/chat/completions ──
type openaiReq struct {
Model string `json:"model"`
Messages []chatMsg `json:"messages"`
Stream bool `json:"stream"`
Temperature *float64 `json:"temperature,omitempty"`
PersonaID *int `json:"persona_id,omitempty"`
}
func handleOpenAI(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeJSON(w, 405, map[string]any{"error": "method not allowed"})
return
}
if !checkClientAuth(r) {
writeJSON(w, 401, map[string]any{"error": "invalid api key"})
return
}
var req openaiReq
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeJSON(w, 400, map[string]any{"error": "invalid JSON"})
return
}
token := resolveAuth(r.Header.Get("Authorization"))
if token == "" {
writeJSON(w, 401, map[string]any{"error": "No auth. Set ONYX_KEYS env var."})
return
}
if len(req.Messages) == 0 {
writeJSON(w, 400, map[string]any{"error": "messages is required"})
return
}
model := req.Model
if model == "" {
model = "claude-opus-4-6"
}
temp := 0.5
if req.Temperature != nil {
temp = *req.Temperature
}
persona := defaultPersona
if req.PersonaID != nil {
persona = *req.PersonaID
}
rid := genID("chatcmpl-")
resp, err := doOnyxRequest(r.Context(), token, model, req.Messages, "", temp, persona)
if err != nil {
if oe, ok := err.(*onyxError); ok {
writeJSON(w, oe.Status, map[string]any{"error": oe.Error(), "detail": oe.Body})
} else {
writeJSON(w, 502, map[string]any{"error": err.Error()})
}
return
}
defer resp.Body.Close()
if req.Stream {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
flusher, ok := w.(http.Flusher)
if !ok {
writeJSON(w, 500, map[string]any{"error": "streaming unsupported"})
return
}
streamOpenAI(w, flusher.Flush, resp.Body, model, rid)
} else {
content := collectOpenAI(resp.Body)
writeJSON(w, 200, map[string]any{
"id": rid, "object": "chat.completion", "created": time.Now().Unix(), "model": model,
"choices": []any{map[string]any{
"index": 0, "message": map[string]any{"role": "assistant", "content": content},
"finish_reason": "stop",
}},
"usage": map[string]any{"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0},
})
}
}
// ── Anthropic: POST /v1/messages ──
type anthropicReq struct {
Model string `json:"model"`
Messages []chatMsg `json:"messages"`
System any `json:"system,omitempty"` // string or []content_block
Stream bool `json:"stream"`
MaxTokens int `json:"max_tokens,omitempty"`
Temperature *float64 `json:"temperature,omitempty"`
PersonaID *int `json:"persona_id,omitempty"`
}
func handleAnthropic(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeJSON(w, 405, map[string]any{"error": map[string]any{"type": "invalid_request_error", "message": "method not allowed"}})
return
}
if !checkClientAuth(r) {
writeJSON(w, 401, map[string]any{"type": "error", "error": map[string]any{"type": "authentication_error", "message": "invalid api key"}})
return
}
var req anthropicReq
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeJSON(w, 400, map[string]any{"type": "error", "error": map[string]any{"type": "invalid_request_error", "message": "invalid JSON"}})
return
}
token := resolveAuth(r.Header.Get("x-api-key"), r.Header.Get("Authorization"))
if token == "" {
writeJSON(w, 401, map[string]any{"type": "error", "error": map[string]any{"type": "authentication_error", "message": "No auth. Set ONYX_KEYS env var."}})
return
}
if len(req.Messages) == 0 {
writeJSON(w, 400, map[string]any{"type": "error", "error": map[string]any{"type": "invalid_request_error", "message": "messages is required"}})
return
}
model := req.Model
if model == "" {
model = "claude-opus-4-6"
}
temp := 0.5
if req.Temperature != nil {
temp = *req.Temperature
}
persona := defaultPersona
if req.PersonaID != nil {
persona = *req.PersonaID
}
system := textContent(req.System)
rid := genID("msg_")
resp, err := doOnyxRequest(r.Context(), token, model, req.Messages, system, temp, persona)
if err != nil {
if oe, ok := err.(*onyxError); ok {
writeJSON(w, oe.Status, map[string]any{"type": "error", "error": map[string]any{"type": "api_error", "message": oe.Error()}})
} else {
writeJSON(w, 502, map[string]any{"type": "error", "error": map[string]any{"type": "api_error", "message": err.Error()}})
}
return
}
defer resp.Body.Close()
if req.Stream {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
flusher, ok := w.(http.Flusher)
if !ok {
writeJSON(w, 500, map[string]any{"type": "error", "error": map[string]any{"type": "api_error", "message": "streaming unsupported"}})
return
}
streamAnthropic(w, flusher.Flush, resp.Body, model, rid)
} else {
text, thinking := collectAnthropic(resp.Body)
var content []any
if thinking != "" {
content = append(content, map[string]any{"type": "thinking", "thinking": thinking})
}
content = append(content, map[string]any{"type": "text", "text": text})
writeJSON(w, 200, map[string]any{
"id": rid, "type": "message", "role": "assistant", "model": model,
"content": content,
"stop_reason": "end_turn",
"usage": map[string]any{"input_tokens": 0, "output_tokens": 0},
})
}
}
// ── Main ────────────────────────────────────────────────────────────────
func main() {
if env := os.Getenv("ONYX_KEYS"); env != "" {
for _, k := range strings.Split(env, ",") {
if k = strings.TrimSpace(k); k != "" {
keys = append(keys, k)
}
}
}
if env := firstNonEmpty(os.Getenv("CLIENT_API_KEYS"), os.Getenv("CLIENT_API_KEY")); env != "" {
clientKeys = map[string]struct{}{}
for _, k := range strings.Split(env, ",") {
if k = strings.TrimSpace(k); k != "" {
clientKeys[k] = struct{}{}
}
}
}
if len(clientKeys) > 0 && len(keys) == 0 {
log.Fatal("CLIENT_API_KEYS/CLIENT_API_KEY requires ONYX_KEYS to be set")
}
listenAddr := resolveListenAddr()
mux := http.NewServeMux()
mux.HandleFunc("/", handleRoot)
mux.HandleFunc("/v1/chat/completions", handleOpenAI)
mux.HandleFunc("/v1/messages", handleAnthropic)
mux.HandleFunc("/v1/models", handleModels)
mux.HandleFunc("/health", handleHealth)
srv := &http.Server{
Addr: listenAddr,
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
ReadTimeout: 60 * time.Second,
IdleTimeout: 120 * time.Second,
}
log.Printf("onyx2api v%s | onyx_keys=%d | client_keys=%d | listen=%s", ver, len(keys), len(clientKeys), listenAddr)
log.Fatal(srv.ListenAndServe())
}
func firstNonEmpty(vals ...string) string {
for _, v := range vals {
if strings.TrimSpace(v) != "" {
return v
}
}
return ""
}
func resolveListenAddr() string {
if addr := strings.TrimSpace(os.Getenv("LISTEN_ADDR")); addr != "" {
return addr
}
port := strings.TrimSpace(os.Getenv("PORT"))
if port == "" {
port = "9898"
}
if strings.Contains(port, ":") {
return port
}
return ":" + port
}