function initializeReadableStream(underlyingSource, strategy)
{
"use strict";
if (underlyingSource === @undefined)
underlyingSource = { };
if (strategy === @undefined)
strategy = { highWaterMark: 1, size: function() { return 1; } };
if (!@isObject(underlyingSource))
throw new @TypeError("ReadableStream constructor takes an object as first argument");
if (strategy !== @undefined && !@isObject(strategy))
throw new @TypeError("ReadableStream constructor takes an object as second argument, if any");
this.@underlyingSource = underlyingSource;
this.@queue = @newQueue();
this.@state = @streamReadable;
this.@started = false;
this.@closeRequested = false;
this.@pullAgain = false;
this.@pulling = false;
this.@reader = @undefined;
this.@storedError = @undefined;
this.@disturbed = false;
this.@controller = new @ReadableStreamController(this);
this.@strategy = @validateAndNormalizeQueuingStrategy(strategy.size, strategy.highWaterMark);
@promiseInvokeOrNoopNoCatch(underlyingSource, "start", [this.@controller]).@then(() => {
this.@started = true;
@requestReadableStreamPull(this);
}, (error) => {
if (this.@state === @streamReadable)
@errorReadableStream(this, error);
});
return this;
}
function cancel(reason)
{
"use strict";
if (!@isReadableStream(this))
return @Promise.@reject(new @TypeError("Function should be called on a ReadableStream"));
if (@isReadableStreamLocked(this))
return @Promise.@reject(new @TypeError("ReadableStream is locked"));
return @cancelReadableStream(this, reason);
}
function getReader()
{
"use strict";
if (!@isReadableStream(this))
throw new @TypeError("Function should be called on a ReadableStream");
return new @ReadableStreamReader(this);
}
function pipeThrough(streams, options)
{
"use strict";
const writable = streams.writable;
const readable = streams.readable;
this.pipeTo(writable, options);
return readable;
}
function pipeTo(destination, options)
{
"use strict";
const preventClose = @isObject(options) && !!options.preventClose;
const preventAbort = @isObject(options) && !!options.preventAbort;
const preventCancel = @isObject(options) && !!options.preventCancel;
const source = this;
let reader;
let lastRead;
let lastWrite;
let closedPurposefully = false;
let promiseCapability;
function doPipe() {
lastRead = reader.read();
@Promise.prototype.@then.@call(@Promise.all([lastRead, destination.ready]), function([{ value, done }]) {
if (done)
closeDestination();
else if (destination.state === "writable") {
lastWrite = destination.write(value);
doPipe();
}
}, function(e) {
throw e;
});
}
function cancelSource(reason) {
if (!preventCancel) {
reader.cancel(reason);
reader.releaseLock();
promiseCapability.@reject.@call(@undefined, reason);
} else {
@Promise.prototype.@then.@call(lastRead, function() {
reader.releaseLock();
promiseCapability.@reject.@call(@undefined, reason);
});
}
}
function closeDestination() {
reader.releaseLock();
const destinationState = destination.state;
if (!preventClose && (destinationState === "waiting" || destinationState === "writable")) {
closedPurposefully = true;
@Promise.prototype.@then.@call(destination.close(), promiseCapability.@resolve, promiseCapability.@reject);
} else if (lastWrite !== @undefined)
@Promise.prototype.@then.@call(lastWrite, promiseCapability.@resolve, promiseCapability.@reject);
else
promiseCapability.@resolve.@call();
}
function abortDestination(reason) {
reader.releaseLock();
if (!preventAbort)
destination.abort(reason);
promiseCapability.@reject.@call(@undefined, reason);
}
promiseCapability = @newPromiseCapability(@Promise);
reader = source.getReader();
@Promise.prototype.@then.@call(reader.closed, @undefined, abortDestination);
@Promise.prototype.@then.@call(destination.closed,
function() {
if (!closedPurposefully)
cancelSource(new @TypeError('destination is closing or closed and cannot be piped to anymore'));
},
cancelSource
);
doPipe();
return promiseCapability.@promise;
}
function tee()
{
"use strict";
if (!@isReadableStream(this))
throw new @TypeError("Function should be called on a ReadableStream");
return @teeReadableStream(this, false);
}
function locked()
{
"use strict";
if (!@isReadableStream(this))
throw new @TypeError("Function should be called on a ReadableStream");
return @isReadableStreamLocked(this);
}