| | """
|
| | Context management for conversation history
|
| | """
|
| |
|
| | import logging
|
| | import os
|
| | import zoneinfo
|
| | from datetime import datetime
|
| | from pathlib import Path
|
| | from typing import Any
|
| |
|
| | import yaml
|
| | from jinja2 import Template
|
| | from litellm import Message, acompletion
|
| |
|
| | logger = logging.getLogger(__name__)
|
| |
|
| |
|
| | _hf_username_cache: str | None = None
|
| |
|
| | _HF_WHOAMI_URL = "https://huggingface.co/api/whoami-v2"
|
| | _HF_WHOAMI_TIMEOUT = 5
|
| |
|
| |
|
| | def _get_hf_username() -> str:
|
| | """Return the HF username, cached after the first call.
|
| |
|
| | Uses subprocess + curl to avoid Python HTTP client IPv6 issues that
|
| | cause 40+ second hangs (httpx/urllib try IPv6 first which times out
|
| | at OS level before falling back to IPv4 — the "Happy Eyeballs" problem).
|
| | """
|
| | import json
|
| | import subprocess
|
| | import time as _t
|
| |
|
| | global _hf_username_cache
|
| | if _hf_username_cache is not None:
|
| | return _hf_username_cache
|
| |
|
| | hf_token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_HUB_TOKEN")
|
| | if not hf_token:
|
| | logger.warning("No HF_TOKEN set, using 'unknown' as username")
|
| | _hf_username_cache = "unknown"
|
| | return _hf_username_cache
|
| |
|
| | t0 = _t.monotonic()
|
| | try:
|
| | result = subprocess.run(
|
| | [
|
| | "curl",
|
| | "-s",
|
| | "-4",
|
| | "-m",
|
| | str(_HF_WHOAMI_TIMEOUT),
|
| | "-H",
|
| | f"Authorization: Bearer {hf_token}",
|
| | _HF_WHOAMI_URL,
|
| | ],
|
| | capture_output=True,
|
| | text=True,
|
| | timeout=_HF_WHOAMI_TIMEOUT + 2,
|
| | )
|
| | t1 = _t.monotonic()
|
| | if result.returncode == 0 and result.stdout:
|
| | data = json.loads(result.stdout)
|
| | _hf_username_cache = data.get("name", "unknown")
|
| | logger.info(
|
| | f"HF username resolved to '{_hf_username_cache}' in {t1 - t0:.2f}s"
|
| | )
|
| | else:
|
| | logger.warning(
|
| | f"curl whoami failed (rc={result.returncode}) in {t1 - t0:.2f}s"
|
| | )
|
| | _hf_username_cache = "unknown"
|
| | except Exception as e:
|
| | t1 = _t.monotonic()
|
| | logger.warning(f"HF whoami failed in {t1 - t0:.2f}s: {e}")
|
| | _hf_username_cache = "unknown"
|
| |
|
| | return _hf_username_cache
|
| |
|
| |
|
| | class ContextManager:
|
| | """Manages conversation context and message history for the agent"""
|
| |
|
| | def __init__(
|
| | self,
|
| | max_context: int = 180_000,
|
| | compact_size: float = 0.1,
|
| | untouched_messages: int = 5,
|
| | tool_specs: list[dict[str, Any]] | None = None,
|
| | prompt_file_suffix: str = "system_prompt_v2.yaml",
|
| | ):
|
| | self.system_prompt = self._load_system_prompt(
|
| | tool_specs or [],
|
| | prompt_file_suffix="system_prompt_v2.yaml",
|
| | )
|
| | self.max_context = max_context
|
| | self.compact_size = int(max_context * compact_size)
|
| | self.context_length = len(self.system_prompt) // 4
|
| | self.untouched_messages = untouched_messages
|
| | self.items: list[Message] = [Message(role="system", content=self.system_prompt)]
|
| |
|
| | def _load_system_prompt(
|
| | self,
|
| | tool_specs: list[dict[str, Any]],
|
| | prompt_file_suffix: str = "system_prompt.yaml",
|
| | ):
|
| | """Load and render the system prompt from YAML file with Jinja2"""
|
| | prompt_file = Path(__file__).parent.parent / "prompts" / f"{prompt_file_suffix}"
|
| |
|
| | with open(prompt_file, "r") as f:
|
| | prompt_data = yaml.safe_load(f)
|
| | template_str = prompt_data.get("system_prompt", "")
|
| |
|
| |
|
| | tz = zoneinfo.ZoneInfo("Europe/Paris")
|
| | now = datetime.now(tz)
|
| | current_date = now.strftime("%d-%m-%Y")
|
| | current_time = now.strftime("%H:%M:%S.%f")[:-3]
|
| | current_timezone = f"{now.strftime('%Z')} (UTC{now.strftime('%z')[:3]}:{now.strftime('%z')[3:]})"
|
| |
|
| |
|
| | hf_user_info = _get_hf_username()
|
| |
|
| | template = Template(template_str)
|
| | return template.render(
|
| | tools=tool_specs,
|
| | num_tools=len(tool_specs),
|
| | current_date=current_date,
|
| | current_time=current_time,
|
| | current_timezone=current_timezone,
|
| | hf_user_info=hf_user_info,
|
| | )
|
| |
|
| | def add_message(self, message: Message, token_count: int = None) -> None:
|
| | """Add a message to the history"""
|
| | if token_count:
|
| | self.context_length = token_count
|
| | self.items.append(message)
|
| |
|
| | def get_messages(self) -> list[Message]:
|
| | """Get all messages for sending to LLM"""
|
| | return self.items
|
| |
|
| | async def compact(self, model_name: str) -> None:
|
| | """Remove old messages to keep history under target size"""
|
| | if (self.context_length <= self.max_context) or not self.items:
|
| | return
|
| |
|
| | system_msg = (
|
| | self.items[0] if self.items and self.items[0].role == "system" else None
|
| | )
|
| |
|
| |
|
| |
|
| |
|
| | idx = len(self.items) - self.untouched_messages
|
| | while idx > 1 and self.items[idx].role != "user":
|
| | idx -= 1
|
| |
|
| | recent_messages = self.items[idx:]
|
| | messages_to_summarize = self.items[1:idx]
|
| |
|
| |
|
| | if not messages_to_summarize:
|
| | return
|
| |
|
| | messages_to_summarize.append(
|
| | Message(
|
| | role="user",
|
| | content="Please provide a concise summary of the conversation above, focusing on key decisions, code changes, problems solved, and important context needed for future turns.",
|
| | )
|
| | )
|
| |
|
| | hf_key = os.environ.get("INFERENCE_TOKEN")
|
| | response = await acompletion(
|
| | model=model_name,
|
| | messages=messages_to_summarize,
|
| | max_completion_tokens=self.compact_size,
|
| | api_key=hf_key
|
| | if hf_key and model_name.startswith("huggingface/")
|
| | else None,
|
| | )
|
| | summarized_message = Message(
|
| | role="assistant", content=response.choices[0].message.content
|
| | )
|
| |
|
| |
|
| | if system_msg:
|
| | self.items = [system_msg, summarized_message] + recent_messages
|
| | else:
|
| | self.items = [summarized_message] + recent_messages
|
| |
|
| | self.context_length = (
|
| | len(self.system_prompt) // 4 + response.usage.completion_tokens
|
| | )
|
| |
|