summaryrefslogtreecommitdiff
path: root/cras/client/libcras/src/cras_stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cras/client/libcras/src/cras_stream.rs')
-rw-r--r--cras/client/libcras/src/cras_stream.rs182
1 files changed, 123 insertions, 59 deletions
diff --git a/cras/client/libcras/src/cras_stream.rs b/cras/client/libcras/src/cras_stream.rs
index f6004802..bd9520a1 100644
--- a/cras/client/libcras/src/cras_stream.rs
+++ b/cras/client/libcras/src/cras_stream.rs
@@ -4,13 +4,17 @@
use std::cmp::min;
use std::io;
use std::marker::PhantomData;
+use std::mem;
use std::{error, fmt};
use audio_streams::{
capture::{CaptureBuffer, CaptureBufferStream},
- BoxError, BufferDrop, PlaybackBuffer, PlaybackBufferStream,
+ BufferDrop, PlaybackBuffer, PlaybackBufferStream,
+};
+use cras_sys::gen::{
+ cras_disconnect_stream_message, cras_server_message, snd_pcm_format_t, CRAS_AUDIO_MESSAGE_ID,
+ CRAS_SERVER_MESSAGE_ID, CRAS_STREAM_DIRECTION,
};
-use cras_sys::gen::{snd_pcm_format_t, CRAS_AUDIO_MESSAGE_ID, CRAS_STREAM_DIRECTION};
use sys_util::error;
use crate::audio_socket::{AudioMessage, AudioSocket};
@@ -18,25 +22,38 @@ use crate::cras_server_socket::CrasServerSocket;
use crate::cras_shm::*;
#[derive(Debug)]
-pub enum Error {
+pub enum ErrorType {
IoError(io::Error),
MessageTypeError,
+ NoShmError,
+}
+
+#[derive(Debug)]
+pub struct Error {
+ error_type: ErrorType,
+}
+
+impl Error {
+ fn new(error_type: ErrorType) -> Error {
+ Error { error_type }
+ }
}
impl error::Error for Error {}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match self {
- Error::IoError(ref err) => err.fmt(f),
- Error::MessageTypeError => write!(f, "Message type error"),
+ match self.error_type {
+ ErrorType::IoError(ref err) => err.fmt(f),
+ ErrorType::MessageTypeError => write!(f, "Message type error"),
+ ErrorType::NoShmError => write!(f, "Shared memory area is not created"),
}
}
}
impl From<io::Error> for Error {
fn from(io_err: io::Error) -> Error {
- Error::IoError(io_err)
+ Error::new(ErrorType::IoError(io_err))
}
}
@@ -44,23 +61,31 @@ impl From<io::Error> for Error {
/// interacts with server's audio thread through `AudioSocket`.
pub trait CrasStreamData<'a>: Send {
// Creates `CrasStreamData` with only `AudioSocket`.
- fn new(audio_sock: AudioSocket, header: CrasAudioHeader<'a>) -> Self;
- fn header_mut(&mut self) -> &mut CrasAudioHeader<'a>;
+ fn new(audio_sock: AudioSocket) -> Self;
+ fn set_header(&mut self, header: CrasAudioHeader<'a>);
+ fn header_mut(&mut self) -> &mut Option<CrasAudioHeader<'a>>;
fn audio_sock_mut(&mut self) -> &mut AudioSocket;
}
/// `CrasStreamData` implementation for `PlaybackBufferStream`.
pub struct CrasPlaybackData<'a> {
audio_sock: AudioSocket,
- header: CrasAudioHeader<'a>,
+ header: Option<CrasAudioHeader<'a>>,
}
impl<'a> CrasStreamData<'a> for CrasPlaybackData<'a> {
- fn new(audio_sock: AudioSocket, header: CrasAudioHeader<'a>) -> Self {
- Self { audio_sock, header }
+ fn new(audio_sock: AudioSocket) -> Self {
+ Self {
+ audio_sock,
+ header: None,
+ }
+ }
+
+ fn set_header(&mut self, header: CrasAudioHeader<'a>) {
+ self.header = Some(header);
}
- fn header_mut(&mut self) -> &mut CrasAudioHeader<'a> {
+ fn header_mut(&mut self) -> &mut Option<CrasAudioHeader<'a>> {
&mut self.header
}
@@ -72,8 +97,10 @@ impl<'a> CrasStreamData<'a> for CrasPlaybackData<'a> {
impl<'a> BufferDrop for CrasPlaybackData<'a> {
fn trigger(&mut self, nframes: usize) {
let log_err = |e| error!("BufferDrop error: {}", e);
- if let Err(e) = self.header.commit_written_frames(nframes as u32) {
- log_err(e);
+ if let Some(header) = &mut self.header {
+ if let Err(e) = header.commit_written_frames(nframes as u32) {
+ log_err(e);
+ }
}
if let Err(e) = self.audio_sock.data_ready(nframes as u32) {
log_err(e);
@@ -84,15 +111,22 @@ impl<'a> BufferDrop for CrasPlaybackData<'a> {
/// `CrasStreamData` implementation for `CaptureBufferStream`.
pub struct CrasCaptureData<'a> {
audio_sock: AudioSocket,
- header: CrasAudioHeader<'a>,
+ header: Option<CrasAudioHeader<'a>>,
}
impl<'a> CrasStreamData<'a> for CrasCaptureData<'a> {
- fn new(audio_sock: AudioSocket, header: CrasAudioHeader<'a>) -> Self {
- Self { audio_sock, header }
+ fn new(audio_sock: AudioSocket) -> Self {
+ Self {
+ audio_sock,
+ header: None,
+ }
}
- fn header_mut(&mut self) -> &mut CrasAudioHeader<'a> {
+ fn set_header(&mut self, header: CrasAudioHeader<'a>) {
+ self.header = Some(header);
+ }
+
+ fn header_mut(&mut self) -> &mut Option<CrasAudioHeader<'a>> {
&mut self.header
}
@@ -104,8 +138,10 @@ impl<'a> CrasStreamData<'a> for CrasCaptureData<'a> {
impl<'a> BufferDrop for CrasCaptureData<'a> {
fn trigger(&mut self, nframes: usize) {
let log_err = |e| error!("BufferDrop error: {}", e);
- if let Err(e) = self.header.commit_read_frames(nframes as u32) {
- log_err(e);
+ if let Some(header) = &mut self.header {
+ if let Err(e) = header.commit_read_frames(nframes as u32) {
+ log_err(e);
+ }
}
if let Err(e) = self.audio_sock.capture_ready(nframes as u32) {
log_err(e);
@@ -119,14 +155,14 @@ pub struct CrasStream<'a, T: CrasStreamData<'a> + BufferDrop> {
server_socket: CrasServerSocket,
block_size: u32,
direction: CRAS_STREAM_DIRECTION,
- rate: u32,
+ rate: usize,
num_channels: usize,
format: snd_pcm_format_t,
/// A structure for stream to interact with server audio thread.
controls: T,
/// The `PhantomData` is used by `controls: T`
phantom: PhantomData<CrasAudioHeader<'a>>,
- audio_buffer: CrasAudioBuffer,
+ audio_buffer: Option<CrasAudioBuffer>,
}
impl<'a, T: CrasStreamData<'a> + BufferDrop> CrasStream<'a, T> {
@@ -134,22 +170,17 @@ impl<'a, T: CrasStreamData<'a> + BufferDrop> CrasStream<'a, T> {
///
/// # Returns
/// `CrasStream` - CRAS client stream.
- #[allow(clippy::too_many_arguments)]
- pub fn try_new(
+ pub fn new(
stream_id: u32,
server_socket: CrasServerSocket,
block_size: u32,
direction: CRAS_STREAM_DIRECTION,
- rate: u32,
+ rate: usize,
num_channels: usize,
format: snd_pcm_format_t,
audio_sock: AudioSocket,
- header_fd: CrasAudioShmHeaderFd,
- samples_fd: CrasShmFd,
- ) -> Result<Self, Error> {
- let (header, audio_buffer) = create_header_and_buffers(header_fd, samples_fd)?;
-
- Ok(Self {
+ ) -> Self {
+ Self {
stream_id,
server_socket,
block_size,
@@ -157,29 +188,41 @@ impl<'a, T: CrasStreamData<'a> + BufferDrop> CrasStream<'a, T> {
rate,
num_channels,
format,
- controls: T::new(audio_sock, header),
+ controls: T::new(audio_sock),
phantom: PhantomData,
- audio_buffer,
- })
+ audio_buffer: None,
+ }
+ }
+
+ /// Receives shared memory fd and initialize stream audio shared memory area
+ pub fn init_shm(
+ &mut self,
+ header_fd: CrasAudioShmHeaderFd,
+ samples_fd: CrasShmFd,
+ ) -> Result<(), Error> {
+ let (header, buffer) = create_header_and_buffers(header_fd, samples_fd)?;
+ self.controls.set_header(header);
+ self.audio_buffer = Some(buffer);
+ Ok(())
}
fn wait_request_data(&mut self) -> Result<(), Error> {
match self.controls.audio_sock_mut().read_audio_message()? {
- AudioMessage::Success {
- id: CRAS_AUDIO_MESSAGE_ID::AUDIO_MESSAGE_REQUEST_DATA,
- ..
- } => Ok(()),
- _ => Err(Error::MessageTypeError),
+ AudioMessage::Success { id, .. } => match id {
+ CRAS_AUDIO_MESSAGE_ID::AUDIO_MESSAGE_REQUEST_DATA => Ok(()),
+ _ => Err(Error::new(ErrorType::MessageTypeError)),
+ },
+ _ => Err(Error::new(ErrorType::MessageTypeError)),
}
}
fn wait_data_ready(&mut self) -> Result<u32, Error> {
match self.controls.audio_sock_mut().read_audio_message()? {
- AudioMessage::Success {
- id: CRAS_AUDIO_MESSAGE_ID::AUDIO_MESSAGE_DATA_READY,
- frames,
- } => Ok(frames),
- _ => Err(Error::MessageTypeError),
+ AudioMessage::Success { id, frames } => match id {
+ CRAS_AUDIO_MESSAGE_ID::AUDIO_MESSAGE_DATA_READY => Ok(frames),
+ _ => Err(Error::new(ErrorType::MessageTypeError)),
+ },
+ _ => Err(Error::new(ErrorType::MessageTypeError)),
}
}
}
@@ -189,36 +232,57 @@ impl<'a, T: CrasStreamData<'a> + BufferDrop> Drop for CrasStream<'a, T> {
/// the return message.
/// Logs an error message to stderr if the method fails.
fn drop(&mut self) {
- if let Err(e) = self.server_socket.disconnect_stream(self.stream_id) {
+ // Send stream disconnect message
+ let msg_header = cras_server_message {
+ length: mem::size_of::<cras_disconnect_stream_message>() as u32,
+ id: CRAS_SERVER_MESSAGE_ID::CRAS_SERVER_DISCONNECT_STREAM,
+ };
+ let server_cmsg = cras_disconnect_stream_message {
+ header: msg_header,
+ stream_id: self.stream_id,
+ };
+ if let Err(e) = self
+ .server_socket
+ .send_server_message_with_fds(&server_cmsg, &[])
+ {
error!("CrasStream::Drop error: {}", e);
}
}
}
impl<'a, T: CrasStreamData<'a> + BufferDrop> PlaybackBufferStream for CrasStream<'a, T> {
- fn next_playback_buffer(&mut self) -> Result<PlaybackBuffer, BoxError> {
+ fn next_playback_buffer(&mut self) -> Result<PlaybackBuffer, Box<dyn error::Error>> {
// Wait for request audio message
self.wait_request_data()?;
- let header = self.controls.header_mut();
- let frame_size = header.get_frame_size();
- let (offset, len) = header.get_write_offset_and_len()?;
- let buf = &mut self.audio_buffer.get_buffer()[offset..offset + len];
-
+ let (frame_size, (offset, len)) = match self.controls.header_mut() {
+ None => return Err(Error::new(ErrorType::NoShmError).into()),
+ Some(header) => (header.get_frame_size(), header.get_write_offset_and_len()?),
+ };
+ let buf = match self.audio_buffer.as_mut() {
+ None => return Err(Error::new(ErrorType::NoShmError).into()),
+ Some(audio_buffer) => &mut audio_buffer.get_buffer()[offset..offset + len],
+ };
PlaybackBuffer::new(frame_size, buf, &mut self.controls).map_err(Box::from)
}
}
impl<'a, T: CrasStreamData<'a> + BufferDrop> CaptureBufferStream for CrasStream<'a, T> {
- fn next_capture_buffer(&mut self) -> Result<CaptureBuffer, BoxError> {
+ fn next_capture_buffer(&mut self) -> Result<CaptureBuffer, Box<dyn error::Error>> {
// Wait for data ready message
let frames = self.wait_data_ready()?;
- let header = self.controls.header_mut();
- let frame_size = header.get_frame_size();
- let shm_frames = header.get_readable_frames()?;
+ let (frame_size, shm_frames, offset) = match self.controls.header_mut() {
+ None => return Err(Error::new(ErrorType::NoShmError).into()),
+ Some(header) => (
+ header.get_frame_size(),
+ header.get_readable_frames()?,
+ header.get_read_buffer_offset()?,
+ ),
+ };
let len = min(shm_frames, frames as usize) * frame_size;
- let offset = header.get_read_buffer_offset()?;
- let buf = &mut self.audio_buffer.get_buffer()[offset..offset + len];
-
+ let buf = match self.audio_buffer.as_mut() {
+ None => return Err(Error::new(ErrorType::NoShmError).into()),
+ Some(audio_buffer) => &mut audio_buffer.get_buffer()[offset..offset + len],
+ };
CaptureBuffer::new(frame_size, buf, &mut self.controls).map_err(Box::from)
}
}