mm_client_common/
attachment.rs

1// Copyright 2024 Colin Marc <hi@colinmarc.com>
2//
3// SPDX-License-Identifier: MIT
4
5use std::sync::Arc;
6
7use async_mutex::Mutex as AsyncMutex;
8use futures::{channel::oneshot, future, FutureExt as _};
9use mm_protocol as protocol;
10pub use protocol::audio_channels::Channel as AudioChannel;
11use tracing::error;
12
13use crate::{
14    codec, conn, display_params, input,
15    packet::{self, PacketRing},
16    ClientError, ClientState,
17};
18
19#[derive(Debug, Clone, uniffi::Record)]
20pub struct AttachmentConfig {
21    /// The width of the video stream.
22    pub width: u32,
23    /// The height of the video stream.
24    pub height: u32,
25
26    /// The codec to use for the video stream. Leaving it empty allows the
27    /// server to decide.
28    pub video_codec: Option<codec::VideoCodec>,
29
30    /// The profile (bit depth and colorspace) to use for the video stream.
31    /// Leaving it empty allows the server to decide.
32    pub video_profile: Option<codec::VideoProfile>,
33
34    /// The quality preset, from 1-10. A None or 0 indicates the server should
35    /// decide.
36    pub quality_preset: Option<u32>,
37
38    /// The codec to use for the audio stream. Leaving it empty allows the
39    /// server to decide.
40    pub audio_codec: Option<codec::AudioCodec>,
41
42    /// The sample rate to use for the audio stream. Leaving it empty allows the
43    /// server to decide.
44    pub sample_rate: Option<u32>,
45
46    /// The channel layout to use for the audio stream. An empty vec indicates
47    /// the server should decide.
48    pub channels: Vec<AudioChannel>,
49
50    /// An offset to apply to the stream_seq of incoming video packets. The
51    /// offset is applied on the client side, and exists as a convenient way to
52    /// way to ensure sequence numbers stay monotonic, even across individual
53    /// attachment streams.
54    pub video_stream_seq_offset: u64,
55
56    /// An offset to apply to the stream_seq of incoming audio packets. The
57    /// offset is applied on the client side, and exists as a convenient way to
58    /// way to ensure sequence numbers stay monotonic, even across individual
59    /// attachment streams.
60    pub audio_stream_seq_offset: u64,
61}
62
63/// The settled video stream params, after the server has applied its defaults.
64#[derive(Debug, Clone, uniffi::Record)]
65pub struct VideoStreamParams {
66    pub width: u32,
67    pub height: u32,
68
69    pub codec: codec::VideoCodec,
70    pub profile: codec::VideoProfile,
71}
72
73/// The settled audio stream params, after the server has applied its defaults.
74#[derive(Debug, Clone, uniffi::Record)]
75pub struct AudioStreamParams {
76    pub codec: codec::AudioCodec,
77    pub sample_rate: u32,
78    pub channels: Vec<AudioChannel>,
79}
80
81/// A handle for sending messages to the server over an attachment stream.
82///
83/// An attachment is ended once the corresponding AttachmentDelegate receives
84/// the attachment_ended or parameters_changed (with reattach_required = true)
85/// callbacks. Using it past that point will silently drop events.
86#[derive(uniffi::Object)]
87pub struct Attachment {
88    sid: u64,
89
90    /// Used to un-munge the stream_seq for [Attachment::request_video_refresh].
91    video_stream_seq_offset: u64,
92
93    // We store a copy of these so that we can send messages on the attachment
94    // stream without locking the client mutex.
95    outgoing: flume::Sender<conn::OutgoingMessage>,
96    conn_waker: Arc<mio::Waker>,
97
98    detached: future::Shared<oneshot::Receiver<()>>,
99}
100
101impl Attachment {
102    pub(crate) async fn new(
103        sid: u64,
104        client: Arc<AsyncMutex<super::InnerClient>>,
105        attached: protocol::Attached,
106        delegate: Arc<dyn AttachmentDelegate>,
107        video_stream_seq_offset: u64,
108    ) -> Result<Self, ClientError> {
109        let session_id = attached.session_id;
110        let attachment_id = attached.attachment_id;
111        let (detached_tx, detached_rx) = oneshot::channel();
112
113        let state = AttachmentState {
114            session_id,
115            attachment_id,
116
117            delegate,
118            attached_msg: attached,
119            server_error: None,
120
121            video_packet_ring: PacketRing::new(),
122            video_stream_seq: None,
123            prev_video_stream_seq: None,
124            video_stream_seq_offset,
125
126            audio_packet_ring: PacketRing::new(),
127            audio_stream_seq: None,
128            prev_audio_stream_seq: None,
129            audio_stream_seq_offset: 0,
130
131            notify_detached: Some(detached_tx),
132            reattach_required: false,
133        };
134
135        let mut guard = client.lock().await;
136
137        let super::ConnHandle {
138            outgoing,
139            waker,
140            attachments,
141            ..
142        } = match &guard.state {
143            ClientState::Connected(conn) => conn,
144            ClientState::Defunct(e) => return Err(e.clone()),
145        };
146
147        let outgoing = outgoing.clone();
148        let conn_waker = waker.clone();
149
150        // Track the attachment in the client, so that the reactor thread will
151        // send us messages.
152        if attachments.send_async((sid, state)).await.is_err() {
153            match guard.close() {
154                Ok(_) => return Err(ClientError::Defunct),
155                Err(e) => return Err(e),
156            }
157        }
158
159        Ok(Self {
160            sid,
161            video_stream_seq_offset,
162            outgoing,
163            conn_waker,
164            detached: detached_rx.shared(),
165        })
166    }
167}
168
169/// Used by client implementations to handle attachment events.
170#[uniffi::export(with_foreign)]
171pub trait AttachmentDelegate: Send + Sync + std::fmt::Debug {
172    /// The video stream is starting or restarting.
173    fn video_stream_start(&self, stream_seq: u64, params: VideoStreamParams);
174
175    /// A video packet is available.
176    fn video_packet(&self, packet: Arc<packet::Packet>);
177
178    /// A video packet was lost.
179    fn dropped_video_packet(&self, dropped: packet::DroppedPacket);
180
181    /// The audio stream is starting or restarting.
182    fn audio_stream_start(&self, stream_seq: u64, params: AudioStreamParams);
183
184    /// An audio packet is available.
185    fn audio_packet(&self, packet: Arc<packet::Packet>);
186
187    // The cursor was updated.
188    fn update_cursor(
189        &self,
190        icon: input::CursorIcon,
191        image: Option<Vec<u8>>,
192        hotspot_x: u32,
193        hotspot_y: u32,
194    );
195
196    /// The pointer should be locked to the given location.
197    fn lock_pointer(&self, x: f64, y: f64);
198
199    /// The pointer should be released.
200    fn release_pointer(&self);
201
202    /// The remote session display params were changed. This usually requires
203    /// the client to reattach. If reattach_required is true, the attachment
204    /// should be considered ended. [attachment_ended] will not be called.
205    fn display_params_changed(
206        &self,
207        params: display_params::DisplayParams,
208        reattach_required: bool,
209    );
210
211    /// The client encountered an error. The attachment should be considered
212    /// ended. [attachment_ended] will not be called.
213    fn error(&self, err: ClientError);
214
215    /// The attachment was ended by the server.
216    fn attachment_ended(&self);
217}
218
219impl Attachment {
220    fn send(&self, msg: impl Into<protocol::MessageType>, fin: bool) {
221        let _ = self.outgoing.send(conn::OutgoingMessage {
222            sid: self.sid,
223            msg: msg.into(),
224            fin,
225        });
226
227        let _ = self.conn_waker.wake();
228    }
229}
230
231#[uniffi::export]
232impl Attachment {
233    /// Requests that the server generate a packet with headers and a keyframe.
234    pub fn request_video_refresh(&self, stream_seq: u64) {
235        self.send(
236            protocol::RequestVideoRefresh {
237                stream_seq: stream_seq - self.video_stream_seq_offset,
238            },
239            false,
240        )
241    }
242
243    /// Sends keyboard input to the server.
244    pub fn keyboard_input(&self, key: input::Key, state: input::KeyState, character: u32) {
245        self.send(
246            protocol::KeyboardInput {
247                key: key.into(),
248                state: state.into(),
249                char: character,
250            },
251            false,
252        )
253    }
254
255    /// Notifies the server that the pointer has entered the video area,
256    /// including if it enters a letterbox around the video.
257    pub fn pointer_entered(&self) {
258        self.send(protocol::PointerEntered {}, false)
259    }
260
261    /// Notifies the server that the pointer has left the video area. This
262    /// should consider any letterboxing part of the video area.
263    pub fn pointer_left(&self) {
264        self.send(protocol::PointerLeft {}, false)
265    }
266
267    /// Sends pointer motion to the server.
268    pub fn pointer_motion(&self, x: f64, y: f64) {
269        self.send(protocol::PointerMotion { x, y }, false)
270    }
271
272    /// Sends relative pointer motion to the server.
273    pub fn relative_pointer_motion(&self, x: f64, y: f64) {
274        self.send(protocol::RelativePointerMotion { x, y }, false)
275    }
276
277    /// Sends pointer input to the server.
278    pub fn pointer_input(&self, button: input::Button, state: input::ButtonState, x: f64, y: f64) {
279        self.send(
280            protocol::PointerInput {
281                button: button.into(),
282                state: state.into(),
283                x,
284                y,
285            },
286            false,
287        )
288    }
289
290    /// Sends pointer scroll events to the server.
291    pub fn pointer_scroll(&self, scroll_type: input::ScrollType, x: f64, y: f64) {
292        self.send(
293            protocol::PointerScroll {
294                scroll_type: scroll_type.into(),
295                x,
296                y,
297            },
298            false,
299        )
300    }
301
302    /// Sends a 'Gamepad Available' event to the server.
303    pub fn gamepad_available(&self, pad: input::Gamepad) {
304        self.send(
305            protocol::GamepadAvailable {
306                gamepad: Some(pad.into()),
307            },
308            false,
309        )
310    }
311
312    /// Sends a 'Gamepad Unavailable' event to the server.
313    pub fn gamepad_unavailable(&self, id: u64) {
314        self.send(protocol::GamepadUnavailable { id }, false)
315    }
316
317    /// Sends gamepad joystick motion to the server.
318    pub fn gamepad_motion(&self, id: u64, axis: input::GamepadAxis, value: f64) {
319        self.send(
320            protocol::GamepadMotion {
321                gamepad_id: id,
322                axis: axis.into(),
323                value,
324            },
325            false,
326        )
327    }
328
329    /// Sends gamepad button input to the server.
330    pub fn gamepad_input(
331        &self,
332        id: u64,
333        button: input::GamepadButton,
334        state: input::GamepadButtonState,
335    ) {
336        self.send(
337            protocol::GamepadInput {
338                gamepad_id: id,
339                button: button.into(),
340                state: state.into(),
341            },
342            false,
343        )
344    }
345
346    /// Ends the attachment.
347    pub async fn detach(&self) -> Result<(), ClientError> {
348        self.send(protocol::Detach {}, true);
349        Ok(self.detached.clone().await?)
350    }
351}
352
353/// Internal state for an attachment.
354pub(crate) struct AttachmentState {
355    pub(crate) session_id: u64,
356    pub(crate) attachment_id: u64,
357
358    delegate: Arc<dyn AttachmentDelegate>,
359    attached_msg: protocol::Attached,
360    reattach_required: bool,
361    server_error: Option<protocol::Error>,
362
363    video_packet_ring: PacketRing,
364    video_stream_seq: Option<u64>,
365    prev_video_stream_seq: Option<u64>,
366    video_stream_seq_offset: u64,
367
368    audio_packet_ring: PacketRing,
369    audio_stream_seq: Option<u64>,
370    prev_audio_stream_seq: Option<u64>,
371    audio_stream_seq_offset: u64,
372
373    // A future representing the end of the attachment.
374    notify_detached: Option<oneshot::Sender<()>>,
375}
376
377impl AttachmentState {
378    pub(crate) fn handle_message(&mut self, msg: protocol::MessageType) {
379        match msg {
380            protocol::MessageType::Attached(attached) => {
381                error!(
382                    "unexpected {} on already-attached stream",
383                    protocol::MessageType::Attached(attached)
384                );
385            }
386            protocol::MessageType::VideoChunk(chunk) => {
387                // We always send packets for two streams - the current one and
388                // (if there is one) the previous one.
389                if self.video_stream_seq.is_none_or(|s| s < chunk.stream_seq) {
390                    // A new stream started.
391                    self.prev_video_stream_seq = self.video_stream_seq;
392                    self.video_stream_seq = Some(chunk.stream_seq);
393
394                    let res = self.attached_msg.streaming_resolution.unwrap_or_default();
395
396                    self.delegate.video_stream_start(
397                        chunk.stream_seq + self.video_stream_seq_offset,
398                        VideoStreamParams {
399                            width: res.width,
400                            height: res.height,
401                            codec: self.attached_msg.video_codec(),
402                            profile: self.attached_msg.video_profile(),
403                        },
404                    );
405
406                    // Discard any older packets.
407                    if let Some(prev) = self.prev_video_stream_seq {
408                        self.video_packet_ring.discard(prev.saturating_sub(1));
409                    }
410                }
411
412                if let Err(err) = self.video_packet_ring.recv_chunk(chunk) {
413                    error!("error in packet ring: {:#}", err);
414                }
415
416                if let Some(prev) = self.prev_video_stream_seq {
417                    // Ignore dropped packets on the previous stream.
418                    for mut packet in self
419                        .video_packet_ring
420                        .drain_completed(prev)
421                        .flat_map(Result::ok)
422                    {
423                        packet.stream_seq += self.video_stream_seq_offset;
424                        self.delegate.video_packet(Arc::new(packet));
425                    }
426                }
427
428                if self.video_stream_seq != self.prev_video_stream_seq {
429                    for res in self
430                        .video_packet_ring
431                        .drain_completed(self.video_stream_seq.unwrap())
432                    {
433                        match res {
434                            Ok(mut packet) => {
435                                packet.stream_seq += self.video_stream_seq_offset;
436                                self.delegate.video_packet(Arc::new(packet));
437                            }
438                            Err(mut dropped) => {
439                                dropped.stream_seq += self.video_stream_seq_offset;
440                                self.delegate.dropped_video_packet(dropped);
441                            }
442                        }
443                    }
444                }
445            }
446            protocol::MessageType::AudioChunk(chunk) => {
447                // We always send packets for two streams - the current one and
448                // (if there is one) the previous one.
449                if self.audio_stream_seq.is_none_or(|s| s < chunk.stream_seq) {
450                    // A new stream started.
451                    self.prev_audio_stream_seq = self.audio_stream_seq;
452                    self.audio_stream_seq = Some(chunk.stream_seq);
453
454                    let channels = self
455                        .attached_msg
456                        .channels
457                        .as_ref()
458                        .map(|c| c.channels().collect())
459                        .unwrap_or_default();
460
461                    self.delegate.audio_stream_start(
462                        chunk.stream_seq + self.audio_stream_seq_offset,
463                        AudioStreamParams {
464                            codec: self.attached_msg.audio_codec(),
465                            sample_rate: self.attached_msg.sample_rate_hz,
466                            channels,
467                        },
468                    );
469
470                    // Discard any older packets.
471                    if let Some(prev) = self.prev_audio_stream_seq {
472                        self.audio_packet_ring.discard(prev.saturating_sub(1));
473                    }
474                }
475
476                if let Err(err) = self.audio_packet_ring.recv_chunk(chunk) {
477                    error!("error in packet ring: {:#}", err);
478                }
479
480                if let Some(prev) = self.prev_audio_stream_seq {
481                    for mut packet in self
482                        .audio_packet_ring
483                        .drain_completed(prev)
484                        .flat_map(Result::ok)
485                    {
486                        packet.stream_seq += self.audio_stream_seq_offset;
487                        self.delegate.audio_packet(Arc::new(packet));
488                    }
489                }
490
491                if self.audio_stream_seq != self.prev_audio_stream_seq {
492                    for mut packet in self
493                        .audio_packet_ring
494                        .drain_completed(self.audio_stream_seq.unwrap())
495                        .flat_map(Result::ok)
496                    {
497                        packet.stream_seq += self.audio_stream_seq_offset;
498                        self.delegate.audio_packet(Arc::new(packet));
499                    }
500                }
501            }
502            protocol::MessageType::UpdateCursor(msg) => {
503                let image = match &msg.image {
504                    v if v.is_empty() => None,
505                    v => Some(v.to_vec()),
506                };
507
508                self.delegate
509                    .update_cursor(msg.icon(), image, msg.hotspot_x, msg.hotspot_y);
510            }
511            protocol::MessageType::LockPointer(msg) => {
512                self.delegate.lock_pointer(msg.x, msg.y);
513            }
514            protocol::MessageType::ReleasePointer(_) => self.delegate.release_pointer(),
515            protocol::MessageType::SessionParametersChanged(msg) => {
516                let Some(params) = msg.display_params.and_then(|p| p.try_into().ok()) else {
517                    error!(?msg, "invalid display params from server");
518                    return;
519                };
520
521                self.delegate
522                    .display_params_changed(params, msg.reattach_required);
523
524                // Mute the attachment_ended callback once.
525                self.reattach_required = msg.reattach_required;
526            }
527            protocol::MessageType::SessionEnded(_) => {
528                // We just check for the fin on the attachment stream.
529            }
530            protocol::MessageType::Error(error) => {
531                self.server_error = Some(error.clone());
532                self.delegate.error(ClientError::ServerError(error));
533            }
534            v => error!("unexpected message on attachment stream: {}", v),
535        }
536    }
537
538    pub(crate) fn handle_close(mut self, err: Option<ClientError>) {
539        if let Some(tx) = self.notify_detached.take() {
540            let _ = tx.send(());
541        }
542
543        if self.reattach_required {
544            self.reattach_required = false;
545        } else if let Some(err) = err {
546            self.delegate.error(err);
547        } else if self.server_error.is_some() {
548            // We don't call attachment_ended because we already called error.
549        } else {
550            self.delegate.attachment_ended();
551        }
552    }
553}