WritableStreamInternals.js [plain text]
function isWritableStream(stream)
{
"use strict";
return @isObject(stream) && !!@getByIdDirectPrivate(stream, "underlyingSink");
}
function syncWritableStreamStateWithQueue(stream)
{
"use strict";
const state = @getByIdDirectPrivate(stream, "state");
if (state === @streamClosing)
return;
@assert(state === @streamWritable || state === @streamWaiting);
const shouldApplyBackpressure = @getByIdDirectPrivate(stream, "queue").size > @getByIdDirectPrivate(stream, "strategy").highWaterMark;
if (shouldApplyBackpressure && state === @streamWritable) {
@putByIdDirectPrivate(stream, "state", @streamWaiting);
@putByIdDirectPrivate(stream, "readyPromiseCapability", @newPromiseCapability(@Promise));
}
if (!shouldApplyBackpressure && state === @streamWaiting) {
@putByIdDirectPrivate(stream, "state", @streamWritable);
@getByIdDirectPrivate(stream, "readyPromiseCapability").@resolve.@call();
}
}
function errorWritableStream(stream, e)
{
"use strict";
const state = @getByIdDirectPrivate(stream, "state");
if (state === @streamClosed || state === @streamErrored)
return;
while (@getByIdDirectPrivate(stream, "queue").content.length > 0) {
const writeRecord = @dequeueValue(@getByIdDirectPrivate(stream, "queue"));
if (writeRecord !== "close")
writeRecord.promiseCapability.@reject.@call(@undefined, e);
}
@putByIdDirectPrivate(stream, "storedError", e);
if (state === @streamWaiting)
@getByIdDirectPrivate(stream, "readyPromiseCapability").@resolve.@call();
@getByIdDirectPrivate(stream, "closedPromiseCapability").@reject.@call(@undefined, e);
@putByIdDirectPrivate(stream, "state", @streamErrored);
}
function callOrScheduleWritableStreamAdvanceQueue(stream)
{
"use strict";
if (!@getByIdDirectPrivate(stream, "started"))
@getByIdDirectPrivate(stream, "startedPromise").@then(function() { @writableStreamAdvanceQueue(stream); });
else
@writableStreamAdvanceQueue(stream);
}
function writableStreamAdvanceQueue(stream)
{
"use strict";
if (@getByIdDirectPrivate(stream, "queue").content.length === 0 || @getByIdDirectPrivate(stream, "writing"))
return;
const writeRecord = @peekQueueValue(@getByIdDirectPrivate(stream, "queue"));
if (writeRecord === "close") {
@assert(@getByIdDirectPrivate(stream, "state") === @streamClosing);
@dequeueValue(@getByIdDirectPrivate(stream, "queue"));
@assert(@getByIdDirectPrivate(stream, "queue").content.length === 0);
@closeWritableStream(stream);
return;
}
@putByIdDirectPrivate(stream, "writing", true);
@promiseInvokeOrNoop(@getByIdDirectPrivate(stream, "underlyingSink"), "write", [writeRecord.chunk]).@then(
function() {
if (@getByIdDirectPrivate(stream, "state") === @streamErrored)
return;
@putByIdDirectPrivate(stream, "writing", false);
writeRecord.promiseCapability.@resolve.@call();
@dequeueValue(@getByIdDirectPrivate(stream, "queue"));
@syncWritableStreamStateWithQueue(stream);
@writableStreamAdvanceQueue(stream);
},
function(r) {
@errorWritableStream(stream, r);
}
);
}
function closeWritableStream(stream)
{
"use strict";
@assert(@getByIdDirectPrivate(stream, "state") === @streamClosing);
@promiseInvokeOrNoop(@getByIdDirectPrivate(stream, "underlyingSink"), "close").@then(
function() {
if (@getByIdDirectPrivate(stream, "state") === @streamErrored)
return;
@assert(@getByIdDirectPrivate(stream, "state") === @streamClosing);
@getByIdDirectPrivate(stream, "closedPromiseCapability").@resolve.@call();
@putByIdDirectPrivate(stream, "state", @streamClosed);
},
function(r) {
@errorWritableStream(stream, r);
}
);
}