7#include "pipewireproduce_p.h" 
   12#include <logging_record.h> 
   16#include <qstringliteral.h> 
   18#include "gifencoder_p.h" 
   19#include "h264vaapiencoder_p.h" 
   20#include "libopenh264encoder_p.h" 
   21#include "libvpxencoder_p.h" 
   22#include "libvpxvp9encoder_p.h" 
   23#include "libwebpencoder_p.h" 
   24#include "libx264encoder_p.h" 
   26#include "logging_frame_statistics.h" 
   32Q_DECLARE_METATYPE(std::optional<int>);
 
   33Q_DECLARE_METATYPE(std::optional<std::chrono::nanoseconds>);
 
   35PipeWireProduce::PipeWireProduce(PipeWireBaseEncodedStream::Encoder encoderType, uint nodeId, uint fd, 
const Fraction &framerate)
 
   38    , m_encoderType(encoderType)
 
   40    , m_frameRate(framerate)
 
   42    qRegisterMetaType<std::optional<int>>();
 
   43    qRegisterMetaType<std::optional<std::chrono::nanoseconds>>();
 
   46PipeWireProduce::~PipeWireProduce()
 
   50void PipeWireProduce::initialize()
 
   52    m_stream.reset(
new PipeWireSourceStream(
nullptr));
 
   53    m_stream->setMaxFramerate(m_frameRate);
 
   62    m_stream->setUsageHint(Encoder::supportsHardwareEncoding() ? PipeWireSourceStream::UsageHint::EncodeHardware
 
   63                                                               : PipeWireSourceStream::UsageHint::EncodeSoftware);
 
   65    bool created = m_stream->createStream(m_nodeId, m_fd);
 
   66    if (!created || !m_stream->error().isEmpty()) {
 
   67        qCWarning(PIPEWIRERECORD_LOGGING) << 
"failed to set up stream for" << m_nodeId << m_stream->error();
 
   68        m_error = m_stream->error();
 
   69        m_stream.reset(
nullptr);
 
   72    connect(m_stream.get(), &PipeWireSourceStream::streamParametersChanged, 
this, &PipeWireProduce::setupStream);
 
   74    if (PIPEWIRERECORDFRAMESTATS_LOGGING().isDebugEnabled()) {
 
   75        m_frameStatisticsTimer = std::make_unique<QTimer>();
 
   76        m_frameStatisticsTimer->setInterval(std::chrono::seconds(1));
 
   78            qCDebug(PIPEWIRERECORDFRAMESTATS_LOGGING) << 
"Processed" << m_processedFrames << 
"frames in the last second.";
 
   79            qCDebug(PIPEWIRERECORDFRAMESTATS_LOGGING) << m_pendingFilterFrames << 
"frames pending for filter.";
 
   80            qCDebug(PIPEWIRERECORDFRAMESTATS_LOGGING) << m_pendingEncodeFrames << 
"frames pending for encode.";
 
   81            m_processedFrames = 0;
 
   93    m_frameRepeatTimer.reset(
new QTimer);
 
   94    m_frameRepeatTimer->setSingleShot(
true);
 
   95    m_frameRepeatTimer->setInterval(100);
 
  100        if (!m_encoder->filterFrame(f)) {
 
  104        m_pendingFilterFrames++;
 
  105        m_passthroughCondition.notify_all();
 
  109Fraction PipeWireProduce::maxFramerate()
 const 
  111    return m_maxFramerate;
 
  114void PipeWireProduce::setMaxFramerate(
const Fraction &framerate)
 
  116    m_maxFramerate = framerate;
 
  118    const double framesPerSecond = 
static_cast<double>(framerate.numerator) / framerate.denominator;
 
  119    if (m_frameRepeatTimer) {
 
  120        m_frameRepeatTimer->setInterval((1000 / framesPerSecond) * 2);
 
  123        m_stream->setMaxFramerate(framerate);
 
  127int PipeWireProduce::maxPendingFrames()
 const 
  129    return m_maxPendingFrames;
 
  132void PipeWireProduce::setMaxPendingFrames(
int newMaxBufferSize)
 
  134    if (newMaxBufferSize < 3) {
 
  135        qCWarning(PIPEWIRERECORD_LOGGING) << 
"Maxmimum pending frame count of " << newMaxBufferSize << 
" requested. Value must be 3 or higher.";
 
  136        newMaxBufferSize = 3;
 
  138    m_maxPendingFrames = newMaxBufferSize;
 
  141void PipeWireProduce::setupStream()
 
  143    qCDebug(PIPEWIRERECORD_LOGGING) << 
"Setting up stream";
 
  144    disconnect(m_stream.get(), &PipeWireSourceStream::streamParametersChanged, 
this, &PipeWireProduce::setupStream);
 
  146    m_encoder = makeEncoder();
 
  148        qCWarning(PIPEWIRERECORD_LOGGING) << 
"No encoder could be created";
 
  152    connect(m_stream.get(), &PipeWireSourceStream::stateChanged, 
this, &PipeWireProduce::stateChanged);
 
  153    if (!setupFormat()) {
 
  154        qCWarning(PIPEWIRERECORD_LOGGING) << 
"Could not set up the producing thread";
 
  158    connect(m_stream.data(), &PipeWireSourceStream::frameReceived, 
this, &PipeWireProduce::processFrame);
 
  160    m_passthroughThread = std::thread([
this]() {
 
  161        m_passthroughRunning = 
true;
 
  162        while (m_passthroughRunning) {
 
  163            std::unique_lock<std::mutex> lock(m_passthroughMutex);
 
  164            m_passthroughCondition.wait(lock);
 
  166            if (!m_passthroughRunning) {
 
  170            auto [
filtered, queued] = m_encoder->encodeFrame(m_maxPendingFrames - m_pendingEncodeFrames);
 
  172            m_pendingEncodeFrames += queued;
 
  174            m_outputCondition.notify_all();
 
  177    pthread_setname_np(m_passthroughThread.native_handle(), 
"PipeWireProduce::passthrough");
 
  179    m_outputThread = std::thread([
this]() {
 
  180        m_outputRunning = 
true;
 
  181        while (m_outputRunning) {
 
  182            std::unique_lock<std::mutex> lock(m_outputMutex);
 
  183            m_outputCondition.wait(lock);
 
  185            if (!m_outputRunning) {
 
  189            auto received = m_encoder->receivePacket();
 
  190            m_pendingEncodeFrames -= received;
 
  191            m_processedFrames += received;
 
  199    pthread_setname_np(m_outputThread.native_handle(), 
"PipeWireProduce::output");
 
  201    if (m_frameStatisticsTimer) {
 
  202        m_frameStatisticsTimer->start();
 
  207void PipeWireProduce::deactivate()
 
  209    m_deactivated = 
true;
 
  211    auto streamState = PW_STREAM_STATE_PAUSED;
 
  213        streamState = m_stream->state();
 
  214        m_stream->setActive(
false);
 
  220    if (!m_encoder || streamState != PW_STREAM_STATE_STREAMING) {
 
  225void PipeWireProduce::destroy()
 
  229    Q_ASSERT_X(
QThread::currentThread() == thread(), 
"PipeWireProduce", 
"destroy() called from a different thread than PipeWireProduce's thread");
 
  235    m_frameRepeatTimer->stop();
 
  237    m_frameStatisticsTimer = 
nullptr;
 
  239    if (m_passthroughThread.joinable()) {
 
  240        m_passthroughRunning = 
false;
 
  241        m_passthroughCondition.notify_all();
 
  242        m_passthroughThread.join();
 
  245    if (m_outputThread.joinable()) {
 
  246        m_outputRunning = 
false;
 
  247        m_outputCondition.notify_all();
 
  248        m_outputThread.join();
 
  253    qCDebug(PIPEWIRERECORD_LOGGING) << 
"finished";
 
  259void PipeWireProduce::setQuality(
const std::optional<quint8> &quality)
 
  263        m_encoder->setQuality(quality);
 
  267void PipeWireProduce::setEncodingPreference(
const PipeWireBaseEncodedStream::EncodingPreference &encodingPreference)
 
  269    m_encodingPreference = encodingPreference;
 
  272        m_encoder->setEncodingPreference(encodingPreference);
 
  276void PipeWireProduce::processFrame(
const PipeWireFrame &frame)
 
  281    m_frameRepeatTimer->start();
 
  284        m_cursor.position = frame.cursor->position;
 
  285        m_cursor.hotspot = frame.cursor->hotspot;
 
  286        if (!frame.cursor->texture.isNull()) {
 
  287            m_cursor.dirty = 
true;
 
  288            m_cursor.texture = frame.cursor->texture;
 
  292    auto pts = framePts(frame.presentationTimestamp);
 
  293    if (m_previousPts >= 0 && pts <= m_previousPts) {
 
  297    auto frameTime = 1000.0 / (m_maxFramerate.numerator / m_maxFramerate.denominator);
 
  298    if ((pts - m_previousPts) < frameTime) {
 
  302    if (m_pendingFilterFrames + 1 > m_maxPendingFrames) {
 
  303        qCWarning(PIPEWIRERECORD_LOGGING) << 
"Filter queue is full, dropping frame" << pts;
 
  308    if (!m_encoder->filterFrame(f)) {
 
  312    m_pendingFilterFrames++;
 
  315    m_passthroughCondition.notify_all();
 
  318void PipeWireProduce::stateChanged(pw_stream_state state)
 
  320    if (state != PW_STREAM_STATE_PAUSED || !m_deactivated) {
 
  324        qCDebug(PIPEWIRERECORD_LOGGING) << 
"finished without a stream";
 
  328    disconnect(m_stream.data(), &PipeWireSourceStream::frameReceived, 
this, &PipeWireProduce::processFrame);
 
  330    if (m_pendingFilterFrames <= 0 && m_pendingEncodeFrames <= 0) {
 
  340        qCDebug(PIPEWIRERECORD_LOGGING) << 
"Waiting for frame queues to empty, still pending filter" << m_pendingFilterFrames << 
"encode" 
  341                                        << m_pendingEncodeFrames;
 
  342        m_passthroughCondition.notify_all();
 
  346void PipeWireProduce::handleEncodedFramesChanged()
 
  348    if (!m_deactivated) {
 
  357    m_passthroughCondition.notify_all();
 
  359    if (m_pendingFilterFrames <= 0) {
 
  362        if (m_pendingEncodeFrames <= 0) {
 
  368std::unique_ptr<Encoder> PipeWireProduce::makeEncoder()
 
  370    auto forcedEncoder = qEnvironmentVariable(
"KPIPEWIRE_FORCE_ENCODER");
 
  371    if (!forcedEncoder.isNull()) {
 
  372        qCWarning(PIPEWIRERECORD_LOGGING) << 
"Forcing encoder to" << forcedEncoder;
 
  375    auto size = m_stream->size();
 
  377    switch (m_encoderType) {
 
  378    case PipeWireBaseEncodedStream::H264Baseline:
 
  379    case PipeWireBaseEncodedStream::H264Main: {
 
  380        auto profile = m_encoderType == PipeWireBaseEncodedStream::H264Baseline ? Encoder::H264Profile::Baseline : Encoder::H264Profile::Main;
 
  382        if (forcedEncoder.isNull() || forcedEncoder == u
"h264_vaapi") {
 
  383            auto hardwareEncoder = std::make_unique<H264VAAPIEncoder>(profile, 
this);
 
  384            hardwareEncoder->setQuality(m_quality);
 
  385            hardwareEncoder->setEncodingPreference(m_encodingPreference);
 
  386            if (hardwareEncoder->initialize(size)) {
 
  387                return hardwareEncoder;
 
  391        if (forcedEncoder.isNull() || forcedEncoder == u
"libx264") {
 
  392            auto softwareEncoder = std::make_unique<LibX264Encoder>(profile, 
this);
 
  393            softwareEncoder->setQuality(m_quality);
 
  394            softwareEncoder->setEncodingPreference(m_encodingPreference);
 
  395            if (softwareEncoder->initialize(size)) {
 
  396                return softwareEncoder;
 
  401        if (forcedEncoder.isNull() || forcedEncoder == u
"libopenh264") {
 
  402            auto softwareEncoder = std::make_unique<LibOpenH264Encoder>(profile, 
this);
 
  403            softwareEncoder->setQuality(m_quality);
 
  404            softwareEncoder->setEncodingPreference(m_encodingPreference);
 
  405            if (softwareEncoder->initialize(size)) {
 
  406                return softwareEncoder;
 
  411    case PipeWireBaseEncodedStream::VP8: {
 
  412        if (forcedEncoder.isNull() || forcedEncoder == u
"libvpx") {
 
  413            auto encoder = std::make_unique<LibVpxEncoder>(
this);
 
  414            encoder->setQuality(m_quality);
 
  415            if (encoder->initialize(size)) {
 
  421    case PipeWireBaseEncodedStream::VP9: {
 
  422        if (forcedEncoder.isNull() || forcedEncoder == u
"libvpx-vp9") {
 
  423            auto encoder = std::make_unique<LibVpxVp9Encoder>(
this);
 
  424            encoder->setQuality(m_quality);
 
  425            if (encoder->initialize(size)) {
 
  431    case PipeWireBaseEncodedStream::Gif: {
 
  432        if (forcedEncoder.isNull() || forcedEncoder == u
"gif") {
 
  433            auto encoder = std::make_unique<GifEncoder>(
this);
 
  434            if (encoder->initialize(size)) {
 
  440    case PipeWireBaseEncodedStream::WebP: {
 
  441        if (forcedEncoder.isNull() || forcedEncoder == u
"libwebp") {
 
  442            auto encoder = std::make_unique<LibWebPEncoder>(
this);
 
  443            encoder->setQuality(m_quality);
 
  444            if (encoder->initialize(size)) {
 
  451        qCWarning(PIPEWIRERECORD_LOGGING) << 
"Unknown encoder type" << m_encoderType;
 
  457#include "moc_pipewireproduce_p.cpp" 
QFuture< typename qValueType< Iterator >::value_type > filtered(Iterator begin, Iterator end, KeepFunctor &&filterFunction)
 
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
 
QThread * currentThread()