1use std::sync::Arc;
6
7use async_mutex::Mutex as AsyncMutex;
8use futures::{channel::oneshot, future, FutureExt as _};
9use mm_protocol as protocol;
10pub use protocol::audio_channels::Channel as AudioChannel;
11use tracing::error;
12
13use crate::{
14 codec, conn, display_params, input,
15 packet::{self, PacketRing},
16 ClientError, ClientState,
17};
18
19#[derive(Debug, Clone, uniffi::Record)]
20pub struct AttachmentConfig {
21 pub width: u32,
23 pub height: u32,
25
26 pub video_codec: Option<codec::VideoCodec>,
29
30 pub video_profile: Option<codec::VideoProfile>,
33
34 pub quality_preset: Option<u32>,
37
38 pub audio_codec: Option<codec::AudioCodec>,
41
42 pub sample_rate: Option<u32>,
45
46 pub channels: Vec<AudioChannel>,
49
50 pub video_stream_seq_offset: u64,
55
56 pub audio_stream_seq_offset: u64,
61}
62
63#[derive(Debug, Clone, uniffi::Record)]
65pub struct VideoStreamParams {
66 pub width: u32,
67 pub height: u32,
68
69 pub codec: codec::VideoCodec,
70 pub profile: codec::VideoProfile,
71}
72
73#[derive(Debug, Clone, uniffi::Record)]
75pub struct AudioStreamParams {
76 pub codec: codec::AudioCodec,
77 pub sample_rate: u32,
78 pub channels: Vec<AudioChannel>,
79}
80
81#[derive(uniffi::Object)]
87pub struct Attachment {
88 sid: u64,
89
90 video_stream_seq_offset: u64,
92
93 outgoing: flume::Sender<conn::OutgoingMessage>,
96 conn_waker: Arc<mio::Waker>,
97
98 detached: future::Shared<oneshot::Receiver<()>>,
99}
100
101impl Attachment {
102 pub(crate) async fn new(
103 sid: u64,
104 client: Arc<AsyncMutex<super::InnerClient>>,
105 attached: protocol::Attached,
106 delegate: Arc<dyn AttachmentDelegate>,
107 video_stream_seq_offset: u64,
108 ) -> Result<Self, ClientError> {
109 let session_id = attached.session_id;
110 let attachment_id = attached.attachment_id;
111 let (detached_tx, detached_rx) = oneshot::channel();
112
113 let state = AttachmentState {
114 session_id,
115 attachment_id,
116
117 delegate,
118 attached_msg: attached,
119 server_error: None,
120
121 video_packet_ring: PacketRing::new(),
122 video_stream_seq: None,
123 prev_video_stream_seq: None,
124 video_stream_seq_offset,
125
126 audio_packet_ring: PacketRing::new(),
127 audio_stream_seq: None,
128 prev_audio_stream_seq: None,
129 audio_stream_seq_offset: 0,
130
131 notify_detached: Some(detached_tx),
132 reattach_required: false,
133 };
134
135 let mut guard = client.lock().await;
136
137 let super::ConnHandle {
138 outgoing,
139 waker,
140 attachments,
141 ..
142 } = match &guard.state {
143 ClientState::Connected(conn) => conn,
144 ClientState::Defunct(e) => return Err(e.clone()),
145 };
146
147 let outgoing = outgoing.clone();
148 let conn_waker = waker.clone();
149
150 if attachments.send_async((sid, state)).await.is_err() {
153 match guard.close() {
154 Ok(_) => return Err(ClientError::Defunct),
155 Err(e) => return Err(e),
156 }
157 }
158
159 Ok(Self {
160 sid,
161 video_stream_seq_offset,
162 outgoing,
163 conn_waker,
164 detached: detached_rx.shared(),
165 })
166 }
167}
168
169#[uniffi::export(with_foreign)]
171pub trait AttachmentDelegate: Send + Sync + std::fmt::Debug {
172 fn video_stream_start(&self, stream_seq: u64, params: VideoStreamParams);
174
175 fn video_packet(&self, packet: Arc<packet::Packet>);
177
178 fn dropped_video_packet(&self, dropped: packet::DroppedPacket);
180
181 fn audio_stream_start(&self, stream_seq: u64, params: AudioStreamParams);
183
184 fn audio_packet(&self, packet: Arc<packet::Packet>);
186
187 fn update_cursor(
189 &self,
190 icon: input::CursorIcon,
191 image: Option<Vec<u8>>,
192 hotspot_x: u32,
193 hotspot_y: u32,
194 );
195
196 fn lock_pointer(&self, x: f64, y: f64);
198
199 fn release_pointer(&self);
201
202 fn display_params_changed(
206 &self,
207 params: display_params::DisplayParams,
208 reattach_required: bool,
209 );
210
211 fn error(&self, err: ClientError);
214
215 fn attachment_ended(&self);
217}
218
219impl Attachment {
220 fn send(&self, msg: impl Into<protocol::MessageType>, fin: bool) {
221 let _ = self.outgoing.send(conn::OutgoingMessage {
222 sid: self.sid,
223 msg: msg.into(),
224 fin,
225 });
226
227 let _ = self.conn_waker.wake();
228 }
229}
230
231#[uniffi::export]
232impl Attachment {
233 pub fn request_video_refresh(&self, stream_seq: u64) {
235 self.send(
236 protocol::RequestVideoRefresh {
237 stream_seq: stream_seq - self.video_stream_seq_offset,
238 },
239 false,
240 )
241 }
242
243 pub fn keyboard_input(&self, key: input::Key, state: input::KeyState, character: u32) {
245 self.send(
246 protocol::KeyboardInput {
247 key: key.into(),
248 state: state.into(),
249 char: character,
250 },
251 false,
252 )
253 }
254
255 pub fn pointer_entered(&self) {
258 self.send(protocol::PointerEntered {}, false)
259 }
260
261 pub fn pointer_left(&self) {
264 self.send(protocol::PointerLeft {}, false)
265 }
266
267 pub fn pointer_motion(&self, x: f64, y: f64) {
269 self.send(protocol::PointerMotion { x, y }, false)
270 }
271
272 pub fn relative_pointer_motion(&self, x: f64, y: f64) {
274 self.send(protocol::RelativePointerMotion { x, y }, false)
275 }
276
277 pub fn pointer_input(&self, button: input::Button, state: input::ButtonState, x: f64, y: f64) {
279 self.send(
280 protocol::PointerInput {
281 button: button.into(),
282 state: state.into(),
283 x,
284 y,
285 },
286 false,
287 )
288 }
289
290 pub fn pointer_scroll(&self, scroll_type: input::ScrollType, x: f64, y: f64) {
292 self.send(
293 protocol::PointerScroll {
294 scroll_type: scroll_type.into(),
295 x,
296 y,
297 },
298 false,
299 )
300 }
301
302 pub fn gamepad_available(&self, pad: input::Gamepad) {
304 self.send(
305 protocol::GamepadAvailable {
306 gamepad: Some(pad.into()),
307 },
308 false,
309 )
310 }
311
312 pub fn gamepad_unavailable(&self, id: u64) {
314 self.send(protocol::GamepadUnavailable { id }, false)
315 }
316
317 pub fn gamepad_motion(&self, id: u64, axis: input::GamepadAxis, value: f64) {
319 self.send(
320 protocol::GamepadMotion {
321 gamepad_id: id,
322 axis: axis.into(),
323 value,
324 },
325 false,
326 )
327 }
328
329 pub fn gamepad_input(
331 &self,
332 id: u64,
333 button: input::GamepadButton,
334 state: input::GamepadButtonState,
335 ) {
336 self.send(
337 protocol::GamepadInput {
338 gamepad_id: id,
339 button: button.into(),
340 state: state.into(),
341 },
342 false,
343 )
344 }
345
346 pub async fn detach(&self) -> Result<(), ClientError> {
348 self.send(protocol::Detach {}, true);
349 Ok(self.detached.clone().await?)
350 }
351}
352
353pub(crate) struct AttachmentState {
355 pub(crate) session_id: u64,
356 pub(crate) attachment_id: u64,
357
358 delegate: Arc<dyn AttachmentDelegate>,
359 attached_msg: protocol::Attached,
360 reattach_required: bool,
361 server_error: Option<protocol::Error>,
362
363 video_packet_ring: PacketRing,
364 video_stream_seq: Option<u64>,
365 prev_video_stream_seq: Option<u64>,
366 video_stream_seq_offset: u64,
367
368 audio_packet_ring: PacketRing,
369 audio_stream_seq: Option<u64>,
370 prev_audio_stream_seq: Option<u64>,
371 audio_stream_seq_offset: u64,
372
373 notify_detached: Option<oneshot::Sender<()>>,
375}
376
377impl AttachmentState {
378 pub(crate) fn handle_message(&mut self, msg: protocol::MessageType) {
379 match msg {
380 protocol::MessageType::Attached(attached) => {
381 error!(
382 "unexpected {} on already-attached stream",
383 protocol::MessageType::Attached(attached)
384 );
385 }
386 protocol::MessageType::VideoChunk(chunk) => {
387 if self.video_stream_seq.is_none_or(|s| s < chunk.stream_seq) {
390 self.prev_video_stream_seq = self.video_stream_seq;
392 self.video_stream_seq = Some(chunk.stream_seq);
393
394 let res = self.attached_msg.streaming_resolution.unwrap_or_default();
395
396 self.delegate.video_stream_start(
397 chunk.stream_seq + self.video_stream_seq_offset,
398 VideoStreamParams {
399 width: res.width,
400 height: res.height,
401 codec: self.attached_msg.video_codec(),
402 profile: self.attached_msg.video_profile(),
403 },
404 );
405
406 if let Some(prev) = self.prev_video_stream_seq {
408 self.video_packet_ring.discard(prev.saturating_sub(1));
409 }
410 }
411
412 if let Err(err) = self.video_packet_ring.recv_chunk(chunk) {
413 error!("error in packet ring: {:#}", err);
414 }
415
416 if let Some(prev) = self.prev_video_stream_seq {
417 for mut packet in self
419 .video_packet_ring
420 .drain_completed(prev)
421 .flat_map(Result::ok)
422 {
423 packet.stream_seq += self.video_stream_seq_offset;
424 self.delegate.video_packet(Arc::new(packet));
425 }
426 }
427
428 if self.video_stream_seq != self.prev_video_stream_seq {
429 for res in self
430 .video_packet_ring
431 .drain_completed(self.video_stream_seq.unwrap())
432 {
433 match res {
434 Ok(mut packet) => {
435 packet.stream_seq += self.video_stream_seq_offset;
436 self.delegate.video_packet(Arc::new(packet));
437 }
438 Err(mut dropped) => {
439 dropped.stream_seq += self.video_stream_seq_offset;
440 self.delegate.dropped_video_packet(dropped);
441 }
442 }
443 }
444 }
445 }
446 protocol::MessageType::AudioChunk(chunk) => {
447 if self.audio_stream_seq.is_none_or(|s| s < chunk.stream_seq) {
450 self.prev_audio_stream_seq = self.audio_stream_seq;
452 self.audio_stream_seq = Some(chunk.stream_seq);
453
454 let channels = self
455 .attached_msg
456 .channels
457 .as_ref()
458 .map(|c| c.channels().collect())
459 .unwrap_or_default();
460
461 self.delegate.audio_stream_start(
462 chunk.stream_seq + self.audio_stream_seq_offset,
463 AudioStreamParams {
464 codec: self.attached_msg.audio_codec(),
465 sample_rate: self.attached_msg.sample_rate_hz,
466 channels,
467 },
468 );
469
470 if let Some(prev) = self.prev_audio_stream_seq {
472 self.audio_packet_ring.discard(prev.saturating_sub(1));
473 }
474 }
475
476 if let Err(err) = self.audio_packet_ring.recv_chunk(chunk) {
477 error!("error in packet ring: {:#}", err);
478 }
479
480 if let Some(prev) = self.prev_audio_stream_seq {
481 for mut packet in self
482 .audio_packet_ring
483 .drain_completed(prev)
484 .flat_map(Result::ok)
485 {
486 packet.stream_seq += self.audio_stream_seq_offset;
487 self.delegate.audio_packet(Arc::new(packet));
488 }
489 }
490
491 if self.audio_stream_seq != self.prev_audio_stream_seq {
492 for mut packet in self
493 .audio_packet_ring
494 .drain_completed(self.audio_stream_seq.unwrap())
495 .flat_map(Result::ok)
496 {
497 packet.stream_seq += self.audio_stream_seq_offset;
498 self.delegate.audio_packet(Arc::new(packet));
499 }
500 }
501 }
502 protocol::MessageType::UpdateCursor(msg) => {
503 let image = match &msg.image {
504 v if v.is_empty() => None,
505 v => Some(v.to_vec()),
506 };
507
508 self.delegate
509 .update_cursor(msg.icon(), image, msg.hotspot_x, msg.hotspot_y);
510 }
511 protocol::MessageType::LockPointer(msg) => {
512 self.delegate.lock_pointer(msg.x, msg.y);
513 }
514 protocol::MessageType::ReleasePointer(_) => self.delegate.release_pointer(),
515 protocol::MessageType::SessionParametersChanged(msg) => {
516 let Some(params) = msg.display_params.and_then(|p| p.try_into().ok()) else {
517 error!(?msg, "invalid display params from server");
518 return;
519 };
520
521 self.delegate
522 .display_params_changed(params, msg.reattach_required);
523
524 self.reattach_required = msg.reattach_required;
526 }
527 protocol::MessageType::SessionEnded(_) => {
528 }
530 protocol::MessageType::Error(error) => {
531 self.server_error = Some(error.clone());
532 self.delegate.error(ClientError::ServerError(error));
533 }
534 v => error!("unexpected message on attachment stream: {}", v),
535 }
536 }
537
538 pub(crate) fn handle_close(mut self, err: Option<ClientError>) {
539 if let Some(tx) = self.notify_detached.take() {
540 let _ = tx.send(());
541 }
542
543 if self.reattach_required {
544 self.reattach_required = false;
545 } else if let Some(err) = err {
546 self.delegate.error(err);
547 } else if self.server_error.is_some() {
548 } else {
550 self.delegate.attachment_ended();
551 }
552 }
553}