from __future__ import annotations import base64 import datetime import hashlib import json import os import secrets import sys from typing import Any, Dict, List, Optional, Tuple import requests from .config import CLIENT_ID_DEFAULT, OAUTH_TOKEN_URL def eprint(*args, **kwargs) -> None: print(*args, file=sys.stderr, **kwargs) def get_home_dir() -> str: home = os.getenv("CHATGPT_LOCAL_HOME") or os.getenv("CODEX_HOME") if not home: home = os.path.expanduser("~/.chatgpt-local") return home def read_auth_file() -> Dict[str, Any] | None: for base in [ os.getenv("CHATGPT_LOCAL_HOME"), os.getenv("CODEX_HOME"), os.path.expanduser("~/.chatgpt-local"), os.path.expanduser("~/.codex"), ]: if not base: continue path = os.path.join(base, "auth.json") try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except FileNotFoundError: continue except Exception: continue return None def write_auth_file(auth: Dict[str, Any]) -> bool: home = get_home_dir() try: os.makedirs(home, exist_ok=True) except Exception as exc: eprint(f"ERROR: unable to create auth home directory {home}: {exc}") return False path = os.path.join(home, "auth.json") try: with open(path, "w", encoding="utf-8") as fp: if hasattr(os, "fchmod"): os.fchmod(fp.fileno(), 0o600) json.dump(auth, fp, indent=2) return True except Exception as exc: eprint(f"ERROR: unable to write auth file: {exc}") return False def parse_jwt_claims(token: str) -> Dict[str, Any] | None: if not token or token.count(".") != 2: return None try: _, payload, _ = token.split(".") padded = payload + "=" * (-len(payload) % 4) data = base64.urlsafe_b64decode(padded.encode()) return json.loads(data.decode()) except Exception: return None def generate_pkce() -> "PkceCodes": from .models import PkceCodes code_verifier = secrets.token_hex(64) digest = hashlib.sha256(code_verifier.encode()).digest() code_challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode() return PkceCodes(code_verifier=code_verifier, code_challenge=code_challenge) def convert_chat_messages_to_responses_input(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: def _normalize_image_data_url(url: str) -> str: try: if not isinstance(url, str): return url if not url.startswith("data:image/"): return url if ";base64," not in url: return url header, data = url.split(",", 1) try: from urllib.parse import unquote data = unquote(data) except Exception: pass data = data.strip().replace("\n", "").replace("\r", "") data = data.replace("-", "+").replace("_", "/") pad = (-len(data)) % 4 if pad: data = data + ("=" * pad) try: base64.b64decode(data, validate=True) except Exception: return url return f"{header},{data}" except Exception: return url input_items: List[Dict[str, Any]] = [] for message in messages: role = message.get("role") if role == "system": continue if role == "tool": call_id = message.get("tool_call_id") or message.get("id") if isinstance(call_id, str) and call_id: content = message.get("content", "") if isinstance(content, list): texts = [] for part in content: if isinstance(part, dict): t = part.get("text") or part.get("content") if isinstance(t, str) and t: texts.append(t) content = "\n".join(texts) if isinstance(content, str): input_items.append( { "type": "function_call_output", "call_id": call_id, "output": content, } ) continue if role == "assistant" and isinstance(message.get("tool_calls"), list): for tc in message.get("tool_calls") or []: if not isinstance(tc, dict): continue tc_type = tc.get("type", "function") if tc_type != "function": continue call_id = tc.get("id") or tc.get("call_id") fn = tc.get("function") if isinstance(tc.get("function"), dict) else {} name = fn.get("name") if isinstance(fn, dict) else None args = fn.get("arguments") if isinstance(fn, dict) else None if isinstance(call_id, str) and isinstance(name, str) and isinstance(args, str): input_items.append( { "type": "function_call", "name": name, "arguments": args, "call_id": call_id, } ) content = message.get("content", "") content_items: List[Dict[str, Any]] = [] if isinstance(content, list): for part in content: if not isinstance(part, dict): continue ptype = part.get("type") if ptype == "text": text = part.get("text") or part.get("content") or "" if isinstance(text, str) and text: kind = "output_text" if role == "assistant" else "input_text" content_items.append({"type": kind, "text": text}) elif ptype == "image_url": image = part.get("image_url") url = image.get("url") if isinstance(image, dict) else image if isinstance(url, str) and url: content_items.append({"type": "input_image", "image_url": _normalize_image_data_url(url)}) elif isinstance(content, str) and content: kind = "output_text" if role == "assistant" else "input_text" content_items.append({"type": kind, "text": content}) if not content_items: continue role_out = "assistant" if role == "assistant" else "user" input_items.append({"type": "message", "role": role_out, "content": content_items}) return input_items def convert_tools_chat_to_responses(tools: Any) -> List[Dict[str, Any]]: out: List[Dict[str, Any]] = [] if not isinstance(tools, list): return out for t in tools: if not isinstance(t, dict): continue if t.get("type") != "function": continue fn = t.get("function") if isinstance(t.get("function"), dict) else {} name = fn.get("name") if isinstance(fn, dict) else None if not isinstance(name, str) or not name: continue desc = fn.get("description") if isinstance(fn, dict) else None params = fn.get("parameters") if isinstance(fn, dict) else None if not isinstance(params, dict): params = {"type": "object", "properties": {}} out.append( { "type": "function", "name": name, "description": desc or "", "strict": False, "parameters": params, } ) return out def load_chatgpt_tokens(ensure_fresh: bool = True) -> tuple[str | None, str | None, str | None]: auth = read_auth_file() if not isinstance(auth, dict): return None, None, None tokens = auth.get("tokens") if isinstance(auth.get("tokens"), dict) else {} access_token: Optional[str] = tokens.get("access_token") account_id: Optional[str] = tokens.get("account_id") id_token: Optional[str] = tokens.get("id_token") refresh_token: Optional[str] = tokens.get("refresh_token") last_refresh = auth.get("last_refresh") if ensure_fresh and isinstance(refresh_token, str) and refresh_token and CLIENT_ID_DEFAULT: needs_refresh = _should_refresh_access_token(access_token, last_refresh) if needs_refresh or not (isinstance(access_token, str) and access_token): refreshed = _refresh_chatgpt_tokens(refresh_token, CLIENT_ID_DEFAULT) if refreshed: access_token = refreshed.get("access_token") or access_token id_token = refreshed.get("id_token") or id_token refresh_token = refreshed.get("refresh_token") or refresh_token account_id = refreshed.get("account_id") or account_id updated_tokens = dict(tokens) if isinstance(access_token, str) and access_token: updated_tokens["access_token"] = access_token if isinstance(id_token, str) and id_token: updated_tokens["id_token"] = id_token if isinstance(refresh_token, str) and refresh_token: updated_tokens["refresh_token"] = refresh_token if isinstance(account_id, str) and account_id: updated_tokens["account_id"] = account_id persisted = _persist_refreshed_auth(auth, updated_tokens) if persisted is not None: auth, tokens = persisted else: tokens = updated_tokens if not isinstance(account_id, str) or not account_id: account_id = _derive_account_id(id_token) access_token = access_token if isinstance(access_token, str) and access_token else None id_token = id_token if isinstance(id_token, str) and id_token else None account_id = account_id if isinstance(account_id, str) and account_id else None return access_token, account_id, id_token def _should_refresh_access_token(access_token: Optional[str], last_refresh: Any) -> bool: if not isinstance(access_token, str) or not access_token: return True claims = parse_jwt_claims(access_token) or {} exp = claims.get("exp") if isinstance(claims, dict) else None now = datetime.datetime.now(datetime.timezone.utc) if isinstance(exp, (int, float)): try: expiry = datetime.datetime.fromtimestamp(float(exp), datetime.timezone.utc) except (OverflowError, OSError, ValueError): expiry = None if expiry is not None: return expiry <= now + datetime.timedelta(minutes=5) if isinstance(last_refresh, str): refreshed_at = _parse_iso8601(last_refresh) if refreshed_at is not None: return refreshed_at <= now - datetime.timedelta(minutes=55) return False def _refresh_chatgpt_tokens(refresh_token: str, client_id: str) -> Optional[Dict[str, Optional[str]]]: payload = { "grant_type": "refresh_token", "refresh_token": refresh_token, "client_id": client_id, "scope": "openid profile email offline_access", } try: resp = requests.post(OAUTH_TOKEN_URL, json=payload, timeout=30) except requests.RequestException as exc: eprint(f"ERROR: failed to refresh ChatGPT token: {exc}") return None if resp.status_code >= 400: eprint(f"ERROR: refresh token request returned status {resp.status_code}") return None try: data = resp.json() except ValueError as exc: eprint(f"ERROR: unable to parse refresh token response: {exc}") return None id_token = data.get("id_token") access_token = data.get("access_token") new_refresh_token = data.get("refresh_token") or refresh_token if not isinstance(id_token, str) or not isinstance(access_token, str): eprint("ERROR: refresh token response missing expected tokens") return None account_id = _derive_account_id(id_token) new_refresh_token = new_refresh_token if isinstance(new_refresh_token, str) and new_refresh_token else refresh_token return { "id_token": id_token, "access_token": access_token, "refresh_token": new_refresh_token, "account_id": account_id, } def _persist_refreshed_auth(auth: Dict[str, Any], updated_tokens: Dict[str, Any]) -> Optional[Tuple[Dict[str, Any], Dict[str, Any]]]: updated_auth = dict(auth) updated_auth["tokens"] = updated_tokens updated_auth["last_refresh"] = _now_iso8601() if write_auth_file(updated_auth): return updated_auth, updated_tokens eprint("ERROR: unable to persist refreshed auth tokens") return None def _derive_account_id(id_token: Optional[str]) -> Optional[str]: if not isinstance(id_token, str) or not id_token: return None claims = parse_jwt_claims(id_token) or {} auth_claims = claims.get("https://api.openai.com/auth") if isinstance(claims, dict) else None if isinstance(auth_claims, dict): account_id = auth_claims.get("chatgpt_account_id") if isinstance(account_id, str) and account_id: return account_id return None def _parse_iso8601(value: str) -> Optional[datetime.datetime]: try: if value.endswith("Z"): value = value[:-1] + "+00:00" dt = datetime.datetime.fromisoformat(value) if dt.tzinfo is None: dt = dt.replace(tzinfo=datetime.timezone.utc) return dt.astimezone(datetime.timezone.utc) except Exception: return None def _now_iso8601() -> str: return datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z") def get_effective_chatgpt_auth() -> tuple[str | None, str | None]: access_token, account_id, id_token = load_chatgpt_tokens() if not account_id: account_id = _derive_account_id(id_token) return access_token, account_id def sse_translate_chat( upstream, model: str, created: int, verbose: bool = False, vlog=None, reasoning_compat: str = "think-tags", *, include_usage: bool = False, ): response_id = "chatcmpl-stream" compat = (reasoning_compat or "think-tags").strip().lower() think_open = False think_closed = False saw_output = False sent_stop_chunk = False saw_any_summary = False pending_summary_paragraph = False upstream_usage = None ws_state: dict[str, Any] = {} ws_index: dict[str, int] = {} ws_next_index: int = 0 def _serialize_tool_args(eff_args: Any) -> str: """ Serialize tool call arguments with proper JSON handling. Args: eff_args: Arguments to serialize (dict, list, str, or other) Returns: JSON string representation of the arguments """ if isinstance(eff_args, (dict, list)): return json.dumps(eff_args) elif isinstance(eff_args, str): try: parsed = json.loads(eff_args) if isinstance(parsed, (dict, list)): return json.dumps(parsed) else: return json.dumps({"query": eff_args}) except (json.JSONDecodeError, ValueError): return json.dumps({"query": eff_args}) else: return "{}" def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None: try: usage = (evt.get("response") or {}).get("usage") if not isinstance(usage, dict): return None pt = int(usage.get("input_tokens") or 0) ct = int(usage.get("output_tokens") or 0) tt = int(usage.get("total_tokens") or (pt + ct)) return {"prompt_tokens": pt, "completion_tokens": ct, "total_tokens": tt} except Exception: return None try: try: line_iterator = upstream.iter_lines(decode_unicode=False) except requests.exceptions.ChunkedEncodingError as e: if verbose and vlog: vlog(f"Failed to start stream: {e}") yield b"data: [DONE]\n\n" return for raw in line_iterator: try: if not raw: continue line = ( raw.decode("utf-8", errors="ignore") if isinstance(raw, (bytes, bytearray)) else raw ) if verbose and vlog: vlog(line) if not line.startswith("data: "): continue data = line[len("data: ") :].strip() if not data: continue if data == "[DONE]": break try: evt = json.loads(data) except (json.JSONDecodeError, UnicodeDecodeError): continue except ( requests.exceptions.ChunkedEncodingError, ConnectionError, BrokenPipeError, ) as e: # Connection interrupted mid-stream - end gracefully if verbose and vlog: vlog(f"Stream interrupted: {e}") yield b"data: [DONE]\n\n" return kind = evt.get("type") if isinstance(evt.get("response"), dict) and isinstance(evt["response"].get("id"), str): response_id = evt["response"].get("id") or response_id if isinstance(kind, str) and ("web_search_call" in kind): try: call_id = evt.get("item_id") or "ws_call" if verbose and vlog: try: vlog(f"CM_TOOLS {kind} id={call_id} -> tool_calls(web_search)") except Exception: pass item = evt.get('item') if isinstance(evt.get('item'), dict) else {} params_dict = ws_state.setdefault(call_id, {}) if isinstance(ws_state.get(call_id), dict) else {} def _merge_from(src): if not isinstance(src, dict): return for whole in ('parameters','args','arguments','input'): if isinstance(src.get(whole), dict): params_dict.update(src.get(whole)) if isinstance(src.get('query'), str): params_dict.setdefault('query', src.get('query')) if isinstance(src.get('q'), str): params_dict.setdefault('query', src.get('q')) for rk in ('recency','time_range','days'): if src.get(rk) is not None and rk not in params_dict: params_dict[rk] = src.get(rk) for dk in ('domains','include_domains','include'): if isinstance(src.get(dk), list) and 'domains' not in params_dict: params_dict['domains'] = src.get(dk) for mk in ('max_results','topn','limit'): if src.get(mk) is not None and 'max_results' not in params_dict: params_dict['max_results'] = src.get(mk) _merge_from(item) _merge_from(evt if isinstance(evt, dict) else None) params = params_dict if params_dict else None if isinstance(params, dict): try: ws_state.setdefault(call_id, {}).update(params) except Exception: pass eff_params = ws_state.get(call_id, params if isinstance(params, (dict, list, str)) else {}) args_str = _serialize_tool_args(eff_params) if call_id not in ws_index: ws_index[call_id] = ws_next_index ws_next_index += 1 _idx = ws_index.get(call_id, 0) delta_chunk = { "id": response_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ { "index": 0, "delta": { "tool_calls": [ { "index": _idx, "id": call_id, "type": "function", "function": {"name": "web_search", "arguments": args_str}, } ] }, "finish_reason": None, } ], } yield f"data: {json.dumps(delta_chunk)}\n\n".encode("utf-8") if kind.endswith(".completed") or kind.endswith(".done"): finish_chunk = { "id": response_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ {"index": 0, "delta": {}, "finish_reason": "tool_calls"} ], } yield f"data: {json.dumps(finish_chunk)}\n\n".encode("utf-8") except Exception: pass if kind == "response.output_text.delta": delta = evt.get("delta") or "" if compat == "think-tags" and think_open and not think_closed: close_chunk = { "id": response_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {"content": ""}, "finish_reason": None}], } yield f"data: {json.dumps(close_chunk)}\n\n".encode("utf-8") think_open = False think_closed = True saw_output = True chunk = { "id": response_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {"content": delta}, "finish_reason": None}], } yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8") elif kind == "response.output_item.done": item = evt.get("item") or {} if isinstance(item, dict) and (item.get("type") == "function_call" or item.get("type") == "web_search_call"): call_id = item.get("call_id") or item.get("id") or "" name = item.get("name") or ("web_search" if item.get("type") == "web_search_call" else "") raw_args = item.get("arguments") or item.get("parameters") if isinstance(raw_args, dict): try: ws_state.setdefault(call_id, {}).update(raw_args) except Exception: pass eff_args = ws_state.get(call_id, raw_args if isinstance(raw_args, (dict, list, str)) else {}) try: args = _serialize_tool_args(eff_args) except Exception: args = "{}" if item.get("type") == "web_search_call" and verbose and vlog: try: vlog(f"CM_TOOLS response.output_item.done web_search_call id={call_id} has_args={bool(args)}") except Exception: pass if call_id not in ws_index: ws_index[call_id] = ws_next_index ws_next_index += 1 _idx = ws_index.get(call_id, 0) if isinstance(call_id, str) and isinstance(name, str) and isinstance(args, str): delta_chunk = { "id": response_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ { "index": 0, "delta": { "tool_calls": [ { "index": _idx, "id": call_id, "type": "function", "function": {"name": name, "arguments": args}, } ] }, "finish_reason": None, } ], } yield f"data: {json.dumps(delta_chunk)}\n\n".encode("utf-8") finish_chunk = { "id": response_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {}, "finish_reason": "tool_calls"}], } yield f"data: {json.dumps(finish_chunk)}\n\n".encode("utf-8") elif kind == "response.reasoning_summary_part.added": if compat in ("think-tags", "o3"): if saw_any_summary: pending_summary_paragraph = True else: saw_any_summary = True elif kind in ("response.reasoning_summary_text.delta", "response.reasoning_text.delta"): delta_txt = evt.get("delta") or "" if compat == "o3": if kind == "response.reasoning_summary_text.delta" and pending_summary_paragraph: nl_chunk = { "id": response_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ { "index": 0, "delta": {"reasoning": {"content": [{"type": "text", "text": "\n"}]}}, "finish_reason": None, } ], } yield f"data: {json.dumps(nl_chunk)}\n\n".encode("utf-8") pending_summary_paragraph = False chunk = { "id": response_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ { "index": 0, "delta": {"reasoning": {"content": [{"type": "text", "text": delta_txt}]}}, "finish_reason": None, } ], } yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8") elif compat == "think-tags": if not think_open and not think_closed: open_chunk = { "id": response_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {"content": ""}, "finish_reason": None}], } yield f"data: {json.dumps(open_chunk)}\n\n".encode("utf-8") think_open = True if think_open and not think_closed: if kind == "response.reasoning_summary_text.delta" and pending_summary_paragraph: nl_chunk = { "id": response_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {"content": "\n"}, "finish_reason": None}], } yield f"data: {json.dumps(nl_chunk)}\n\n".encode("utf-8") pending_summary_paragraph = False content_chunk = { "id": response_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {"content": delta_txt}, "finish_reason": None}], } yield f"data: {json.dumps(content_chunk)}\n\n".encode("utf-8") else: if kind == "response.reasoning_summary_text.delta": chunk = { "id": response_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ { "index": 0, "delta": {"reasoning_summary": delta_txt, "reasoning": delta_txt}, "finish_reason": None, } ], } yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8") else: chunk = { "id": response_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ {"index": 0, "delta": {"reasoning": delta_txt}, "finish_reason": None} ], } yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8") elif isinstance(kind, str) and kind.endswith(".done"): pass elif kind == "response.output_text.done": chunk = { "id": response_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], } yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8") sent_stop_chunk = True elif kind == "response.failed": err = evt.get("response", {}).get("error", {}).get("message", "response.failed") chunk = {"error": {"message": err}} yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8") elif kind == "response.completed": m = _extract_usage(evt) if m: upstream_usage = m if compat == "think-tags" and think_open and not think_closed: close_chunk = { "id": response_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {"content": ""}, "finish_reason": None}], } yield f"data: {json.dumps(close_chunk)}\n\n".encode("utf-8") think_open = False think_closed = True if not sent_stop_chunk: chunk = { "id": response_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], } yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8") sent_stop_chunk = True if include_usage and upstream_usage: try: usage_chunk = { "id": response_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {}, "finish_reason": None}], "usage": upstream_usage, } yield f"data: {json.dumps(usage_chunk)}\n\n".encode("utf-8") except Exception: pass yield b"data: [DONE]\n\n" break finally: upstream.close() def sse_translate_text(upstream, model: str, created: int, verbose: bool = False, vlog=None, *, include_usage: bool = False): response_id = "cmpl-stream" upstream_usage = None def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None: try: usage = (evt.get("response") or {}).get("usage") if not isinstance(usage, dict): return None pt = int(usage.get("input_tokens") or 0) ct = int(usage.get("output_tokens") or 0) tt = int(usage.get("total_tokens") or (pt + ct)) return {"prompt_tokens": pt, "completion_tokens": ct, "total_tokens": tt} except Exception: return None try: for raw_line in upstream.iter_lines(decode_unicode=False): if not raw_line: continue line = raw_line.decode("utf-8", errors="ignore") if isinstance(raw_line, (bytes, bytearray)) else raw_line if verbose and vlog: vlog(line) if not line.startswith("data: "): continue data = line[len("data: "):].strip() if not data or data == "[DONE]": if data == "[DONE]": chunk = { "id": response_id, "object": "text_completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "text": "", "finish_reason": "stop"}], } yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8") continue try: evt = json.loads(data) except Exception: continue kind = evt.get("type") if isinstance(evt.get("response"), dict) and isinstance(evt["response"].get("id"), str): response_id = evt["response"].get("id") or response_id if kind == "response.output_text.delta": delta_text = evt.get("delta") or "" chunk = { "id": response_id, "object": "text_completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "text": delta_text, "finish_reason": None}], } yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8") elif kind == "response.output_text.done": chunk = { "id": response_id, "object": "text_completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "text": "", "finish_reason": "stop"}], } yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8") elif kind == "response.completed": m = _extract_usage(evt) if m: upstream_usage = m if include_usage and upstream_usage: try: usage_chunk = { "id": response_id, "object": "text_completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "text": "", "finish_reason": None}], "usage": upstream_usage, } yield f"data: {json.dumps(usage_chunk)}\n\n".encode("utf-8") except Exception: pass yield b"data: [DONE]\n\n" break finally: upstream.close()