Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import re | |
| import logging | |
| from enum import Enum | |
| from typing import Optional | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| import tiktoken | |
| logger = logging.getLogger("promptzip") | |
| # ββ App βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = FastAPI( | |
| title="PromptZip API", | |
| description="Compress large text, code, and logs to save LLM context window space.", | |
| version="0.1.0", | |
| ) | |
| origins = ["*"] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=False, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ββ Tokenizer (loaded once at startup) βββββββββββββββββββββββββββββββββββββββ | |
| _encoder = tiktoken.get_encoding("cl100k_base") | |
| _COST_PER_MILLION: float = 5.00 # USD β GPT-4o standard input rate | |
| def count_tokens(text: str) -> int: | |
| """Exact token count via cl100k_base (GPT-4 / GPT-4o).""" | |
| return len(_encoder.encode(text)) | |
| def estimate_cost(token_count: int) -> float: | |
| """USD cost rounded to 6 dp at $5.00 / 1 M tokens.""" | |
| return round((token_count / 1_000_000) * _COST_PER_MILLION, 6) | |
| # ββ LLMlingua (optional β lazy-loaded so startup is never blocked) ββββββββββββ | |
| _llmlingua_compressor = None | |
| _llmlingua_error: Optional[str] = None | |
| def _get_llmlingua(): | |
| """Return a cached PromptCompressor, or raise HTTPException if unavailable.""" | |
| global _llmlingua_compressor, _llmlingua_error | |
| if _llmlingua_compressor is not None: | |
| return _llmlingua_compressor | |
| if _llmlingua_error is not None: | |
| raise HTTPException( | |
| status_code=503, | |
| detail=f"LLMlingua failed to load: {_llmlingua_error}. " | |
| "Use mode='code' or mode='logs' for regex-based compression.", | |
| ) | |
| try: | |
| from llmlingua import PromptCompressor | |
| _llmlingua_compressor = PromptCompressor( | |
| model_name="microsoft/llmlingua-2-bert-base-multilingual-cased-meetingbank", | |
| use_llmlingua2=True, | |
| device_map="cpu", | |
| ) | |
| logger.info("LLMlingua initialised successfully.") | |
| return _llmlingua_compressor | |
| except Exception as exc: | |
| _llmlingua_error = str(exc) | |
| logger.error("LLMlingua init failed: %s", exc) | |
| raise HTTPException( | |
| status_code=503, | |
| detail=f"LLMlingua failed to load: {exc}. " | |
| "Use mode='code' or mode='logs' for regex-based compression.", | |
| ) | |
| # ββ Compression logic βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Aggression β target retention ratio for LLMlingua | |
| _TEXT_RATIOS = {1: 0.8, 2: 0.6, 3: 0.4} | |
| def compress_logs(text: str, aggression: int) -> str: | |
| """ | |
| Regex-based log compression: | |
| 1. Strip common timestamp patterns. | |
| 2. Optionally strip IPv4 addresses (aggression >= 2). | |
| 3. Collapse consecutive duplicate lines (repeating errors). | |
| 4. Collapse runs of blank lines to a single blank. | |
| """ | |
| # --- Timestamps --- | |
| # ISO-8601 / syslog / common log format variants | |
| timestamp_patterns = [ | |
| # [2023-10-12 14:00:00.123] or 2023-10-12T14:00:00Z | |
| r"\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(?:[.,]\d+)?(?:Z|[+-]\d{2}:\d{2})?\s*", | |
| # [12/Oct/2023:14:00:00 +0000] | |
| r"\[\d{2}/\w+/\d{4}:\d{2}:\d{2}:\d{2} [+-]\d{4}\]\s*", | |
| # Jan 12 14:00:00 | |
| r"\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}\s*", | |
| # [14:00:00] | |
| r"\[\d{2}:\d{2}:\d{2}(?:\.\d+)?\]\s*", | |
| ] | |
| for pat in timestamp_patterns: | |
| text = re.sub(pat, "", text) | |
| # --- IP addresses (aggression >= 2) --- | |
| if aggression >= 2: | |
| text = re.sub( | |
| r"\b(?:\d{1,3}\.){3}\d{1,3}(?::\d+)?\b", | |
| "<ip>", | |
| text, | |
| ) | |
| # --- Collapse consecutive duplicate lines --- | |
| lines = text.splitlines() | |
| deduped: list[str] = [] | |
| prev = None | |
| repeat_count = 0 | |
| for line in lines: | |
| stripped = line.strip() | |
| if stripped == prev and stripped: # skip blank dedup here | |
| repeat_count += 1 | |
| if repeat_count == 1: | |
| deduped.append(f" [repeated {repeat_count + 1}x β]") | |
| else: | |
| deduped[-1] = f" [repeated {repeat_count + 1}x β]" | |
| else: | |
| repeat_count = 0 | |
| deduped.append(line) | |
| prev = stripped | |
| # --- Collapse blank lines --- | |
| text = "\n".join(deduped) | |
| text = re.sub(r"\n{3,}", "\n\n", text) | |
| # --- Strip leading/trailing whitespace per line (aggression 3) --- | |
| if aggression == 3: | |
| text = "\n".join(l.strip() for l in text.splitlines()) | |
| text = re.sub(r"\n{2,}", "\n", text) | |
| return text.strip() | |
| def compress_code(text: str, aggression: int) -> str: | |
| """ | |
| Regex-based code comment & whitespace stripping: | |
| - Remove /* ... */ block comments (including docblock variants /** */) | |
| - Remove Python/Ruby # single-line comments | |
| - Remove C++/JS // single-line comments | |
| - Remove Python/Java triple-quoted docstrings (aggression >= 2) | |
| - Remove blank / whitespace-only lines (aggression >= 2) | |
| - Strip trailing whitespace and over-indent (aggression 3) | |
| """ | |
| # --- Block comments: /* ... */ (non-greedy, dotall) --- | |
| text = re.sub(r"/\*.*?\*/", "", text, flags=re.DOTALL) | |
| # --- Triple-quoted Python docstrings (aggression >= 2) --- | |
| if aggression >= 2: | |
| text = re.sub(r'""".*?"""', "", text, flags=re.DOTALL) | |
| text = re.sub(r"'''.*?'''", "", text, flags=re.DOTALL) | |
| # --- Single-line comments --- | |
| # // comments (not inside strings β best-effort with regex) | |
| text = re.sub(r"(?m)(?<!:)//.*$", "", text) | |
| # # comments β skip shebang on line 1 | |
| text = re.sub(r"(?m)(?<!^#!)#.*$", "", text) | |
| # --- Trailing whitespace --- | |
| text = re.sub(r"(?m)[ \t]+$", "", text) | |
| # --- Blank lines (aggression >= 2) --- | |
| if aggression >= 2: | |
| text = re.sub(r"\n{2,}", "\n", text) | |
| # --- Aggressive: remove all indentation & collapse to single lines per block --- | |
| if aggression == 3: | |
| text = re.sub(r"(?m)^[ \t]+", "", text) | |
| return text.strip() | |
| def compress_text(text: str, aggression: int) -> str: | |
| """ | |
| Semantic compression via LLMlingua PromptCompressor. | |
| Falls back gracefully if the model cannot be loaded. | |
| """ | |
| compressor = _get_llmlingua() | |
| ratio = _TEXT_RATIOS[aggression] | |
| result = compressor.compress_prompt( | |
| text, | |
| rate=ratio, | |
| force_tokens=["\n"], # preserve newline structure | |
| ) | |
| return result.get("compressed_prompt", text) | |
| # ββ Schemas βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Mode(str, Enum): | |
| text = "text" | |
| code = "code" | |
| logs = "logs" | |
| class CompressRequest(BaseModel): | |
| text: str = Field(..., description="The raw text, code, or log to compress.") | |
| mode: Mode = Field(Mode.text, description="Compression strategy: text | code | logs.") | |
| aggression_level: int = Field( | |
| 2, | |
| ge=1, | |
| le=3, | |
| description="1 = gentle, 2 = balanced, 3 = aggressive.", | |
| ) | |
| class CompressResponse(BaseModel): | |
| compressed_text: str | |
| original_tokens: int | |
| new_tokens: int | |
| tokens_saved: int | |
| percent_saved: float = Field(..., description="Percentage of tokens removed.") | |
| dollars_saved: float = Field(..., description="Estimated API cost delta in USD.") | |
| mode: Mode | |
| aggression_level: int | |
| # ββ Endpoints βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def health_check(): | |
| """Check that the API is alive and responding.""" | |
| return {"status": "ok", "service": "promptzip-api"} | |
| async def tokenize(body: dict): | |
| """Count exact tokens and return estimated cost.""" | |
| text = body.get("text", "") | |
| tokens = count_tokens(text) | |
| return { | |
| "token_count": tokens, | |
| "estimated_cost_usd": estimate_cost(tokens), | |
| "encoding": "cl100k_base", | |
| "rate_per_million_usd": _COST_PER_MILLION, | |
| } | |
| async def compress(body: CompressRequest): | |
| """ | |
| Compress *text* using the chosen strategy: | |
| - **logs** β regex strips timestamps, IPs, and repeating lines | |
| - **code** β regex strips comments, docstrings, blank lines | |
| - **text** β semantic compression via LLMlingua PromptCompressor | |
| """ | |
| if not body.text.strip(): | |
| raise HTTPException(status_code=400, detail="text must not be empty.") | |
| dispatch = { | |
| Mode.logs: compress_logs, | |
| Mode.code: compress_code, | |
| Mode.text: compress_text, | |
| } | |
| compressed = dispatch[body.mode](body.text, body.aggression_level) | |
| original_tokens = count_tokens(body.text) | |
| new_tokens = count_tokens(compressed) | |
| saved = original_tokens - new_tokens | |
| pct = round((saved / original_tokens) * 100, 2) if original_tokens else 0.0 | |
| return CompressResponse( | |
| compressed_text=compressed, | |
| original_tokens=original_tokens, | |
| new_tokens=new_tokens, | |
| tokens_saved=saved, | |
| percent_saved=pct, | |
| dollars_saved=round(estimate_cost(original_tokens) - estimate_cost(new_tokens), 6), | |
| mode=body.mode, | |
| aggression_level=body.aggression_level, | |
| ) | |