/* * Copyright (C) 2020 Apple Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ // @internal function isTransformStream(stream) { "use strict"; return @isObject(stream) && !!@getByIdDirectPrivate(stream, "readable"); } function isTransformStreamDefaultController(controller) { "use strict"; return @isObject(controller) && !!@getByIdDirectPrivate(controller, "transformAlgorithm"); } function createTransformStream(startAlgorithm, transformAlgorithm, flushAlgorithm, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm) { if (writableHighWaterMark === @undefined) writableHighWaterMark = 1; if (writableSizeAlgorithm === @undefined) writableSizeAlgorithm = () => 1; if (readableHighWaterMark === @undefined) readableHighWaterMark = 0; if (readableSizeAlgorithm === @undefined) readableSizeAlgorithm = () => 1; @assert(writableHighWaterMark >= 0); @assert(readableHighWaterMark >= 0); const transform = {}; @putByIdDirectPrivate(transform, "TransformStream", true); const stream = new @TransformStream(transform); const startPromiseCapability = @newPromiseCapability(@Promise); @initializeTransformStream(stream, startPromiseCapability.@promise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm); const controller = new @TransformStreamDefaultController(); @setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm); startAlgorithm().@then(() => { startPromiseCapability.@resolve.@call(); }, (error) => { startPromiseCapability.@reject.@call(@undefined, error); }); return stream; } function initializeTransformStream(stream, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm) { "use strict"; const startAlgorithm = () => { return startPromise; }; const writeAlgorithm = (chunk) => { return @transformStreamDefaultSinkWriteAlgorithm(stream, chunk); } const abortAlgorithm = (reason) => { return @transformStreamDefaultSinkAbortAlgorithm(stream, reason); } const closeAlgorithm = () => { return @transformStreamDefaultSinkCloseAlgorithm(stream); } const writable = @createWritableStream(startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm); const pullAlgorithm = () => { return @transformStreamDefaultSourcePullAlgorithm(stream); }; const cancelAlgorithm = (reason) => { @transformStreamErrorWritableAndUnblockWrite(stream, reason); return @Promise.@resolve(); }; const underlyingSource = { }; @putByIdDirectPrivate(underlyingSource, "start", startAlgorithm); @putByIdDirectPrivate(underlyingSource, "pull", pullAlgorithm); @putByIdDirectPrivate(underlyingSource, "cancel", cancelAlgorithm); const options = { }; @putByIdDirectPrivate(options, "size", readableSizeAlgorithm); @putByIdDirectPrivate(options, "highWaterMark", readableHighWaterMark); const readable = new @ReadableStream(underlyingSource, options); @putByIdDirectPrivate(stream, "writable", writable); @putByIdDirectPrivate(stream, "readable", readable); @putByIdDirectPrivate(stream, "backpressure", @undefined); @putByIdDirectPrivate(stream, "backpressureChangePromise", @undefined); @transformStreamSetBackpressure(stream, true); @putByIdDirectPrivate(stream, "controller", @undefined); } function transformStreamError(stream, e) { "use strict"; const readable = @getByIdDirectPrivate(stream, "readable"); const readableController = @getByIdDirectPrivate(readable, "readableStreamController"); @readableStreamDefaultControllerError(readableController, e); @transformStreamErrorWritableAndUnblockWrite(stream, e); } function transformStreamErrorWritableAndUnblockWrite(stream, e) { "use strict"; @transformStreamDefaultControllerClearAlgorithms(@getByIdDirectPrivate(stream, "controller")); const writable = @getByIdDirectPrivate(stream, "writable"); @writableStreamDefaultControllerErrorIfNeeded(@getByIdDirectPrivate(writable, "controller"), e); if (@getByIdDirectPrivate(stream, "backpressure")) @transformStreamSetBackpressure(stream, false); } function transformStreamSetBackpressure(stream, backpressure) { "use strict"; @assert(@getByIdDirectPrivate(stream, "backpressure") !== backpressure); const backpressureChangePromise = @getByIdDirectPrivate(stream, "backpressureChangePromise"); if (backpressureChangePromise !== @undefined) backpressureChangePromise.@resolve.@call(); @putByIdDirectPrivate(stream, "backpressureChangePromise", @newPromiseCapability(@Promise)); @putByIdDirectPrivate(stream, "backpressure", backpressure); } function setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm) { "use strict"; @assert(@isTransformStream(stream)); @assert(@getByIdDirectPrivate(stream, "controller") === @undefined); @putByIdDirectPrivate(controller, "stream", stream); @putByIdDirectPrivate(stream, "controller", controller); @putByIdDirectPrivate(controller, "transformAlgorithm", transformAlgorithm); @putByIdDirectPrivate(controller, "flushAlgorithm", flushAlgorithm); } function setUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict) { "use strict"; const controller = new @TransformStreamDefaultController(); let transformAlgorithm = (chunk) => { try { @transformStreamDefaultControllerEnqueue(controller, chunk); } catch (e) { return @Promise.@reject(e); } return @Promise.@resolve(); }; let flushAlgorithm = () => { return @Promise.@resolve(); }; if ("transform" in transformerDict) transformAlgorithm = (chunk) => { return @promiseInvokeOrNoopMethod(transformer, transformerDict["transform"], [chunk, controller]); }; if ("flush" in transformerDict) { flushAlgorithm = () => { return @promiseInvokeOrNoopMethod(transformer, transformerDict["flush"], [controller]); }; } @setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm); } function transformStreamDefaultControllerClearAlgorithms(controller) { "use strict"; // We set transformAlgorithm to true to allow GC but keep the isTransformStreamDefaultController check. @putByIdDirectPrivate(controller, "transformAlgorithm", true); @putByIdDirectPrivate(controller, "flushAlgorithm", @undefined); } function transformStreamDefaultControllerEnqueue(controller, chunk) { "use strict"; const stream = @getByIdDirectPrivate(controller, "stream"); const readable = @getByIdDirectPrivate(stream, "readable"); const readableController = @getByIdDirectPrivate(readable, "readableStreamController"); @assert(readableController !== @undefined); if (!@readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) @throwTypeError("TransformStream.readable cannot close or enqueue"); try { @readableStreamDefaultControllerEnqueue(readableController, chunk); } catch (e) { @transformStreamErrorWritableAndUnblockWrite(stream, e); throw @getByIdDirectPrivate(readable, "storedError"); } const backpressure = !@readableStreamDefaultControllerShouldCallPull(readableController); if (backpressure !== @getByIdDirectPrivate(stream, "backpressure")) { @assert(backpressure); @transformStreamSetBackpressure(stream, true); } } function transformStreamDefaultControllerError(controller, e) { "use strict"; @transformStreamError(@getByIdDirectPrivate(controller, "stream"), e); } function transformStreamDefaultControllerPerformTransform(controller, chunk) { "use strict"; const promiseCapability = @newPromiseCapability(@Promise); const transformPromise = @getByIdDirectPrivate(controller, "transformAlgorithm").@call(@undefined, chunk); transformPromise.@then(() => { promiseCapability.@resolve(); }, (r) => { @transformStreamError(@getByIdDirectPrivate(controller, "stream"), r); promiseCapability.@reject.@call(@undefined, r); }); return promiseCapability.@promise; } function transformStreamDefaultControllerTerminate(controller) { "use strict"; const stream = @getByIdDirectPrivate(controller, "stream"); const readable = @getByIdDirectPrivate(stream, "readable"); const readableController = @getByIdDirectPrivate(readable, "readableStreamController"); // FIXME: Update readableStreamDefaultControllerClose to make this check. if (@readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) @readableStreamDefaultControllerClose(readableController); const error = @makeTypeError("the stream has been terminated"); @transformStreamErrorWritableAndUnblockWrite(stream, error); } function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) { "use strict"; const writable = @getByIdDirectPrivate(stream, "writable"); @assert(@getByIdDirectPrivate(writable, "state") === "writable"); const controller = @getByIdDirectPrivate(stream, "controller"); if (@getByIdDirectPrivate(stream, "backpressure")) { const promiseCapability = @newPromiseCapability(@Promise); const backpressureChangePromise = @getByIdDirectPrivate(stream, "backpressureChangePromise"); @assert(backpressureChangePromise !== @undefined); backpressureChangePromise.@promise.@then(() => { const state = @getByIdDirectPrivate(writable, "state"); if (state === "erroring") { promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(writable, "storedError")); return; } @assert(state === "writable"); @transformStreamDefaultControllerPerformTransform(controller, chunk).@then(() => { promiseCapability.@resolve(); }, (e) => { promiseCapability.@reject.@call(@undefined, e); }); }, (e) => { promiseCapability.@reject.@call(@undefined, e); }); return promiseCapability.@promise; } return @transformStreamDefaultControllerPerformTransform(controller, chunk); } function transformStreamDefaultSinkAbortAlgorithm(stream, reason) { "use strict"; @transformStreamError(stream, reason); return @Promise.@resolve(); } function transformStreamDefaultSinkCloseAlgorithm(stream) { "use strict"; const readable = @getByIdDirectPrivate(stream, "readable"); const controller = @getByIdDirectPrivate(stream, "controller"); const readableController = @getByIdDirectPrivate(readable, "readableStreamController"); const flushAlgorithm = @getByIdDirectPrivate(controller, "flushAlgorithm"); @assert(flushAlgorithm !== @undefined); const flushPromise = @getByIdDirectPrivate(controller, "flushAlgorithm").@call(); @transformStreamDefaultControllerClearAlgorithms(controller); const promiseCapability = @newPromiseCapability(@Promise); flushPromise.@then(() => { if (@getByIdDirectPrivate(readable, "state") === @streamErrored) { promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(readable, "storedError")); return; } // FIXME: Update readableStreamDefaultControllerClose to make this check. if (@readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) @readableStreamDefaultControllerClose(readableController); promiseCapability.@resolve(); }, (r) => { @transformStreamError(@getByIdDirectPrivate(controller, "stream"), r); promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(readable, "storedError")); }); return promiseCapability.@promise; } function transformStreamDefaultSourcePullAlgorithm(stream) { "use strict"; @assert(@getByIdDirectPrivate(stream, "backpressure")); @assert(@getByIdDirectPrivate(stream, "backpressureChangePromise") !== @undefined); @transformStreamSetBackpressure(stream, false); return @getByIdDirectPrivate(stream, "backpressureChangePromise").@promise; }