aboutsummaryrefslogtreecommitdiff
path: root/src/raw.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/raw.rs')
-rw-r--r--src/raw.rs126
1 files changed, 93 insertions, 33 deletions
diff --git a/src/raw.rs b/src/raw.rs
index bb031da..50109ab 100644
--- a/src/raw.rs
+++ b/src/raw.rs
@@ -1,21 +1,34 @@
use alloc::alloc::Layout as StdLayout;
use core::cell::UnsafeCell;
use core::future::Future;
+use core::marker::PhantomData;
use core::mem::{self, ManuallyDrop};
use core::pin::Pin;
use core::ptr::NonNull;
-use core::sync::atomic::{AtomicUsize, Ordering};
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
+#[cfg(not(feature = "portable-atomic"))]
+use core::sync::atomic::AtomicUsize;
+use core::sync::atomic::Ordering;
+#[cfg(feature = "portable-atomic")]
+use portable_atomic::AtomicUsize;
+
use crate::header::Header;
+use crate::runnable::{Schedule, ScheduleInfo};
use crate::state::*;
use crate::utils::{abort, abort_on_panic, max, Layout};
use crate::Runnable;
+#[cfg(feature = "std")]
+pub(crate) type Panic = alloc::boxed::Box<dyn core::any::Any + Send + 'static>;
+
+#[cfg(not(feature = "std"))]
+pub(crate) type Panic = core::convert::Infallible;
+
/// The vtable for a task.
pub(crate) struct TaskVTable {
/// Schedules the task.
- pub(crate) schedule: unsafe fn(*const ()),
+ pub(crate) schedule: unsafe fn(*const (), ScheduleInfo),
/// Drops the future inside the task.
pub(crate) drop_future: unsafe fn(*const ()),
@@ -64,9 +77,9 @@ pub(crate) struct TaskLayout {
}
/// Raw pointers to the fields inside a task.
-pub(crate) struct RawTask<F, T, S> {
+pub(crate) struct RawTask<F, T, S, M> {
/// The task header.
- pub(crate) header: *const Header,
+ pub(crate) header: *const Header<M>,
/// The schedule function.
pub(crate) schedule: *const S,
@@ -75,28 +88,28 @@ pub(crate) struct RawTask<F, T, S> {
pub(crate) future: *mut F,
/// The output of the future.
- pub(crate) output: *mut T,
+ pub(crate) output: *mut Result<T, Panic>,
}
-impl<F, T, S> Copy for RawTask<F, T, S> {}
+impl<F, T, S, M> Copy for RawTask<F, T, S, M> {}
-impl<F, T, S> Clone for RawTask<F, T, S> {
+impl<F, T, S, M> Clone for RawTask<F, T, S, M> {
fn clone(&self) -> Self {
*self
}
}
-impl<F, T, S> RawTask<F, T, S> {
+impl<F, T, S, M> RawTask<F, T, S, M> {
const TASK_LAYOUT: Option<TaskLayout> = Self::eval_task_layout();
/// Computes the memory layout for a task.
#[inline]
const fn eval_task_layout() -> Option<TaskLayout> {
// Compute the layouts for `Header`, `S`, `F`, and `T`.
- let layout_header = Layout::new::<Header>();
+ let layout_header = Layout::new::<Header<M>>();
let layout_s = Layout::new::<S>();
let layout_f = Layout::new::<F>();
- let layout_r = Layout::new::<T>();
+ let layout_r = Layout::new::<Result<T, Panic>>();
// Compute the layout for `union { F, T }`.
let size_union = max(layout_f.size(), layout_r.size());
@@ -119,10 +132,10 @@ impl<F, T, S> RawTask<F, T, S> {
}
}
-impl<F, T, S> RawTask<F, T, S>
+impl<F, T, S, M> RawTask<F, T, S, M>
where
F: Future<Output = T>,
- S: Fn(Runnable),
+ S: Schedule<M>,
{
const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
Self::clone_waker,
@@ -134,7 +147,15 @@ where
/// Allocates a task with the given `future` and `schedule` function.
///
/// It is assumed that initially only the `Runnable` and the `Task` exist.
- pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> {
+ pub(crate) fn allocate<'a, Gen: FnOnce(&'a M) -> F>(
+ future: Gen,
+ schedule: S,
+ builder: crate::Builder<M>,
+ ) -> NonNull<()>
+ where
+ F: 'a,
+ M: 'a,
+ {
// Compute the layout of the task for allocation. Abort if the computation fails.
//
// n.b. notgull: task_layout now automatically aborts instead of panicking
@@ -149,8 +170,14 @@ where
let raw = Self::from_ptr(ptr.as_ptr());
+ let crate::Builder {
+ metadata,
+ #[cfg(feature = "std")]
+ propagate_panic,
+ } = builder;
+
// Write the header as the first field of the task.
- (raw.header as *mut Header).write(Header {
+ (raw.header as *mut Header<M>).write(Header {
state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE),
awaiter: UnsafeCell::new(None),
vtable: &TaskVTable {
@@ -163,11 +190,17 @@ where
clone_waker: Self::clone_waker,
layout_info: &Self::TASK_LAYOUT,
},
+ metadata,
+ #[cfg(feature = "std")]
+ propagate_panic,
});
// Write the schedule function as the third field of the task.
(raw.schedule as *mut S).write(schedule);
+ // Generate the future, now that the metadata has been pinned in place.
+ let future = abort_on_panic(|| future(&(*raw.header).metadata));
+
// Write the future as the fourth field of the task.
raw.future.write(future);
@@ -183,10 +216,10 @@ where
unsafe {
Self {
- header: p as *const Header,
+ header: p as *const Header<M>,
schedule: p.add(task_layout.offset_s) as *const S,
future: p.add(task_layout.offset_f) as *mut F,
- output: p.add(task_layout.offset_r) as *mut T,
+ output: p.add(task_layout.offset_r) as *mut Result<T, Panic>,
}
}
}
@@ -252,7 +285,7 @@ where
// time to schedule it.
if state & RUNNING == 0 {
// Schedule the task.
- Self::schedule(ptr);
+ Self::schedule(ptr, ScheduleInfo::new(false));
} else {
// Drop the waker.
Self::drop_waker(ptr);
@@ -310,7 +343,7 @@ where
// If the task is not running, now is the time to schedule.
if state & RUNNING == 0 {
// If the reference count overflowed, abort.
- if state > isize::max_value() as usize {
+ if state > isize::MAX as usize {
abort();
}
@@ -319,8 +352,9 @@ where
// still alive.
let task = Runnable {
ptr: NonNull::new_unchecked(ptr as *mut ()),
+ _marker: PhantomData,
};
- (*raw.schedule)(task);
+ (*raw.schedule).schedule(task, ScheduleInfo::new(false));
}
break;
@@ -340,7 +374,7 @@ where
let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
// If the reference count overflowed, abort.
- if state > isize::max_value() as usize {
+ if state > isize::MAX as usize {
abort();
}
@@ -368,7 +402,7 @@ where
(*raw.header)
.state
.store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
- Self::schedule(ptr);
+ Self::schedule(ptr, ScheduleInfo::new(false));
} else {
// Otherwise, destroy the task right away.
Self::destroy(ptr);
@@ -398,7 +432,7 @@ where
///
/// This function doesn't modify the state of the task. It only passes the task reference to
/// its schedule function.
- unsafe fn schedule(ptr: *const ()) {
+ unsafe fn schedule(ptr: *const (), info: ScheduleInfo) {
let raw = Self::from_ptr(ptr);
// If the schedule function has captured variables, create a temporary waker that prevents
@@ -410,8 +444,9 @@ where
let task = Runnable {
ptr: NonNull::new_unchecked(ptr as *mut ()),
+ _marker: PhantomData,
};
- (*raw.schedule)(task);
+ (*raw.schedule).schedule(task, info);
}
/// Drops the future inside a task.
@@ -442,6 +477,9 @@ where
// We need a safeguard against panics because destructors can panic.
abort_on_panic(|| {
+ // Drop the header along with the metadata.
+ (raw.header as *mut Header<M>).drop_in_place();
+
// Drop the schedule function.
(raw.schedule as *mut S).drop_in_place();
});
@@ -507,8 +545,30 @@ where
// Poll the inner future, but surround it with a guard that closes the task in case polling
// panics.
+ // If available, we should also try to catch the panic so that it is propagated correctly.
let guard = Guard(raw);
- let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
+
+ // Panic propagation is not available for no_std.
+ #[cfg(not(feature = "std"))]
+ let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx).map(Ok);
+
+ #[cfg(feature = "std")]
+ let poll = {
+ // Check if we should propagate panics.
+ if (*raw.header).propagate_panic {
+ // Use catch_unwind to catch the panic.
+ match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
+ <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx)
+ })) {
+ Ok(Poll::Ready(v)) => Poll::Ready(Ok(v)),
+ Ok(Poll::Pending) => Poll::Pending,
+ Err(e) => Poll::Ready(Err(e)),
+ }
+ } else {
+ <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx).map(Ok)
+ }
+ };
+
mem::forget(guard);
match poll {
@@ -608,7 +668,7 @@ where
} else if state & SCHEDULED != 0 {
// The thread that woke the task up didn't reschedule it because
// it was running so now it's our responsibility to do so.
- Self::schedule(ptr);
+ Self::schedule(ptr, ScheduleInfo::new(true));
return true;
} else {
// Drop the task reference.
@@ -625,15 +685,15 @@ where
return false;
/// A guard that closes the task if polling its future panics.
- struct Guard<F, T, S>(RawTask<F, T, S>)
+ struct Guard<F, T, S, M>(RawTask<F, T, S, M>)
where
F: Future<Output = T>,
- S: Fn(Runnable);
+ S: Schedule<M>;
- impl<F, T, S> Drop for Guard<F, T, S>
+ impl<F, T, S, M> Drop for Guard<F, T, S, M>
where
F: Future<Output = T>,
- S: Fn(Runnable),
+ S: Schedule<M>,
{
fn drop(&mut self) {
let raw = self.0;
@@ -648,7 +708,7 @@ where
if state & CLOSED != 0 {
// The thread that closed the task didn't drop the future because it
// was running so now it's our responsibility to do so.
- RawTask::<F, T, S>::drop_future(ptr);
+ RawTask::<F, T, S, M>::drop_future(ptr);
// Mark the task as not running and not scheduled.
(*raw.header)
@@ -662,7 +722,7 @@ where
}
// Drop the task reference.
- RawTask::<F, T, S>::drop_ref(ptr);
+ RawTask::<F, T, S, M>::drop_ref(ptr);
// Notify the awaiter that the future has been dropped.
if let Some(w) = awaiter {
@@ -680,7 +740,7 @@ where
) {
Ok(state) => {
// Drop the future because the task is now closed.
- RawTask::<F, T, S>::drop_future(ptr);
+ RawTask::<F, T, S, M>::drop_future(ptr);
// Take the awaiter out.
let mut awaiter = None;
@@ -689,7 +749,7 @@ where
}
// Drop the task reference.
- RawTask::<F, T, S>::drop_ref(ptr);
+ RawTask::<F, T, S, M>::drop_ref(ptr);
// Notify the awaiter that the future has been dropped.
if let Some(w) = awaiter {