| | """ |
| | Context management for conversation history |
| | """ |
| |
|
| | import logging |
| | import os |
| | import zoneinfo |
| | from datetime import datetime |
| | from pathlib import Path |
| | from typing import Any |
| |
|
| | import yaml |
| | from jinja2 import Template |
| | from litellm import Message, acompletion |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | _hf_username_cache: str | None = None |
| |
|
| | _HF_WHOAMI_URL = "https://huggingface.co/api/whoami-v2" |
| | _HF_WHOAMI_TIMEOUT = 5 |
| |
|
| |
|
| | def _get_hf_username() -> str: |
| | """Return the HF username, cached after the first call. |
| | |
| | Uses subprocess + curl to avoid Python HTTP client IPv6 issues that |
| | cause 40+ second hangs (httpx/urllib try IPv6 first which times out |
| | at OS level before falling back to IPv4 — the "Happy Eyeballs" problem). |
| | """ |
| | import json |
| | import subprocess |
| | import time as _t |
| |
|
| | global _hf_username_cache |
| | if _hf_username_cache is not None: |
| | return _hf_username_cache |
| |
|
| | hf_token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_HUB_TOKEN") |
| | if not hf_token: |
| | logger.warning("No HF_TOKEN set, using 'unknown' as username") |
| | _hf_username_cache = "unknown" |
| | return _hf_username_cache |
| |
|
| | t0 = _t.monotonic() |
| | try: |
| | result = subprocess.run( |
| | [ |
| | "curl", |
| | "-s", |
| | "-4", |
| | "-m", |
| | str(_HF_WHOAMI_TIMEOUT), |
| | "-H", |
| | f"Authorization: Bearer {hf_token}", |
| | _HF_WHOAMI_URL, |
| | ], |
| | capture_output=True, |
| | text=True, |
| | timeout=_HF_WHOAMI_TIMEOUT + 2, |
| | ) |
| | t1 = _t.monotonic() |
| | if result.returncode == 0 and result.stdout: |
| | data = json.loads(result.stdout) |
| | _hf_username_cache = data.get("name", "unknown") |
| | logger.info( |
| | f"HF username resolved to '{_hf_username_cache}' in {t1 - t0:.2f}s" |
| | ) |
| | else: |
| | logger.warning( |
| | f"curl whoami failed (rc={result.returncode}) in {t1 - t0:.2f}s" |
| | ) |
| | _hf_username_cache = "unknown" |
| | except Exception as e: |
| | t1 = _t.monotonic() |
| | logger.warning(f"HF whoami failed in {t1 - t0:.2f}s: {e}") |
| | _hf_username_cache = "unknown" |
| |
|
| | return _hf_username_cache |
| |
|
| |
|
| | class ContextManager: |
| | """Manages conversation context and message history for the agent""" |
| |
|
| | def __init__( |
| | self, |
| | max_context: int = 180_000, |
| | compact_size: float = 0.1, |
| | untouched_messages: int = 5, |
| | tool_specs: list[dict[str, Any]] | None = None, |
| | prompt_file_suffix: str = "system_prompt_v2.yaml", |
| | ): |
| | self.system_prompt = self._load_system_prompt( |
| | tool_specs or [], |
| | prompt_file_suffix="system_prompt_v2.yaml", |
| | ) |
| | self.max_context = max_context |
| | self.compact_size = int(max_context * compact_size) |
| | self.context_length = len(self.system_prompt) // 4 |
| | self.untouched_messages = untouched_messages |
| | self.items: list[Message] = [Message(role="system", content=self.system_prompt)] |
| |
|
| | def _load_system_prompt( |
| | self, |
| | tool_specs: list[dict[str, Any]], |
| | prompt_file_suffix: str = "system_prompt.yaml", |
| | ): |
| | """Load and render the system prompt from YAML file with Jinja2""" |
| | prompt_file = Path(__file__).parent.parent / "prompts" / f"{prompt_file_suffix}" |
| |
|
| | with open(prompt_file, "r") as f: |
| | prompt_data = yaml.safe_load(f) |
| | template_str = prompt_data.get("system_prompt", "") |
| |
|
| | |
| | tz = zoneinfo.ZoneInfo("Europe/Paris") |
| | now = datetime.now(tz) |
| | current_date = now.strftime("%d-%m-%Y") |
| | current_time = now.strftime("%H:%M:%S.%f")[:-3] |
| | current_timezone = f"{now.strftime('%Z')} (UTC{now.strftime('%z')[:3]}:{now.strftime('%z')[3:]})" |
| |
|
| | |
| | hf_user_info = _get_hf_username() |
| |
|
| | template = Template(template_str) |
| | return template.render( |
| | tools=tool_specs, |
| | num_tools=len(tool_specs), |
| | current_date=current_date, |
| | current_time=current_time, |
| | current_timezone=current_timezone, |
| | hf_user_info=hf_user_info, |
| | ) |
| |
|
| | def add_message(self, message: Message, token_count: int = None) -> None: |
| | """Add a message to the history""" |
| | if token_count: |
| | self.context_length = token_count |
| | self.items.append(message) |
| |
|
| | def get_messages(self) -> list[Message]: |
| | """Get all messages for sending to LLM""" |
| | return self.items |
| |
|
| | async def compact(self, model_name: str) -> None: |
| | """Remove old messages to keep history under target size""" |
| | if (self.context_length <= self.max_context) or not self.items: |
| | return |
| |
|
| | system_msg = ( |
| | self.items[0] if self.items and self.items[0].role == "system" else None |
| | ) |
| |
|
| | |
| | |
| | |
| | idx = len(self.items) - self.untouched_messages |
| | while idx > 1 and self.items[idx].role != "user": |
| | idx -= 1 |
| |
|
| | recent_messages = self.items[idx:] |
| | messages_to_summarize = self.items[1:idx] |
| |
|
| | |
| | if not messages_to_summarize: |
| | return |
| |
|
| | messages_to_summarize.append( |
| | Message( |
| | role="user", |
| | content="Please provide a concise summary of the conversation above, focusing on key decisions, code changes, problems solved, and important context needed for future turns.", |
| | ) |
| | ) |
| |
|
| | hf_key = os.environ.get("INFERENCE_TOKEN") |
| | response = await acompletion( |
| | model=model_name, |
| | messages=messages_to_summarize, |
| | max_completion_tokens=self.compact_size, |
| | api_key=hf_key |
| | if hf_key and model_name.startswith("huggingface/") |
| | else None, |
| | ) |
| | summarized_message = Message( |
| | role="assistant", content=response.choices[0].message.content |
| | ) |
| |
|
| | |
| | if system_msg: |
| | self.items = [system_msg, summarized_message] + recent_messages |
| | else: |
| | self.items = [summarized_message] + recent_messages |
| |
|
| | self.context_length = ( |
| | len(self.system_prompt) // 4 + response.usage.completion_tokens |
| | ) |
| |
|