File size: 33,976 Bytes
be4122e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
"""OpenAI-compatible facade that talks to Google's Cloud Code Assist backend.

This adapter lets Hermes use the ``google-gemini-cli`` provider as if it were
a standard OpenAI-shaped chat completion endpoint, while the underlying HTTP
traffic goes to ``cloudcode-pa.googleapis.com/v1internal:{generateContent,
streamGenerateContent}`` with a Bearer access token obtained via OAuth PKCE.

Architecture
------------
- ``GeminiCloudCodeClient`` exposes ``.chat.completions.create(**kwargs)``
  mirroring the subset of the OpenAI SDK that ``run_agent.py`` uses.
- Incoming OpenAI ``messages[]`` / ``tools[]`` / ``tool_choice`` are translated
  to Gemini's native ``contents[]`` / ``tools[].functionDeclarations`` /
  ``toolConfig`` / ``systemInstruction`` shape.
- The request body is wrapped ``{project, model, user_prompt_id, request}``
  per Code Assist API expectations.
- Responses (``candidates[].content.parts[]``) are converted back to
  OpenAI ``choices[0].message`` shape with ``content`` + ``tool_calls``.
- Streaming uses SSE (``?alt=sse``) and yields OpenAI-shaped delta chunks.

Attribution
-----------
Translation semantics follow jenslys/opencode-gemini-auth (MIT) and the public
Gemini API docs. Request envelope shape
(``{project, model, user_prompt_id, request}``) is documented nowhere; it is
reverse-engineered from the opencode-gemini-auth and clawdbot implementations.
"""

from __future__ import annotations

import json
import logging
import os
import time
import uuid
from types import SimpleNamespace
from typing import Any, Dict, Iterator, List, Optional

import httpx

from agent import google_oauth
from agent.gemini_schema import sanitize_gemini_tool_parameters
from agent.google_code_assist import (
    CODE_ASSIST_ENDPOINT,
    FREE_TIER_ID,
    CodeAssistError,
    ProjectContext,
    resolve_project_context,
)

logger = logging.getLogger(__name__)


# =============================================================================
# Request translation: OpenAI β†’ Gemini
# =============================================================================

_ROLE_MAP_OPENAI_TO_GEMINI = {
    "user": "user",
    "assistant": "model",
    "system": "user",   # handled separately via systemInstruction
    "tool": "user",     # functionResponse is wrapped in a user-role turn
    "function": "user",
}


def _coerce_content_to_text(content: Any) -> str:
    """OpenAI content may be str or a list of parts; reduce to plain text."""
    if content is None:
        return ""
    if isinstance(content, str):
        return content
    if isinstance(content, list):
        pieces: List[str] = []
        for p in content:
            if isinstance(p, str):
                pieces.append(p)
            elif isinstance(p, dict):
                if p.get("type") == "text" and isinstance(p.get("text"), str):
                    pieces.append(p["text"])
                # Multimodal (image_url, etc.) β€” stub for now; log and skip
                elif p.get("type") in ("image_url", "input_audio"):
                    logger.debug("Dropping multimodal part (not yet supported): %s", p.get("type"))
        return "\n".join(pieces)
    return str(content)


def _translate_tool_call_to_gemini(tool_call: Dict[str, Any]) -> Dict[str, Any]:
    """OpenAI tool_call -> Gemini functionCall part."""
    fn = tool_call.get("function") or {}
    args_raw = fn.get("arguments", "")
    try:
        args = json.loads(args_raw) if isinstance(args_raw, str) and args_raw else {}
    except json.JSONDecodeError:
        args = {"_raw": args_raw}
    if not isinstance(args, dict):
        args = {"_value": args}
    return {
        "functionCall": {
            "name": fn.get("name") or "",
            "args": args,
        },
        # Sentinel signature β€” matches opencode-gemini-auth's approach.
        # Without this, Code Assist rejects function calls that originated
        # outside its own chain.
        "thoughtSignature": "skip_thought_signature_validator",
    }


def _translate_tool_result_to_gemini(message: Dict[str, Any]) -> Dict[str, Any]:
    """OpenAI tool-role message -> Gemini functionResponse part.

    The function name isn't in the OpenAI tool message directly; it must be
    passed via the assistant message that issued the call. For simplicity we
    look up ``name`` on the message (OpenAI SDK copies it there) or on the
    ``tool_call_id`` cross-reference.
    """
    name = str(message.get("name") or message.get("tool_call_id") or "tool")
    content = _coerce_content_to_text(message.get("content"))
    # Gemini expects the response as a dict under `response`. We wrap plain
    # text in {"output": "..."}.
    try:
        parsed = json.loads(content) if content.strip().startswith(("{", "[")) else None
    except json.JSONDecodeError:
        parsed = None
    response = parsed if isinstance(parsed, dict) else {"output": content}
    return {
        "functionResponse": {
            "name": name,
            "response": response,
        },
    }


def _build_gemini_contents(
    messages: List[Dict[str, Any]],
) -> tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]:
    """Convert OpenAI messages[] to Gemini contents[] + systemInstruction."""
    system_text_parts: List[str] = []
    contents: List[Dict[str, Any]] = []

    for msg in messages:
        if not isinstance(msg, dict):
            continue
        role = str(msg.get("role") or "user")

        if role == "system":
            system_text_parts.append(_coerce_content_to_text(msg.get("content")))
            continue

        # Tool result message β€” emit a user-role turn with functionResponse
        if role == "tool" or role == "function":
            contents.append({
                "role": "user",
                "parts": [_translate_tool_result_to_gemini(msg)],
            })
            continue

        gemini_role = _ROLE_MAP_OPENAI_TO_GEMINI.get(role, "user")
        parts: List[Dict[str, Any]] = []

        text = _coerce_content_to_text(msg.get("content"))
        if text:
            parts.append({"text": text})

        # Assistant messages can carry tool_calls
        tool_calls = msg.get("tool_calls") or []
        if isinstance(tool_calls, list):
            for tc in tool_calls:
                if isinstance(tc, dict):
                    parts.append(_translate_tool_call_to_gemini(tc))

        if not parts:
            # Gemini rejects empty parts; skip the turn entirely
            continue

        contents.append({"role": gemini_role, "parts": parts})

    system_instruction: Optional[Dict[str, Any]] = None
    joined_system = "\n".join(p for p in system_text_parts if p).strip()
    if joined_system:
        system_instruction = {
            "role": "system",
            "parts": [{"text": joined_system}],
        }

    return contents, system_instruction


def _translate_tools_to_gemini(tools: Any) -> List[Dict[str, Any]]:
    """OpenAI tools[] -> Gemini tools[].functionDeclarations[]."""
    if not isinstance(tools, list) or not tools:
        return []
    declarations: List[Dict[str, Any]] = []
    for t in tools:
        if not isinstance(t, dict):
            continue
        fn = t.get("function") or {}
        if not isinstance(fn, dict):
            continue
        name = fn.get("name")
        if not name:
            continue
        decl = {"name": str(name)}
        if fn.get("description"):
            decl["description"] = str(fn["description"])
        params = fn.get("parameters")
        if isinstance(params, dict):
            decl["parameters"] = sanitize_gemini_tool_parameters(params)
        declarations.append(decl)
    if not declarations:
        return []
    return [{"functionDeclarations": declarations}]


def _translate_tool_choice_to_gemini(tool_choice: Any) -> Optional[Dict[str, Any]]:
    """OpenAI tool_choice -> Gemini toolConfig.functionCallingConfig."""
    if tool_choice is None:
        return None
    if isinstance(tool_choice, str):
        if tool_choice == "auto":
            return {"functionCallingConfig": {"mode": "AUTO"}}
        if tool_choice == "required":
            return {"functionCallingConfig": {"mode": "ANY"}}
        if tool_choice == "none":
            return {"functionCallingConfig": {"mode": "NONE"}}
    if isinstance(tool_choice, dict):
        fn = tool_choice.get("function") or {}
        name = fn.get("name")
        if name:
            return {
                "functionCallingConfig": {
                    "mode": "ANY",
                    "allowedFunctionNames": [str(name)],
                },
            }
    return None


def _normalize_thinking_config(config: Any) -> Optional[Dict[str, Any]]:
    """Accept thinkingBudget / thinkingLevel / includeThoughts (+ snake_case)."""
    if not isinstance(config, dict) or not config:
        return None
    budget = config.get("thinkingBudget", config.get("thinking_budget"))
    level = config.get("thinkingLevel", config.get("thinking_level"))
    include = config.get("includeThoughts", config.get("include_thoughts"))
    normalized: Dict[str, Any] = {}
    if isinstance(budget, (int, float)):
        normalized["thinkingBudget"] = int(budget)
    if isinstance(level, str) and level.strip():
        normalized["thinkingLevel"] = level.strip().lower()
    if isinstance(include, bool):
        normalized["includeThoughts"] = include
    return normalized or None


def build_gemini_request(
    *,
    messages: List[Dict[str, Any]],
    tools: Any = None,
    tool_choice: Any = None,
    temperature: Optional[float] = None,
    max_tokens: Optional[int] = None,
    top_p: Optional[float] = None,
    stop: Any = None,
    thinking_config: Any = None,
) -> Dict[str, Any]:
    """Build the inner Gemini request body (goes inside ``request`` wrapper)."""
    contents, system_instruction = _build_gemini_contents(messages)

    body: Dict[str, Any] = {"contents": contents}
    if system_instruction is not None:
        body["systemInstruction"] = system_instruction

    gemini_tools = _translate_tools_to_gemini(tools)
    if gemini_tools:
        body["tools"] = gemini_tools
    tool_cfg = _translate_tool_choice_to_gemini(tool_choice)
    if tool_cfg is not None:
        body["toolConfig"] = tool_cfg

    generation_config: Dict[str, Any] = {}
    if isinstance(temperature, (int, float)):
        generation_config["temperature"] = float(temperature)
    if isinstance(max_tokens, int) and max_tokens > 0:
        generation_config["maxOutputTokens"] = max_tokens
    if isinstance(top_p, (int, float)):
        generation_config["topP"] = float(top_p)
    if isinstance(stop, str) and stop:
        generation_config["stopSequences"] = [stop]
    elif isinstance(stop, list) and stop:
        generation_config["stopSequences"] = [str(s) for s in stop if s]
    normalized_thinking = _normalize_thinking_config(thinking_config)
    if normalized_thinking:
        generation_config["thinkingConfig"] = normalized_thinking
    if generation_config:
        body["generationConfig"] = generation_config

    return body


def wrap_code_assist_request(
    *,
    project_id: str,
    model: str,
    inner_request: Dict[str, Any],
    user_prompt_id: Optional[str] = None,
) -> Dict[str, Any]:
    """Wrap the inner Gemini request in the Code Assist envelope."""
    return {
        "project": project_id,
        "model": model,
        "user_prompt_id": user_prompt_id or str(uuid.uuid4()),
        "request": inner_request,
    }


# =============================================================================
# Response translation: Gemini β†’ OpenAI
# =============================================================================

def _translate_gemini_response(
    resp: Dict[str, Any],
    model: str,
) -> SimpleNamespace:
    """Non-streaming Gemini response -> OpenAI-shaped SimpleNamespace.

    Code Assist wraps the actual Gemini response inside ``response``, so we
    unwrap it first if present.
    """
    inner = resp.get("response") if isinstance(resp.get("response"), dict) else resp

    candidates = inner.get("candidates") or []
    if not isinstance(candidates, list) or not candidates:
        return _empty_response(model)

    cand = candidates[0]
    content_obj = cand.get("content") if isinstance(cand, dict) else {}
    parts = content_obj.get("parts") if isinstance(content_obj, dict) else []

    text_pieces: List[str] = []
    reasoning_pieces: List[str] = []
    tool_calls: List[SimpleNamespace] = []

    for i, part in enumerate(parts or []):
        if not isinstance(part, dict):
            continue
        # Thought parts are model's internal reasoning β€” surface as reasoning,
        # don't mix into content.
        if part.get("thought") is True:
            if isinstance(part.get("text"), str):
                reasoning_pieces.append(part["text"])
            continue
        if isinstance(part.get("text"), str):
            text_pieces.append(part["text"])
            continue
        fc = part.get("functionCall")
        if isinstance(fc, dict) and fc.get("name"):
            try:
                args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False)
            except (TypeError, ValueError):
                args_str = "{}"
            tool_calls.append(SimpleNamespace(
                id=f"call_{uuid.uuid4().hex[:12]}",
                type="function",
                index=i,
                function=SimpleNamespace(name=str(fc["name"]), arguments=args_str),
            ))

    finish_reason = "tool_calls" if tool_calls else _map_gemini_finish_reason(
        str(cand.get("finishReason") or "")
    )

    usage_meta = inner.get("usageMetadata") or {}
    usage = SimpleNamespace(
        prompt_tokens=int(usage_meta.get("promptTokenCount") or 0),
        completion_tokens=int(usage_meta.get("candidatesTokenCount") or 0),
        total_tokens=int(usage_meta.get("totalTokenCount") or 0),
        prompt_tokens_details=SimpleNamespace(
            cached_tokens=int(usage_meta.get("cachedContentTokenCount") or 0),
        ),
    )

    message = SimpleNamespace(
        role="assistant",
        content="".join(text_pieces) if text_pieces else None,
        tool_calls=tool_calls or None,
        reasoning="".join(reasoning_pieces) or None,
        reasoning_content="".join(reasoning_pieces) or None,
        reasoning_details=None,
    )
    choice = SimpleNamespace(
        index=0,
        message=message,
        finish_reason=finish_reason,
    )
    return SimpleNamespace(
        id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
        object="chat.completion",
        created=int(time.time()),
        model=model,
        choices=[choice],
        usage=usage,
    )


def _empty_response(model: str) -> SimpleNamespace:
    message = SimpleNamespace(
        role="assistant", content="", tool_calls=None,
        reasoning=None, reasoning_content=None, reasoning_details=None,
    )
    choice = SimpleNamespace(index=0, message=message, finish_reason="stop")
    usage = SimpleNamespace(
        prompt_tokens=0, completion_tokens=0, total_tokens=0,
        prompt_tokens_details=SimpleNamespace(cached_tokens=0),
    )
    return SimpleNamespace(
        id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
        object="chat.completion",
        created=int(time.time()),
        model=model,
        choices=[choice],
        usage=usage,
    )


def _map_gemini_finish_reason(reason: str) -> str:
    mapping = {
        "STOP": "stop",
        "MAX_TOKENS": "length",
        "SAFETY": "content_filter",
        "RECITATION": "content_filter",
        "OTHER": "stop",
    }
    return mapping.get(reason.upper(), "stop")


# =============================================================================
# Streaming SSE iterator
# =============================================================================

class _GeminiStreamChunk(SimpleNamespace):
    """Mimics an OpenAI ChatCompletionChunk with .choices[0].delta."""
    pass


def _make_stream_chunk(
    *,
    model: str,
    content: str = "",
    tool_call_delta: Optional[Dict[str, Any]] = None,
    finish_reason: Optional[str] = None,
    reasoning: str = "",
) -> _GeminiStreamChunk:
    delta_kwargs: Dict[str, Any] = {"role": "assistant"}
    if content:
        delta_kwargs["content"] = content
    if tool_call_delta is not None:
        delta_kwargs["tool_calls"] = [SimpleNamespace(
            index=tool_call_delta.get("index", 0),
            id=tool_call_delta.get("id") or f"call_{uuid.uuid4().hex[:12]}",
            type="function",
            function=SimpleNamespace(
                name=tool_call_delta.get("name") or "",
                arguments=tool_call_delta.get("arguments") or "",
            ),
        )]
    if reasoning:
        delta_kwargs["reasoning"] = reasoning
        delta_kwargs["reasoning_content"] = reasoning
    delta = SimpleNamespace(**delta_kwargs)
    choice = SimpleNamespace(index=0, delta=delta, finish_reason=finish_reason)
    return _GeminiStreamChunk(
        id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
        object="chat.completion.chunk",
        created=int(time.time()),
        model=model,
        choices=[choice],
        usage=None,
    )


def _iter_sse_events(response: httpx.Response) -> Iterator[Dict[str, Any]]:
    """Parse Server-Sent Events from an httpx streaming response."""
    buffer = ""
    for chunk in response.iter_text():
        if not chunk:
            continue
        buffer += chunk
        while "\n" in buffer:
            line, buffer = buffer.split("\n", 1)
            line = line.rstrip("\r")
            if not line:
                continue
            if line.startswith("data: "):
                data = line[6:]
                if data == "[DONE]":
                    return
                try:
                    yield json.loads(data)
                except json.JSONDecodeError:
                    logger.debug("Non-JSON SSE line: %s", data[:200])


def _translate_stream_event(
    event: Dict[str, Any],
    model: str,
    tool_call_counter: List[int],
) -> List[_GeminiStreamChunk]:
    """Unwrap Code Assist envelope and emit OpenAI-shaped chunk(s).

    ``tool_call_counter`` is a single-element list used as a mutable counter
    across events in the same stream. Each ``functionCall`` part gets a
    fresh, unique OpenAI ``index`` β€” keying by function name would collide
    whenever the model issues parallel calls to the same tool (e.g. reading
    three files in one turn).
    """
    inner = event.get("response") if isinstance(event.get("response"), dict) else event
    candidates = inner.get("candidates") or []
    if not candidates:
        return []
    cand = candidates[0]
    if not isinstance(cand, dict):
        return []

    chunks: List[_GeminiStreamChunk] = []

    content = cand.get("content") or {}
    parts = content.get("parts") if isinstance(content, dict) else []
    for part in parts or []:
        if not isinstance(part, dict):
            continue
        if part.get("thought") is True and isinstance(part.get("text"), str):
            chunks.append(_make_stream_chunk(
                model=model, reasoning=part["text"],
            ))
            continue
        if isinstance(part.get("text"), str) and part["text"]:
            chunks.append(_make_stream_chunk(model=model, content=part["text"]))
        fc = part.get("functionCall")
        if isinstance(fc, dict) and fc.get("name"):
            name = str(fc["name"])
            idx = tool_call_counter[0]
            tool_call_counter[0] += 1
            try:
                args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False)
            except (TypeError, ValueError):
                args_str = "{}"
            chunks.append(_make_stream_chunk(
                model=model,
                tool_call_delta={
                    "index": idx,
                    "name": name,
                    "arguments": args_str,
                },
            ))

    finish_reason_raw = str(cand.get("finishReason") or "")
    if finish_reason_raw:
        mapped = _map_gemini_finish_reason(finish_reason_raw)
        if tool_call_counter[0] > 0:
            mapped = "tool_calls"
        chunks.append(_make_stream_chunk(model=model, finish_reason=mapped))
    return chunks


# =============================================================================
# GeminiCloudCodeClient β€” OpenAI-compatible facade
# =============================================================================

MARKER_BASE_URL = "cloudcode-pa://google"


class _GeminiChatCompletions:
    def __init__(self, client: "GeminiCloudCodeClient"):
        self._client = client

    def create(self, **kwargs: Any) -> Any:
        return self._client._create_chat_completion(**kwargs)


class _GeminiChatNamespace:
    def __init__(self, client: "GeminiCloudCodeClient"):
        self.completions = _GeminiChatCompletions(client)


class GeminiCloudCodeClient:
    """Minimal OpenAI-SDK-compatible facade over Code Assist v1internal."""

    def __init__(
        self,
        *,
        api_key: Optional[str] = None,
        base_url: Optional[str] = None,
        default_headers: Optional[Dict[str, str]] = None,
        project_id: str = "",
        **_: Any,
    ):
        # `api_key` here is a dummy β€” real auth is the OAuth access token
        # fetched on every call via agent.google_oauth.get_valid_access_token().
        # We accept the kwarg for openai.OpenAI interface parity.
        self.api_key = api_key or "google-oauth"
        self.base_url = base_url or MARKER_BASE_URL
        self._default_headers = dict(default_headers or {})
        self._configured_project_id = project_id
        self._project_context: Optional[ProjectContext] = None
        self._project_context_lock = False  # simple single-thread guard
        self.chat = _GeminiChatNamespace(self)
        self.is_closed = False
        self._http = httpx.Client(timeout=httpx.Timeout(connect=15.0, read=600.0, write=30.0, pool=30.0))

    def close(self) -> None:
        self.is_closed = True
        try:
            self._http.close()
        except Exception:
            pass

    # Implement the OpenAI SDK's context-manager-ish closure check
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def _ensure_project_context(self, access_token: str, model: str) -> ProjectContext:
        """Lazily resolve and cache the project context for this client."""
        if self._project_context is not None:
            return self._project_context

        env_project = google_oauth.resolve_project_id_from_env()
        creds = google_oauth.load_credentials()
        stored_project = creds.project_id if creds else ""

        # Prefer what's already baked into the creds
        if stored_project:
            self._project_context = ProjectContext(
                project_id=stored_project,
                managed_project_id=creds.managed_project_id if creds else "",
                tier_id="",
                source="stored",
            )
            return self._project_context

        ctx = resolve_project_context(
            access_token,
            configured_project_id=self._configured_project_id,
            env_project_id=env_project,
            user_agent_model=model,
        )
        # Persist discovered project back to the creds file so the next
        # session doesn't re-run the discovery.
        if ctx.project_id or ctx.managed_project_id:
            google_oauth.update_project_ids(
                project_id=ctx.project_id,
                managed_project_id=ctx.managed_project_id,
            )
        self._project_context = ctx
        return ctx

    def _create_chat_completion(
        self,
        *,
        model: str = "gemini-2.5-flash",
        messages: Optional[List[Dict[str, Any]]] = None,
        stream: bool = False,
        tools: Any = None,
        tool_choice: Any = None,
        temperature: Optional[float] = None,
        max_tokens: Optional[int] = None,
        top_p: Optional[float] = None,
        stop: Any = None,
        extra_body: Optional[Dict[str, Any]] = None,
        timeout: Any = None,
        **_: Any,
    ) -> Any:
        access_token = google_oauth.get_valid_access_token()
        ctx = self._ensure_project_context(access_token, model)

        thinking_config = None
        if isinstance(extra_body, dict):
            thinking_config = extra_body.get("thinking_config") or extra_body.get("thinkingConfig")

        inner = build_gemini_request(
            messages=messages or [],
            tools=tools,
            tool_choice=tool_choice,
            temperature=temperature,
            max_tokens=max_tokens,
            top_p=top_p,
            stop=stop,
            thinking_config=thinking_config,
        )
        wrapped = wrap_code_assist_request(
            project_id=ctx.project_id,
            model=model,
            inner_request=inner,
        )

        headers = {
            "Content-Type": "application/json",
            "Accept": "application/json",
            "Authorization": f"Bearer {access_token}",
            "User-Agent": "hermes-agent (gemini-cli-compat)",
            "X-Goog-Api-Client": "gl-python/hermes",
            "x-activity-request-id": str(uuid.uuid4()),
        }
        headers.update(self._default_headers)

        if stream:
            return self._stream_completion(model=model, wrapped=wrapped, headers=headers)

        url = f"{CODE_ASSIST_ENDPOINT}/v1internal:generateContent"
        response = self._http.post(url, json=wrapped, headers=headers)
        if response.status_code != 200:
            raise _gemini_http_error(response)
        try:
            payload = response.json()
        except ValueError as exc:
            raise CodeAssistError(
                f"Invalid JSON from Code Assist: {exc}",
                code="code_assist_invalid_json",
            ) from exc
        return _translate_gemini_response(payload, model=model)

    def _stream_completion(
        self,
        *,
        model: str,
        wrapped: Dict[str, Any],
        headers: Dict[str, str],
    ) -> Iterator[_GeminiStreamChunk]:
        """Generator that yields OpenAI-shaped streaming chunks."""
        url = f"{CODE_ASSIST_ENDPOINT}/v1internal:streamGenerateContent?alt=sse"
        stream_headers = dict(headers)
        stream_headers["Accept"] = "text/event-stream"

        def _generator() -> Iterator[_GeminiStreamChunk]:
            try:
                with self._http.stream("POST", url, json=wrapped, headers=stream_headers) as response:
                    if response.status_code != 200:
                        # Materialize error body for better diagnostics
                        response.read()
                        raise _gemini_http_error(response)
                    tool_call_counter: List[int] = [0]
                    for event in _iter_sse_events(response):
                        for chunk in _translate_stream_event(event, model, tool_call_counter):
                            yield chunk
            except httpx.HTTPError as exc:
                raise CodeAssistError(
                    f"Streaming request failed: {exc}",
                    code="code_assist_stream_error",
                ) from exc

        return _generator()


def _gemini_http_error(response: httpx.Response) -> CodeAssistError:
    """Translate an httpx response into a CodeAssistError with rich metadata.

    Parses Google's error envelope (``{"error": {"code", "message", "status",
    "details": [...]}}``) so the agent's error classifier can reason about
    the failure β€” ``status_code`` enables the rate_limit / auth classification
    paths, and ``response`` lets the main loop honor ``Retry-After`` just
    like it does for OpenAI SDK exceptions.

    Also lifts a few recognizable Google conditions into human-readable
    messages so the user sees something better than a 500-char JSON dump:

        MODEL_CAPACITY_EXHAUSTED β†’ "Gemini model capacity exhausted for
            <model>. This is a Google-side throttle..."
        RESOURCE_EXHAUSTED w/o reason β†’ quota-style message
        404 β†’ "Model <name> not found at cloudcode-pa..."
    """
    status = response.status_code

    # Parse the body once, surviving any weird encodings.
    body_text = ""
    body_json: Dict[str, Any] = {}
    try:
        body_text = response.text
    except Exception:
        body_text = ""
    if body_text:
        try:
            parsed = json.loads(body_text)
            if isinstance(parsed, dict):
                body_json = parsed
        except (ValueError, TypeError):
            body_json = {}

    # Dig into Google's error envelope.  Shape is:
    #   {"error": {"code": 429, "message": "...", "status": "RESOURCE_EXHAUSTED",
    #              "details": [{"@type": ".../ErrorInfo", "reason": "MODEL_CAPACITY_EXHAUSTED",
    #                           "metadata": {...}},
    #                          {"@type": ".../RetryInfo", "retryDelay": "30s"}]}}
    err_obj = body_json.get("error") if isinstance(body_json, dict) else None
    if not isinstance(err_obj, dict):
        err_obj = {}
    err_status = str(err_obj.get("status") or "").strip()
    err_message = str(err_obj.get("message") or "").strip()
    _raw_details = err_obj.get("details")
    err_details_list = _raw_details if isinstance(_raw_details, list) else []

    # Extract google.rpc.ErrorInfo reason + metadata.  There may be more
    # than one ErrorInfo (rare), so we pick the first one with a reason.
    error_reason = ""
    error_metadata: Dict[str, Any] = {}
    retry_delay_seconds: Optional[float] = None
    for detail in err_details_list:
        if not isinstance(detail, dict):
            continue
        type_url = str(detail.get("@type") or "")
        if not error_reason and type_url.endswith("/google.rpc.ErrorInfo"):
            reason = detail.get("reason")
            if isinstance(reason, str) and reason:
                error_reason = reason
            md = detail.get("metadata")
            if isinstance(md, dict):
                error_metadata = md
        elif retry_delay_seconds is None and type_url.endswith("/google.rpc.RetryInfo"):
            # retryDelay is a google.protobuf.Duration string like "30s" or "1.5s".
            delay_raw = detail.get("retryDelay")
            if isinstance(delay_raw, str) and delay_raw.endswith("s"):
                try:
                    retry_delay_seconds = float(delay_raw[:-1])
                except ValueError:
                    pass
            elif isinstance(delay_raw, (int, float)):
                retry_delay_seconds = float(delay_raw)

    # Fall back to the Retry-After header if the body didn't include RetryInfo.
    if retry_delay_seconds is None:
        try:
            header_val = response.headers.get("Retry-After") or response.headers.get("retry-after")
        except Exception:
            header_val = None
        if header_val:
            try:
                retry_delay_seconds = float(header_val)
            except (TypeError, ValueError):
                retry_delay_seconds = None

    # Classify the error code.  ``code_assist_rate_limited`` stays the default
    # for 429s; a more specific reason tag helps downstream callers (e.g. tests,
    # logs) without changing the rate_limit classification path.
    code = f"code_assist_http_{status}"
    if status == 401:
        code = "code_assist_unauthorized"
    elif status == 429:
        code = "code_assist_rate_limited"
        if error_reason == "MODEL_CAPACITY_EXHAUSTED":
            code = "code_assist_capacity_exhausted"

    # Build a human-readable message.  Keep the status + a raw-body tail for
    # debugging, but lead with a friendlier summary when we recognize the
    # Google signal.
    model_hint = ""
    if isinstance(error_metadata, dict):
        model_hint = str(error_metadata.get("model") or error_metadata.get("modelId") or "").strip()

    if status == 429 and error_reason == "MODEL_CAPACITY_EXHAUSTED":
        target = model_hint or "this Gemini model"
        message = (
            f"Gemini capacity exhausted for {target} (Google-side throttle, "
            f"not a Hermes issue). Try a different Gemini model or set a "
            f"fallback_providers entry to a non-Gemini provider."
        )
        if retry_delay_seconds is not None:
            message += f" Google suggests retrying in {retry_delay_seconds:g}s."
    elif status == 429 and err_status == "RESOURCE_EXHAUSTED":
        message = (
            f"Gemini quota exhausted ({err_message or 'RESOURCE_EXHAUSTED'}). "
            f"Check /gquota for remaining daily requests."
        )
        if retry_delay_seconds is not None:
            message += f" Retry suggested in {retry_delay_seconds:g}s."
    elif status == 404:
        # Google returns 404 when a model has been retired or renamed.
        target = model_hint or (err_message or "model")
        message = (
            f"Code Assist 404: {target} is not available at "
            f"cloudcode-pa.googleapis.com. It may have been renamed or "
            f"retired. Check hermes_cli/models.py for the current list."
        )
    elif err_message:
        # Generic fallback with the parsed message.
        message = f"Code Assist HTTP {status} ({err_status or 'error'}): {err_message}"
    else:
        # Last-ditch fallback β€” raw body snippet.
        message = f"Code Assist returned HTTP {status}: {body_text[:500]}"

    return CodeAssistError(
        message,
        code=code,
        status_code=status,
        response=response,
        retry_after=retry_delay_seconds,
        details={
            "status": err_status,
            "reason": error_reason,
            "metadata": error_metadata,
            "message": err_message,
        },
    )