From a68ee7a47d76f44ca13850dd0fe13d8faaae9481 Mon Sep 17 00:00:00 2001 From: Sandro Circi Date: Fri, 22 Aug 2025 13:42:00 +0200 Subject: [PATCH 01/15] feat(adapters): initial `uws` internal implementation --- package.json | 4 + pnpm-lock.yaml | 11 ++ src/adapters/_uws/headers.ts | 256 +++++++++++++++++++++++++ src/adapters/_uws/index.ts | 3 + src/adapters/_uws/request.ts | 216 +++++++++++++++++++++ src/adapters/_uws/response.ts | 347 ++++++++++++++++++++++++++++++++++ src/types.ts | 27 ++- 7 files changed, 863 insertions(+), 1 deletion(-) create mode 100644 src/adapters/_uws/headers.ts create mode 100644 src/adapters/_uws/index.ts create mode 100644 src/adapters/_uws/request.ts create mode 100644 src/adapters/_uws/response.ts diff --git a/package.json b/package.json index 4afe036..396da75 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,7 @@ "./cloudflare": "./dist/adapters/cloudflare.mjs", "./generic": "./dist/adapters/generic.mjs", "./service-worker": "./dist/adapters/service-worker.mjs", + "./uws": "./dist/adapters/uws.mjs", "./cli": "./dist/cli.mjs", "./static": "./dist/static.mjs", "./log": "./dist/log.mjs", @@ -86,6 +87,9 @@ "undici": "^7.13.0", "vitest": "^3.2.4" }, + "optionalDependencies": { + "uWebSockets.js": "uNetworking/uWebSockets.js#v20.52.0" + }, "packageManager": "pnpm@10.12.4", "engines": { "node": ">=20.16.0" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b7be11b..dc9c3b2 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -93,6 +93,10 @@ importers: vitest: specifier: ^3.2.4 version: 3.2.4(@types/node@24.2.0)(jiti@2.5.1) + optionalDependencies: + uWebSockets.js: + specifier: uNetworking/uWebSockets.js#v20.52.0 + version: https://codeload.github.com/uNetworking/uWebSockets.js/tar.gz/cfc9a40d8132a34881813cec3d5f8e3a185b3ce3 examples/elysia: devDependencies: @@ -2506,6 +2510,10 @@ packages: engines: {node: '>=14.17'} hasBin: true + uWebSockets.js@https://codeload.github.com/uNetworking/uWebSockets.js/tar.gz/cfc9a40d8132a34881813cec3d5f8e3a185b3ce3: + resolution: {tarball: https://codeload.github.com/uNetworking/uWebSockets.js/tar.gz/cfc9a40d8132a34881813cec3d5f8e3a185b3ce3} + version: 20.52.0 + ufo@1.6.1: resolution: {integrity: sha512-9a4/uxlTWJ4+a5i0ooc1rU7C7YOw3wT+UGqdeNNHWnOF9qcMBgLRS+4IYUqbczewFx4mLEig6gawh7X6mFlEkA==} @@ -4921,6 +4929,9 @@ snapshots: typescript@5.9.2: {} + uWebSockets.js@https://codeload.github.com/uNetworking/uWebSockets.js/tar.gz/cfc9a40d8132a34881813cec3d5f8e3a185b3ce3: + optional: true + ufo@1.6.1: {} uint8array-extras@1.4.0: {} diff --git a/src/adapters/_uws/headers.ts b/src/adapters/_uws/headers.ts new file mode 100644 index 0000000..007f1a5 --- /dev/null +++ b/src/adapters/_uws/headers.ts @@ -0,0 +1,256 @@ +import { splitSetCookieString } from "cookie-es"; +import { kNodeInspect } from "../_node/_common.ts"; + +import type { UWSServerRequest, UWSServerResponse } from "../../types.ts"; + +export const UWSRequestHeaders: { + new (req: UWSServerRequest): globalThis.Headers; +} = /* @__PURE__ */ (() => { + const _Headers = class Headers implements globalThis.Headers { + _req: UWSServerRequest; + + constructor(req: UWSServerRequest) { + this._req = req; + } + + append(_name: string, _value: string): void { + throw new Error("UWSRequestHeaders are immutable."); + } + + delete(_name: string): void { + throw new Error("UWSRequestHeaders are immutable."); + } + + get(name: string): string | null { + const value = this._req.getHeader(validateHeader(name)); + return value === "" ? null : value; + } + + getSetCookie(): string[] { + const setCookie = this.get("set-cookie"); + if (!setCookie) { + return []; + } + return splitSetCookieString(setCookie); + } + + has(name: string): boolean { + return this.get(validateHeader(name)) !== null; + } + + set(_name: string, _value: string): void { + throw new Error("UWSRequestHeaders are immutable."); + } + + get count(): number { + // Bun-specific addon + throw new Error("Method not implemented."); + } + + getAll(name: "set-cookie" | "Set-Cookie"): string[] { + const lowerName = name.toLowerCase(); + const val = this._req.getHeader(lowerName); + if (lowerName === "set-cookie") { + return val ? splitSetCookieString(val) : []; + } + return val === "" ? [] : val.split(", "); + } + + toJSON(): Record { + const result: Record = {}; + this._req["forEach"]((key, value) => { + result[key] = value; + }); + return result; + } + + forEach( + cb: (value: string, key: string, parent: Headers) => void, + thisArg?: object, + ): void { + this._req["forEach"]((key, value) => { + cb.call(thisArg, value, key, this); + }); + } + + *entries(): HeadersIterator<[string, string]> { + const entries: [string, string][] = []; + this._req["forEach"]((k, v) => { + entries.push([k, v]); + }); + yield* entries; + } + + *keys(): HeadersIterator { + const keys: string[] = []; + this._req["forEach"]((k) => { + keys.push(k); + }); + yield* keys; + } + + *values(): HeadersIterator { + const values: string[] = []; + this._req["forEach"]((_, v) => { + values.push(v); + }); + yield* values; + } + + [Symbol.iterator](): HeadersIterator<[string, string]> { + return this.entries(); + } + + get [Symbol.toStringTag]() { + return "Headers"; + } + + [kNodeInspect]() { + return Object.fromEntries(this.entries()); + } + }; + + Object.setPrototypeOf(_Headers.prototype, globalThis.Headers.prototype); + + return _Headers; +})(); + +export const UWSResponseHeaders: { + new (res: UWSServerResponse): globalThis.Headers; +} = /* @__PURE__ */ (() => { + const _Headers = class Headers implements globalThis.Headers { + _res: UWSServerResponse; + _headers: Record = {}; + + constructor(res: UWSServerResponse) { + this._res = res; + } + + append(name: string, value: string): void { + name = validateHeader(name); + const current = this._headers[name]; + if (current) { + if (Array.isArray(current)) { + current.push(value); + } else { + this._headers[name] = [current, value]; + } + } else { + this._headers[name] = value; + } + this._apply(); + } + + delete(name: string): void { + name = validateHeader(name); + delete this._headers[name]; + this._apply(); + } + + get(name: string): string | null { + const value = this._headers[validateHeader(name)]; + if (value === undefined) { + return null; + } + return Array.isArray(value) ? value.join(", ") : value; + } + + getSetCookie(): string[] { + const setCookie = this._headers["set-cookie"]; + if (!setCookie) { + return []; + } + return Array.isArray(setCookie) ? setCookie : [setCookie]; + } + + has(name: string): boolean { + return this._headers[validateHeader(name)] !== undefined; + } + + set(name: string, value: string): void { + this._headers[validateHeader(name)] = value; + this._apply(); + } + + get count(): number { + // Bun-specific addon + throw new Error("Method not implemented."); + } + + getAll(_name: "set-cookie" | "Set-Cookie"): string[] { + // Bun-specific addon + throw new Error("Method not implemented."); + } + + _apply() { + for (const [key, value] of Object.entries(this._headers)) { + if (Array.isArray(value)) { + // uws allows multiple headers with same name + for (const v of value) { + this._res.writeHeader(key, v); + } + } else { + this._res.writeHeader(key, value); + } + } + } + + toJSON(): Record { + const result: Record = {}; + for (const key in this._headers) { + result[key] = this.get(key)!; + } + return result; + } + + forEach( + cb: (value: string, key: string, parent: Headers) => void, + thisArg?: object, + ): void { + for (const key in this._headers) { + cb.call(thisArg, this.get(key)!, key, this); + } + } + + *entries(): HeadersIterator<[string, string]> { + for (const key in this._headers) { + yield [key, this.get(key)!]; + } + } + + *keys(): HeadersIterator { + for (const key in this._headers) { + yield key; + } + } + + *values(): HeadersIterator { + for (const key in this._headers) { + yield this.get(key)!; + } + } + + [Symbol.iterator](): HeadersIterator<[string, string]> { + return this.entries(); + } + + get [Symbol.toStringTag]() { + return "Headers"; + } + + [kNodeInspect]() { + return this._headers; + } + }; + + Object.setPrototypeOf(_Headers.prototype, globalThis.Headers.prototype); + + return _Headers; +})(); + +function validateHeader(name: string): string { + if (name[0] === ":") { + throw new TypeError(`${JSON.stringify(name)} is an invalid header name.`); + } + return name.toLowerCase(); +} diff --git a/src/adapters/_uws/index.ts b/src/adapters/_uws/index.ts new file mode 100644 index 0000000..4e91f08 --- /dev/null +++ b/src/adapters/_uws/index.ts @@ -0,0 +1,3 @@ +export { UWSRequestHeaders, UWSResponseHeaders } from "./headers.ts"; +export { UWSRequest } from "./request.ts"; +export { UWSResponse } from "./response.ts"; diff --git a/src/adapters/_uws/request.ts b/src/adapters/_uws/request.ts new file mode 100644 index 0000000..2f79ca1 --- /dev/null +++ b/src/adapters/_uws/request.ts @@ -0,0 +1,216 @@ +import { kNodeInspect } from "../_node/_common.ts"; +import { UWSRequestHeaders } from "./headers.ts"; + +import type { + UWSServerRequest, + UWSServerResponse, + ServerRequest, + ServerRuntimeContext, +} from "../../types.ts"; + +export type UWSRequestContext = { + req: UWSServerRequest; + res: UWSServerResponse; +}; + +export const UWSRequest = /* @__PURE__ */ (() => { + const unsupportedGetters = [ + "cache", + "credentials", + "destination", + "integrity", + "keepalive", + "mode", + "redirect", + "referrer", + "referrerPolicy", + ] as const; + + const _Request = class Request + implements Omit + { + #headers?: InstanceType; + #bodyUsed: boolean = false; + #abortSignal?: AbortController; + #bodyBytes?: Promise>; + #blobBody?: Promise; + #formDataBody?: Promise; + #jsonBody?: Promise; + #textBody?: Promise; + #bodyStream?: undefined | ReadableStream>; + + _uws: UWSRequestContext; + runtime: ServerRuntimeContext; + + constructor(uwsCtx: UWSRequestContext) { + this._uws = uwsCtx; + this.runtime = { + name: "uws", + uws: uwsCtx, + }; + this._uws.res.onAborted(() => { + this.#abortSignal?.abort(); + }); + } + + get ip() { + return new TextDecoder().decode(this._uws.res.getRemoteAddressAsText()); + } + + get headers() { + if (!this.#headers) { + this.#headers = new UWSRequestHeaders(this._uws.req); + } + return this.#headers; + } + + clone: any = () => { + return new _Request({ ...this._uws }) as unknown as ServerRequest; + }; + + get url() { + const query = this._uws.req.getQuery(); + return ( + (this._uws.req.getHeader("x-forwarded-proto") === "https" + ? "https://" + : "http://") + + this._uws.req.getHeader("host") + + this._uws.req.getUrl() + + (query ? `?${query}` : "") + ); + } + + get method() { + return this._uws.req.getMethod().toUpperCase(); + } + + get signal() { + if (!this.#abortSignal) { + this.#abortSignal = new AbortController(); + } + return this.#abortSignal.signal; + } + + get bodyUsed() { + return this.#bodyUsed; + } + + get body(): ReadableStream> | null { + if (this.method === "GET" || this.method === "HEAD") { + return null; + } + if (!this.#bodyStream) { + this.#bodyUsed = true; + this.#bodyStream = new ReadableStream({ + start: (controller) => { + this._uws.res.onData((chunk, isLast) => { + controller.enqueue(new Uint8Array(chunk)); + if (isLast) { + controller.close(); + } + }); + }, + }); + } + return this.#bodyStream; + } + + bytes(): Promise> { + if (!this.#bodyBytes) { + const _bodyStream = this.body; + this.#bodyBytes = _bodyStream + ? _readStream(_bodyStream) + : Promise.resolve(new Uint8Array()); + } + return this.#bodyBytes; + } + + arrayBuffer(): Promise { + return this.bytes().then((buff) => buff.buffer); + } + + blob(): Promise { + if (!this.#blobBody) { + this.#blobBody = this.bytes().then((bytes) => { + return new Blob([bytes], { + type: this.headers.get("content-type") || "", + }); + }); + } + return this.#blobBody; + } + + formData(): Promise { + if (!this.#formDataBody) { + this.#formDataBody = new Response(this.body, { + headers: this.headers as unknown as Headers, + }).formData(); + } + return this.#formDataBody; + } + + text(): Promise { + if (!this.#textBody) { + this.#textBody = this.bytes().then((bytes) => { + return new TextDecoder().decode(bytes); + }); + } + return this.#textBody; + } + + json(): Promise { + if (!this.#jsonBody) { + this.#jsonBody = this.text().then((txt) => { + return JSON.parse(txt); + }); + } + return this.#jsonBody; + } + + get [Symbol.toStringTag]() { + return "Request"; + } + + [kNodeInspect]() { + return { + method: this.method, + url: this.url, + headers: this.headers, + }; + } + }; + + for (const key of unsupportedGetters) { + Object.defineProperty(_Request.prototype, key, { + enumerable: true, + configurable: false, + }); + } + + Object.setPrototypeOf(_Request.prototype, globalThis.Request.prototype); + + return _Request; +})() as unknown as { + new (uwsCtx: UWSRequestContext): ServerRequest; +}; + +async function _readStream(stream: ReadableStream) { + const chunks: Uint8Array[] = []; + const reader = stream.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + chunks.push(value); + } + const buffer = new Uint8Array( + chunks.reduce((acc, chunk) => acc + chunk.length, 0), + ); + let offset = 0; + for (const chunk of chunks) { + buffer.set(chunk, offset); + offset += chunk.length; + } + return buffer; +} diff --git a/src/adapters/_uws/response.ts b/src/adapters/_uws/response.ts new file mode 100644 index 0000000..7815c5e --- /dev/null +++ b/src/adapters/_uws/response.ts @@ -0,0 +1,347 @@ +import { splitSetCookieString } from "cookie-es"; +import { Buffer } from "node:buffer"; +import type { Readable as NodeReadable } from "node:stream"; + +export type UWSResponse = InstanceType; + +/** + * Fast Response for uWebSockets.js runtime + * + * It is faster because in most cases it doesn't create a full Response instance. + */ +export const UWSResponse: { + new ( + body?: BodyInit | null, + init?: ResponseInit, + ): globalThis.Response & { + readonly uwsResponse: () => { + status: number; + statusText: string; + headers: [string, string][]; + body: + | string + | Buffer + | Uint8Array + | DataView + | ReadableStream + | NodeReadable + | undefined + | null; + }; + }; +} = /* @__PURE__ */ (() => { + const CONTENT_TYPE = "content-type"; + const JSON_TYPE = "application/json"; + const JSON_HEADER = [[CONTENT_TYPE, JSON_TYPE]] as HeadersInit; + + const _Response = class Response implements globalThis.Response { + #body?: BodyInit | null; + #init?: ResponseInit; + readonly webSocket: any; + + constructor(body?: BodyInit | null, init?: ResponseInit) { + this.#body = body; + this.#init = init; + } + + static json(data: unknown, init?: ResponseInit): Response { + if (init?.headers) { + if (!(init.headers as Record)[CONTENT_TYPE]) { + const initHeaders = new Headers(init.headers); + if (!initHeaders.has(CONTENT_TYPE)) { + initHeaders.set(CONTENT_TYPE, JSON_TYPE); + } + init = { ...init, headers: initHeaders }; + } + } else { + init = init ? { ...init } : {}; + init.headers = JSON_HEADER; + } + return new _Response(JSON.stringify(data), init); + } + + static error(): globalThis.Response { + return globalThis.Response.error(); + } + + static redirect(url: string | URL, status?: number): globalThis.Response { + return globalThis.Response.redirect(url, status); + } + + /** + * Prepare uWebSockets.js response object + */ + uwsResponse() { + const status = this.#init?.status ?? 200; + const statusText = this.#init?.statusText ?? ""; + + const headers: [string, string][] = []; + + const headersInit = this.#init?.headers; + if (this.#headersObj) { + for (const [key, value] of this.#headersObj) { + if (key === "set-cookie") { + for (const setCookie of splitSetCookieString(value)) { + headers.push(["set-cookie", setCookie]); + } + } else { + headers.push([key, value]); + } + } + } else if (headersInit) { + const headerEntries = Array.isArray(headersInit) + ? headersInit + : headersInit.entries + ? (headersInit as Headers).entries() + : Object.entries(headersInit); + for (const [key, value] of headerEntries) { + if (key === "set-cookie") { + for (const setCookie of splitSetCookieString(value)) { + headers.push(["set-cookie", setCookie]); + } + } else { + headers.push([key, value]); + } + } + } + + const bodyInit = this.#body as BodyInit | null | undefined | NodeReadable; + // prettier-ignore + let body: string | Buffer | Uint8Array | DataView | ReadableStream | NodeReadable | undefined | null; + if (bodyInit) { + if (typeof bodyInit === "string") { + body = bodyInit; + } else if (bodyInit instanceof ReadableStream) { + body = bodyInit; + } else if (bodyInit instanceof ArrayBuffer) { + body = Buffer.from(bodyInit); + } else if (bodyInit instanceof Uint8Array) { + body = Buffer.from(bodyInit); + } else if (bodyInit instanceof DataView) { + body = Buffer.from(bodyInit.buffer); + } else if (bodyInit instanceof Blob) { + body = bodyInit.stream(); + if (bodyInit.type) { + headers.push(["content-type", bodyInit.type]); + } + } else if (typeof (bodyInit as NodeReadable).pipe === "function") { + body = bodyInit as NodeReadable; + } else { + const res = new globalThis.Response(bodyInit as BodyInit); + body = res.body as ReadableStream>; + for (const [key, value] of res.headers) { + headers.push([key, value]); + } + } + } + + // Free up memory + this.#body = undefined; + this.#init = undefined; + this.#headersObj = undefined; + this.#responseObj = undefined; + + return { + status, + statusText, + headers, + body, + }; + } + + // ... the rest is for interface compatibility only and usually not to be used ... + + /** Lazy initialized response instance */ + #responseObj?: globalThis.Response; + + /** Lazy initialized headers instance */ + #headersObj?: Headers; + + clone(): globalThis.Response { + if (this.#responseObj) { + return this.#responseObj.clone(); + } + if (this.#headersObj) { + return new _Response(this.#body, { + ...this.#init, + headers: this.#headersObj, + }); + } + return new _Response(this.#body, this.#init); + } + + get #response(): globalThis.Response { + if (!this.#responseObj) { + this.#responseObj = this.#headersObj + ? new globalThis.Response(this.#body, { + ...this.#init, + headers: this.#headersObj, + }) + : new globalThis.Response(this.#body, this.#init); + // Free up memory + this.#body = undefined; + this.#init = undefined; + this.#headersObj = undefined; + } + return this.#responseObj; + } + + get headers(): Headers { + if (this.#responseObj) { + return this.#responseObj.headers; // Reuse instance + } + if (!this.#headersObj) { + this.#headersObj = new Headers(this.#init?.headers); + } + return this.#headersObj; + } + + get ok(): boolean { + if (this.#responseObj) { + return this.#responseObj.ok; + } + const status = this.#init?.status ?? 200; + return status >= 200 && status < 300; + } + + get redirected(): boolean { + if (this.#responseObj) { + return this.#responseObj.redirected; + } + return false; + } + + get status(): number { + if (this.#responseObj) { + return this.#responseObj.status; + } + return this.#init?.status ?? 200; + } + + get statusText(): string { + if (this.#responseObj) { + return this.#responseObj.statusText; + } + return this.#init?.statusText ?? ""; + } + + get type(): ResponseType { + if (this.#responseObj) { + return this.#responseObj.type; + } + return "default"; + } + + get url(): string { + if (this.#responseObj) { + return this.#responseObj.url; + } + return ""; + } + + // --- body --- + + #fastBody( + as: new (...args: any[]) => T, + ): T | null | false { + const bodyInit = this.#body; + if (bodyInit === null || bodyInit === undefined) { + return null; // No body + } + if (bodyInit instanceof as) { + return bodyInit; // Fast path + } + return false; // Not supported + } + + get body(): ReadableStream> | null { + if (this.#responseObj) { + return this.#responseObj.body as ReadableStream< + Uint8Array + >; // Reuse instance + } + const fastBody = this.#fastBody(ReadableStream); + if (fastBody !== false) { + return fastBody as ReadableStream>; // Fast path + } + return this.#response.body as ReadableStream>; // Slow path + } + + get bodyUsed(): boolean { + if (this.#responseObj) { + return this.#responseObj.bodyUsed; + } + return false; + } + + arrayBuffer(): Promise { + if (this.#responseObj) { + return this.#responseObj.arrayBuffer(); // Reuse instance + } + const fastBody = this.#fastBody(ArrayBuffer); + if (fastBody !== false) { + return Promise.resolve(fastBody || new ArrayBuffer(0)); // Fast path + } + return this.#response.arrayBuffer(); // Slow path + } + + blob(): Promise { + if (this.#responseObj) { + return this.#responseObj.blob(); // Reuse instance + } + const fastBody = this.#fastBody(Blob); + if (fastBody !== false) { + return Promise.resolve(fastBody || new Blob()); // Fast path + } + return this.#response.blob(); // Slow path + } + + bytes(): Promise> { + if (this.#responseObj) { + return this.#responseObj.bytes() as Promise>; // Reuse instance + } + const fastBody = this.#fastBody(Uint8Array); + if (fastBody !== false) { + return Promise.resolve(fastBody || new Uint8Array()); // Fast path + } + return this.#response.bytes() as Promise>; // Slow path + } + + formData(): Promise { + if (this.#responseObj) { + return this.#responseObj.formData(); // Reuse instance + } + const fastBody = this.#fastBody(FormData); + if (fastBody !== false) { + // TODO: Content-Type should be one of "multipart/form-data" or "application/x-www-form-urlencoded" + return Promise.resolve(fastBody || new FormData()); // Fast path + } + return this.#response.formData(); // Slow path + } + + text(): Promise { + if (this.#responseObj) { + return this.#responseObj.text(); // Reuse instance + } + const bodyInit = this.#body; + if (bodyInit === null || bodyInit === undefined) { + return Promise.resolve(""); // No body + } + if (typeof bodyInit === "string") { + return Promise.resolve(bodyInit); // Fast path + } + return this.#response.text(); // Slow path + } + + json(): Promise { + if (this.#responseObj) { + return this.#responseObj.json(); // Reuse instance + } + return this.text().then((text) => JSON.parse(text)); + } + }; + + Object.setPrototypeOf(_Response.prototype, globalThis.Response.prototype); + + return _Response as any; +})(); diff --git a/src/types.ts b/src/types.ts index 378ec21..8354eeb 100644 --- a/src/types.ts +++ b/src/types.ts @@ -4,6 +4,7 @@ import type * as NodeHttp2 from "node:http2"; import type * as NodeNet from "node:net"; import type * as Bun from "bun"; import type * as CF from "@cloudflare/workers-types"; +import type * as uws from "uWebSockets.js"; // Utils type MaybePromise = T | Promise; @@ -185,7 +186,8 @@ export interface Server { | "bun" | "cloudflare" | "service-worker" - | "generic"; + | "generic" + | "uws"; /** * Server options @@ -274,6 +276,14 @@ export interface ServerRuntimeContext { server: Bun.Server; }; + /** + * Underlying uWebSockets.js server request context. + */ + uws?: { + req: UWSServerRequest; + res: UWSServerResponse; + }; + /** * Underlying Cloudflare request context. */ @@ -340,3 +350,18 @@ export type NodeHTTPMiddleware = ( ) => unknown | Promise; export type CloudflareFetchHandler = CF.ExportedHandlerFetchHandler; + +export type UWSServerRequest = uws.HttpRequest; + +export type UWSServerResponse = uws.HttpResponse; + +export type UWSHTTPHandler = ( + req: UWSServerRequest, + res: UWSServerResponse, +) => void | Promise; + +export type UWSHTTPMiddleware = ( + req: UWSServerRequest, + res: UWSServerResponse, + next: (error?: Error) => void, +) => unknown | Promise; From b9aaf7055b6d309851f0f5e2bcf3afaa3851992c Mon Sep 17 00:00:00 2001 From: Sandro Circi Date: Fri, 22 Aug 2025 13:49:00 +0200 Subject: [PATCH 02/15] fix(adapters): missing uws AppOptions --- src/types.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/types.ts b/src/types.ts index 8354eeb..8dec72a 100644 --- a/src/types.ts +++ b/src/types.ts @@ -159,6 +159,13 @@ export interface ServerOptions { */ deno?: Deno.ServeOptions; + /** + * uWebSockets.js server options + * + * @docs https://github.com/uNetworking/uWebSockets.js + */ + uws?: uws.AppOptions; + /** * Service worker options */ From 5ff625d97586a173600af97d7b51187ff1451bb3 Mon Sep 17 00:00:00 2001 From: Sandro Circi Date: Fri, 22 Aug 2025 13:55:21 +0200 Subject: [PATCH 03/15] fix(adapters): missing uws server context --- src/types.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/types.ts b/src/types.ts index 8dec72a..1e44edd 100644 --- a/src/types.ts +++ b/src/types.ts @@ -227,6 +227,11 @@ export interface Server { */ readonly deno?: { server?: Deno.HttpServer }; + /** + * uWebSockets.js context. + */ + readonly uws?: { server?: uws.TemplatedApp }; + /** * Server fetch handler */ From 723526e0cec70666a1c0fb1be869b6c6df0c15ed Mon Sep 17 00:00:00 2001 From: Sandro Circi Date: Fri, 22 Aug 2025 14:04:17 +0200 Subject: [PATCH 04/15] feat(adapters): create send utility for uws --- src/adapters/_uws/send.ts | 58 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 src/adapters/_uws/send.ts diff --git a/src/adapters/_uws/send.ts b/src/adapters/_uws/send.ts new file mode 100644 index 0000000..0223328 --- /dev/null +++ b/src/adapters/_uws/send.ts @@ -0,0 +1,58 @@ +import type { UWSServerResponse } from "../../types.ts"; +import type { UWSResponse } from "./response.ts"; + +export async function sendUWSResponse( + res: UWSServerResponse, + webRes: Response | UWSResponse, +): Promise { + if (res.aborted) { + return; + } + + if (!webRes) { + res.cork(() => { + res.writeStatus("500"); + res.end(); + }); + return; + } + + // Fast path for UWSResponse + if ((webRes as UWSResponse).uwsResponse) { + const uwsRes = (webRes as UWSResponse).uwsResponse(); + // UWSResponse body can be a stream, which is not supported by fast path. + if (!(uwsRes.body instanceof ReadableStream)) { + res.cork(() => { + res.writeStatus(`${uwsRes.status} ${uwsRes.statusText}`); + for (const [key, value] of uwsRes.headers) { + res.writeHeader(key, value); + } + if (uwsRes.body) { + res.end(uwsRes.body as string); + } else { + res.end(); + } + }); + return; + } + } + + // Slow path for standard Response or streaming UWSResponse + const body = webRes.body ? await webRes.arrayBuffer() : undefined; + + if (res.aborted) { + return; + } + + res.cork(() => { + res.writeStatus(`${webRes.status} ${webRes.statusText}`); + for (const [key, value] of webRes.headers) { + res.writeHeader(key, value); + } + if (body) { + res.end(body); + } else { + res.end(); + } + }); +} From a6cfdbc785c2d7ab1c25a4f265a9bdbf922d6630 Mon Sep 17 00:00:00 2001 From: Sandro Circi Date: Fri, 22 Aug 2025 14:04:55 +0200 Subject: [PATCH 05/15] up --- src/adapters/_uws/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/adapters/_uws/index.ts b/src/adapters/_uws/index.ts index 4e91f08..58fcbdc 100644 --- a/src/adapters/_uws/index.ts +++ b/src/adapters/_uws/index.ts @@ -1,3 +1,4 @@ export { UWSRequestHeaders, UWSResponseHeaders } from "./headers.ts"; export { UWSRequest } from "./request.ts"; export { UWSResponse } from "./response.ts"; +export { sendUWSResponse } from "./send.ts"; From 85f0098b4c0e08f00c8dfa6519cfea1982c24a59 Mon Sep 17 00:00:00 2001 From: Sandro Circi Date: Fri, 22 Aug 2025 14:04:59 +0200 Subject: [PATCH 06/15] feat(adapters): uws implementation --- src/adapters/uws.ts | 93 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 src/adapters/uws.ts diff --git a/src/adapters/uws.ts b/src/adapters/uws.ts new file mode 100644 index 0000000..4b560da --- /dev/null +++ b/src/adapters/uws.ts @@ -0,0 +1,93 @@ +import type { + FetchHandler, + Server, + ServerOptions, + ServerHandler, + UWSHTTPHandler, +} from "../types.ts"; +import { + createWaitUntil, + fmtURL, + printListening, + resolvePortAndHost, + resolveTLSOptions, +} from "../_utils.ts"; +import { wrapFetch } from "../_middleware.ts"; +import { UWSRequest } from "./_uws/request.ts"; +import { sendUWSResponse } from "./_uws/send.ts"; +import { errorPlugin } from "../_plugins.ts"; + +export { FastURL } from "../_url.ts"; +export { UWSRequest } from "./_uws/request.ts"; +export { UWSRequestHeaders, UWSResponseHeaders } from "./_uws/headers.ts"; +export { UWSResponse, UWSResponse as FastResponse } from "./_uws/response.ts"; +export { sendUWSResponse } from "./_uws/send.ts"; + +export function serve(options: ServerOptions): Server { + return new UWSServer(options); +} + +export function toUWSHandler(fetchHandler: FetchHandler): UWSHTTPHandler { + return (nodeReq, nodeRes) => { + const request = new UWSRequest({ req: nodeReq, res: nodeRes }); + const response = fetchHandler(request); + if (response instanceof Promise) { + response.then((resolved) => sendUWSResponse(nodeRes, resolved)); + } else { + sendUWSResponse(nodeRes, response); + } + }; +} + +class UWSServer implements Server { + readonly runtime = "uws"; + readonly uws: Server["uws"]; + readonly options: Server["options"]; + readonly fetch: ServerHandler; + + #listeningPromise?: Promise; + #isSecure: boolean; + #wait: ReturnType; + + constructor(options: ServerOptions) { + this.options = { ...options, middleware: [...(options.middleware || [])] }; + + for (const plugin of options.plugins || []) { + plugin(this); + } + errorPlugin(this); + + this.fetch = wrapFetch(this); + this.#wait = createWaitUntil(); + + const tls = resolveTLSOptions(this.options); + this.#isSecure = !!(tls?.cert && tls?.key); + } + + serve(): Promise { + if (this.#listeningPromise) { + return this.#listeningPromise.then(() => this); + } + const promise = new Promise((resolve) => { + // TODO: Implement uws server creation and listening + printListening(this.options, this.url); + resolve(); + }); + this.#listeningPromise = promise; + return promise.then(() => this); + } + + get url() { + const { port, hostname } = resolvePortAndHost(this.options); + return fmtURL(hostname, port, this.#isSecure); + } + + ready(): Promise { + return Promise.resolve(this.#listeningPromise).then(() => this); + } + + async close(): Promise { + await this.#wait.wait(); + this.uws?.server?.close(); + } +} From 109775c32d45be4a4b2d40bf23eb1fa6b369fc80 Mon Sep 17 00:00:00 2001 From: Sandro Circi Date: Fri, 22 Aug 2025 15:07:53 +0200 Subject: [PATCH 07/15] fix(adapters): toUWSHandler --- src/adapters/uws.ts | 46 ++++++++++++++++++++++++++++++++++++--------- src/types.ts | 2 +- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/src/adapters/uws.ts b/src/adapters/uws.ts index 4b560da..7772f7e 100644 --- a/src/adapters/uws.ts +++ b/src/adapters/uws.ts @@ -17,6 +17,8 @@ import { UWSRequest } from "./_uws/request.ts"; import { sendUWSResponse } from "./_uws/send.ts"; import { errorPlugin } from "../_plugins.ts"; +import type { us_listen_socket } from "uWebSockets.js"; + export { FastURL } from "../_url.ts"; export { UWSRequest } from "./_uws/request.ts"; export { UWSRequestHeaders, UWSResponseHeaders } from "./_uws/headers.ts"; @@ -28,7 +30,7 @@ export function serve(options: ServerOptions): Server { } export function toUWSHandler(fetchHandler: FetchHandler): UWSHTTPHandler { - return (nodeReq, nodeRes) => { + return (nodeRes, nodeReq) => { const request = new UWSRequest({ req: nodeReq, res: nodeRes }); const response = fetchHandler(request); if (response instanceof Promise) { @@ -41,7 +43,7 @@ export function toUWSHandler(fetchHandler: FetchHandler): UWSHTTPHandler { class UWSServer implements Server { readonly runtime = "uws"; - readonly uws: Server["uws"]; + uws: Server["uws"]; readonly options: Server["options"]; readonly fetch: ServerHandler; @@ -68,12 +70,35 @@ class UWSServer implements Server { if (this.#listeningPromise) { return this.#listeningPromise.then(() => this); } - const promise = new Promise((resolve) => { - // TODO: Implement uws server creation and listening - printListening(this.options, this.url); - resolve(); - }); - this.#listeningPromise = promise; + const promise = (async () => { + const uws = await import("uWebSockets.js").catch((error) => { + console.error( + "Please install uWebSockets.js: `npm install uWebSockets.js`", + ); + throw error; + }); + const tls = resolveTLSOptions(this.options); + const app = tls ? uws.SSLApp(tls) : uws.App(); + this.uws = { server: app }; + const handler = toUWSHandler(this.fetch); + app.any("/*", handler); + const { port, hostname } = resolvePortAndHost(this.options); + await new Promise((resolve, reject) => { + app.listen( + hostname || "0.0.0.0", + port, + (listenSocket: us_listen_socket | false) => { + if (listenSocket) { + printListening(this.options, this.url); + resolve(); + } else { + reject(new Error("Failed to listen on port " + port)); + } + }, + ); + }); + })(); + this.#listeningPromise = promise.then(() => {}); return promise.then(() => this); } @@ -88,6 +113,9 @@ class UWSServer implements Server { async close(): Promise { await this.#wait.wait(); - this.uws?.server?.close(); + if (this.uws?.server) { + const { us_listen_socket_close } = await import("uWebSockets.js"); + us_listen_socket_close(this.uws.server); + } } } diff --git a/src/types.ts b/src/types.ts index 1e44edd..f5586a6 100644 --- a/src/types.ts +++ b/src/types.ts @@ -368,8 +368,8 @@ export type UWSServerRequest = uws.HttpRequest; export type UWSServerResponse = uws.HttpResponse; export type UWSHTTPHandler = ( - req: UWSServerRequest, res: UWSServerResponse, + req: UWSServerRequest, ) => void | Promise; export type UWSHTTPMiddleware = ( From ff9530fe6fb2aa032ca67de8d5dbfb4999696707 Mon Sep 17 00:00:00 2001 From: Sandro Circi Date: Fri, 22 Aug 2025 16:23:30 +0200 Subject: [PATCH 08/15] fix: build --- build.config.mjs | 1 + 1 file changed, 1 insertion(+) diff --git a/build.config.mjs b/build.config.mjs index d2670f8..6025b26 100644 --- a/build.config.mjs +++ b/build.config.mjs @@ -19,6 +19,7 @@ export default defineBuildConfig({ "cloudflare", "generic", "service-worker", + "uws", ].map((adapter) => `src/adapters/${adapter}.ts`), ], rolldown: { From bbee4ea933a3322f992b7670946a88e7a7ae3bd7 Mon Sep 17 00:00:00 2001 From: Sandro Circi Date: Fri, 22 Aug 2025 16:40:37 +0200 Subject: [PATCH 09/15] fix(adapter): uws implementation --- src/adapters/uws.ts | 69 ++++++++++++++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 23 deletions(-) diff --git a/src/adapters/uws.ts b/src/adapters/uws.ts index 7772f7e..e62c53c 100644 --- a/src/adapters/uws.ts +++ b/src/adapters/uws.ts @@ -10,7 +10,6 @@ import { fmtURL, printListening, resolvePortAndHost, - resolveTLSOptions, } from "../_utils.ts"; import { wrapFetch } from "../_middleware.ts"; import { UWSRequest } from "./_uws/request.ts"; @@ -43,13 +42,14 @@ export function toUWSHandler(fetchHandler: FetchHandler): UWSHTTPHandler { class UWSServer implements Server { readonly runtime = "uws"; - uws: Server["uws"]; + readonly uws: Server["uws"] = {}; readonly options: Server["options"]; readonly fetch: ServerHandler; - #listeningPromise?: Promise; - #isSecure: boolean; #wait: ReturnType; + #listeningPromise?: Promise; + #listeningInfo?: { hostname?: string; port: number }; + #listenSocket?: us_listen_socket; constructor(options: ServerOptions) { this.options = { ...options, middleware: [...(options.middleware || [])] }; @@ -62,33 +62,44 @@ class UWSServer implements Server { this.fetch = wrapFetch(this); this.#wait = createWaitUntil(); - const tls = resolveTLSOptions(this.options); - this.#isSecure = !!(tls?.cert && tls?.key); + if (!options.manual) { + this.serve(); + } } serve(): Promise { - if (this.#listeningPromise) { - return this.#listeningPromise.then(() => this); + if (this.uws?.server) { + return Promise.resolve(this.#listeningPromise).then(() => this); } - const promise = (async () => { + this.#listeningPromise = (async () => { const uws = await import("uWebSockets.js").catch((error) => { console.error( "Please install uWebSockets.js: `npm install uWebSockets.js`", ); throw error; }); - const tls = resolveTLSOptions(this.options); - const app = tls ? uws.SSLApp(tls) : uws.App(); - this.uws = { server: app }; + this.uws!.server = + this.options.uws && + "cert_file_name" in this.options.uws && + this.options.uws.cert_file_name && + "key_file_name" in this.options.uws && + this.options.uws.key_file_name + ? uws.SSLApp(this.options.uws) + : uws.App(this.options.uws); const handler = toUWSHandler(this.fetch); - app.any("/*", handler); - const { port, hostname } = resolvePortAndHost(this.options); + this.uws!.server.any("/*", handler); + const { port } = resolvePortAndHost(this.options); await new Promise((resolve, reject) => { - app.listen( - hostname || "0.0.0.0", + this.uws!.server!.listen( port, (listenSocket: us_listen_socket | false) => { if (listenSocket) { + this.#listenSocket = listenSocket; + const { port, hostname } = resolvePortAndHost({ + ...this.options, + port: uws.us_socket_local_port(listenSocket), + }); + this.#listeningInfo = { hostname, port }; printListening(this.options, this.url); resolve(); } else { @@ -98,13 +109,23 @@ class UWSServer implements Server { ); }); })(); - this.#listeningPromise = promise.then(() => {}); - return promise.then(() => this); + return this.#listeningPromise.then(() => this); } - get url() { - const { port, hostname } = resolvePortAndHost(this.options); - return fmtURL(hostname, port, this.#isSecure); + get url(): string | undefined { + return this.#listeningInfo + ? fmtURL( + this.#listeningInfo.hostname, + this.#listeningInfo.port, + !!( + this.options.uws && + "cert_file_name" in this.options.uws && + this.options.uws.cert_file_name && + "key_file_name" in this.options.uws && + this.options.uws.key_file_name + ), + ) + : undefined; } ready(): Promise { @@ -113,9 +134,11 @@ class UWSServer implements Server { async close(): Promise { await this.#wait.wait(); - if (this.uws?.server) { + if (this.uws?.server && this.#listenSocket) { const { us_listen_socket_close } = await import("uWebSockets.js"); - us_listen_socket_close(this.uws.server); + us_listen_socket_close(this.#listenSocket); + this.uws.server.close(); + this.#listenSocket = undefined; } } } From b4a3717d50add65ef9d13028534c065bba1ac26b Mon Sep 17 00:00:00 2001 From: Sandro Circi Date: Fri, 22 Aug 2025 16:52:30 +0200 Subject: [PATCH 10/15] up --- src/adapters/uws.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/adapters/uws.ts b/src/adapters/uws.ts index e62c53c..30ae222 100644 --- a/src/adapters/uws.ts +++ b/src/adapters/uws.ts @@ -53,6 +53,7 @@ class UWSServer implements Server { constructor(options: ServerOptions) { this.options = { ...options, middleware: [...(options.middleware || [])] }; + this.options.uws ??= {}; for (const plugin of options.plugins || []) { plugin(this); From 536b859023e476e0444db89625e6b8a7f7621c73 Mon Sep 17 00:00:00 2001 From: Sandro Circi Date: Fri, 22 Aug 2025 16:52:41 +0200 Subject: [PATCH 11/15] chore: specify github source --- package.json | 2 +- pnpm-lock.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index 396da75..e8d30a6 100644 --- a/package.json +++ b/package.json @@ -88,7 +88,7 @@ "vitest": "^3.2.4" }, "optionalDependencies": { - "uWebSockets.js": "uNetworking/uWebSockets.js#v20.52.0" + "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.52.0" }, "packageManager": "pnpm@10.12.4", "engines": { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index dc9c3b2..8f74aec 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -95,7 +95,7 @@ importers: version: 3.2.4(@types/node@24.2.0)(jiti@2.5.1) optionalDependencies: uWebSockets.js: - specifier: uNetworking/uWebSockets.js#v20.52.0 + specifier: github:uNetworking/uWebSockets.js#v20.52.0 version: https://codeload.github.com/uNetworking/uWebSockets.js/tar.gz/cfc9a40d8132a34881813cec3d5f8e3a185b3ce3 examples/elysia: From 634a2c07c6b1a05f30969bf501c00d783f2efaf9 Mon Sep 17 00:00:00 2001 From: Sandro Circi Date: Fri, 22 Aug 2025 17:36:11 +0200 Subject: [PATCH 12/15] fix(uws): body streaming, abortion tracking and ip normalization --- src/adapters/_uws/_common.ts | 2 + src/adapters/_uws/request.ts | 42 ++++++- src/adapters/_uws/send.ts | 236 ++++++++++++++++++++++++++++++----- 3 files changed, 244 insertions(+), 36 deletions(-) create mode 100644 src/adapters/_uws/_common.ts diff --git a/src/adapters/_uws/_common.ts b/src/adapters/_uws/_common.ts new file mode 100644 index 0000000..2a98b36 --- /dev/null +++ b/src/adapters/_uws/_common.ts @@ -0,0 +1,2 @@ +export { kNodeInspect } from "../_node/_common.ts"; +export const kUWSAbort: symbol = /* @__PURE__ */ Symbol.for("srvx.uws.abort"); diff --git a/src/adapters/_uws/request.ts b/src/adapters/_uws/request.ts index 2f79ca1..172eb3b 100644 --- a/src/adapters/_uws/request.ts +++ b/src/adapters/_uws/request.ts @@ -1,4 +1,4 @@ -import { kNodeInspect } from "../_node/_common.ts"; +import { kNodeInspect, kUWSAbort } from "./_common.ts"; import { UWSRequestHeaders } from "./headers.ts"; import type { @@ -54,7 +54,10 @@ export const UWSRequest = /* @__PURE__ */ (() => { } get ip() { - return new TextDecoder().decode(this._uws.res.getRemoteAddressAsText()); + const txt = new TextDecoder().decode( + this._uws.res.getRemoteAddressAsText(), + ); + return normalizeIp(txt); } get headers() { @@ -64,9 +67,9 @@ export const UWSRequest = /* @__PURE__ */ (() => { return this.#headers; } - clone: any = () => { - return new _Request({ ...this._uws }) as unknown as ServerRequest; - }; + clone(): any { + return new _Request({ ...this._uws }); + } get url() { const query = this._uws.req.getQuery(); @@ -87,6 +90,9 @@ export const UWSRequest = /* @__PURE__ */ (() => { get signal() { if (!this.#abortSignal) { this.#abortSignal = new AbortController(); + // Allow response pipeline to notify abort + (this._uws.res as unknown as Record void>)[kUWSAbort] = + () => this.#abortSignal?.abort(); } return this.#abortSignal.signal; } @@ -214,3 +220,29 @@ async function _readStream(stream: ReadableStream) { } return buffer; } + +function normalizeIp(txt: string): string { + // Normalize common IPv6 loopback and IPv4-mapped forms + // Examples returned by uWS: "::1" or "0000:...:ffff:7f00:0001" + const lower = txt.toLowerCase(); + if (lower === "::1") return "::1"; + const ffffIdx = lower.lastIndexOf("ffff:"); + if (ffffIdx !== -1) { + // IPv4-mapped IPv6; parse the last two hextets into IPv4 + const parts = lower.split(":"); + const a = parts.at(-2); + const b = parts.at(-1); + if (a && b) { + const ah = Number.parseInt(a, 16); + const bh = Number.parseInt(b, 16); + if (Number.isFinite(ah) && Number.isFinite(bh)) { + const b1 = (ah >> 8) & 0xff; + const b2 = ah & 0xff; + const b3 = (bh >> 8) & 0xff; + const b4 = bh & 0xff; + return `${b1}.${b2}.${b3}.${b4}`; + } + } + } + return txt; +} diff --git a/src/adapters/_uws/send.ts b/src/adapters/_uws/send.ts index 0223328..37bcaff 100644 --- a/src/adapters/_uws/send.ts +++ b/src/adapters/_uws/send.ts @@ -1,5 +1,136 @@ import type { UWSServerResponse } from "../../types.ts"; import type { UWSResponse } from "./response.ts"; +import { kUWSAbort } from "./_common.ts"; + +function isReadableStream(v: unknown): v is ReadableStream { + return typeof ReadableStream !== "undefined" && v instanceof ReadableStream; +} + +function isNodeReadable(v: unknown): v is NodeJS.ReadableStream { + const obj = v as { pipe?: unknown; getReader?: unknown } | null | undefined; + return ( + !!obj && + typeof obj.pipe === "function" && + typeof obj.getReader !== "function" + ); +} + +function hasUwsResponse(v: unknown): v is UWSResponse { + const obj = v as { uwsResponse?: unknown } | null | undefined; + return !!obj && typeof obj.uwsResponse === "function"; +} + +function writeStatusAndHeaders( + res: UWSServerResponse, + status: number, + statusText: string, + headers: Iterable<[string, string]>, +) { + res.cork(() => { + res.writeStatus(`${status} ${statusText || ""}`); + for (const [key, value] of headers) { + res.writeHeader(key, value); + } + }); +} + +async function streamWebReadable( + res: UWSServerResponse, + stream: ReadableStream, +) { + let aborted = false; + res.onAborted(() => { + aborted = true; + try { + // Cancel the readable stream on abort + stream.cancel?.().catch?.(() => { + /* ignore */ + }); + } catch { + /* ignore */ + } + // Propagate to request.signal if available + try { + (res as unknown as Record void>)[kUWSAbort]?.(); + } catch { + /* ignore */ + } + }); + const reader = stream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done || aborted) break; + if (value && value.length > 0) { + // Best-effort backpressure handling; small chunks in tests won't saturate. + res.write(value); + } + } + } finally { + if (!aborted) { + // End only if not aborted + res.end(); + } + } +} + +async function streamNodeReadable( + res: UWSServerResponse, + nodeStream: NodeJS.ReadableStream, +) { + let aborted = false; + const onAborted = () => { + aborted = true; + try { + (nodeStream as unknown as { destroy?: () => void }).destroy?.(); + } catch { + /* ignore */ + } + try { + (res as unknown as Record void>)[kUWSAbort]?.(); + } catch { + /* ignore */ + } + }; + res.onAborted(onAborted); + + await new Promise((resolve) => { + const onData = (chunk: unknown) => { + if (aborted) return; + // Ensure Uint8Array or string per uWS API + if (typeof chunk === "string") { + res.write(chunk); + } else if (chunk instanceof Uint8Array) { + res.write(chunk); + } else if (ArrayBuffer.isView(chunk)) { + const view = chunk as ArrayBufferView; + res.write( + new Uint8Array(view.buffer, view.byteOffset, view.byteLength), + ); + } else { + // Fallback stringify + res.write(String(chunk)); + } + }; + const onEnd = () => { + nodeStream.off("data", onData); + nodeStream.off("end", onEnd); + nodeStream.off("error", onError); + if (!aborted) res.end(); + resolve(); + }; + const onError = () => { + nodeStream.off("data", onData); + nodeStream.off("end", onEnd); + nodeStream.off("error", onError); + if (!aborted) res.end(); + resolve(); + }; + nodeStream.on("data", onData); + nodeStream.once("end", onEnd); + nodeStream.once("error", onError); + }); +} export async function sendUWSResponse( res: UWSServerResponse, @@ -17,42 +148,85 @@ export async function sendUWSResponse( return; } - // Fast path for UWSResponse - if ((webRes as UWSResponse).uwsResponse) { - const uwsRes = (webRes as UWSResponse).uwsResponse(); - // UWSResponse body can be a stream, which is not supported by fast path. - if (!(uwsRes.body instanceof ReadableStream)) { - res.cork(() => { - res.writeStatus(`${uwsRes.status} ${uwsRes.statusText}`); - for (const [key, value] of uwsRes.headers) { - res.writeHeader(key, value); - } - if (uwsRes.body) { - res.end(uwsRes.body as string); - } else { - res.end(); - } - }); + // If this is a fast UWSResponse, fully handle based on the extracted data. + const maybeFast = webRes as unknown; + if (hasUwsResponse(maybeFast)) { + const fast = (maybeFast as UWSResponse).uwsResponse(); + const { status, statusText, headers } = fast; + const body = fast.body as + | string + | Uint8Array + | ArrayBuffer + | DataView + | ReadableStream + | NodeJS.ReadableStream + | null + | undefined; + + // Streaming bodies + if (isReadableStream(body)) { + writeStatusAndHeaders(res, status, statusText, headers); + await streamWebReadable(res, body); + return; + } + if (isNodeReadable(body)) { + writeStatusAndHeaders(res, status, statusText, headers); + await streamNodeReadable(res, body); return; } - } - // Slow path for standard Response or streaming UWSResponse - const body = webRes.body ? await webRes.arrayBuffer() : undefined; + // Non-streaming bodies + writeStatusAndHeaders(res, status, statusText, headers); + if (body === null || body === undefined) { + res.end(); + return; + } + if (typeof body === "string") { + res.end(body); + return; + } + if (body instanceof ArrayBuffer) { + res.end(body); + return; + } + if (body instanceof Uint8Array) { + res.end(body); + return; + } + if (body instanceof DataView) { + res.end(new Uint8Array(body.buffer, body.byteOffset, body.byteLength)); + return; + } + // Fallback + res.end(String(body)); + return; + } - if (res.aborted) { + // Standard Response + const body = (webRes as Response).body; + if (isReadableStream(body)) { + writeStatusAndHeaders( + res, + webRes.status, + webRes.statusText, + (webRes.headers as unknown as Headers).entries(), + ); + await streamWebReadable(res, body as ReadableStream); return; } - res.cork(() => { - res.writeStatus(`${webRes.status} ${webRes.statusText}`); - for (const [key, value] of webRes.headers) { - res.writeHeader(key, value); - } - if (body) { - res.end(body); - } else { - res.end(); - } - }); + // Buffer small/finite bodies + const ab = body ? await (webRes as Response).arrayBuffer() : undefined; + if (res.aborted) return; + writeStatusAndHeaders( + res, + webRes.status, + webRes.statusText, + (webRes.headers as unknown as Headers).entries(), + ); + if (ab) { + res.end(ab); + } else { + res.end(); + } } From 018b9d1e653df743e277bc69f815cbda4157d26c Mon Sep 17 00:00:00 2001 From: Sandro Circi Date: Fri, 22 Aug 2025 17:36:19 +0200 Subject: [PATCH 13/15] test(uws): add tests --- test/uws.test.ts | 84 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 test/uws.test.ts diff --git a/test/uws.test.ts b/test/uws.test.ts new file mode 100644 index 0000000..46aadea --- /dev/null +++ b/test/uws.test.ts @@ -0,0 +1,84 @@ +import { describe, beforeAll, afterAll } from "vitest"; +import { fetch, Agent } from "undici"; +import type { RequestInfo, RequestInit } from "undici"; +import { addTests } from "./_tests.ts"; +import { serve, FastResponse } from "../src/adapters/uws.ts"; +import { getTLSCert } from "./_utils.ts"; +import { fixture } from "./_fixture.ts"; + +const tls = await getTLSCert(); + +const testConfigs = [ + { + name: "http", + Response: globalThis.Response, + }, + { + name: "http, FastResponse", + Response: FastResponse, + }, + { + name: "https", + Response: globalThis.Response, + serveOptions: { tls }, + }, + { + name: "https, FastResponse", + Response: FastResponse, + serveOptions: { tls }, + }, +]; + +for (const config of testConfigs) { + describe.sequential(`uws (${config.name})`, () => { + const client = getHttpClient(config.serveOptions?.tls); + let server: ReturnType | undefined; + + beforeAll(async () => { + server = serve( + fixture( + { + port: 0, + ...config.serveOptions, + }, + config.Response as unknown as typeof Response, + ), + ); + await server!.ready(); + }); + + afterAll(async () => { + await client.agent?.close(); + await server!.close(); + }); + + addTests({ + url: (path) => server!.url! + path.slice(1), + runtime: "uws", + fetch: client.fetch, + }); + }); +} + +function getHttpClient(tlsOptions?: { key: string; cert: string }) { + if (!tlsOptions) { + return { + fetch: globalThis.fetch, + agent: undefined, + }; + } + const httpsAgent = new Agent({ connect: { ...tls } }); + const fetchWithHttps = ( + input: RequestInfo, + init?: RequestInit, + ): Promise => + fetch(input, { + ...init, + dispatcher: httpsAgent, + }) as unknown as Promise; + + return { + fetch: fetchWithHttps as unknown as typeof globalThis.fetch, + agent: httpsAgent, + }; +} From 61f6ac58d63fa9fd1740875988c378bc2225819a Mon Sep 17 00:00:00 2001 From: Sandro Circi Date: Fri, 22 Aug 2025 17:58:56 +0200 Subject: [PATCH 14/15] docs: update notes --- src/adapters/_uws/response.ts | 71 +++++++++++++++++++++++------------ 1 file changed, 46 insertions(+), 25 deletions(-) diff --git a/src/adapters/_uws/response.ts b/src/adapters/_uws/response.ts index 7815c5e..598f959 100644 --- a/src/adapters/_uws/response.ts +++ b/src/adapters/_uws/response.ts @@ -149,8 +149,6 @@ export const UWSResponse: { }; } - // ... the rest is for interface compatibility only and usually not to be used ... - /** Lazy initialized response instance */ #responseObj?: globalThis.Response; @@ -246,25 +244,31 @@ export const UWSResponse: { ): T | null | false { const bodyInit = this.#body; if (bodyInit === null || bodyInit === undefined) { - return null; // No body + // No body + return null; } if (bodyInit instanceof as) { - return bodyInit; // Fast path + // Fast path + return bodyInit; } - return false; // Not supported + // Not supported + return false; } get body(): ReadableStream> | null { if (this.#responseObj) { + // Reuse instance return this.#responseObj.body as ReadableStream< Uint8Array - >; // Reuse instance + >; } const fastBody = this.#fastBody(ReadableStream); if (fastBody !== false) { - return fastBody as ReadableStream>; // Fast path + // Fast path + return fastBody as ReadableStream>; } - return this.#response.body as ReadableStream>; // Slow path + // Slow path + return this.#response.body as ReadableStream>; } get bodyUsed(): boolean { @@ -276,66 +280,83 @@ export const UWSResponse: { arrayBuffer(): Promise { if (this.#responseObj) { - return this.#responseObj.arrayBuffer(); // Reuse instance + // Reuse instance + return this.#responseObj.arrayBuffer(); } const fastBody = this.#fastBody(ArrayBuffer); if (fastBody !== false) { - return Promise.resolve(fastBody || new ArrayBuffer(0)); // Fast path + // Fast path + return Promise.resolve(fastBody || new ArrayBuffer(0)); } - return this.#response.arrayBuffer(); // Slow path + // Slow path + return this.#response.arrayBuffer(); } blob(): Promise { if (this.#responseObj) { - return this.#responseObj.blob(); // Reuse instance + // Reuse instance + return this.#responseObj.blob(); } const fastBody = this.#fastBody(Blob); if (fastBody !== false) { - return Promise.resolve(fastBody || new Blob()); // Fast path + // Fast path + return Promise.resolve(fastBody || new Blob()); } - return this.#response.blob(); // Slow path + // Slow path + return this.#response.blob(); } bytes(): Promise> { if (this.#responseObj) { - return this.#responseObj.bytes() as Promise>; // Reuse instance + // Reuse instance + return this.#responseObj.bytes() as Promise>; } const fastBody = this.#fastBody(Uint8Array); if (fastBody !== false) { - return Promise.resolve(fastBody || new Uint8Array()); // Fast path + // Fast path + return Promise.resolve(fastBody || new Uint8Array()); } - return this.#response.bytes() as Promise>; // Slow path + // Slow path + return this.#response.bytes() as Promise>; } formData(): Promise { if (this.#responseObj) { - return this.#responseObj.formData(); // Reuse instance + // Reuse instance + return this.#responseObj.formData(); } const fastBody = this.#fastBody(FormData); if (fastBody !== false) { // TODO: Content-Type should be one of "multipart/form-data" or "application/x-www-form-urlencoded" - return Promise.resolve(fastBody || new FormData()); // Fast path + // Fast path + return Promise.resolve(fastBody || new FormData()); } - return this.#response.formData(); // Slow path + // Slow path + return this.#response.formData(); } text(): Promise { if (this.#responseObj) { - return this.#responseObj.text(); // Reuse instance + // Reuse instance + return this.#responseObj.text(); } const bodyInit = this.#body; if (bodyInit === null || bodyInit === undefined) { - return Promise.resolve(""); // No body + // No body + return Promise.resolve(""); } if (typeof bodyInit === "string") { - return Promise.resolve(bodyInit); // Fast path + // Fast path + return Promise.resolve(bodyInit); } - return this.#response.text(); // Slow path + // Slow path + return this.#response.text(); } json(): Promise { if (this.#responseObj) { - return this.#responseObj.json(); // Reuse instance + // Reuse instance + return this.#responseObj.json(); } return this.text().then((text) => JSON.parse(text)); } From d4a5698b65b1f2ee93fcd4ddb3cdbb6b365e07f2 Mon Sep 17 00:00:00 2001 From: Sandro Circi Date: Fri, 22 Aug 2025 18:20:22 +0200 Subject: [PATCH 15/15] fix(uws): ip normalization --- src/adapters/_uws/request.ts | 53 ++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/src/adapters/_uws/request.ts b/src/adapters/_uws/request.ts index 172eb3b..272d99a 100644 --- a/src/adapters/_uws/request.ts +++ b/src/adapters/_uws/request.ts @@ -222,27 +222,46 @@ async function _readStream(stream: ReadableStream) { } function normalizeIp(txt: string): string { - // Normalize common IPv6 loopback and IPv4-mapped forms - // Examples returned by uWS: "::1" or "0000:...:ffff:7f00:0001" const lower = txt.toLowerCase(); - if (lower === "::1") return "::1"; - const ffffIdx = lower.lastIndexOf("ffff:"); - if (ffffIdx !== -1) { - // IPv4-mapped IPv6; parse the last two hextets into IPv4 - const parts = lower.split(":"); - const a = parts.at(-2); - const b = parts.at(-1); - if (a && b) { - const ah = Number.parseInt(a, 16); - const bh = Number.parseInt(b, 16); - if (Number.isFinite(ah) && Number.isFinite(bh)) { - const b1 = (ah >> 8) & 0xff; - const b2 = ah & 0xff; - const b3 = (bh >> 8) & 0xff; - const b4 = bh & 0xff; + + if (lower === "::1") { + return "::1"; + } + + const parts = lower.split(":"); + + if ( + parts.length === 8 && + Number.parseInt(parts[7], 16) === 1 && + parts.slice(0, 7).every((p) => Number.parseInt(p, 16) === 0) + ) { + return "::1"; + } + + const match = lower.match(/:ffff:([0-9a-f]{1,4}):([0-9a-f]{1,4})$/); + + if (match && typeof match.index === "number") { + const prefix = lower.slice(0, Math.max(0, match.index)); + const prefixHextets = prefix.split(":").filter(Boolean); + const isPrefixAllZeros = prefixHextets.every( + (p) => Number.parseInt(p, 16) === 0, + ); + + if (isPrefixAllZeros) { + const hexA = match[1]; + const hexB = match[2]; + const valA = Number.parseInt(hexA, 16); + const valB = Number.parseInt(hexB, 16); + + if (Number.isFinite(valA) && Number.isFinite(valB)) { + const b1 = (valA >> 8) & 0xff; + const b2 = valA & 0xff; + const b3 = (valB >> 8) & 0xff; + const b4 = valB & 0xff; return `${b1}.${b2}.${b3}.${b4}`; } } } + return txt; }