KPipewire

pipewireproduce.cpp
1/*
2 SPDX-FileCopyrightText: 2022 Aleix Pol Gonzalez <aleixpol@kde.org>
3
4 SPDX-License-Identifier: LGPL-2.1-only OR LGPL-3.0-only OR LicenseRef-KDE-Accepted-LGPL
5*/
6
7#include "pipewireproduce_p.h"
8
9#include <QMutex>
10#include <QPainter>
11#include <QThreadPool>
12#include <logging_record.h>
13
14#include <QDateTime>
15#include <memory>
16#include <qstringliteral.h>
17
18#include "gifencoder_p.h"
19#include "h264vaapiencoder_p.h"
20#include "libopenh264encoder_p.h"
21#include "libvpxencoder_p.h"
22#include "libvpxvp9encoder_p.h"
23#include "libwebpencoder_p.h"
24#include "libx264encoder_p.h"
25
26#include "logging_frame_statistics.h"
27
28extern "C" {
29#include <fcntl.h>
30}
31
32Q_DECLARE_METATYPE(std::optional<int>);
33Q_DECLARE_METATYPE(std::optional<std::chrono::nanoseconds>);
34
35PipeWireProduce::PipeWireProduce(PipeWireBaseEncodedStream::Encoder encoderType, uint nodeId, uint fd, const Fraction &framerate)
36 : QObject()
37 , m_nodeId(nodeId)
38 , m_encoderType(encoderType)
39 , m_fd(fd)
40 , m_frameRate(framerate)
41{
42 qRegisterMetaType<std::optional<int>>();
43 qRegisterMetaType<std::optional<std::chrono::nanoseconds>>();
44}
45
46PipeWireProduce::~PipeWireProduce()
47{
48}
49
50void PipeWireProduce::initialize()
51{
52 m_stream.reset(new PipeWireSourceStream(nullptr));
53 m_stream->setMaxFramerate(m_frameRate);
54
55 // The check in supportsHardwareEncoding() is insufficient to fully
56 // determine if we actually support hardware encoding the current stream,
57 // but to determine that we need the stream size, which we don't get until
58 // after we've created the stream, but creating the stream sets important
59 // parameters that require the correct usage hint to be set. So use the
60 // insufficient check to set the hint, assuming that we still get a working
61 // stream when we use the wrong hint with software encoding.
62 m_stream->setUsageHint(Encoder::supportsHardwareEncoding() ? PipeWireSourceStream::UsageHint::EncodeHardware
63 : PipeWireSourceStream::UsageHint::EncodeSoftware);
64
65 bool created = m_stream->createStream(m_nodeId, m_fd);
66 if (!created || !m_stream->error().isEmpty()) {
67 qCWarning(PIPEWIRERECORD_LOGGING) << "failed to set up stream for" << m_nodeId << m_stream->error();
68 m_error = m_stream->error();
69 m_stream.reset(nullptr);
70 return;
71 }
72 connect(m_stream.get(), &PipeWireSourceStream::streamParametersChanged, this, &PipeWireProduce::setupStream);
73
74 if (PIPEWIRERECORDFRAMESTATS_LOGGING().isDebugEnabled()) {
75 m_frameStatisticsTimer = std::make_unique<QTimer>();
76 m_frameStatisticsTimer->setInterval(std::chrono::seconds(1));
77 connect(m_frameStatisticsTimer.get(), &QTimer::timeout, this, [this]() {
78 qCDebug(PIPEWIRERECORDFRAMESTATS_LOGGING) << "Processed" << m_processedFrames << "frames in the last second.";
79 qCDebug(PIPEWIRERECORDFRAMESTATS_LOGGING) << m_pendingFilterFrames << "frames pending for filter.";
80 qCDebug(PIPEWIRERECORDFRAMESTATS_LOGGING) << m_pendingEncodeFrames << "frames pending for encode.";
81 m_processedFrames = 0;
82 });
83 }
84
85 /**
86 * Kwin only sends a new frame when there's damage on screen
87 * The encoder does not flush all frames whilst a stream is active
88 * it will keep one frame in the queue waiting for more input until the stream is closed
89 *
90 * If there's no update this timer bumps the last frame through the stack again
91 * to flush the last frame.
92 */
93 m_frameRepeatTimer.reset(new QTimer);
94 m_frameRepeatTimer->setSingleShot(true);
95 m_frameRepeatTimer->setInterval(100);
96 connect(m_frameRepeatTimer.data(), &QTimer::timeout, this, [this]() {
97 auto f = m_lastFrame;
98 m_lastFrame = {};
99 aboutToEncode(f);
100 if (!m_encoder->filterFrame(f)) {
101 return;
102 }
103
104 m_pendingFilterFrames++;
105 m_passthroughCondition.notify_all();
106 });
107}
108
109Fraction PipeWireProduce::maxFramerate() const
110{
111 return m_maxFramerate;
112}
113
114void PipeWireProduce::setMaxFramerate(const Fraction &framerate)
115{
116 m_maxFramerate = framerate;
117
118 const double framesPerSecond = static_cast<double>(framerate.numerator) / framerate.denominator;
119 if (m_frameRepeatTimer) {
120 m_frameRepeatTimer->setInterval((1000 / framesPerSecond) * 2);
121 }
122 if (m_stream) {
123 m_stream->setMaxFramerate(framerate);
124 }
125}
126
127int PipeWireProduce::maxPendingFrames() const
128{
129 return m_maxPendingFrames;
130}
131
132void PipeWireProduce::setMaxPendingFrames(int newMaxBufferSize)
133{
134 if (newMaxBufferSize < 3) {
135 qCWarning(PIPEWIRERECORD_LOGGING) << "Maxmimum pending frame count of " << newMaxBufferSize << " requested. Value must be 3 or higher.";
136 newMaxBufferSize = 3;
137 }
138 m_maxPendingFrames = newMaxBufferSize;
139}
140
141void PipeWireProduce::setupStream()
142{
143 qCDebug(PIPEWIRERECORD_LOGGING) << "Setting up stream";
144 disconnect(m_stream.get(), &PipeWireSourceStream::streamParametersChanged, this, &PipeWireProduce::setupStream);
145
146 m_encoder = makeEncoder();
147 if (!m_encoder) {
148 qCWarning(PIPEWIRERECORD_LOGGING) << "No encoder could be created";
149 return;
150 }
151
152 connect(m_stream.get(), &PipeWireSourceStream::stateChanged, this, &PipeWireProduce::stateChanged);
153 if (!setupFormat()) {
154 qCWarning(PIPEWIRERECORD_LOGGING) << "Could not set up the producing thread";
155 return;
156 }
157
158 connect(m_stream.data(), &PipeWireSourceStream::frameReceived, this, &PipeWireProduce::processFrame);
159
160 m_passthroughThread = std::thread([this]() {
161 m_passthroughRunning = true;
162 while (m_passthroughRunning) {
163 std::unique_lock<std::mutex> lock(m_passthroughMutex);
164 m_passthroughCondition.wait(lock);
165
166 if (!m_passthroughRunning) {
167 break;
168 }
169
170 auto [filtered, queued] = m_encoder->encodeFrame(m_maxPendingFrames - m_pendingEncodeFrames);
171 m_pendingFilterFrames -= filtered;
172 m_pendingEncodeFrames += queued;
173
174 m_outputCondition.notify_all();
175 }
176 });
177 pthread_setname_np(m_passthroughThread.native_handle(), "PipeWireProduce::passthrough");
178
179 m_outputThread = std::thread([this]() {
180 m_outputRunning = true;
181 while (m_outputRunning) {
182 std::unique_lock<std::mutex> lock(m_outputMutex);
183 m_outputCondition.wait(lock);
184
185 if (!m_outputRunning) {
186 break;
187 }
188
189 auto received = m_encoder->receivePacket();
190 m_pendingEncodeFrames -= received;
191 m_processedFrames += received;
192
193 // Notify the produce thread that the count of processed frames has
194 // changed and it can do cleanup if needed, making sure that that
195 // handling is done on the right thread.
196 QMetaObject::invokeMethod(this, &PipeWireProduce::handleEncodedFramesChanged, Qt::QueuedConnection);
197 }
198 });
199 pthread_setname_np(m_outputThread.native_handle(), "PipeWireProduce::output");
200
201 if (m_frameStatisticsTimer) {
202 m_frameStatisticsTimer->start();
203 }
204 Q_EMIT started();
205}
206
207void PipeWireProduce::deactivate()
208{
209 m_deactivated = true;
210
211 auto streamState = PW_STREAM_STATE_PAUSED;
212 if (m_stream) {
213 streamState = m_stream->state();
214 m_stream->setActive(false);
215 }
216
217 // If we have not been initialized properly before, ensure we still run any
218 // cleanup code and exit the thread, otherwise we risk applications not closing
219 // properly.
220 if (!m_encoder || streamState != PW_STREAM_STATE_STREAMING) {
221 QMetaObject::invokeMethod(this, &PipeWireProduce::destroy, Qt::QueuedConnection);
222 }
223}
224
225void PipeWireProduce::destroy()
226{
227 // Ensure we cleanup the PipeWireSourceStream while in the same thread we
228 // created it in.
229 Q_ASSERT_X(QThread::currentThread() == thread(), "PipeWireProduce", "destroy() called from a different thread than PipeWireProduce's thread");
230
231 if (!m_stream) {
232 return;
233 }
234
235 m_frameRepeatTimer->stop();
236
237 m_frameStatisticsTimer = nullptr;
238
239 if (m_passthroughThread.joinable()) {
240 m_passthroughRunning = false;
241 m_passthroughCondition.notify_all();
242 m_passthroughThread.join();
243 }
244
245 if (m_outputThread.joinable()) {
246 m_outputRunning = false;
247 m_outputCondition.notify_all();
248 m_outputThread.join();
249 }
250
251 m_stream.reset();
252
253 qCDebug(PIPEWIRERECORD_LOGGING) << "finished";
254 cleanup();
255 Q_EMIT finished();
257}
258
259void PipeWireProduce::setQuality(const std::optional<quint8> &quality)
260{
261 m_quality = quality;
262 if (m_encoder) {
263 m_encoder->setQuality(quality);
264 }
265}
266
267void PipeWireProduce::setEncodingPreference(const PipeWireBaseEncodedStream::EncodingPreference &encodingPreference)
268{
269 m_encodingPreference = encodingPreference;
270
271 if (m_encoder) {
272 m_encoder->setEncodingPreference(encodingPreference);
273 }
274}
275
276void PipeWireProduce::processFrame(const PipeWireFrame &frame)
277{
278 auto f = frame;
279
280 m_lastFrame = frame;
281 m_frameRepeatTimer->start();
282
283 if (frame.cursor) {
284 m_cursor.position = frame.cursor->position;
285 m_cursor.hotspot = frame.cursor->hotspot;
286 if (!frame.cursor->texture.isNull()) {
287 m_cursor.dirty = true;
288 m_cursor.texture = frame.cursor->texture;
289 }
290 }
291
292 auto pts = framePts(frame.presentationTimestamp);
293 if (m_previousPts >= 0 && pts <= m_previousPts) {
294 return;
295 }
296
297 auto frameTime = 1000.0 / (m_maxFramerate.numerator / m_maxFramerate.denominator);
298 if ((pts - m_previousPts) < frameTime) {
299 return;
300 }
301
302 if (m_pendingFilterFrames + 1 > m_maxPendingFrames) {
303 qCWarning(PIPEWIRERECORD_LOGGING) << "Filter queue is full, dropping frame" << pts;
304 return;
305 }
306
307 aboutToEncode(f);
308 if (!m_encoder->filterFrame(f)) {
309 return;
310 }
311
312 m_pendingFilterFrames++;
313 m_previousPts = pts;
314
315 m_passthroughCondition.notify_all();
316}
317
318void PipeWireProduce::stateChanged(pw_stream_state state)
319{
320 if (state != PW_STREAM_STATE_PAUSED || !m_deactivated) {
321 return;
322 }
323 if (!m_stream) {
324 qCDebug(PIPEWIRERECORD_LOGGING) << "finished without a stream";
325 return;
326 }
327
328 disconnect(m_stream.data(), &PipeWireSourceStream::frameReceived, this, &PipeWireProduce::processFrame);
329
330 if (m_pendingFilterFrames <= 0 && m_pendingEncodeFrames <= 0) {
331 // If we have nothing pending, cleanup immediately.
332 m_encoder->finish();
333
334 // We want to clean up the source stream while in the input thread, but we
335 // need to do so while not handling any PipeWire callback as that risks
336 // crashing because we're stil executing PipeWire handling code.
337 QMetaObject::invokeMethod(this, &PipeWireProduce::destroy, Qt::QueuedConnection);
338 } else {
339 // If we have pending frames, wait with cleanup until all frames have been processed.
340 qCDebug(PIPEWIRERECORD_LOGGING) << "Waiting for frame queues to empty, still pending filter" << m_pendingFilterFrames << "encode"
341 << m_pendingEncodeFrames;
342 m_passthroughCondition.notify_all();
343 }
344}
345
346void PipeWireProduce::handleEncodedFramesChanged()
347{
348 if (!m_deactivated) {
349 return;
350 }
351
352 // If we're deactivating but still have frames in the queue, we want to
353 // flush everything. Since at that point we are not receiving new frames, we
354 // need a different trigger to make the filtering thread process frames.
355 // Triggering here means the filter thread runs as fast as the encode thread
356 // can process the frames.
357 m_passthroughCondition.notify_all();
358
359 if (m_pendingFilterFrames <= 0) {
360 m_encoder->finish();
361
362 if (m_pendingEncodeFrames <= 0) {
363 destroy();
364 }
365 }
366}
367
368std::unique_ptr<Encoder> PipeWireProduce::makeEncoder()
369{
370 auto forcedEncoder = qEnvironmentVariable("KPIPEWIRE_FORCE_ENCODER");
371 if (!forcedEncoder.isNull()) {
372 qCWarning(PIPEWIRERECORD_LOGGING) << "Forcing encoder to" << forcedEncoder;
373 }
374
375 auto size = m_stream->size();
376
377 switch (m_encoderType) {
378 case PipeWireBaseEncodedStream::H264Baseline:
379 case PipeWireBaseEncodedStream::H264Main: {
380 auto profile = m_encoderType == PipeWireBaseEncodedStream::H264Baseline ? Encoder::H264Profile::Baseline : Encoder::H264Profile::Main;
381
382 if (forcedEncoder.isNull() || forcedEncoder == u"h264_vaapi") {
383 auto hardwareEncoder = std::make_unique<H264VAAPIEncoder>(profile, this);
384 hardwareEncoder->setQuality(m_quality);
385 hardwareEncoder->setEncodingPreference(m_encodingPreference);
386 if (hardwareEncoder->initialize(size)) {
387 return hardwareEncoder;
388 }
389 }
390
391 if (forcedEncoder.isNull() || forcedEncoder == u"libx264") {
392 auto softwareEncoder = std::make_unique<LibX264Encoder>(profile, this);
393 softwareEncoder->setQuality(m_quality);
394 softwareEncoder->setEncodingPreference(m_encodingPreference);
395 if (softwareEncoder->initialize(size)) {
396 return softwareEncoder;
397 }
398 }
399
400 // Try libopenh264 last, it's slower and has less features.
401 if (forcedEncoder.isNull() || forcedEncoder == u"libopenh264") {
402 auto softwareEncoder = std::make_unique<LibOpenH264Encoder>(profile, this);
403 softwareEncoder->setQuality(m_quality);
404 softwareEncoder->setEncodingPreference(m_encodingPreference);
405 if (softwareEncoder->initialize(size)) {
406 return softwareEncoder;
407 }
408 }
409 break;
410 }
411 case PipeWireBaseEncodedStream::VP8: {
412 if (forcedEncoder.isNull() || forcedEncoder == u"libvpx") {
413 auto encoder = std::make_unique<LibVpxEncoder>(this);
414 encoder->setQuality(m_quality);
415 if (encoder->initialize(size)) {
416 return encoder;
417 }
418 }
419 break;
420 }
421 case PipeWireBaseEncodedStream::VP9: {
422 if (forcedEncoder.isNull() || forcedEncoder == u"libvpx-vp9") {
423 auto encoder = std::make_unique<LibVpxVp9Encoder>(this);
424 encoder->setQuality(m_quality);
425 if (encoder->initialize(size)) {
426 return encoder;
427 }
428 }
429 break;
430 }
431 case PipeWireBaseEncodedStream::Gif: {
432 if (forcedEncoder.isNull() || forcedEncoder == u"gif") {
433 auto encoder = std::make_unique<GifEncoder>(this);
434 if (encoder->initialize(size)) {
435 return encoder;
436 }
437 }
438 break;
439 }
440 case PipeWireBaseEncodedStream::WebP: {
441 if (forcedEncoder.isNull() || forcedEncoder == u"libwebp") {
442 auto encoder = std::make_unique<LibWebPEncoder>(this);
443 encoder->setQuality(m_quality);
444 if (encoder->initialize(size)) {
445 return encoder;
446 }
447 }
448 break;
449 }
450 default:
451 qCWarning(PIPEWIRERECORD_LOGGING) << "Unknown encoder type" << m_encoderType;
452 }
453
454 return nullptr;
455}
456
457#include "moc_pipewireproduce_p.cpp"
bool invokeMethod(QObject *context, Functor &&function, FunctorReturnType *ret)
QueuedConnection
QFuture< typename qValueType< Iterator >::value_type > filtered(Iterator begin, Iterator end, KeepFunctor &&filterFunction)
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
QThread * currentThread()
void quit()
void timeout()
This file is part of the KDE documentation.
Documentation copyright © 1996-2025 The KDE developers.
Generated on Fri May 2 2025 12:00:27 by doxygen 1.13.2 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.