WritableStreamInternals.js [plain text]
function isWritableStream(stream)
{
"use strict";
return @isObject(stream) && !!stream.@underlyingSink;
}
function syncWritableStreamStateWithQueue(stream)
{
"use strict";
if (stream.@state === @streamClosing)
return;
@assert(stream.@state === @streamWritable || stream.@state === @streamWaiting);
const shouldApplyBackpressure = stream.@queue.size > stream.@strategy.highWaterMark;
if (shouldApplyBackpressure && stream.@state === @streamWritable) {
stream.@state = @streamWaiting;
stream.@readyPromiseCapability = @newPromiseCapability(@Promise);
}
if (!shouldApplyBackpressure && stream.@state === @streamWaiting) {
stream.@state = @streamWritable;
stream.@readyPromiseCapability.@resolve.@call();
}
}
function errorWritableStream(stream, e)
{
"use strict";
if (stream.@state === @streamClosed || stream.@state === @streamErrored)
return;
while (stream.@queue.content.length > 0) {
const writeRecord = @dequeueValue(stream.@queue);
if (writeRecord !== "close")
writeRecord.promiseCapability.@reject.@call(@undefined, e);
}
stream.@storedError = e;
if (stream.@state === @streamWaiting)
stream.@readyPromiseCapability.@resolve.@call();
stream.@closedPromiseCapability.@reject.@call(@undefined, e);
stream.@state = @streamErrored;
}
function callOrScheduleWritableStreamAdvanceQueue(stream)
{
"use strict";
if (!stream.@started)
stream.@startedPromise.@then(function() { @writableStreamAdvanceQueue(stream); });
else
@writableStreamAdvanceQueue(stream);
}
function writableStreamAdvanceQueue(stream)
{
"use strict";
if (stream.@queue.content.length === 0 || stream.@writing)
return;
const writeRecord = @peekQueueValue(stream.@queue);
if (writeRecord === "close") {
@assert(stream.@state === @streamClosing);
@dequeueValue(stream.@queue);
@assert(stream.@queue.content.length === 0);
@closeWritableStream(stream);
return;
}
stream.@writing = true;
@promiseInvokeOrNoop(stream.@underlyingSink, "write", [writeRecord.chunk]).@then(
function() {
if (stream.@state === @streamErrored)
return;
stream.@writing = false;
writeRecord.promiseCapability.@resolve.@call();
@dequeueValue(stream.@queue);
@syncWritableStreamStateWithQueue(stream);
@writableStreamAdvanceQueue(stream);
},
function(r) {
@errorWritableStream(stream, r);
}
);
}
function closeWritableStream(stream)
{
"use strict";
@assert(stream.@state === @streamClosing);
@promiseInvokeOrNoop(stream.@underlyingSink, "close").@then(
function() {
if (stream.@state === @streamErrored)
return;
@assert(stream.@state === @streamClosing);
stream.@closedPromiseCapability.@resolve.@call();
stream.@state = @streamClosed;
},
function(r) {
@errorWritableStream(stream, r);
}
);
}