7#include "pipewireproduce_p.h"
12#include <logging_record.h>
16#include <qstringliteral.h>
18#include "gifencoder_p.h"
19#include "h264vaapiencoder_p.h"
20#include "libopenh264encoder_p.h"
21#include "libvpxencoder_p.h"
22#include "libvpxvp9encoder_p.h"
23#include "libwebpencoder_p.h"
24#include "libx264encoder_p.h"
26#include "logging_frame_statistics.h"
32Q_DECLARE_METATYPE(std::optional<int>);
33Q_DECLARE_METATYPE(std::optional<std::chrono::nanoseconds>);
35PipeWireProduce::PipeWireProduce(PipeWireBaseEncodedStream::Encoder encoderType, uint nodeId, uint fd,
const Fraction &framerate)
38 , m_encoderType(encoderType)
40 , m_frameRate(framerate)
42 qRegisterMetaType<std::optional<int>>();
43 qRegisterMetaType<std::optional<std::chrono::nanoseconds>>();
46PipeWireProduce::~PipeWireProduce()
50void PipeWireProduce::initialize()
52 m_stream.reset(
new PipeWireSourceStream(
nullptr));
53 m_stream->setMaxFramerate(m_frameRate);
62 m_stream->setUsageHint(Encoder::supportsHardwareEncoding() ? PipeWireSourceStream::UsageHint::EncodeHardware
63 : PipeWireSourceStream::UsageHint::EncodeSoftware);
65 bool created = m_stream->createStream(m_nodeId, m_fd);
66 if (!created || !m_stream->error().isEmpty()) {
67 qCWarning(PIPEWIRERECORD_LOGGING) <<
"failed to set up stream for" << m_nodeId << m_stream->error();
68 m_error = m_stream->error();
69 m_stream.reset(
nullptr);
72 connect(m_stream.get(), &PipeWireSourceStream::streamParametersChanged,
this, &PipeWireProduce::setupStream);
74 if (PIPEWIRERECORDFRAMESTATS_LOGGING().isDebugEnabled()) {
75 m_frameStatisticsTimer = std::make_unique<QTimer>();
76 m_frameStatisticsTimer->setInterval(std::chrono::seconds(1));
78 qCDebug(PIPEWIRERECORDFRAMESTATS_LOGGING) <<
"Processed" << m_processedFrames <<
"frames in the last second.";
79 qCDebug(PIPEWIRERECORDFRAMESTATS_LOGGING) << m_pendingFilterFrames <<
"frames pending for filter.";
80 qCDebug(PIPEWIRERECORDFRAMESTATS_LOGGING) << m_pendingEncodeFrames <<
"frames pending for encode.";
81 m_processedFrames = 0;
93 m_frameRepeatTimer.reset(
new QTimer);
94 m_frameRepeatTimer->setSingleShot(
true);
95 m_frameRepeatTimer->setInterval(100);
100 if (!m_encoder->filterFrame(f)) {
104 m_pendingFilterFrames++;
105 m_passthroughCondition.notify_all();
109Fraction PipeWireProduce::maxFramerate()
const
111 return m_maxFramerate;
114void PipeWireProduce::setMaxFramerate(
const Fraction &framerate)
116 m_maxFramerate = framerate;
118 const double framesPerSecond =
static_cast<double>(framerate.numerator) / framerate.denominator;
119 if (m_frameRepeatTimer) {
120 m_frameRepeatTimer->setInterval((1000 / framesPerSecond) * 2);
123 m_stream->setMaxFramerate(framerate);
127int PipeWireProduce::maxPendingFrames()
const
129 return m_maxPendingFrames;
132void PipeWireProduce::setMaxPendingFrames(
int newMaxBufferSize)
134 if (newMaxBufferSize < 3) {
135 qCWarning(PIPEWIRERECORD_LOGGING) <<
"Maxmimum pending frame count of " << newMaxBufferSize <<
" requested. Value must be 3 or higher.";
136 newMaxBufferSize = 3;
138 m_maxPendingFrames = newMaxBufferSize;
141void PipeWireProduce::setupStream()
143 qCDebug(PIPEWIRERECORD_LOGGING) <<
"Setting up stream";
144 disconnect(m_stream.get(), &PipeWireSourceStream::streamParametersChanged,
this, &PipeWireProduce::setupStream);
146 m_encoder = makeEncoder();
148 qCWarning(PIPEWIRERECORD_LOGGING) <<
"No encoder could be created";
152 connect(m_stream.get(), &PipeWireSourceStream::stateChanged,
this, &PipeWireProduce::stateChanged);
153 if (!setupFormat()) {
154 qCWarning(PIPEWIRERECORD_LOGGING) <<
"Could not set up the producing thread";
158 connect(m_stream.data(), &PipeWireSourceStream::frameReceived,
this, &PipeWireProduce::processFrame);
160 m_passthroughThread = std::thread([
this]() {
161 m_passthroughRunning =
true;
162 while (m_passthroughRunning) {
163 std::unique_lock<std::mutex> lock(m_passthroughMutex);
164 m_passthroughCondition.wait(lock);
166 if (!m_passthroughRunning) {
170 auto [
filtered, queued] = m_encoder->encodeFrame(m_maxPendingFrames - m_pendingEncodeFrames);
172 m_pendingEncodeFrames += queued;
174 m_outputCondition.notify_all();
177 pthread_setname_np(m_passthroughThread.native_handle(),
"PipeWireProduce::passthrough");
179 m_outputThread = std::thread([
this]() {
180 m_outputRunning =
true;
181 while (m_outputRunning) {
182 std::unique_lock<std::mutex> lock(m_outputMutex);
183 m_outputCondition.wait(lock);
185 if (!m_outputRunning) {
189 auto received = m_encoder->receivePacket();
190 m_pendingEncodeFrames -= received;
191 m_processedFrames += received;
199 pthread_setname_np(m_outputThread.native_handle(),
"PipeWireProduce::output");
201 if (m_frameStatisticsTimer) {
202 m_frameStatisticsTimer->start();
207void PipeWireProduce::deactivate()
209 m_deactivated =
true;
211 auto streamState = PW_STREAM_STATE_PAUSED;
213 streamState = m_stream->state();
214 m_stream->setActive(
false);
220 if (!m_encoder || streamState != PW_STREAM_STATE_STREAMING) {
225void PipeWireProduce::destroy()
229 Q_ASSERT_X(
QThread::currentThread() == thread(),
"PipeWireProduce",
"destroy() called from a different thread than PipeWireProduce's thread");
235 m_frameRepeatTimer->stop();
237 m_frameStatisticsTimer =
nullptr;
239 if (m_passthroughThread.joinable()) {
240 m_passthroughRunning =
false;
241 m_passthroughCondition.notify_all();
242 m_passthroughThread.join();
245 if (m_outputThread.joinable()) {
246 m_outputRunning =
false;
247 m_outputCondition.notify_all();
248 m_outputThread.join();
253 qCDebug(PIPEWIRERECORD_LOGGING) <<
"finished";
259void PipeWireProduce::setQuality(
const std::optional<quint8> &quality)
263 m_encoder->setQuality(quality);
267void PipeWireProduce::setEncodingPreference(
const PipeWireBaseEncodedStream::EncodingPreference &encodingPreference)
269 m_encodingPreference = encodingPreference;
272 m_encoder->setEncodingPreference(encodingPreference);
276void PipeWireProduce::processFrame(
const PipeWireFrame &frame)
281 m_frameRepeatTimer->start();
284 m_cursor.position = frame.cursor->position;
285 m_cursor.hotspot = frame.cursor->hotspot;
286 if (!frame.cursor->texture.isNull()) {
287 m_cursor.dirty =
true;
288 m_cursor.texture = frame.cursor->texture;
292 auto pts = framePts(frame.presentationTimestamp);
293 if (m_previousPts >= 0 && pts <= m_previousPts) {
297 auto frameTime = 1000.0 / (m_maxFramerate.numerator / m_maxFramerate.denominator);
298 if ((pts - m_previousPts) < frameTime) {
302 if (m_pendingFilterFrames + 1 > m_maxPendingFrames) {
303 qCWarning(PIPEWIRERECORD_LOGGING) <<
"Filter queue is full, dropping frame" << pts;
308 if (!m_encoder->filterFrame(f)) {
312 m_pendingFilterFrames++;
315 m_passthroughCondition.notify_all();
318void PipeWireProduce::stateChanged(pw_stream_state state)
320 if (state != PW_STREAM_STATE_PAUSED || !m_deactivated) {
324 qCDebug(PIPEWIRERECORD_LOGGING) <<
"finished without a stream";
328 disconnect(m_stream.data(), &PipeWireSourceStream::frameReceived,
this, &PipeWireProduce::processFrame);
330 if (m_pendingFilterFrames <= 0 && m_pendingEncodeFrames <= 0) {
340 qCDebug(PIPEWIRERECORD_LOGGING) <<
"Waiting for frame queues to empty, still pending filter" << m_pendingFilterFrames <<
"encode"
341 << m_pendingEncodeFrames;
342 m_passthroughCondition.notify_all();
346void PipeWireProduce::handleEncodedFramesChanged()
348 if (!m_deactivated) {
357 m_passthroughCondition.notify_all();
359 if (m_pendingFilterFrames <= 0) {
362 if (m_pendingEncodeFrames <= 0) {
368std::unique_ptr<Encoder> PipeWireProduce::makeEncoder()
370 auto forcedEncoder = qEnvironmentVariable(
"KPIPEWIRE_FORCE_ENCODER");
371 if (!forcedEncoder.isNull()) {
372 qCWarning(PIPEWIRERECORD_LOGGING) <<
"Forcing encoder to" << forcedEncoder;
375 auto size = m_stream->size();
377 switch (m_encoderType) {
378 case PipeWireBaseEncodedStream::H264Baseline:
379 case PipeWireBaseEncodedStream::H264Main: {
380 auto profile = m_encoderType == PipeWireBaseEncodedStream::H264Baseline ? Encoder::H264Profile::Baseline : Encoder::H264Profile::Main;
382 if (forcedEncoder.isNull() || forcedEncoder == u
"h264_vaapi") {
383 auto hardwareEncoder = std::make_unique<H264VAAPIEncoder>(profile,
this);
384 hardwareEncoder->setQuality(m_quality);
385 hardwareEncoder->setEncodingPreference(m_encodingPreference);
386 if (hardwareEncoder->initialize(size)) {
387 return hardwareEncoder;
391 if (forcedEncoder.isNull() || forcedEncoder == u
"libx264") {
392 auto softwareEncoder = std::make_unique<LibX264Encoder>(profile,
this);
393 softwareEncoder->setQuality(m_quality);
394 softwareEncoder->setEncodingPreference(m_encodingPreference);
395 if (softwareEncoder->initialize(size)) {
396 return softwareEncoder;
401 if (forcedEncoder.isNull() || forcedEncoder == u
"libopenh264") {
402 auto softwareEncoder = std::make_unique<LibOpenH264Encoder>(profile,
this);
403 softwareEncoder->setQuality(m_quality);
404 softwareEncoder->setEncodingPreference(m_encodingPreference);
405 if (softwareEncoder->initialize(size)) {
406 return softwareEncoder;
411 case PipeWireBaseEncodedStream::VP8: {
412 if (forcedEncoder.isNull() || forcedEncoder == u
"libvpx") {
413 auto encoder = std::make_unique<LibVpxEncoder>(
this);
414 encoder->setQuality(m_quality);
415 if (encoder->initialize(size)) {
421 case PipeWireBaseEncodedStream::VP9: {
422 if (forcedEncoder.isNull() || forcedEncoder == u
"libvpx-vp9") {
423 auto encoder = std::make_unique<LibVpxVp9Encoder>(
this);
424 encoder->setQuality(m_quality);
425 if (encoder->initialize(size)) {
431 case PipeWireBaseEncodedStream::Gif: {
432 if (forcedEncoder.isNull() || forcedEncoder == u
"gif") {
433 auto encoder = std::make_unique<GifEncoder>(
this);
434 if (encoder->initialize(size)) {
440 case PipeWireBaseEncodedStream::WebP: {
441 if (forcedEncoder.isNull() || forcedEncoder == u
"libwebp") {
442 auto encoder = std::make_unique<LibWebPEncoder>(
this);
443 encoder->setQuality(m_quality);
444 if (encoder->initialize(size)) {
451 qCWarning(PIPEWIRERECORD_LOGGING) <<
"Unknown encoder type" << m_encoderType;
457#include "moc_pipewireproduce_p.cpp"
QFuture< typename qValueType< Iterator >::value_type > filtered(Iterator begin, Iterator end, KeepFunctor &&filterFunction)
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
QThread * currentThread()