diff options
Diffstat (limited to 'cras/client/libcras/src/cras_stream.rs')
-rw-r--r-- | cras/client/libcras/src/cras_stream.rs | 182 |
1 files changed, 123 insertions, 59 deletions
diff --git a/cras/client/libcras/src/cras_stream.rs b/cras/client/libcras/src/cras_stream.rs index f6004802..bd9520a1 100644 --- a/cras/client/libcras/src/cras_stream.rs +++ b/cras/client/libcras/src/cras_stream.rs @@ -4,13 +4,17 @@ use std::cmp::min; use std::io; use std::marker::PhantomData; +use std::mem; use std::{error, fmt}; use audio_streams::{ capture::{CaptureBuffer, CaptureBufferStream}, - BoxError, BufferDrop, PlaybackBuffer, PlaybackBufferStream, + BufferDrop, PlaybackBuffer, PlaybackBufferStream, +}; +use cras_sys::gen::{ + cras_disconnect_stream_message, cras_server_message, snd_pcm_format_t, CRAS_AUDIO_MESSAGE_ID, + CRAS_SERVER_MESSAGE_ID, CRAS_STREAM_DIRECTION, }; -use cras_sys::gen::{snd_pcm_format_t, CRAS_AUDIO_MESSAGE_ID, CRAS_STREAM_DIRECTION}; use sys_util::error; use crate::audio_socket::{AudioMessage, AudioSocket}; @@ -18,25 +22,38 @@ use crate::cras_server_socket::CrasServerSocket; use crate::cras_shm::*; #[derive(Debug)] -pub enum Error { +pub enum ErrorType { IoError(io::Error), MessageTypeError, + NoShmError, +} + +#[derive(Debug)] +pub struct Error { + error_type: ErrorType, +} + +impl Error { + fn new(error_type: ErrorType) -> Error { + Error { error_type } + } } impl error::Error for Error {} impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Error::IoError(ref err) => err.fmt(f), - Error::MessageTypeError => write!(f, "Message type error"), + match self.error_type { + ErrorType::IoError(ref err) => err.fmt(f), + ErrorType::MessageTypeError => write!(f, "Message type error"), + ErrorType::NoShmError => write!(f, "Shared memory area is not created"), } } } impl From<io::Error> for Error { fn from(io_err: io::Error) -> Error { - Error::IoError(io_err) + Error::new(ErrorType::IoError(io_err)) } } @@ -44,23 +61,31 @@ impl From<io::Error> for Error { /// interacts with server's audio thread through `AudioSocket`. pub trait CrasStreamData<'a>: Send { // Creates `CrasStreamData` with only `AudioSocket`. - fn new(audio_sock: AudioSocket, header: CrasAudioHeader<'a>) -> Self; - fn header_mut(&mut self) -> &mut CrasAudioHeader<'a>; + fn new(audio_sock: AudioSocket) -> Self; + fn set_header(&mut self, header: CrasAudioHeader<'a>); + fn header_mut(&mut self) -> &mut Option<CrasAudioHeader<'a>>; fn audio_sock_mut(&mut self) -> &mut AudioSocket; } /// `CrasStreamData` implementation for `PlaybackBufferStream`. pub struct CrasPlaybackData<'a> { audio_sock: AudioSocket, - header: CrasAudioHeader<'a>, + header: Option<CrasAudioHeader<'a>>, } impl<'a> CrasStreamData<'a> for CrasPlaybackData<'a> { - fn new(audio_sock: AudioSocket, header: CrasAudioHeader<'a>) -> Self { - Self { audio_sock, header } + fn new(audio_sock: AudioSocket) -> Self { + Self { + audio_sock, + header: None, + } + } + + fn set_header(&mut self, header: CrasAudioHeader<'a>) { + self.header = Some(header); } - fn header_mut(&mut self) -> &mut CrasAudioHeader<'a> { + fn header_mut(&mut self) -> &mut Option<CrasAudioHeader<'a>> { &mut self.header } @@ -72,8 +97,10 @@ impl<'a> CrasStreamData<'a> for CrasPlaybackData<'a> { impl<'a> BufferDrop for CrasPlaybackData<'a> { fn trigger(&mut self, nframes: usize) { let log_err = |e| error!("BufferDrop error: {}", e); - if let Err(e) = self.header.commit_written_frames(nframes as u32) { - log_err(e); + if let Some(header) = &mut self.header { + if let Err(e) = header.commit_written_frames(nframes as u32) { + log_err(e); + } } if let Err(e) = self.audio_sock.data_ready(nframes as u32) { log_err(e); @@ -84,15 +111,22 @@ impl<'a> BufferDrop for CrasPlaybackData<'a> { /// `CrasStreamData` implementation for `CaptureBufferStream`. pub struct CrasCaptureData<'a> { audio_sock: AudioSocket, - header: CrasAudioHeader<'a>, + header: Option<CrasAudioHeader<'a>>, } impl<'a> CrasStreamData<'a> for CrasCaptureData<'a> { - fn new(audio_sock: AudioSocket, header: CrasAudioHeader<'a>) -> Self { - Self { audio_sock, header } + fn new(audio_sock: AudioSocket) -> Self { + Self { + audio_sock, + header: None, + } } - fn header_mut(&mut self) -> &mut CrasAudioHeader<'a> { + fn set_header(&mut self, header: CrasAudioHeader<'a>) { + self.header = Some(header); + } + + fn header_mut(&mut self) -> &mut Option<CrasAudioHeader<'a>> { &mut self.header } @@ -104,8 +138,10 @@ impl<'a> CrasStreamData<'a> for CrasCaptureData<'a> { impl<'a> BufferDrop for CrasCaptureData<'a> { fn trigger(&mut self, nframes: usize) { let log_err = |e| error!("BufferDrop error: {}", e); - if let Err(e) = self.header.commit_read_frames(nframes as u32) { - log_err(e); + if let Some(header) = &mut self.header { + if let Err(e) = header.commit_read_frames(nframes as u32) { + log_err(e); + } } if let Err(e) = self.audio_sock.capture_ready(nframes as u32) { log_err(e); @@ -119,14 +155,14 @@ pub struct CrasStream<'a, T: CrasStreamData<'a> + BufferDrop> { server_socket: CrasServerSocket, block_size: u32, direction: CRAS_STREAM_DIRECTION, - rate: u32, + rate: usize, num_channels: usize, format: snd_pcm_format_t, /// A structure for stream to interact with server audio thread. controls: T, /// The `PhantomData` is used by `controls: T` phantom: PhantomData<CrasAudioHeader<'a>>, - audio_buffer: CrasAudioBuffer, + audio_buffer: Option<CrasAudioBuffer>, } impl<'a, T: CrasStreamData<'a> + BufferDrop> CrasStream<'a, T> { @@ -134,22 +170,17 @@ impl<'a, T: CrasStreamData<'a> + BufferDrop> CrasStream<'a, T> { /// /// # Returns /// `CrasStream` - CRAS client stream. - #[allow(clippy::too_many_arguments)] - pub fn try_new( + pub fn new( stream_id: u32, server_socket: CrasServerSocket, block_size: u32, direction: CRAS_STREAM_DIRECTION, - rate: u32, + rate: usize, num_channels: usize, format: snd_pcm_format_t, audio_sock: AudioSocket, - header_fd: CrasAudioShmHeaderFd, - samples_fd: CrasShmFd, - ) -> Result<Self, Error> { - let (header, audio_buffer) = create_header_and_buffers(header_fd, samples_fd)?; - - Ok(Self { + ) -> Self { + Self { stream_id, server_socket, block_size, @@ -157,29 +188,41 @@ impl<'a, T: CrasStreamData<'a> + BufferDrop> CrasStream<'a, T> { rate, num_channels, format, - controls: T::new(audio_sock, header), + controls: T::new(audio_sock), phantom: PhantomData, - audio_buffer, - }) + audio_buffer: None, + } + } + + /// Receives shared memory fd and initialize stream audio shared memory area + pub fn init_shm( + &mut self, + header_fd: CrasAudioShmHeaderFd, + samples_fd: CrasShmFd, + ) -> Result<(), Error> { + let (header, buffer) = create_header_and_buffers(header_fd, samples_fd)?; + self.controls.set_header(header); + self.audio_buffer = Some(buffer); + Ok(()) } fn wait_request_data(&mut self) -> Result<(), Error> { match self.controls.audio_sock_mut().read_audio_message()? { - AudioMessage::Success { - id: CRAS_AUDIO_MESSAGE_ID::AUDIO_MESSAGE_REQUEST_DATA, - .. - } => Ok(()), - _ => Err(Error::MessageTypeError), + AudioMessage::Success { id, .. } => match id { + CRAS_AUDIO_MESSAGE_ID::AUDIO_MESSAGE_REQUEST_DATA => Ok(()), + _ => Err(Error::new(ErrorType::MessageTypeError)), + }, + _ => Err(Error::new(ErrorType::MessageTypeError)), } } fn wait_data_ready(&mut self) -> Result<u32, Error> { match self.controls.audio_sock_mut().read_audio_message()? { - AudioMessage::Success { - id: CRAS_AUDIO_MESSAGE_ID::AUDIO_MESSAGE_DATA_READY, - frames, - } => Ok(frames), - _ => Err(Error::MessageTypeError), + AudioMessage::Success { id, frames } => match id { + CRAS_AUDIO_MESSAGE_ID::AUDIO_MESSAGE_DATA_READY => Ok(frames), + _ => Err(Error::new(ErrorType::MessageTypeError)), + }, + _ => Err(Error::new(ErrorType::MessageTypeError)), } } } @@ -189,36 +232,57 @@ impl<'a, T: CrasStreamData<'a> + BufferDrop> Drop for CrasStream<'a, T> { /// the return message. /// Logs an error message to stderr if the method fails. fn drop(&mut self) { - if let Err(e) = self.server_socket.disconnect_stream(self.stream_id) { + // Send stream disconnect message + let msg_header = cras_server_message { + length: mem::size_of::<cras_disconnect_stream_message>() as u32, + id: CRAS_SERVER_MESSAGE_ID::CRAS_SERVER_DISCONNECT_STREAM, + }; + let server_cmsg = cras_disconnect_stream_message { + header: msg_header, + stream_id: self.stream_id, + }; + if let Err(e) = self + .server_socket + .send_server_message_with_fds(&server_cmsg, &[]) + { error!("CrasStream::Drop error: {}", e); } } } impl<'a, T: CrasStreamData<'a> + BufferDrop> PlaybackBufferStream for CrasStream<'a, T> { - fn next_playback_buffer(&mut self) -> Result<PlaybackBuffer, BoxError> { + fn next_playback_buffer(&mut self) -> Result<PlaybackBuffer, Box<dyn error::Error>> { // Wait for request audio message self.wait_request_data()?; - let header = self.controls.header_mut(); - let frame_size = header.get_frame_size(); - let (offset, len) = header.get_write_offset_and_len()?; - let buf = &mut self.audio_buffer.get_buffer()[offset..offset + len]; - + let (frame_size, (offset, len)) = match self.controls.header_mut() { + None => return Err(Error::new(ErrorType::NoShmError).into()), + Some(header) => (header.get_frame_size(), header.get_write_offset_and_len()?), + }; + let buf = match self.audio_buffer.as_mut() { + None => return Err(Error::new(ErrorType::NoShmError).into()), + Some(audio_buffer) => &mut audio_buffer.get_buffer()[offset..offset + len], + }; PlaybackBuffer::new(frame_size, buf, &mut self.controls).map_err(Box::from) } } impl<'a, T: CrasStreamData<'a> + BufferDrop> CaptureBufferStream for CrasStream<'a, T> { - fn next_capture_buffer(&mut self) -> Result<CaptureBuffer, BoxError> { + fn next_capture_buffer(&mut self) -> Result<CaptureBuffer, Box<dyn error::Error>> { // Wait for data ready message let frames = self.wait_data_ready()?; - let header = self.controls.header_mut(); - let frame_size = header.get_frame_size(); - let shm_frames = header.get_readable_frames()?; + let (frame_size, shm_frames, offset) = match self.controls.header_mut() { + None => return Err(Error::new(ErrorType::NoShmError).into()), + Some(header) => ( + header.get_frame_size(), + header.get_readable_frames()?, + header.get_read_buffer_offset()?, + ), + }; let len = min(shm_frames, frames as usize) * frame_size; - let offset = header.get_read_buffer_offset()?; - let buf = &mut self.audio_buffer.get_buffer()[offset..offset + len]; - + let buf = match self.audio_buffer.as_mut() { + None => return Err(Error::new(ErrorType::NoShmError).into()), + Some(audio_buffer) => &mut audio_buffer.get_buffer()[offset..offset + len], + }; CaptureBuffer::new(frame_size, buf, &mut self.controls).map_err(Box::from) } } |