ReadableByteStreamInternals.js [plain text]
function privateInitializeReadableByteStreamController(stream, underlyingByteSource, highWaterMark)
{
"use strict";
if (!@isReadableStream(stream))
@throwTypeError("ReadableByteStreamController needs a ReadableStream");
if (stream.@readableStreamController !== null)
@throwTypeError("ReadableStream already has a controller");
this.@controlledReadableStream = stream;
this.@underlyingByteSource = underlyingByteSource;
this.@pullAgain = false;
this.@pulling = false;
@readableByteStreamControllerClearPendingPullIntos(this);
this.@queue = @newQueue();
this.@totalQueuedBytes = 0;
this.@started = false;
this.@closeRequested = false;
let hwm = @Number(highWaterMark);
if (@isNaN(hwm) || hwm < 0)
@throwRangeError("highWaterMark value is negative or not a number");
this.@strategyHWM = hwm;
let autoAllocateChunkSize = underlyingByteSource.autoAllocateChunkSize;
if (autoAllocateChunkSize !== @undefined) {
autoAllocateChunkSize = @Number(autoAllocateChunkSize);
if (autoAllocateChunkSize <= 0 || autoAllocateChunkSize === @Number.POSITIVE_INFINITY || autoAllocateChunkSize === @Number.NEGATIVE_INFINITY)
@throwRangeError("autoAllocateChunkSize value is negative or equal to positive or negative infinity");
}
this.@autoAllocateChunkSize = autoAllocateChunkSize;
this.@pendingPullIntos = @newQueue();
const controller = this;
const startResult = @promiseInvokeOrNoopNoCatch(underlyingByteSource, "start", [this]).@then(() => {
controller.@started = true;
@assert(!controller.@pulling);
@assert(!controller.@pullAgain);
}, (error) => {
});
this.@cancel = @readableByteStreamControllerCancel;
return this;
}
function isReadableByteStreamController(controller)
{
"use strict";
return @isObject(controller) && !!controller.@underlyingByteSource;
}
function readableByteStreamControllerCancel(controller, reason)
{
"use strict";
if (controller.@pendingPullIntos.content.length > 0)
controller.@pendingPullIntos[0].bytesFilled = 0;
controller.@queue = @newQueue();
controller.@totalQueuedBytes = 0;
return @promiseInvokeOrNoop(controller.@underlyingByteSource, "cancel", [reason]);
}
function readableByteStreamControllerError(controller, e)
{
"use strict";
@assert(controller.@controlledReadableStream.@state === @streamReadable);
@readableByteStreamControllerClearPendingPullIntos(controller);
controller.@queue = @newQueue();
@readableStreamError(controller.@controlledReadableStream, e);
}
function readableByteStreamControllerClose(controller)
{
"use strict";
@assert(!controller.@closeRequested);
@assert(controller.@controlledReadableStream.@state === @streamReadable);
if (controller.@totalQueuedBytes > 0) {
controller.@closeRequested = true;
return;
}
if (controller.@pendingPullIntos.content.length > 0) {
if (controller.@pendingPullIntos[0].bytesFilled > 0) {
const e = new @TypeError("Close requested while there remain pending bytes");
@readableByteStreamControllerError(controller, e);
throw e;
}
}
@readableStreamClose(controller.@controlledReadableStream);
}
function readableByteStreamControllerClearPendingPullIntos(controller)
{
"use strict";
}
function readableByteStreamControllerGetDesiredSize(controller)
{
"use strict";
return controller.@strategyHWM - controller.@totalQueuedBytes;
}