diff options
Diffstat (limited to 'src/queue.c')
-rw-r--r-- | src/queue.c | 225 |
1 files changed, 116 insertions, 109 deletions
diff --git a/src/queue.c b/src/queue.c index 2f0f19b..ce0ecf6 100644 --- a/src/queue.c +++ b/src/queue.c @@ -1,20 +1,12 @@ /* SPDX-License-Identifier: MIT */ #define _POSIX_C_SOURCE 200112L -#include <sys/types.h> -#include <sys/stat.h> -#include <sys/mman.h> -#include <unistd.h> -#include <errno.h> -#include <string.h> -#include <stdbool.h> - +#include "lib.h" +#include "syscall.h" +#include "liburing.h" +#include "int_flags.h" #include "liburing/compat.h" #include "liburing/io_uring.h" -#include "liburing.h" -#include "liburing/barrier.h" - -#include "syscall.h" /* * Returns true if we're not using SQ thread (thus nobody submits but us) @@ -26,6 +18,12 @@ static inline bool sq_ring_needs_enter(struct io_uring *ring, unsigned *flags) if (!(ring->flags & IORING_SETUP_SQPOLL)) return true; + /* + * Ensure the kernel can see the store to the SQ tail before we read + * the flags. + */ + io_uring_smp_mb(); + if (uring_unlikely(IO_URING_READ_ONCE(*ring->sq.kflags) & IORING_SQ_NEED_WAKEUP)) { *flags |= IORING_ENTER_SQ_WAKEUP; @@ -37,44 +35,13 @@ static inline bool sq_ring_needs_enter(struct io_uring *ring, unsigned *flags) static inline bool cq_ring_needs_flush(struct io_uring *ring) { - return IO_URING_READ_ONCE(*ring->sq.kflags) & IORING_SQ_CQ_OVERFLOW; + return IO_URING_READ_ONCE(*ring->sq.kflags) & + (IORING_SQ_CQ_OVERFLOW | IORING_SQ_TASKRUN); } -static int __io_uring_peek_cqe(struct io_uring *ring, - struct io_uring_cqe **cqe_ptr, - unsigned *nr_available) +static inline bool cq_ring_needs_enter(struct io_uring *ring) { - struct io_uring_cqe *cqe; - int err = 0; - unsigned available; - unsigned mask = *ring->cq.kring_mask; - - do { - unsigned tail = io_uring_smp_load_acquire(ring->cq.ktail); - unsigned head = *ring->cq.khead; - - cqe = NULL; - available = tail - head; - if (!available) - break; - - cqe = &ring->cq.cqes[head & mask]; - if (!(ring->features & IORING_FEAT_EXT_ARG) && - cqe->user_data == LIBURING_UDATA_TIMEOUT) { - if (cqe->res < 0) - err = cqe->res; - io_uring_cq_advance(ring, 1); - if (!err) - continue; - cqe = NULL; - } - - break; - } while (1); - - *cqe_ptr = cqe; - *nr_available = available; - return err; + return (ring->flags & IORING_SETUP_IOPOLL) || cq_ring_needs_flush(ring); } struct get_data { @@ -85,15 +52,16 @@ struct get_data { void *arg; }; -static int _io_uring_get_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr, +static int _io_uring_get_cqe(struct io_uring *ring, + struct io_uring_cqe **cqe_ptr, struct get_data *data) { struct io_uring_cqe *cqe = NULL; + bool looped = false; int err; do { bool need_enter = false; - bool cq_overflow_flush = false; unsigned flags = 0; unsigned nr_available; int ret; @@ -102,34 +70,40 @@ static int _io_uring_get_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_pt if (err) break; if (!cqe && !data->wait_nr && !data->submit) { - if (!cq_ring_needs_flush(ring)) { + /* + * If we already looped once, we already entererd + * the kernel. Since there's nothing to submit or + * wait for, don't keep retrying. + */ + if (looped || !cq_ring_needs_enter(ring)) { err = -EAGAIN; break; } - cq_overflow_flush = true; + need_enter = true; } - if (data->wait_nr > nr_available || cq_overflow_flush) { + if (data->wait_nr > nr_available || need_enter) { flags = IORING_ENTER_GETEVENTS | data->get_flags; need_enter = true; } - if (data->submit) { - sq_ring_needs_enter(ring, &flags); + if (data->submit && sq_ring_needs_enter(ring, &flags)) need_enter = true; - } if (!need_enter) break; - ret = __sys_io_uring_enter2(ring->ring_fd, data->submit, - data->wait_nr, flags, data->arg, - data->sz); + if (ring->int_flags & INT_FLAG_REG_RING) + flags |= IORING_ENTER_REGISTERED_RING; + ret = ____sys_io_uring_enter2(ring->enter_ring_fd, data->submit, + data->wait_nr, flags, data->arg, + data->sz); if (ret < 0) { - err = -errno; + err = ret; break; } data->submit -= ret; if (cqe) break; + looped = true; } while (1); *cqe_ptr = cqe; @@ -159,6 +133,10 @@ unsigned io_uring_peek_batch_cqe(struct io_uring *ring, { unsigned ready; bool overflow_checked = false; + int shift = 0; + + if (ring->flags & IORING_SETUP_CQE32) + shift = 1; again: ready = io_uring_cq_ready(ring); @@ -171,7 +149,7 @@ again: count = count > ready ? ready : count; last = head + count; for (;head != last; head++, i++) - cqes[i] = &ring->cq.cqes[head & mask]; + cqes[i] = &ring->cq.cqes[(head & mask) << shift]; return count; } @@ -180,8 +158,11 @@ again: goto done; if (cq_ring_needs_flush(ring)) { - __sys_io_uring_enter(ring->ring_fd, 0, 0, - IORING_ENTER_GETEVENTS, NULL); + int flags = IORING_ENTER_GETEVENTS; + + if (ring->int_flags & INT_FLAG_REG_RING) + flags |= IORING_ENTER_REGISTERED_RING; + ____sys_io_uring_enter(ring->enter_ring_fd, 0, 0, flags, NULL); overflow_checked = true; goto again; } @@ -239,7 +220,8 @@ out: */ static int io_uring_wait_cqes_new(struct io_uring *ring, struct io_uring_cqe **cqe_ptr, - unsigned wait_nr, struct __kernel_timespec *ts, + unsigned wait_nr, + struct __kernel_timespec *ts, sigset_t *sigmask) { struct io_uring_getevents_arg arg = { @@ -248,7 +230,6 @@ static int io_uring_wait_cqes_new(struct io_uring *ring, .ts = (unsigned long) ts }; struct get_data data = { - .submit = __io_uring_flush_sq(ring), .wait_nr = wait_nr, .get_flags = IORING_ENTER_EXT_ARG, .sz = sizeof(arg), @@ -275,36 +256,77 @@ static int io_uring_wait_cqes_new(struct io_uring *ring, * hence this function is safe to use for applications that split SQ and CQ * handling between two threads. */ +static int __io_uring_submit_timeout(struct io_uring *ring, unsigned wait_nr, + struct __kernel_timespec *ts) +{ + struct io_uring_sqe *sqe; + int ret; + + /* + * If the SQ ring is full, we may need to submit IO first + */ + sqe = io_uring_get_sqe(ring); + if (!sqe) { + ret = io_uring_submit(ring); + if (ret < 0) + return ret; + sqe = io_uring_get_sqe(ring); + if (!sqe) + return -EAGAIN; + } + io_uring_prep_timeout(sqe, ts, wait_nr, 0); + sqe->user_data = LIBURING_UDATA_TIMEOUT; + return __io_uring_flush_sq(ring); +} + int io_uring_wait_cqes(struct io_uring *ring, struct io_uring_cqe **cqe_ptr, unsigned wait_nr, struct __kernel_timespec *ts, sigset_t *sigmask) { - unsigned to_submit = 0; + int to_submit = 0; if (ts) { - struct io_uring_sqe *sqe; - int ret; - if (ring->features & IORING_FEAT_EXT_ARG) return io_uring_wait_cqes_new(ring, cqe_ptr, wait_nr, ts, sigmask); + to_submit = __io_uring_submit_timeout(ring, wait_nr, ts); + if (to_submit < 0) + return to_submit; + } - /* - * If the SQ ring is full, we may need to submit IO first - */ - sqe = io_uring_get_sqe(ring); - if (!sqe) { - ret = io_uring_submit(ring); - if (ret < 0) - return ret; - sqe = io_uring_get_sqe(ring); - if (!sqe) - return -EAGAIN; + return __io_uring_get_cqe(ring, cqe_ptr, to_submit, wait_nr, sigmask); +} + +int io_uring_submit_and_wait_timeout(struct io_uring *ring, + struct io_uring_cqe **cqe_ptr, + unsigned wait_nr, + struct __kernel_timespec *ts, + sigset_t *sigmask) +{ + int to_submit; + + if (ts) { + if (ring->features & IORING_FEAT_EXT_ARG) { + struct io_uring_getevents_arg arg = { + .sigmask = (unsigned long) sigmask, + .sigmask_sz = _NSIG / 8, + .ts = (unsigned long) ts + }; + struct get_data data = { + .submit = __io_uring_flush_sq(ring), + .wait_nr = wait_nr, + .get_flags = IORING_ENTER_EXT_ARG, + .sz = sizeof(arg), + .arg = &arg + }; + + return _io_uring_get_cqe(ring, cqe_ptr, &data); } - io_uring_prep_timeout(sqe, ts, wait_nr, 0); - sqe->user_data = LIBURING_UDATA_TIMEOUT; + to_submit = __io_uring_submit_timeout(ring, wait_nr, ts); + if (to_submit < 0) + return to_submit; + } else to_submit = __io_uring_flush_sq(ring); - } return __io_uring_get_cqe(ring, cqe_ptr, to_submit, wait_nr, sigmask); } @@ -335,11 +357,11 @@ static int __io_uring_submit(struct io_uring *ring, unsigned submitted, if (sq_ring_needs_enter(ring, &flags) || wait_nr) { if (wait_nr || (ring->flags & IORING_SETUP_IOPOLL)) flags |= IORING_ENTER_GETEVENTS; + if (ring->int_flags & INT_FLAG_REG_RING) + flags |= IORING_ENTER_REGISTERED_RING; - ret = __sys_io_uring_enter(ring->ring_fd, submitted, wait_nr, - flags, NULL); - if (ret < 0) - return -errno; + ret = ____sys_io_uring_enter(ring->enter_ring_fd, submitted, + wait_nr, flags, NULL); } else ret = submitted; @@ -371,34 +393,19 @@ int io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr) return __io_uring_submit_and_wait(ring, wait_nr); } -/* - * Return an sqe to fill. Application must later call io_uring_submit() - * when it's ready to tell the kernel about it. The caller may call this - * function multiple times before calling io_uring_submit(). - * - * Returns a vacant sqe, or NULL if we're full. - */ +#ifdef LIBURING_INTERNAL struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring) { - struct io_uring_sq *sq = &ring->sq; - unsigned int head = io_uring_smp_load_acquire(sq->khead); - unsigned int next = sq->sqe_tail + 1; - struct io_uring_sqe *sqe = NULL; - - if (next - head <= *sq->kring_entries) { - sqe = &sq->sqes[sq->sqe_tail & *sq->kring_mask]; - sq->sqe_tail = next; - } - return sqe; + return _io_uring_get_sqe(ring); } +#endif int __io_uring_sqring_wait(struct io_uring *ring) { - int ret; + int flags = IORING_ENTER_SQ_WAIT; - ret = __sys_io_uring_enter(ring->ring_fd, 0, 0, IORING_ENTER_SQ_WAIT, - NULL); - if (ret < 0) - ret = -errno; - return ret; + if (ring->int_flags & INT_FLAG_REG_RING) + flags |= IORING_ENTER_REGISTERED_RING; + + return ____sys_io_uring_enter(ring->enter_ring_fd, 0, 0, flags, NULL); } |