WritableStreamInternals.js   [plain text]


/*
 * Copyright (C) 2015 Canon Inc.
 * Copyright (C) 2015 Igalia
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL APPLE INC. OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

// @internal

function isWritableStream(stream)
{
    "use strict";

    return @isObject(stream) && !!@getByIdDirectPrivate(stream, "underlyingSink");
}

function isWritableStreamDefaultWriter(writer)
{
    "use strict";

    return @isObject(writer) && !!@getByIdDirectPrivate(writer, "closedPromise");
}

function acquireWritableStreamDefaultWriter(stream)
{
    return new @WritableStreamDefaultWriter(stream);
}

function isWritableStreamLocked(stream)
{
    return @getByIdDirectPrivate(stream, "writer") !== @undefined;
}

function setUpWritableStreamDefaultWriter(writer, stream)
{
    if (@isWritableStreamLocked(stream))
        @throwTypeError("WritableStream is locked");

    @putByIdDirectPrivate(writer, "stream", stream);
    @putByIdDirectPrivate(stream, "writer", writer);

    const readyPromiseCapability = @newPromiseCapability(@Promise);
    const closedPromiseCapability = @newPromiseCapability(@Promise);
    @putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability);
    @putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability);

    const state = @getByIdDirectPrivate(stream, "state");
    if (state === "writable") {
        if (@writableStreamCloseQueuedOrInFlight(stream) || !@getByIdDirectPrivate(stream, "backpressure"))
            readyPromiseCapability.@resolve.@call();
    } else if (state === "erroring") {
        readyPromiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(stream, "storedError"));
        @markPromiseAsHandled(readyPromiseCapability.@promise);
    } else if (state === "closed") {
        readyPromiseCapability.@resolve.@call();
        closedPromiseCapability.@resolve.@call();
    } else {
        @assert(state === "errored");
        const storedError = @getByIdDirectPrivate(stream, "storedError");
        readyPromiseCapability.@reject.@call(@undefined, storedError);
        @markPromiseAsHandled(readyPromiseCapability.@promise);
        closedPromiseCapability.@reject.@call(@undefined, storedError);
        @markPromiseAsHandled(closedPromiseCapability.@promise);
    }
}

function writableStreamAbort(stream, reason)
{
    const state = @getByIdDirectPrivate(stream, "state");
    if (state === "closed" || state === "errored")
        return @Promise.@resolve();

    const pendingAbortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest");
    if (pendingAbortRequest !== @undefined)
        return pendingAbortRequest.promise.@promise;

    @assert(state === "writable" || state === "erroring");
    let wasAlreadyErroring = false;
    if (state === "erroring") {
        wasAlreadyErroring = true;
        reason = @undefined;
    }

    const abortPromiseCapability = @newPromiseCapability(@Promise);
    @putByIdDirectPrivate(stream, "pendingAbortRequest", { promise : abortPromiseCapability, reason : reason, wasAlreadyErroring : wasAlreadyErroring });

    if (!wasAlreadyErroring)
        @writableStreamStartErroring(stream, reason);
    return abortPromiseCapability.@promise;
}

function writableStreamClose(stream)
{
    const state = @getByIdDirectPrivate(stream, "state");
    if (state === "closed" || state === "errored")
        return @Promise.@reject(@makeTypeError("Cannot close a writable stream that is closed or errored"));

    @assert(state === "writable" || state === "erroring");
    @assert(!@writableStreamCloseQueuedOrInFlight(stream));

    const closePromiseCapability = @newPromiseCapability(@Promise);
    @putByIdDirectPrivate(stream, "closeRequest", closePromiseCapability);

    const writer = @getByIdDirectPrivate(stream, "writer");
    if (writer !== @undefined && @getByIdDirectPrivate(stream, "backpressure") && state === "writable")
        @getByIdDirectPrivate(writer, "readyPromise").@resolve.@call();
        
    @writableStreamDefaultControllerClose(@getByIdDirectPrivate(stream, "controller"));

    return closePromiseCapability.@promise;
}

function writableStreamAddWriteRequest(stream)
{
    @assert(@isWritableStreamLocked(stream))
    @assert(@getByIdDirectPrivate(stream, "state") === "writable");

    const writePromiseCapability = @newPromiseCapability(@Promise);
    const writeRequests = @getByIdDirectPrivate(stream, "writeRequests");
    writeRequests.@push(writePromiseCapability);
    return writePromiseCapability.@promise;
}

function writableStreamCloseQueuedOrInFlight(stream)
{
    return @getByIdDirectPrivate(stream, "closeRequest") !== @undefined || @getByIdDirectPrivate(stream, "inFlightCloseRequest") !== @undefined;
}

function writableStreamDealWithRejection(stream, error)
{
    const state = @getByIdDirectPrivate(stream, "state");
    if (state === "writable") {
        @writableStreamStartErroring(stream, error);
        return;
    }

    @assert(state === "erroring");
    @writableStreamFinishErroring(stream);
}

function writableStreamFinishErroring(stream)
{
    @assert(@getByIdDirectPrivate(stream, "state") === "erroring");
    @assert(!@writableStreamHasOperationMarkedInFlight(stream));

    @putByIdDirectPrivate(stream, "state", "errored");

    const controller = @getByIdDirectPrivate(stream, "controller");
    @getByIdDirectPrivate(controller, "errorSteps").@call();

    const storedError = @getByIdDirectPrivate(stream, "storedError");
    const requests = @getByIdDirectPrivate(stream, "writeRequests");
    for (let index = 0, length = requests.length; index < length; ++index)
        requests[index].@reject.@call(@undefined, storedError);

    @putByIdDirectPrivate(stream, "writeRequests", []);

    const abortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest");
    if (abortRequest === @undefined) {
        @writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
        return;
    }

    @putByIdDirectPrivate(stream, "pendingAbortRequest", @undefined);
    if (abortRequest.wasAlreadyErroring) {
        abortRequest.promise.@reject.@call(@undefined, storedError);
        @writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
        return;
    }

    @getByIdDirectPrivate(controller, "abortSteps").@call(@undefined, abortRequest.reason).@then(() => {
        abortRequest.promise.@resolve.@call();
        @writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
    }, (reason) => {
        abortRequest.promise.@reject.@call(@undefined, reason);
        @writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
    });
}

function writableStreamFinishInFlightClose(stream)
{
    const inFlightCloseRequest = @getByIdDirectPrivate(stream, "inFlightCloseRequest");
    inFlightCloseRequest.@resolve.@call();

    @putByIdDirectPrivate(stream, "inFlightCloseRequest", @undefined);

    const state = @getByIdDirectPrivate(stream, "state");
    @assert(state === "writable" || state === "erroring");

    if (state === "erroring") {
        @putByIdDirectPrivate(stream, "storedError", @undefined);
        const abortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest");
        if (abortRequest !== @undefined) {
            abortRequest.promise.@resolve.@call();
            @putByIdDirectPrivate(stream, "pendingAbortRequest", @undefined);
        }
    }

    @putByIdDirectPrivate(stream, "state", "closed");

    const writer = @getByIdDirectPrivate(stream, "writer");
    if (writer !== @undefined)
        @getByIdDirectPrivate(writer, "closedPromise").@resolve.@call();

    @assert(@getByIdDirectPrivate(stream, "pendingAbortRequest") === @undefined);
    @assert(@getByIdDirectPrivate(stream, "storedError") === @undefined);
}

function writableStreamFinishInFlightCloseWithError(stream, error)
{
    const inFlightCloseRequest = @getByIdDirectPrivate(stream, "inFlightCloseRequest");
    @assert(inFlightCloseRequest !== @undefined);
    inFlightCloseRequest.@reject.@call(@undefined, error);

    @putByIdDirectPrivate(stream, "inFlightCloseRequest", @undefined);

    const state = @getByIdDirectPrivate(stream, "state");
    @assert(state === "writable" || state === "erroring");

    const abortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest");
    if (abortRequest !== @undefined) {
        abortRequest.promise.@reject.@call(@undefined, error);
        @putByIdDirectPrivate(stream, "pendingAbortRequest", @undefined);
    }

    @writableStreamDealWithRejection(stream, error);
}

function writableStreamFinishInFlightWrite(stream)
{
    const inFlightWriteRequest = @getByIdDirectPrivate(stream, "inFlightWriteRequest");
    @assert(inFlightWriteRequest !== @undefined);
    inFlightWriteRequest.@resolve.@call();

    @putByIdDirectPrivate(stream, "inFlightWriteRequest", @undefined);
}

function writableStreamFinishInFlightWriteWithError(stream, error)
{
    const inFlightWriteRequest = @getByIdDirectPrivate(stream, "inFlightWriteRequest");
    @assert(inFlightWriteRequest !== @undefined);
    inFlightWriteRequest.@reject.@call(@undefined, error);

    @putByIdDirectPrivate(stream, "inFlightWriteRequest", @undefined);

    const state = @getByIdDirectPrivate(stream, "state");
    @assert(state === "writable" || state === "erroring");

    @writableStreamDealWithRejection(stream, error);
}

function writableStreamHasOperationMarkedInFlight(stream)
{
    return @getByIdDirectPrivate(stream, "inFlightWriteRequest") !== @undefined || @getByIdDirectPrivate(stream, "inFlightCloseRequest") !== @undefined;
}

function writableStreamMarkCloseRequestInFlight(stream)
{
    const closeRequest = @getByIdDirectPrivate(stream, "closeRequest");
    @assert(@getByIdDirectPrivate(stream, "inFlightCloseRequest") === @undefined);
    @assert(closeRequest !== @undefined);

    @putByIdDirectPrivate(stream, "inFlightCloseRequest", closeRequest);
    @putByIdDirectPrivate(stream, "closeRequest", @undefined);
}

function writableStreamMarkFirstWriteRequestInFlight(stream)
{
    const writeRequests = @getByIdDirectPrivate(stream, "writeRequests");
    @assert(@getByIdDirectPrivate(stream, "inFlightWriteRequest") === @undefined);
    @assert(writeRequests.length > 0);

    const writeRequest = writeRequests.@shift();
    @putByIdDirectPrivate(stream, "inFlightWriteRequest", writeRequest);
}

function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream)
{
    @assert(@getByIdDirectPrivate(stream, "state") === "errored");

    const storedError = @getByIdDirectPrivate(stream, "storedError");

    const closeRequest = @getByIdDirectPrivate(stream, "closeRequest");
    if (closeRequest !== @undefined) {
        @assert(@getByIdDirectPrivate(stream, "inFlightCloseRequest") === @undefined);
        closeRequest.@reject.@call(@undefined, storedError);
        @putByIdDirectPrivate(stream, "closeRequest", @undefined);
    }

    const writer = @getByIdDirectPrivate(stream, "writer");
    if (writer !== @undefined) {
        const closedPromise = @getByIdDirectPrivate(writer, "closedPromise");
        closedPromise.@reject.@call(@undefined, storedError);
        @markPromiseAsHandled(closedPromise.@promise);
    }
}

function writableStreamStartErroring(stream, reason)
{
    @assert(@getByIdDirectPrivate(stream, "storedError") === @undefined);
    @assert(@getByIdDirectPrivate(stream, "state") === "writable");
 
    const controller = @getByIdDirectPrivate(stream, "controller");
    @assert(controller !== @undefined);

    @putByIdDirectPrivate(stream, "state", "erroring");
    @putByIdDirectPrivate(stream, "storedError", reason);

    const writer = @getByIdDirectPrivate(stream, "writer");
    if (writer !== @undefined)
        @writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);

    if (!@writableStreamHasOperationMarkedInFlight(stream) && @getByIdDirectPrivate(controller, "started"))
        @writableStreamFinishErroring(stream);
}

function writableStreamUpdateBackpressure(stream, backpressure)
{
    @assert(@getByIdDirectPrivate(stream, "state") === "writable");
    @assert(!@writableStreamCloseQueuedOrInFlight(stream));

    const writer = @getByIdDirectPrivate(stream, "writer");
    if (writer !== @undefined && backpressure !== @getByIdDirectPrivate(stream, "backpressure")) {
        if (backpressure)
           @putByIdDirectPrivate(writer, "readyPromise", @newPromiseCapability(@Promise));
        else
            @getByIdDirectPrivate(writer, "readyPromise").@resolve.@call();
    }
    @putByIdDirectPrivate(stream, "backpressure", backpressure);
}

function writableStreamDefaultWriterAbort(writer, reason)
{
    const stream = @getByIdDirectPrivate(writer, "stream");
    @assert(stream !== @undefined);
    return @writableStreamAbort(stream, reason);
}

function writableStreamDefaultWriterClose(writer)
{
    const stream = @getByIdDirectPrivate(writer, "stream");
    @assert(stream !== @undefined);
    return @writableStreamClose(stream);
}

function writableStreamDefaultWriterCloseWithErrorPropagation(writer)
{
    const stream = @getByIdDirectPrivate(writer, "stream");
    @assert(stream !== @undefined);

    const state = @getByIdDirectPrivate(stream, "state");

    if (@writableStreamCloseQueuedOrInFlight(stream) || state === "closed")
        return @Promise.@resolve();

    if (state === "errored")
        return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));

    @assert(state === "writable" || state === "erroring");
    return @writableStreamDefaultWriterClose(writer);
}

function writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, error)
{
    let closedPromiseCapability = @getByIdDirectPrivate(writer, "closedPromise");
    let closedPromise = closedPromiseCapability.@promise;

    if ((@getPromiseInternalField(closedPromise, @promiseFieldFlags) & @promiseStateMask) !== @promiseStatePending) {
        closedPromiseCapability = @newPromiseCapability(@Promise);
        closedPromise = closedPromiseCapability.@promise;
        @putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability);
    }

    closedPromiseCapability.@reject.@call(@undefined, error);
    @markPromiseAsHandled(closedPromise);
}

function writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error)
{
    let readyPromiseCapability = @getByIdDirectPrivate(writer, "readyPromise");
    let readyPromise = readyPromiseCapability.@promise;

    if ((@getPromiseInternalField(readyPromise, @promiseFieldFlags) & @promiseStateMask) !== @promiseStatePending) {
        readyPromiseCapability = @newPromiseCapability(@Promise);
        readyPromise = readyPromiseCapability.@promise;
        @putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability);
    }

    readyPromiseCapability.@reject.@call(@undefined, error);
    @markPromiseAsHandled(readyPromise);
}

function writableStreamDefaultWriterGetDesiredSize(writer)
{
    const stream = @getByIdDirectPrivate(writer, "stream");
    @assert(stream !== @undefined);

    const state = @getByIdDirectPrivate(stream, "state");

    if (state === "errored" || state === "erroring")
        return null;

    if (state === "closed")
        return 0;

    return @writableStreamDefaultControllerGetDesiredSize(@getByIdDirectPrivate(stream, "controller"));
}

function writableStreamDefaultWriterRelease(writer)
{
    const stream = @getByIdDirectPrivate(writer, "stream");
    @assert(stream !== @undefined);
    @assert(@getByIdDirectPrivate(stream, "writer") === writer);

    const releasedError = @makeTypeError("writableStreamDefaultWriterRelease");

    @writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError);
    @writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError);

    @putByIdDirectPrivate(stream, "writer", @undefined);
    @putByIdDirectPrivate(writer, "stream", @undefined);
}

function writableStreamDefaultWriterWrite(writer, chunk)
{
    const stream = @getByIdDirectPrivate(writer, "stream");
    @assert(stream !== @undefined);

    const controller = @getByIdDirectPrivate(stream, "controller");
    @assert(controller !== @undefined);
    const chunkSize = @writableStreamDefaultControllerGetChunkSize(controller, chunk);

    if (stream !== @getByIdDirectPrivate(writer, "stream"))
        return @Promise.@reject(@makeTypeError("writer is not stream's writer"));

    const state = @getByIdDirectPrivate(stream, "state");
    if (state === "errored")
        return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));

    if (@writableStreamCloseQueuedOrInFlight(stream) || state === "closed")
        return @Promise.@reject(@makeTypeError("stream is closing or closed"));

    if (@writableStreamCloseQueuedOrInFlight(stream) || state === "closed")
        return @Promise.@reject(@makeTypeError("stream is closing or closed"));

    if (state === "erroring")
        return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));

    @assert(state === "writable");

    const promise = @writableStreamAddWriteRequest(stream);
    @writableStreamDefaultControllerWrite(controller, chunk, chunkSize);
    return promise;
}

function setUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm)
{
    @assert(@isWritableStream(stream));
    @assert(@getByIdDirectPrivate(stream, "controller") === @undefined);

    @putByIdDirectPrivate(controller, "stream", stream);
    @putByIdDirectPrivate(stream, "controller", controller);

    @resetQueue(@getByIdDirectPrivate(controller, "queue"));

    @putByIdDirectPrivate(controller, "started", false);
    @putByIdDirectPrivate(controller, "strategySizeAlgorithm", sizeAlgorithm);
    @putByIdDirectPrivate(controller, "strategyHWM", highWaterMark);
    @putByIdDirectPrivate(controller, "writeAlgorithm", writeAlgorithm);
    @putByIdDirectPrivate(controller, "closeAlgorithm", closeAlgorithm);
    @putByIdDirectPrivate(controller, "abortAlgorithm", abortAlgorithm);

    const backpressure = @writableStreamDefaultControllerGetBackpressure(controller);
    @writableStreamUpdateBackpressure(stream, backpressure);

    @Promise.@resolve(startAlgorithm.@call()).@then(() => {
        const state = @getByIdDirectPrivate(stream, "state");
        @assert(state === "writable" || state === "erroring");
        @putByIdDirectPrivate(controller, "started", true);
        @writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
    }, (error) => {
        const state = @getByIdDirectPrivate(stream, "state");
        @assert(state === "writable" || state === "erroring");
        @putByIdDirectPrivate(controller, "started", true);
        @writableStreamDealWithRejection(stream, error);
    });
}

function setUpWritableStreamDefaultControllerFromUnderlyingSink(stream, underlyingSink, underlyingSinkDict, highWaterMark, sizeAlgorithm)
{
    const controller = new @WritableStreamDefaultController();

    let startAlgorithm = () => { };
    let writeAlgorithm = () => { return @Promise.@resolve(); };
    let closeAlgorithm = () => { return @Promise.@resolve(); };
    let abortAlgorithm = () => { return @Promise.@resolve(); };

    if ("start" in underlyingSinkDict) {
        const startMethod = underlyingSinkDict["start"];
        startAlgorithm = () => @promiseInvokeOrNoopMethodNoCatch(underlyingSink, startMethod, [controller]);
    }
    if ("write" in underlyingSinkDict) {
        const writeMethod = underlyingSinkDict["write"];
        writeAlgorithm = (chunk) => @promiseInvokeOrNoopMethod(underlyingSink, writeMethod, [chunk, controller]);
    }
    if ("close" in underlyingSinkDict) {
        const closeMethod = underlyingSinkDict["close"];
        closeAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSink, closeMethod, []);
    }
    if ("abort" in underlyingSinkDict) {
        const abortMethod = underlyingSinkDict["abort"];
        abortAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSink, abortMethod, [reason]);
    }

    @setUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm);
}

function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller)
{
    const stream = @getByIdDirectPrivate(controller, "stream");

    if (!@getByIdDirectPrivate(controller, "started"))
        return;

    @assert(stream !== @undefined);
    if (@getByIdDirectPrivate(stream, "inFlightWriteRequest") !== @undefined)
        return;

    const state = @getByIdDirectPrivate(stream, "state");
    @assert(state !== "closed" || state !== "errored");
    if (state === "erroring") {
        @writableStreamFinishErroring(stream);
        return;
    }

    if (@getByIdDirectPrivate(controller, "queue").content.length === 0)
        return;

    const value = @peekQueueValue(@getByIdDirectPrivate(controller, "queue"));
    if (value === @isCloseSentinel)
        @writableStreamDefaultControllerProcessClose(controller);
    else
        @writableStreamDefaultControllerProcessWrite(controller, value);
}

function isCloseSentinel()
{
}

function writableStreamDefaultControllerClearAlgorithms(controller)
{
    @putByIdDirectPrivate(controller, "writeAlgorithm", @undefined);
    @putByIdDirectPrivate(controller, "closeAlgorithm", @undefined);
    @putByIdDirectPrivate(controller, "abortAlgorithm", @undefined);
    @putByIdDirectPrivate(controller, "strategySizeAlgorithm", @undefined);
}

function writableStreamDefaultControllerClose(controller)
{
    @enqueueValueWithSize(@getByIdDirectPrivate(controller, "queue"), @isCloseSentinel, 0);
    @writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
}

function writableStreamDefaultControllerError(controller, error)
{
    const stream = @getByIdDirectPrivate(controller, "stream");
    @assert(stream !== @undefined);
    @assert(@getByIdDirectPrivate(stream, "state") === "writable");

    @writableStreamDefaultControllerClearAlgorithms(controller);
    @writableStreamStartErroring(stream, error);
}

function writableStreamDefaultControllerErrorIfNeeded(controller, error)
{
    const stream = @getByIdDirectPrivate(controller, "stream");
    if (@getByIdDirectPrivate(stream, "state") === "writable")
        @writableStreamDefaultControllerError(controller, error);
}

function writableStreamDefaultControllerGetBackpressure(controller)
{
    const desiredSize = @writableStreamDefaultControllerGetDesiredSize(controller);
    return desiredSize <= 0;
}

function writableStreamDefaultControllerGetChunkSize(controller, chunk)
{
    try {
        return @getByIdDirectPrivate(controller, "strategySizeAlgorithm").@call(@undefined, chunk);
    } catch (e) {
        @writableStreamDefaultControllerErrorIfNeeded(controller, e);
        return 1;
    }
}

function writableStreamDefaultControllerGetDesiredSize(controller)
{
    return @getByIdDirectPrivate(controller, "strategyHWM") - @getByIdDirectPrivate(controller, "queue").size;
}

function writableStreamDefaultControllerProcessClose(controller)
{
    const stream = @getByIdDirectPrivate(controller, "stream");

    @writableStreamMarkCloseRequestInFlight(stream);
    @dequeueValue(@getByIdDirectPrivate(controller, "queue"));

    @assert(@getByIdDirectPrivate(controller, "queue").content.length === 0);

    const sinkClosePromise = @getByIdDirectPrivate(controller, "closeAlgorithm").@call();
    @writableStreamDefaultControllerClearAlgorithms(controller);

    sinkClosePromise.@then(() => {
        @writableStreamFinishInFlightClose(stream);
    }, (reason) => {
        @writableStreamFinishInFlightCloseWithError(stream, reason);
    });
}

function writableStreamDefaultControllerProcessWrite(controller, chunk)
{
    const stream = @getByIdDirectPrivate(controller, "stream");

    @writableStreamMarkFirstWriteRequestInFlight(stream);

    const sinkWritePromise = @getByIdDirectPrivate(controller, "writeAlgorithm").@call(@undefined, chunk);

    sinkWritePromise.@then(() => {
        @writableStreamFinishInFlightWrite(stream);
        const state = @getByIdDirectPrivate(stream, "state");
        @assert(state === "writable" || state === "erroring");

        @dequeueValue(@getByIdDirectPrivate(controller, "queue"));
        if (!@writableStreamCloseQueuedOrInFlight(stream) && state === "writable") {
            const backpressure = @writableStreamDefaultControllerGetBackpressure(controller);
            @writableStreamUpdateBackpressure(stream, backpressure);
        }
        @writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
    }, (reason) => {
        const state = @getByIdDirectPrivate(stream, "state");
        if (state === "writable")
            @writableStreamDefaultControllerClearAlgorithms(controller);

        @writableStreamFinishInFlightWriteWithError(stream, reason);
    });
}

function writableStreamDefaultControllerWrite(controller, chunk, chunkSize)
{
    try {
        @enqueueValueWithSize(@getByIdDirectPrivate(controller, "queue"), chunk, chunkSize);

        const stream = @getByIdDirectPrivate(controller, "stream");

        const state = @getByIdDirectPrivate(stream, "state");
        if (!@writableStreamCloseQueuedOrInFlight(stream) && state === "writable") {
            const backpressure = @writableStreamDefaultControllerGetBackpressure(controller);
            @writableStreamUpdateBackpressure(stream, backpressure);
        }
        @writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
    } catch (e) {
        @writableStreamDefaultControllerErrorIfNeeded(controller, e);
    }
}