diff options
Diffstat (limited to 'src/raw.rs')
-rw-r--r-- | src/raw.rs | 126 |
1 files changed, 93 insertions, 33 deletions
@@ -1,21 +1,34 @@ use alloc::alloc::Layout as StdLayout; use core::cell::UnsafeCell; use core::future::Future; +use core::marker::PhantomData; use core::mem::{self, ManuallyDrop}; use core::pin::Pin; use core::ptr::NonNull; -use core::sync::atomic::{AtomicUsize, Ordering}; use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; +#[cfg(not(feature = "portable-atomic"))] +use core::sync::atomic::AtomicUsize; +use core::sync::atomic::Ordering; +#[cfg(feature = "portable-atomic")] +use portable_atomic::AtomicUsize; + use crate::header::Header; +use crate::runnable::{Schedule, ScheduleInfo}; use crate::state::*; use crate::utils::{abort, abort_on_panic, max, Layout}; use crate::Runnable; +#[cfg(feature = "std")] +pub(crate) type Panic = alloc::boxed::Box<dyn core::any::Any + Send + 'static>; + +#[cfg(not(feature = "std"))] +pub(crate) type Panic = core::convert::Infallible; + /// The vtable for a task. pub(crate) struct TaskVTable { /// Schedules the task. - pub(crate) schedule: unsafe fn(*const ()), + pub(crate) schedule: unsafe fn(*const (), ScheduleInfo), /// Drops the future inside the task. pub(crate) drop_future: unsafe fn(*const ()), @@ -64,9 +77,9 @@ pub(crate) struct TaskLayout { } /// Raw pointers to the fields inside a task. -pub(crate) struct RawTask<F, T, S> { +pub(crate) struct RawTask<F, T, S, M> { /// The task header. - pub(crate) header: *const Header, + pub(crate) header: *const Header<M>, /// The schedule function. pub(crate) schedule: *const S, @@ -75,28 +88,28 @@ pub(crate) struct RawTask<F, T, S> { pub(crate) future: *mut F, /// The output of the future. - pub(crate) output: *mut T, + pub(crate) output: *mut Result<T, Panic>, } -impl<F, T, S> Copy for RawTask<F, T, S> {} +impl<F, T, S, M> Copy for RawTask<F, T, S, M> {} -impl<F, T, S> Clone for RawTask<F, T, S> { +impl<F, T, S, M> Clone for RawTask<F, T, S, M> { fn clone(&self) -> Self { *self } } -impl<F, T, S> RawTask<F, T, S> { +impl<F, T, S, M> RawTask<F, T, S, M> { const TASK_LAYOUT: Option<TaskLayout> = Self::eval_task_layout(); /// Computes the memory layout for a task. #[inline] const fn eval_task_layout() -> Option<TaskLayout> { // Compute the layouts for `Header`, `S`, `F`, and `T`. - let layout_header = Layout::new::<Header>(); + let layout_header = Layout::new::<Header<M>>(); let layout_s = Layout::new::<S>(); let layout_f = Layout::new::<F>(); - let layout_r = Layout::new::<T>(); + let layout_r = Layout::new::<Result<T, Panic>>(); // Compute the layout for `union { F, T }`. let size_union = max(layout_f.size(), layout_r.size()); @@ -119,10 +132,10 @@ impl<F, T, S> RawTask<F, T, S> { } } -impl<F, T, S> RawTask<F, T, S> +impl<F, T, S, M> RawTask<F, T, S, M> where F: Future<Output = T>, - S: Fn(Runnable), + S: Schedule<M>, { const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( Self::clone_waker, @@ -134,7 +147,15 @@ where /// Allocates a task with the given `future` and `schedule` function. /// /// It is assumed that initially only the `Runnable` and the `Task` exist. - pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> { + pub(crate) fn allocate<'a, Gen: FnOnce(&'a M) -> F>( + future: Gen, + schedule: S, + builder: crate::Builder<M>, + ) -> NonNull<()> + where + F: 'a, + M: 'a, + { // Compute the layout of the task for allocation. Abort if the computation fails. // // n.b. notgull: task_layout now automatically aborts instead of panicking @@ -149,8 +170,14 @@ where let raw = Self::from_ptr(ptr.as_ptr()); + let crate::Builder { + metadata, + #[cfg(feature = "std")] + propagate_panic, + } = builder; + // Write the header as the first field of the task. - (raw.header as *mut Header).write(Header { + (raw.header as *mut Header<M>).write(Header { state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE), awaiter: UnsafeCell::new(None), vtable: &TaskVTable { @@ -163,11 +190,17 @@ where clone_waker: Self::clone_waker, layout_info: &Self::TASK_LAYOUT, }, + metadata, + #[cfg(feature = "std")] + propagate_panic, }); // Write the schedule function as the third field of the task. (raw.schedule as *mut S).write(schedule); + // Generate the future, now that the metadata has been pinned in place. + let future = abort_on_panic(|| future(&(*raw.header).metadata)); + // Write the future as the fourth field of the task. raw.future.write(future); @@ -183,10 +216,10 @@ where unsafe { Self { - header: p as *const Header, + header: p as *const Header<M>, schedule: p.add(task_layout.offset_s) as *const S, future: p.add(task_layout.offset_f) as *mut F, - output: p.add(task_layout.offset_r) as *mut T, + output: p.add(task_layout.offset_r) as *mut Result<T, Panic>, } } } @@ -252,7 +285,7 @@ where // time to schedule it. if state & RUNNING == 0 { // Schedule the task. - Self::schedule(ptr); + Self::schedule(ptr, ScheduleInfo::new(false)); } else { // Drop the waker. Self::drop_waker(ptr); @@ -310,7 +343,7 @@ where // If the task is not running, now is the time to schedule. if state & RUNNING == 0 { // If the reference count overflowed, abort. - if state > isize::max_value() as usize { + if state > isize::MAX as usize { abort(); } @@ -319,8 +352,9 @@ where // still alive. let task = Runnable { ptr: NonNull::new_unchecked(ptr as *mut ()), + _marker: PhantomData, }; - (*raw.schedule)(task); + (*raw.schedule).schedule(task, ScheduleInfo::new(false)); } break; @@ -340,7 +374,7 @@ where let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed); // If the reference count overflowed, abort. - if state > isize::max_value() as usize { + if state > isize::MAX as usize { abort(); } @@ -368,7 +402,7 @@ where (*raw.header) .state .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); - Self::schedule(ptr); + Self::schedule(ptr, ScheduleInfo::new(false)); } else { // Otherwise, destroy the task right away. Self::destroy(ptr); @@ -398,7 +432,7 @@ where /// /// This function doesn't modify the state of the task. It only passes the task reference to /// its schedule function. - unsafe fn schedule(ptr: *const ()) { + unsafe fn schedule(ptr: *const (), info: ScheduleInfo) { let raw = Self::from_ptr(ptr); // If the schedule function has captured variables, create a temporary waker that prevents @@ -410,8 +444,9 @@ where let task = Runnable { ptr: NonNull::new_unchecked(ptr as *mut ()), + _marker: PhantomData, }; - (*raw.schedule)(task); + (*raw.schedule).schedule(task, info); } /// Drops the future inside a task. @@ -442,6 +477,9 @@ where // We need a safeguard against panics because destructors can panic. abort_on_panic(|| { + // Drop the header along with the metadata. + (raw.header as *mut Header<M>).drop_in_place(); + // Drop the schedule function. (raw.schedule as *mut S).drop_in_place(); }); @@ -507,8 +545,30 @@ where // Poll the inner future, but surround it with a guard that closes the task in case polling // panics. + // If available, we should also try to catch the panic so that it is propagated correctly. let guard = Guard(raw); - let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx); + + // Panic propagation is not available for no_std. + #[cfg(not(feature = "std"))] + let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx).map(Ok); + + #[cfg(feature = "std")] + let poll = { + // Check if we should propagate panics. + if (*raw.header).propagate_panic { + // Use catch_unwind to catch the panic. + match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx) + })) { + Ok(Poll::Ready(v)) => Poll::Ready(Ok(v)), + Ok(Poll::Pending) => Poll::Pending, + Err(e) => Poll::Ready(Err(e)), + } + } else { + <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx).map(Ok) + } + }; + mem::forget(guard); match poll { @@ -608,7 +668,7 @@ where } else if state & SCHEDULED != 0 { // The thread that woke the task up didn't reschedule it because // it was running so now it's our responsibility to do so. - Self::schedule(ptr); + Self::schedule(ptr, ScheduleInfo::new(true)); return true; } else { // Drop the task reference. @@ -625,15 +685,15 @@ where return false; /// A guard that closes the task if polling its future panics. - struct Guard<F, T, S>(RawTask<F, T, S>) + struct Guard<F, T, S, M>(RawTask<F, T, S, M>) where F: Future<Output = T>, - S: Fn(Runnable); + S: Schedule<M>; - impl<F, T, S> Drop for Guard<F, T, S> + impl<F, T, S, M> Drop for Guard<F, T, S, M> where F: Future<Output = T>, - S: Fn(Runnable), + S: Schedule<M>, { fn drop(&mut self) { let raw = self.0; @@ -648,7 +708,7 @@ where if state & CLOSED != 0 { // The thread that closed the task didn't drop the future because it // was running so now it's our responsibility to do so. - RawTask::<F, T, S>::drop_future(ptr); + RawTask::<F, T, S, M>::drop_future(ptr); // Mark the task as not running and not scheduled. (*raw.header) @@ -662,7 +722,7 @@ where } // Drop the task reference. - RawTask::<F, T, S>::drop_ref(ptr); + RawTask::<F, T, S, M>::drop_ref(ptr); // Notify the awaiter that the future has been dropped. if let Some(w) = awaiter { @@ -680,7 +740,7 @@ where ) { Ok(state) => { // Drop the future because the task is now closed. - RawTask::<F, T, S>::drop_future(ptr); + RawTask::<F, T, S, M>::drop_future(ptr); // Take the awaiter out. let mut awaiter = None; @@ -689,7 +749,7 @@ where } // Drop the task reference. - RawTask::<F, T, S>::drop_ref(ptr); + RawTask::<F, T, S, M>::drop_ref(ptr); // Notify the awaiter that the future has been dropped. if let Some(w) = awaiter { |