| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import http2 from 'http2'; |
| import { log } from './config.js'; |
| import { wrapRequest, StreamingFrameParser } from './connect.js'; |
|
|
| const USE_CONNECT = process.env.GRPC_PROTOCOL === 'connect'; |
| export const _USE_CONNECT_FOR_TEST = USE_CONNECT; |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| const _sessionPool = new Map(); |
|
|
| function getSession(port) { |
| const key = `localhost:${port}`; |
| let session = _sessionPool.get(key); |
| if (session && !session.destroyed && !session.closed) return session; |
|
|
| session = http2.connect(`http://localhost:${port}`); |
| session.on('error', (err) => { |
| log.debug(`HTTP/2 session error on port ${port}: ${err.message}`); |
| if (_sessionPool.get(key) === session) _sessionPool.delete(key); |
| }); |
| session.on('close', () => { |
| if (_sessionPool.get(key) === session) _sessionPool.delete(key); |
| }); |
| |
| |
| try { session.unref(); } catch {} |
| _sessionPool.set(key, session); |
| return session; |
| } |
|
|
| |
| |
| |
| |
| |
| export function closeSessionForPort(port) { |
| const key = `localhost:${port}`; |
| const session = _sessionPool.get(key); |
| if (session) { |
| try { session.close(); } catch {} |
| _sessionPool.delete(key); |
| } |
| } |
|
|
| |
| |
| |
| |
| |
| export function grpcFrame(payload) { |
| const buf = Buffer.isBuffer(payload) ? payload : Buffer.from(payload); |
| if (USE_CONNECT) return wrapRequest(buf); |
| const frame = Buffer.alloc(5 + buf.length); |
| frame[0] = 0; |
| frame.writeUInt32BE(buf.length, 1); |
| buf.copy(frame, 5); |
| return frame; |
| } |
|
|
| |
| |
| |
| |
| export function stripGrpcFrame(buf) { |
| if (buf.length >= 5 && buf[0] === 0) { |
| const msgLen = buf.readUInt32BE(1); |
| if (buf.length >= 5 + msgLen) { |
| return buf.subarray(5, 5 + msgLen); |
| } |
| } |
| return buf; |
| } |
|
|
| |
| |
| |
| export function extractGrpcFrames(buf) { |
| const frames = []; |
| let offset = 0; |
| while (offset + 5 <= buf.length) { |
| const compressed = buf[offset]; |
| const msgLen = buf.readUInt32BE(offset + 1); |
| if (compressed !== 0 || offset + 5 + msgLen > buf.length) break; |
| frames.push(buf.subarray(offset + 5, offset + 5 + msgLen)); |
| offset += 5 + msgLen; |
| } |
| return frames; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| export function grpcUnary(port, csrfToken, path, body, timeout = 30000) { |
| return new Promise((resolve, reject) => { |
| |
| |
| |
| let settled = false; |
| const done = (fn, ...args) => { |
| if (settled) return; |
| settled = true; |
| fn(...args); |
| }; |
|
|
| const client = getSession(port); |
| const chunks = []; |
| let timer; |
|
|
| timer = setTimeout(() => { |
| try { req.close?.(http2.constants.NGHTTP2_CANCEL); } catch {} |
| done(reject, new Error('gRPC unary timeout')); |
| }, timeout); |
|
|
| const headers = USE_CONNECT ? { |
| ':method': 'POST', |
| ':path': path, |
| 'content-type': 'application/connect+proto', |
| 'connect-protocol-version': '1', |
| 'connect-accept-encoding': 'gzip', |
| 'user-agent': 'connect-es/2.0.0', |
| 'x-codeium-csrf-token': csrfToken, |
| } : { |
| ':method': 'POST', |
| ':path': path, |
| 'content-type': 'application/grpc', |
| 'te': 'trailers', |
| 'user-agent': 'grpc-node/1.108.2', |
| 'x-codeium-csrf-token': csrfToken, |
| }; |
|
|
| const req = client.request(headers); |
| req.on('data', (chunk) => chunks.push(chunk)); |
|
|
| let grpcStatus = '0', grpcMessage = ''; |
|
|
| req.on('trailers', (trailers) => { |
| grpcStatus = String(trailers['grpc-status'] ?? '0'); |
| grpcMessage = String(trailers['grpc-message'] ?? ''); |
| }); |
|
|
| req.on('end', () => { |
| clearTimeout(timer); |
| if (!USE_CONNECT && grpcStatus !== '0') { |
| const msg = grpcMessage ? decodeURIComponent(grpcMessage) : `gRPC status ${grpcStatus}`; |
| done(reject, new Error(msg)); |
| return; |
| } |
| const full = Buffer.concat(chunks); |
|
|
| if (USE_CONNECT) { |
| let parsed; |
| try { |
| const parser = new StreamingFrameParser(); |
| parser.push(full); |
| parsed = parser.drain(); |
| } catch (err) { |
| try { req.close?.(http2.constants.NGHTTP2_CANCEL); } catch {} |
| done(reject, err); |
| return; |
| } |
| const dataFrames = parsed.filter(f => !f.isEndStream); |
| const trailer = parsed.find(f => f.isEndStream); |
| if (trailer) { |
| try { |
| const t = JSON.parse(trailer.payload.toString()); |
| if (t.error) { done(reject, new Error(t.error.message || 'connect error')); return; } |
| } catch {} |
| } |
| const payload = dataFrames.length > 0 |
| ? Buffer.concat(dataFrames.map(f => f.payload)) |
| : full; |
| done(resolve, payload); |
| } else { |
| const frames = extractGrpcFrames(full); |
| const payload = frames.length > 0 ? Buffer.concat(frames) : stripGrpcFrame(full); |
| done(resolve, payload); |
| } |
| }); |
|
|
| req.on('error', (err) => { |
| clearTimeout(timer); |
| done(reject, err); |
| }); |
|
|
| req.write(body); |
| req.end(); |
| }); |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| export function grpcStream(port, csrfToken, path, body, opts = {}) { |
| const { onData, onEnd, onError, timeout = 300000 } = opts; |
|
|
| |
| |
| let settled = false; |
| const client = getSession(port); |
| let timer; |
| let pendingBuf = Buffer.alloc(0); |
|
|
| timer = setTimeout(() => { |
| if (settled) return; |
| settled = true; |
| try { req.close?.(http2.constants.NGHTTP2_CANCEL); } catch {} |
| onError?.(new Error('gRPC stream timeout')); |
| }, timeout); |
|
|
| const streamHeaders = USE_CONNECT ? { |
| ':method': 'POST', |
| ':path': path, |
| 'content-type': 'application/connect+proto', |
| 'connect-protocol-version': '1', |
| 'connect-accept-encoding': 'gzip', |
| 'user-agent': 'connect-es/2.0.0', |
| 'x-codeium-csrf-token': csrfToken, |
| } : { |
| ':method': 'POST', |
| ':path': path, |
| 'content-type': 'application/grpc', |
| 'te': 'trailers', |
| 'grpc-accept-encoding': 'identity,gzip,deflate', |
| 'user-agent': 'grpc-node/1.108.2', |
| 'x-codeium-csrf-token': csrfToken, |
| }; |
|
|
| const req = client.request(streamHeaders); |
| const connectParser = USE_CONNECT ? new StreamingFrameParser() : null; |
|
|
| req.on('data', (chunk) => { |
| if (settled) return; |
|
|
| if (USE_CONNECT) { |
| try { |
| connectParser.push(chunk); |
| for (const frame of connectParser.drain()) { |
| if (frame.isEndStream) { |
| try { |
| const t = JSON.parse(frame.payload.toString()); |
| if (t.error) { |
| settled = true; |
| clearTimeout(timer); |
| try { req.close?.(http2.constants.NGHTTP2_CANCEL); } catch {} |
| onError?.(new Error(t.error.message || 'connect stream error')); |
| return; |
| } |
| } catch {} |
| } else { |
| onData?.(frame.payload); |
| } |
| } |
| } catch (err) { |
| settled = true; |
| clearTimeout(timer); |
| try { req.close?.(http2.constants.NGHTTP2_CANCEL); } catch {} |
| onError?.(err); |
| } |
| return; |
| } |
|
|
| pendingBuf = Buffer.concat([pendingBuf, chunk]); |
| if (pendingBuf.length > 100 * 1024 * 1024) { |
| settled = true; |
| clearTimeout(timer); |
| try { req.close?.(http2.constants.NGHTTP2_CANCEL); } catch {} |
| onError?.(new Error('gRPC frame too large (>100MB)')); |
| return; |
| } |
|
|
| while (pendingBuf.length >= 5) { |
| const compressed = pendingBuf[0]; |
| const msgLen = pendingBuf.readUInt32BE(1); |
| if (pendingBuf.length < 5 + msgLen) break; |
|
|
| if (compressed === 0) { |
| const payload = pendingBuf.subarray(5, 5 + msgLen); |
| onData?.(payload); |
| } |
| pendingBuf = pendingBuf.subarray(5 + msgLen); |
| } |
| }); |
|
|
| let grpcStatus = '0', grpcMessage = ''; |
|
|
| req.on('trailers', (trailers) => { |
| grpcStatus = String(trailers['grpc-status'] ?? '0'); |
| grpcMessage = String(trailers['grpc-message'] ?? ''); |
| }); |
|
|
| req.on('end', () => { |
| clearTimeout(timer); |
| if (settled) return; |
| settled = true; |
| if (!USE_CONNECT && grpcStatus !== '0') { |
| const msg = grpcMessage ? decodeURIComponent(grpcMessage) : `gRPC status ${grpcStatus}`; |
| onError?.(new Error(msg)); |
| } else { |
| onEnd?.(); |
| } |
| }); |
|
|
| req.on('error', (err) => { |
| clearTimeout(timer); |
| if (settled) return; |
| settled = true; |
| onError?.(err); |
| }); |
|
|
| req.write(body); |
| req.end(); |
| } |
|
|