AppendPipeline.cpp [plain text]
#include "config.h"
#include "AppendPipeline.h"
#if ENABLE(VIDEO) && USE(GSTREAMER) && ENABLE(MEDIA_SOURCE)
#include "AudioTrackPrivateGStreamer.h"
#include "GStreamerCommon.h"
#include "GStreamerEMEUtilities.h"
#include "GStreamerMediaDescription.h"
#include "GStreamerRegistryScannerMSE.h"
#include "MediaSampleGStreamer.h"
#include "InbandTextTrackPrivateGStreamer.h"
#include "MediaDescription.h"
#include "SourceBufferPrivateGStreamer.h"
#include "VideoTrackPrivateGStreamer.h"
#include <functional>
#include <gst/app/gstappsink.h>
#include <gst/app/gstappsrc.h>
#include <gst/gst.h>
#include <gst/pbutils/pbutils.h>
#include <gst/video/video.h>
#include <wtf/Condition.h>
#include <wtf/glib/GLibUtilities.h>
#include <wtf/glib/RunLoopSourcePriority.h>
#include <wtf/text/StringConcatenateNumbers.h>
GST_DEBUG_CATEGORY_EXTERN(webkit_mse_debug);
#define GST_CAT_DEFAULT webkit_mse_debug
namespace WebCore {
GType AppendPipeline::s_endOfAppendMetaType = 0;
const GstMetaInfo* AppendPipeline::s_webKitEndOfAppendMetaInfo = nullptr;
std::once_flag AppendPipeline::s_staticInitializationFlag;
struct EndOfAppendMeta {
GstMeta base;
static gboolean init(GstMeta*, void*, GstBuffer*) { return TRUE; }
static gboolean transform(GstBuffer*, GstMeta*, GstBuffer*, GQuark, void*) { g_return_val_if_reached(FALSE); }
static void free(GstMeta*, GstBuffer*) { }
};
void AppendPipeline::staticInitialization()
{
ASSERT(isMainThread());
const char* tags[] = { nullptr };
s_endOfAppendMetaType = gst_meta_api_type_register("WebKitEndOfAppendMetaAPI", tags);
s_webKitEndOfAppendMetaInfo = gst_meta_register(s_endOfAppendMetaType, "WebKitEndOfAppendMeta", sizeof(EndOfAppendMeta), EndOfAppendMeta::init, EndOfAppendMeta::free, EndOfAppendMeta::transform);
}
#if !LOG_DISABLED
static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
#endif
#if ENABLE(ENCRYPTED_MEDIA)
static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
#endif
static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo*, gpointer);
static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo*, void*);
static void assertedElementSetState(GstElement* element, GstState desiredState)
{
GstState oldState;
gst_element_get_state(element, &oldState, nullptr, 0);
GstStateChangeReturn result = gst_element_set_state(element, desiredState);
GstState newState;
gst_element_get_state(element, &newState, nullptr, 0);
if (desiredState != newState || result != GST_STATE_CHANGE_SUCCESS) {
GST_ERROR("AppendPipeline state change failed (returned %d): %" GST_PTR_FORMAT " %d -> %d (expected %d)",
static_cast<int>(result), element, static_cast<int>(oldState), static_cast<int>(newState), static_cast<int>(desiredState));
ASSERT_NOT_REACHED();
}
}
AppendPipeline::AppendPipeline(SourceBufferPrivateGStreamer& sourceBufferPrivate, MediaPlayerPrivateGStreamerMSE& playerPrivate)
: m_sourceBufferPrivate(sourceBufferPrivate)
, m_playerPrivate(&playerPrivate)
, m_id(0)
, m_wasBusAlreadyNotifiedOfAvailableSamples(false)
, m_streamType(Unknown)
{
ASSERT(isMainThread());
std::call_once(s_staticInitializationFlag, AppendPipeline::staticInitialization);
GST_TRACE("Creating AppendPipeline (%p)", this);
static size_t appendPipelineCount = 0;
String pipelineName = makeString("append-pipeline-",
m_sourceBufferPrivate.type().containerType().replace("/", "-"), '-', appendPipelineCount++);
m_pipeline = gst_pipeline_new(pipelineName.utf8().data());
m_bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get())));
gst_bus_add_signal_watch_full(m_bus.get(), RunLoopSourcePriority::RunLoopDispatcher);
gst_bus_enable_sync_message_emission(m_bus.get());
g_signal_connect(m_bus.get(), "sync-message::error", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
appendPipeline->handleErrorSyncMessage(message);
}), this);
g_signal_connect(m_bus.get(), "sync-message::need-context", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
appendPipeline->handleNeedContextSyncMessage(message);
}), this);
g_signal_connect(m_bus.get(), "message::state-changed", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
appendPipeline->handleStateChangeMessage(message);
}), this);
m_appsrc = gst_element_factory_make("appsrc", nullptr);
GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, [](GstPad*, GstPadProbeInfo* padProbeInfo, void* userData) {
return static_cast<AppendPipeline*>(userData)->appsrcEndOfAppendCheckerProbe(padProbeInfo);
}, this, nullptr);
const String& type = m_sourceBufferPrivate.type().containerType();
GST_DEBUG("SourceBuffer containerType: %s", type.utf8().data());
if (type.endsWith("mp4") || type.endsWith("aac"))
m_demux = gst_element_factory_make("qtdemux", nullptr);
else if (type.endsWith("webm"))
m_demux = gst_element_factory_make("matroskademux", nullptr);
else
ASSERT_NOT_REACHED();
m_appsink = gst_element_factory_make("appsink", nullptr);
gst_app_sink_set_emit_signals(GST_APP_SINK(m_appsink.get()), TRUE);
gst_base_sink_set_sync(GST_BASE_SINK(m_appsink.get()), FALSE);
gst_base_sink_set_async_enabled(GST_BASE_SINK(m_appsink.get()), FALSE); gst_base_sink_set_drop_out_of_segment(GST_BASE_SINK(m_appsink.get()), FALSE);
gst_base_sink_set_last_sample_enabled(GST_BASE_SINK(m_appsink.get()), FALSE);
GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(+[](GObject*, GParamSpec*, AppendPipeline* appendPipeline) {
if (isMainThread()) {
#ifndef NDEBUG
GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(appendPipeline->m_appsink.get(), "sink"));
GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
ASSERT(!caps);
#endif
return;
}
appendPipeline->m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([appendPipeline]() {
appendPipeline->appsinkCapsChanged();
return AbortableTaskQueue::Void();
});
}), this);
#if !LOG_DISABLED
GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
m_demuxerDataEnteringPadProbeInformation.appendPipeline = this;
m_demuxerDataEnteringPadProbeInformation.description = "demuxer data entering";
m_demuxerDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(demuxerPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_demuxerDataEnteringPadProbeInformation, nullptr);
m_appsinkDataEnteringPadProbeInformation.appendPipeline = this;
m_appsinkDataEnteringPadProbeInformation.description = "appsink data entering";
m_appsinkDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_appsinkDataEnteringPadProbeInformation, nullptr);
#endif
#if ENABLE(ENCRYPTED_MEDIA)
m_appsinkPadEventProbeInformation.appendPipeline = this;
m_appsinkPadEventProbeInformation.description = "appsink event probe";
m_appsinkPadEventProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsinkPadEventProbe), &m_appsinkPadEventProbeInformation, nullptr);
#endif
g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) {
appendPipeline->connectDemuxerSrcPadToAppsinkFromStreamingThread(demuxerSrcPad);
}), this);
g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) {
appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread(demuxerSrcPad);
}), this);
g_signal_connect(m_demux.get(), "no-more-pads", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) {
ASSERT(!isMainThread());
GST_DEBUG("Posting no-more-pads task to main thread");
appendPipeline->m_taskQueue.enqueueTask([appendPipeline]() {
appendPipeline->didReceiveInitializationSegment();
});
}), this);
g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(+[](GstElement* appsink, AppendPipeline* appendPipeline) -> GstFlowReturn {
appendPipeline->handleAppsinkNewSampleFromStreamingThread(appsink);
return GST_FLOW_OK;
}), this);
g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) {
if (appendPipeline->m_errorReceived)
return;
GST_ERROR("AppendPipeline's appsink received EOS. This is usually caused by an invalid initialization segment.");
appendPipeline->handleErrorConditionFromStreamingThread();
}), this);
gst_bin_add_many(GST_BIN(m_pipeline.get()), m_appsrc.get(), m_demux.get(), nullptr);
gst_element_link(m_appsrc.get(), m_demux.get());
assertedElementSetState(m_pipeline.get(), GST_STATE_PLAYING);
}
AppendPipeline::~AppendPipeline()
{
GST_DEBUG_OBJECT(m_pipeline.get(), "Destructing AppendPipeline (%p)", this);
ASSERT(isMainThread());
m_taskQueue.startAborting();
if (m_pipeline) {
ASSERT(m_bus);
g_signal_handlers_disconnect_by_data(m_bus.get(), this);
gst_bus_disable_sync_message_emission(m_bus.get());
gst_bus_remove_signal_watch(m_bus.get());
}
if (m_appsrc)
g_signal_handlers_disconnect_by_data(m_appsrc.get(), this);
if (m_demux) {
#if !LOG_DISABLED
GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
gst_pad_remove_probe(demuxerPad.get(), m_demuxerDataEnteringPadProbeInformation.probeId);
#endif
g_signal_handlers_disconnect_by_data(m_demux.get(), this);
}
if (m_appsink) {
GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
g_signal_handlers_disconnect_by_data(appsinkPad.get(), this);
g_signal_handlers_disconnect_by_data(m_appsink.get(), this);
#if !LOG_DISABLED
gst_pad_remove_probe(appsinkPad.get(), m_appsinkDataEnteringPadProbeInformation.probeId);
#endif
#if ENABLE(ENCRYPTED_MEDIA)
gst_pad_remove_probe(appsinkPad.get(), m_appsinkPadEventProbeInformation.probeId);
#endif
}
if (m_pipeline)
gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
}
void AppendPipeline::handleErrorConditionFromStreamingThread()
{
ASSERT(!isMainThread());
auto response = m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([this]() {
m_errorReceived = true;
m_sourceBufferPrivate.appendParsingFailed();
return AbortableTaskQueue::Void();
});
#ifdef NDEBUG
UNUSED_VARIABLE(response);
#endif
ASSERT(!response);
}
void AppendPipeline::handleErrorSyncMessage(GstMessage* message)
{
ASSERT(!isMainThread());
GST_WARNING_OBJECT(m_pipeline.get(), "Demuxing error: %" GST_PTR_FORMAT, message);
handleErrorConditionFromStreamingThread();
}
GstPadProbeReturn AppendPipeline::appsrcEndOfAppendCheckerProbe(GstPadProbeInfo* padProbeInfo)
{
ASSERT(!isMainThread());
m_streamingThread = &WTF::Thread::current();
GstBuffer* buffer = GST_BUFFER(padProbeInfo->data);
ASSERT(GST_IS_BUFFER(buffer));
GST_TRACE_OBJECT(m_pipeline.get(), "Buffer entered appsrcEndOfAppendCheckerProbe: %" GST_PTR_FORMAT, buffer);
EndOfAppendMeta* endOfAppendMeta = reinterpret_cast<EndOfAppendMeta*>(gst_buffer_get_meta(buffer, s_endOfAppendMetaType));
if (!endOfAppendMeta) {
return GST_PAD_PROBE_OK;
}
GST_TRACE_OBJECT(m_pipeline.get(), "Posting end-of-append task to the main thread");
m_taskQueue.enqueueTask([this]() {
handleEndOfAppend();
});
return GST_PAD_PROBE_DROP;
}
void AppendPipeline::handleNeedContextSyncMessage(GstMessage* message)
{
const gchar* contextType = nullptr;
gst_message_parse_context_type(message, &contextType);
GST_TRACE("context type: %s", contextType);
m_playerPrivate->handleSyncMessage(message);
}
void AppendPipeline::handleStateChangeMessage(GstMessage* message)
{
ASSERT(isMainThread());
if (GST_MESSAGE_SRC(message) == reinterpret_cast<GstObject*>(m_pipeline.get())) {
GstState currentState, newState;
gst_message_parse_state_changed(message, ¤tState, &newState, nullptr);
CString sourceBufferType = String(m_sourceBufferPrivate.type().raw())
.replace("/", "_").replace(" ", "_")
.replace("\"", "").replace("\'", "").utf8();
CString dotFileName = makeString("webkit-append-",
sourceBufferType.data(), '-',
gst_element_state_get_name(currentState), '_',
gst_element_state_get_name(newState)).utf8();
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.data());
}
}
gint AppendPipeline::id()
{
ASSERT(isMainThread());
if (m_id)
return m_id;
static gint s_totalAudio = 0;
static gint s_totalVideo = 0;
static gint s_totalText = 0;
switch (m_streamType) {
case Audio:
m_id = ++s_totalAudio;
break;
case Video:
m_id = ++s_totalVideo;
break;
case Text:
m_id = ++s_totalText;
break;
case Unknown:
case Invalid:
GST_ERROR("Trying to get id for a pipeline of Unknown/Invalid type");
ASSERT_NOT_REACHED();
break;
}
GST_DEBUG("streamType=%d, id=%d", static_cast<int>(m_streamType), m_id);
return m_id;
}
void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
{
ASSERT(isMainThread());
m_demuxerSrcPadCaps = adoptGRef(demuxerSrcPadCaps);
m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Unknown;
const char* originalMediaType = capsMediaType(m_demuxerSrcPadCaps.get());
auto& gstRegistryScanner = GStreamerRegistryScannerMSE::singleton();
if (!gstRegistryScanner.isCodecSupported(originalMediaType)) {
m_presentationSize = WebCore::FloatSize();
m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Invalid;
} else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_VIDEO_CAPS_TYPE_PREFIX)) {
Optional<FloatSize> size = getVideoResolutionFromCaps(m_demuxerSrcPadCaps.get());
if (size.hasValue())
m_presentationSize = size.value();
else
m_presentationSize = WebCore::FloatSize();
m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video;
} else {
m_presentationSize = WebCore::FloatSize();
if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_AUDIO_CAPS_TYPE_PREFIX))
m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio;
else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_TEXT_CAPS_TYPE_PREFIX))
m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text;
}
}
void AppendPipeline::appsinkCapsChanged()
{
ASSERT(isMainThread());
consumeAppsinkAvailableSamples();
GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
if (!caps)
return;
if (doCapsHaveType(caps.get(), GST_VIDEO_CAPS_TYPE_PREFIX)) {
Optional<FloatSize> size = getVideoResolutionFromCaps(caps.get());
if (size.hasValue())
m_presentationSize = size.value();
}
bool previousCapsWereNull = !m_appsinkCaps;
if (m_appsinkCaps != caps) {
m_appsinkCaps = WTFMove(caps);
m_playerPrivate->trackDetected(*this, m_track, previousCapsWereNull);
}
}
void AppendPipeline::handleEndOfAppend()
{
ASSERT(isMainThread());
consumeAppsinkAvailableSamples();
GST_TRACE_OBJECT(m_pipeline.get(), "Notifying SourceBufferPrivate the append is complete");
sourceBufferPrivate().didReceiveAllPendingSamples();
}
void AppendPipeline::appsinkNewSample(GRefPtr<GstSample>&& sample)
{
ASSERT(isMainThread());
if (UNLIKELY(!gst_sample_get_buffer(sample.get()))) {
GST_WARNING("Received sample without buffer from appsink.");
return;
}
if (!GST_BUFFER_PTS_IS_VALID(gst_sample_get_buffer(sample.get()))) {
GST_DEBUG("Ignoring sample without PTS: %" GST_PTR_FORMAT, gst_sample_get_buffer(sample.get()));
return;
}
auto mediaSample = WebCore::MediaSampleGStreamer::create(WTFMove(sample), m_presentationSize, trackId());
GST_TRACE("append: trackId=%s PTS=%s DTS=%s DUR=%s presentationSize=%.0fx%.0f",
mediaSample->trackID().string().utf8().data(),
mediaSample->presentationTime().toString().utf8().data(),
mediaSample->decodeTime().toString().utf8().data(),
mediaSample->duration().toString().utf8().data(),
mediaSample->presentationSize().width(), mediaSample->presentationSize().height());
if (mediaSample->decodeTime() == MediaTime::zeroTime() && mediaSample->presentationTime() > MediaTime::zeroTime() && mediaSample->presentationTime() <= MediaTime(1, 10)) {
GST_DEBUG("Extending first sample to make it start at PTS=0");
mediaSample->extendToTheBeginning();
}
m_sourceBufferPrivate.didReceiveSample(mediaSample.get());
}
void AppendPipeline::didReceiveInitializationSegment()
{
ASSERT(isMainThread());
WebCore::SourceBufferPrivateClient::InitializationSegment initializationSegment;
GST_DEBUG("Notifying SourceBuffer for track %s", (m_track) ? m_track->id().string().utf8().data() : nullptr);
initializationSegment.duration = m_initialDuration;
switch (m_streamType) {
case Audio: {
WebCore::SourceBufferPrivateClient::InitializationSegment::AudioTrackInformation info;
info.track = static_cast<AudioTrackPrivateGStreamer*>(m_track.get());
info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
initializationSegment.audioTracks.append(info);
break;
}
case Video: {
WebCore::SourceBufferPrivateClient::InitializationSegment::VideoTrackInformation info;
info.track = static_cast<VideoTrackPrivateGStreamer*>(m_track.get());
info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
initializationSegment.videoTracks.append(info);
break;
}
default:
GST_ERROR("Unsupported stream type or codec");
break;
}
m_sourceBufferPrivate.didReceiveInitializationSegment(initializationSegment);
}
AtomString AppendPipeline::trackId()
{
ASSERT(isMainThread());
if (!m_track)
return AtomString();
return m_track->id();
}
void AppendPipeline::consumeAppsinkAvailableSamples()
{
ASSERT(isMainThread());
GRefPtr<GstSample> sample;
int batchedSampleCount = 0;
m_playerPrivate->blockDurationChanges();
while ((sample = adoptGRef(gst_app_sink_try_pull_sample(GST_APP_SINK(m_appsink.get()), 0)))) {
appsinkNewSample(WTFMove(sample));
batchedSampleCount++;
}
m_playerPrivate->unblockDurationChanges();
GST_TRACE_OBJECT(m_pipeline.get(), "batchedSampleCount = %d", batchedSampleCount);
}
void AppendPipeline::resetParserState()
{
ASSERT(isMainThread());
GST_DEBUG_OBJECT(m_pipeline.get(), "Handling resetParserState() in AppendPipeline by resetting the pipeline");
m_taskQueue.startAborting();
assertedElementSetState(m_pipeline.get(), GST_STATE_READY);
ASSERT(!m_parser);
assertedElementSetState(m_pipeline.get(), GST_STATE_PLAYING);
m_taskQueue.finishAborting();
#if (!(LOG_DISABLED || defined(GST_DISABLE_GST_DEBUG)))
{
static unsigned i = 0;
WTF::String dotFileName = makeString("reset-pipeline-", ++i);
gst_debug_bin_to_dot_file(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.utf8().data());
}
#endif
}
void AppendPipeline::pushNewBuffer(GRefPtr<GstBuffer>&& buffer)
{
GST_TRACE_OBJECT(m_pipeline.get(), "pushing data buffer %" GST_PTR_FORMAT, buffer.get());
GstFlowReturn pushDataBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), buffer.leakRef());
if (pushDataBufferRet != GST_FLOW_OK) {
GST_ERROR_OBJECT(m_pipeline.get(), "Failed to push data buffer into appsrc.");
ASSERT_NOT_REACHED();
}
GstBuffer* endOfAppendBuffer = gst_buffer_new();
gst_buffer_add_meta(endOfAppendBuffer, s_webKitEndOfAppendMetaInfo, nullptr);
GST_TRACE_OBJECT(m_pipeline.get(), "pushing end-of-append buffer %" GST_PTR_FORMAT, endOfAppendBuffer);
GstFlowReturn pushEndOfAppendBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), endOfAppendBuffer);
if (pushEndOfAppendBufferRet != GST_FLOW_OK) {
GST_ERROR_OBJECT(m_pipeline.get(), "Failed to push end-of-append buffer into appsrc.");
ASSERT_NOT_REACHED();
}
}
void AppendPipeline::handleAppsinkNewSampleFromStreamingThread(GstElement*)
{
ASSERT(!isMainThread());
if (&WTF::Thread::current() != m_streamingThread) {
GST_ERROR_OBJECT(m_pipeline.get(), "Appsink received a sample in a different thread than appsrcEndOfAppendCheckerProbe run.");
ASSERT_NOT_REACHED();
}
if (!m_wasBusAlreadyNotifiedOfAvailableSamples.test_and_set()) {
GST_TRACE("Posting appsink-new-sample task to the main thread");
m_taskQueue.enqueueTask([this]() {
m_wasBusAlreadyNotifiedOfAvailableSamples.clear();
consumeAppsinkAvailableSamples();
});
}
}
static GRefPtr<GstElement>
createOptionalParserForFormat(GstPad* demuxerSrcPad)
{
GRefPtr<GstCaps> padCaps = adoptGRef(gst_pad_get_current_caps(demuxerSrcPad));
GstStructure* structure = gst_caps_get_structure(padCaps.get(), 0);
const char* mediaType = gst_structure_get_name(structure);
GUniquePtr<char> demuxerPadName(gst_pad_get_name(demuxerSrcPad));
GUniquePtr<char> parserName(g_strdup_printf("%s_parser", demuxerPadName.get()));
if (!g_strcmp0(mediaType, "audio/x-opus")) {
GstElement* opusparse = gst_element_factory_make("opusparse", parserName.get());
ASSERT(opusparse);
g_return_val_if_fail(opusparse, nullptr);
return GRefPtr<GstElement>(opusparse);
}
if (!g_strcmp0(mediaType, "video/x-h264")) {
GstElement* h264parse = gst_element_factory_make("h264parse", parserName.get());
ASSERT(h264parse);
g_return_val_if_fail(h264parse, nullptr);
return GRefPtr<GstElement>(h264parse);
}
return nullptr;
}
void AppendPipeline::connectDemuxerSrcPadToAppsinkFromStreamingThread(GstPad* demuxerSrcPad)
{
ASSERT(!isMainThread());
GST_DEBUG("connecting to appsink");
if (m_demux->numsrcpads > 1) {
GST_WARNING("Only one stream per SourceBuffer is allowed! Ignoring stream %d by adding a black hole probe.", m_demux->numsrcpads);
gulong probeId = gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineDemuxerBlackHolePadProbe), nullptr, nullptr);
g_object_set_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId", GULONG_TO_POINTER(probeId));
return;
}
GRefPtr<GstPad> appsinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
ASSERT(!gst_pad_is_linked(appsinkSinkPad.get()));
gint64 timeLength = 0;
if (gst_element_query_duration(m_demux.get(), GST_FORMAT_TIME, &timeLength)
&& static_cast<guint64>(timeLength) != GST_CLOCK_TIME_NONE)
m_initialDuration = MediaTime(GST_TIME_AS_USECONDS(timeLength), G_USEC_PER_SEC);
else
m_initialDuration = MediaTime::positiveInfiniteTime();
GST_DEBUG("Requesting demuxer-connect-to-appsink to main thread");
auto response = m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([this, demuxerSrcPad]() {
connectDemuxerSrcPadToAppsink(demuxerSrcPad);
return AbortableTaskQueue::Void();
});
if (!response) {
return;
}
bool isData = (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Audio)
|| (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Video)
|| (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Text);
if (isData) {
GRefPtr<GstObject> parent = adoptGRef(gst_element_get_parent(m_appsink.get()));
if (!parent)
gst_bin_add(GST_BIN(m_pipeline.get()), m_appsink.get());
GRefPtr<GstPad> currentSrcPad = demuxerSrcPad;
m_parser = createOptionalParserForFormat(currentSrcPad.get());
if (m_parser) {
gst_bin_add(GST_BIN(m_pipeline.get()), m_parser.get());
gst_element_sync_state_with_parent(m_parser.get());
GRefPtr<GstPad> parserSinkPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "sink"));
GRefPtr<GstPad> parserSrcPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "src"));
gst_pad_link(currentSrcPad.get(), parserSinkPad.get());
currentSrcPad = parserSrcPad;
}
gst_pad_link(currentSrcPad.get(), appsinkSinkPad.get());
gst_element_sync_state_with_parent(m_appsink.get());
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "webkit-after-link");
}
}
void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
{
ASSERT(isMainThread());
GST_DEBUG("Connecting to appsink");
const String& type = m_sourceBufferPrivate.type().containerType();
if (type.endsWith("webm"))
gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, matroskademuxForceSegmentStartToEqualZero, nullptr, nullptr);
GRefPtr<GstPad> sinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
ASSERT(!gst_pad_is_linked(sinkSinkPad.get()));
GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(GST_PAD(demuxerSrcPad)));
#ifndef GST_DISABLE_GST_DEBUG
{
GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
GST_DEBUG("%s", strcaps.get());
}
#endif
parseDemuxerSrcPadCaps(gst_caps_ref(caps.get()));
switch (m_streamType) {
case WebCore::MediaSourceStreamTypeGStreamer::Audio:
m_track = WebCore::AudioTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get());
break;
case WebCore::MediaSourceStreamTypeGStreamer::Video:
m_track = WebCore::VideoTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get());
break;
case WebCore::MediaSourceStreamTypeGStreamer::Text:
m_track = WebCore::InbandTextTrackPrivateGStreamer::create(id(), sinkSinkPad.get());
break;
case WebCore::MediaSourceStreamTypeGStreamer::Invalid:
GST_WARNING_OBJECT(m_pipeline.get(), "Unsupported track codec: %" GST_PTR_FORMAT, caps.get());
m_sourceBufferPrivate.appendParsingFailed();
return;
default:
GST_WARNING_OBJECT(m_pipeline.get(), "Pad has unknown track type, ignoring: %" GST_PTR_FORMAT, caps.get());
break;
}
m_appsinkCaps = WTFMove(caps);
m_playerPrivate->trackDetected(*this, m_track, true);
}
void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad*)
{
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-before");
GST_DEBUG("Disconnecting appsink");
if (m_parser) {
assertedElementSetState(m_parser.get(), GST_STATE_NULL);
gst_bin_remove(GST_BIN(m_pipeline.get()), m_parser.get());
m_parser = nullptr;
}
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-after");
}
#if !LOG_DISABLED
static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation)
{
ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
GST_TRACE("%s: buffer of size %" G_GSIZE_FORMAT " going thru", padProbeInformation->description, gst_buffer_get_size(buffer));
return GST_PAD_PROBE_OK;
}
#endif
#if ENABLE(ENCRYPTED_MEDIA)
static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation *padProbeInformation)
{
ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM);
GstEvent* event = gst_pad_probe_info_get_event(info);
GST_DEBUG("Handling event %s on append pipeline appsinkPad", GST_EVENT_TYPE_NAME(event));
WebCore::AppendPipeline* appendPipeline = padProbeInformation->appendPipeline;
switch (GST_EVENT_TYPE(event)) {
case GST_EVENT_PROTECTION:
if (appendPipeline && appendPipeline->playerPrivate())
appendPipeline->playerPrivate()->handleProtectionEvent(event);
return GST_PAD_PROBE_DROP;
default:
break;
}
return GST_PAD_PROBE_OK;
}
#endif
static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo* info, gpointer)
{
ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
GST_TRACE("buffer of size %" G_GSIZE_FORMAT " ignored", gst_buffer_get_size(buffer));
return GST_PAD_PROBE_DROP;
}
static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo* info, void*)
{
ASSERT(info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM);
GstEvent* event = static_cast<GstEvent*>(info->data);
if (event->type == GST_EVENT_SEGMENT) {
GstSegment segment;
gst_event_copy_segment(event, &segment);
segment.start = 0;
GRefPtr<GstEvent> newEvent = adoptGRef(gst_event_new_segment(&segment));
gst_event_replace(reinterpret_cast<GstEvent**>(&info->data), newEvent.get());
}
return GST_PAD_PROBE_OK;
}
}
#endif // USE(GSTREAMER)