diff options
Diffstat (limited to 'audio_streams/src/shm_streams.rs')
-rw-r--r-- | audio_streams/src/shm_streams.rs | 568 |
1 files changed, 0 insertions, 568 deletions
diff --git a/audio_streams/src/shm_streams.rs b/audio_streams/src/shm_streams.rs deleted file mode 100644 index b11626fd..00000000 --- a/audio_streams/src/shm_streams.rs +++ /dev/null @@ -1,568 +0,0 @@ -// Copyright 2019 The Chromium OS Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. -use std::error; -use std::fmt; -use std::os::unix::io::RawFd; -use std::sync::Arc; -use std::time::{Duration, Instant}; - -use sync::{Condvar, Mutex}; -use sys_util::SharedMemory; - -use crate::{BoxError, SampleFormat, StreamDirection, StreamEffect}; - -type GenericResult<T> = std::result::Result<T, BoxError>; - -/// `BufferSet` is used as a callback mechanism for `ServerRequest` objects. -/// It is meant to be implemented by the audio stream, allowing arbitrary code -/// to be run after a buffer offset and length is set. -pub trait BufferSet { - /// Called when the client sets a buffer offset and length. - /// - /// `offset` is the offset within shared memory of the buffer and `frames` - /// indicates the number of audio frames that can be read from or written to - /// the buffer. - fn callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>; - - /// Called when the client ignores a request from the server. - fn ignore(&mut self) -> GenericResult<()>; -} - -#[derive(Debug)] -pub enum Error { - TooManyFrames(usize, usize), -} - -impl error::Error for Error {} - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Error::TooManyFrames(provided, requested) => write!( - f, - "Provided number of frames {} exceeds requested number of frames {}", - provided, requested - ), - } - } -} - -/// `ServerRequest` represents an active request from the server for the client -/// to provide a buffer in shared memory to playback from or capture to. -pub struct ServerRequest<'a> { - requested_frames: usize, - buffer_set: &'a mut dyn BufferSet, -} - -impl<'a> ServerRequest<'a> { - /// Create a new ServerRequest object - /// - /// Create a ServerRequest object representing a request from the server - /// for a buffer `requested_frames` in size. - /// - /// When the client responds to this request by calling - /// [`set_buffer_offset_and_frames`](ServerRequest::set_buffer_offset_and_frames), - /// BufferSet::callback will be called on `buffer_set`. - /// - /// # Arguments - /// * `requested_frames` - The requested buffer size in frames. - /// * `buffer_set` - The object implementing the callback for when a buffer is provided. - pub fn new<D: BufferSet>(requested_frames: usize, buffer_set: &'a mut D) -> Self { - Self { - requested_frames, - buffer_set, - } - } - - /// Get the number of frames of audio data requested by the server. - /// - /// The returned value should never be greater than the `buffer_size` - /// given in [`new_stream`](ShmStreamSource::new_stream). - pub fn requested_frames(&self) -> usize { - self.requested_frames - } - - /// Sets the buffer offset and length for the requested buffer. - /// - /// Sets the buffer offset and length of the buffer that fulfills this - /// server request to `offset` and `length`, respectively. This means that - /// `length` bytes of audio samples may be read from/written to that - /// location in `client_shm` for a playback/capture stream, respectively. - /// This function may only be called once for a `ServerRequest`, at which - /// point the ServerRequest is dropped and no further calls are possible. - /// - /// # Arguments - /// - /// * `offset` - The value to use as the new buffer offset for the next buffer. - /// * `frames` - The length of the next buffer in frames. - /// - /// # Errors - /// - /// * If `frames` is greater than `requested_frames`. - pub fn set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()> { - if frames > self.requested_frames { - return Err(Box::new(Error::TooManyFrames( - frames, - self.requested_frames, - ))); - } - - self.buffer_set.callback(offset, frames) - } - - /// Ignore this request - /// - /// If the client does not intend to respond to this ServerRequest with a - /// buffer, they should call this function. The stream will be notified that - /// the request has been ignored and will handle it properly. - pub fn ignore_request(self) -> GenericResult<()> { - self.buffer_set.ignore() - } -} - -/// `ShmStream` allows a client to interact with an active CRAS stream. -pub trait ShmStream: Send { - /// Get the size of a frame of audio data for this stream. - fn frame_size(&self) -> usize; - - /// Get the number of channels of audio data for this stream. - fn num_channels(&self) -> usize; - - /// Get the frame rate of audio data for this stream. - fn frame_rate(&self) -> u32; - - /// Waits until the next server message indicating action is required. - /// - /// For playback streams, this will be `AUDIO_MESSAGE_REQUEST_DATA`, meaning - /// that we must set the buffer offset to the next location where playback - /// data can be found. - /// For capture streams, this will be `AUDIO_MESSAGE_DATA_READY`, meaning - /// that we must set the buffer offset to the next location where captured - /// data can be written to. - /// Will return early if `timeout` elapses before a message is received. - /// - /// # Arguments - /// - /// * `timeout` - The amount of time to wait until a message is received. - /// - /// # Return value - /// - /// Returns `Some(request)` where `request` is an object that implements the - /// [`ServerRequest`](ServerRequest) trait and which can be used to get the - /// number of bytes requested for playback streams or that have already been - /// written to shm for capture streams. - /// - /// If the timeout occurs before a message is received, returns `None`. - /// - /// # Errors - /// - /// * If an invalid message type is received for the stream. - fn wait_for_next_action_with_timeout( - &mut self, - timeout: Duration, - ) -> GenericResult<Option<ServerRequest>>; -} - -/// `ShmStreamSource` creates streams for playback or capture of audio. -pub trait ShmStreamSource: Send { - /// Creates a new [`ShmStream`](ShmStream) - /// - /// Creates a new `ShmStream` object, which allows: - /// * Waiting until the server has communicated that data is ready or - /// requested that we make more data available. - /// * Setting the location and length of buffers for reading/writing audio data. - /// - /// # Arguments - /// - /// * `direction` - The direction of the stream, either `Playback` or `Capture`. - /// * `num_channels` - The number of audio channels for the stream. - /// * `format` - The audio format to use for audio samples. - /// * `frame_rate` - The stream's frame rate in Hz. - /// * `buffer_size` - The maximum size of an audio buffer. This will be the - /// size used for transfers of audio data between client - /// and server. - /// * `effects` - Audio effects to use for the stream, such as echo-cancellation. - /// * `client_shm` - The shared memory area that will contain samples. - /// * `buffer_offsets` - The two initial values to use as buffer offsets - /// for streams. This way, the server will not write - /// audio data to an arbitrary offset in `client_shm` - /// if the client fails to update offsets in time. - /// - /// # Errors - /// - /// * If sending the connect stream message to the server fails. - #[allow(clippy::too_many_arguments)] - fn new_stream( - &mut self, - direction: StreamDirection, - num_channels: usize, - format: SampleFormat, - frame_rate: u32, - buffer_size: usize, - effects: &[StreamEffect], - client_shm: &SharedMemory, - buffer_offsets: [u64; 2], - ) -> GenericResult<Box<dyn ShmStream>>; - - /// Get a list of file descriptors used by the implementation. - /// - /// Returns any open file descriptors needed by the implementation. - /// This list helps users of the ShmStreamSource enter Linux jails without - /// closing needed file descriptors. - fn keep_fds(&self) -> Vec<RawFd> { - Vec::new() - } -} - -/// Class that implements ShmStream trait but does nothing with the samples -pub struct NullShmStream { - num_channels: usize, - frame_rate: u32, - buffer_size: usize, - frame_size: usize, - interval: Duration, - next_frame: Duration, - start_time: Instant, -} - -impl NullShmStream { - /// Attempt to create a new NullShmStream with the given number of channels, - /// format, frame_rate, and buffer_size. - pub fn new( - buffer_size: usize, - num_channels: usize, - format: SampleFormat, - frame_rate: u32, - ) -> Self { - let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64); - Self { - num_channels, - frame_rate, - buffer_size, - frame_size: format.sample_bytes() * num_channels, - interval, - next_frame: interval, - start_time: Instant::now(), - } - } -} - -impl BufferSet for NullShmStream { - fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> { - Ok(()) - } - - fn ignore(&mut self) -> GenericResult<()> { - Ok(()) - } -} - -impl ShmStream for NullShmStream { - fn frame_size(&self) -> usize { - self.frame_size - } - - fn num_channels(&self) -> usize { - self.num_channels - } - - fn frame_rate(&self) -> u32 { - self.frame_rate - } - - fn wait_for_next_action_with_timeout( - &mut self, - timeout: Duration, - ) -> GenericResult<Option<ServerRequest>> { - let elapsed = self.start_time.elapsed(); - if elapsed < self.next_frame { - if timeout < self.next_frame - elapsed { - std::thread::sleep(timeout); - return Ok(None); - } else { - std::thread::sleep(self.next_frame - elapsed); - } - } - self.next_frame += self.interval; - Ok(Some(ServerRequest::new(self.buffer_size, self))) - } -} - -/// Source of `NullShmStream` objects. -#[derive(Default)] -pub struct NullShmStreamSource; - -impl NullShmStreamSource { - pub fn new() -> Self { - Self::default() - } -} - -impl ShmStreamSource for NullShmStreamSource { - fn new_stream( - &mut self, - _direction: StreamDirection, - num_channels: usize, - format: SampleFormat, - frame_rate: u32, - buffer_size: usize, - _effects: &[StreamEffect], - _client_shm: &SharedMemory, - _buffer_offsets: [u64; 2], - ) -> GenericResult<Box<dyn ShmStream>> { - let new_stream = NullShmStream::new(buffer_size, num_channels, format, frame_rate); - Ok(Box::new(new_stream)) - } -} - -#[derive(Clone)] -pub struct MockShmStream { - num_channels: usize, - frame_rate: u32, - request_size: usize, - frame_size: usize, - request_notifier: Arc<(Mutex<bool>, Condvar)>, -} - -impl MockShmStream { - /// Attempt to create a new MockShmStream with the given number of - /// channels, frame_rate, format, and buffer_size. - pub fn new( - num_channels: usize, - frame_rate: u32, - format: SampleFormat, - buffer_size: usize, - ) -> Self { - Self { - num_channels, - frame_rate, - request_size: buffer_size, - frame_size: format.sample_bytes() * num_channels, - request_notifier: Arc::new((Mutex::new(false), Condvar::new())), - } - } - - /// Call to request data from the stream, causing it to return from - /// `wait_for_next_action_with_timeout`. Will block until - /// `set_buffer_offset_and_frames` is called on the ServerRequest returned - /// from `wait_for_next_action_with_timeout`, or until `timeout` elapses. - /// Returns true if a response was successfully received. - pub fn trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool { - let &(ref lock, ref cvar) = &*self.request_notifier; - let mut requested = lock.lock(); - *requested = true; - cvar.notify_one(); - let start_time = Instant::now(); - while *requested { - requested = cvar.wait_timeout(requested, timeout).0; - if start_time.elapsed() > timeout { - // We failed to get a callback in time, mark this as false. - *requested = false; - return false; - } - } - - true - } - - fn notify_request(&mut self) { - let &(ref lock, ref cvar) = &*self.request_notifier; - let mut requested = lock.lock(); - *requested = false; - cvar.notify_one(); - } -} - -impl BufferSet for MockShmStream { - fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> { - self.notify_request(); - Ok(()) - } - - fn ignore(&mut self) -> GenericResult<()> { - self.notify_request(); - Ok(()) - } -} - -impl ShmStream for MockShmStream { - fn frame_size(&self) -> usize { - self.frame_size - } - - fn num_channels(&self) -> usize { - self.num_channels - } - - fn frame_rate(&self) -> u32 { - self.frame_rate - } - - fn wait_for_next_action_with_timeout( - &mut self, - timeout: Duration, - ) -> GenericResult<Option<ServerRequest>> { - { - let start_time = Instant::now(); - let &(ref lock, ref cvar) = &*self.request_notifier; - let mut requested = lock.lock(); - while !*requested { - requested = cvar.wait_timeout(requested, timeout).0; - if start_time.elapsed() > timeout { - return Ok(None); - } - } - } - - Ok(Some(ServerRequest::new(self.request_size, self))) - } -} - -/// Source of `MockShmStream` objects. -#[derive(Clone, Default)] -pub struct MockShmStreamSource { - last_stream: Arc<(Mutex<Option<MockShmStream>>, Condvar)>, -} - -impl MockShmStreamSource { - pub fn new() -> Self { - Default::default() - } - - /// Get the last stream that has been created from this source. If no stream - /// has been created, block until one has. - pub fn get_last_stream(&self) -> MockShmStream { - let &(ref last_stream, ref cvar) = &*self.last_stream; - let mut stream = last_stream.lock(); - loop { - match &*stream { - None => stream = cvar.wait(stream), - Some(ref s) => return s.clone(), - }; - } - } -} - -impl ShmStreamSource for MockShmStreamSource { - fn new_stream( - &mut self, - _direction: StreamDirection, - num_channels: usize, - format: SampleFormat, - frame_rate: u32, - buffer_size: usize, - _effects: &[StreamEffect], - _client_shm: &SharedMemory, - _buffer_offsets: [u64; 2], - ) -> GenericResult<Box<dyn ShmStream>> { - let &(ref last_stream, ref cvar) = &*self.last_stream; - let mut stream = last_stream.lock(); - - let new_stream = MockShmStream::new(num_channels, frame_rate, format, buffer_size); - *stream = Some(new_stream.clone()); - cvar.notify_one(); - Ok(Box::new(new_stream)) - } -} - -#[cfg(test)] -pub mod tests { - use super::*; - - #[test] - fn mock_trigger_callback() { - let stream_source = MockShmStreamSource::new(); - let mut thread_stream_source = stream_source.clone(); - - let buffer_size = 480; - let num_channels = 2; - let format = SampleFormat::S24LE; - let shm = SharedMemory::anon().expect("Failed to create shm"); - - let handle = std::thread::spawn(move || { - let mut stream = thread_stream_source - .new_stream( - StreamDirection::Playback, - num_channels, - format, - 44100, - buffer_size, - &[], - &shm, - [400, 8000], - ) - .expect("Failed to create stream"); - - let request = stream - .wait_for_next_action_with_timeout(Duration::from_secs(5)) - .expect("Failed to wait for next action"); - match request { - Some(r) => { - let requested = r.requested_frames(); - r.set_buffer_offset_and_frames(872, requested) - .expect("Failed to set buffer offset and frames"); - requested - } - None => 0, - } - }); - - let mut stream = stream_source.get_last_stream(); - assert!(stream.trigger_callback_with_timeout(Duration::from_secs(1))); - - let requested_frames = handle.join().expect("Failed to join thread"); - assert_eq!(requested_frames, buffer_size); - } - - #[test] - fn null_consumption_rate() { - let frame_rate = 44100; - let buffer_size = 480; - let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64); - - let shm = SharedMemory::anon().expect("Failed to create shm"); - - let start = Instant::now(); - - let mut stream_source = NullShmStreamSource::new(); - let mut stream = stream_source - .new_stream( - StreamDirection::Playback, - 2, - SampleFormat::S24LE, - frame_rate, - buffer_size, - &[], - &shm, - [400, 8000], - ) - .expect("Failed to create stream"); - - let timeout = Duration::from_secs(5); - let request = stream - .wait_for_next_action_with_timeout(timeout) - .expect("Failed to wait for first request") - .expect("First request should not have timed out"); - request - .set_buffer_offset_and_frames(276, 480) - .expect("Failed to set buffer offset and length"); - - // The second call should block until the first buffer is consumed. - let _request = stream - .wait_for_next_action_with_timeout(timeout) - .expect("Failed to wait for second request"); - let elapsed = start.elapsed(); - assert!( - elapsed > interval, - "wait_for_next_action_with_timeout didn't block long enough: {:?}", - elapsed - ); - - assert!( - elapsed < timeout, - "wait_for_next_action_with_timeout blocked for too long: {:?}", - elapsed - ); - } -} |