aboutsummaryrefslogtreecommitdiff
path: root/test/send_recvmsg.c
diff options
context:
space:
mode:
Diffstat (limited to 'test/send_recvmsg.c')
-rw-r--r--test/send_recvmsg.c220
1 files changed, 165 insertions, 55 deletions
diff --git a/test/send_recvmsg.c b/test/send_recvmsg.c
index 2ff8d9d..cce6c45 100644
--- a/test/send_recvmsg.c
+++ b/test/send_recvmsg.c
@@ -17,9 +17,11 @@
static char str[] = "This is a test of sendmsg and recvmsg over io_uring!";
+static int ud;
+
#define MAX_MSG 128
-#define PORT 10200
+#define PORT 10203
#define HOST "127.0.0.1"
#define BUF_BGID 10
@@ -27,31 +29,32 @@ static char str[] = "This is a test of sendmsg and recvmsg over io_uring!";
#define MAX_IOV_COUNT 10
-static int recv_prep(struct io_uring *ring, struct iovec iov[], int iov_count,
- int bgid)
+static int no_pbuf_ring;
+
+static int recv_prep(struct io_uring *ring, int *sockfd, struct iovec iov[],
+ int iov_count, int bgid, int async)
{
struct sockaddr_in saddr;
struct msghdr msg;
struct io_uring_sqe *sqe;
- int sockfd, ret;
- int val = 1;
+ int ret, val = 1;
memset(&saddr, 0, sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = htonl(INADDR_ANY);
saddr.sin_port = htons(PORT);
- sockfd = socket(AF_INET, SOCK_DGRAM, 0);
- if (sockfd < 0) {
+ *sockfd = socket(AF_INET, SOCK_DGRAM, 0);
+ if (*sockfd < 0) {
perror("socket");
return 1;
}
val = 1;
- setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
- setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
+ setsockopt(*sockfd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
+ setsockopt(*sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
- ret = bind(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
+ ret = bind(*sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
if (ret < 0) {
perror("bind");
goto err;
@@ -63,13 +66,16 @@ static int recv_prep(struct io_uring *ring, struct iovec iov[], int iov_count,
return 1;
}
- io_uring_prep_recvmsg(sqe, sockfd, &msg, 0);
+ io_uring_prep_recvmsg(sqe, *sockfd, &msg, 0);
if (bgid) {
iov->iov_base = NULL;
sqe->flags |= IOSQE_BUFFER_SELECT;
sqe->buf_group = bgid;
iov_count = 1;
}
+ sqe->user_data = ++ud;
+ if (async)
+ sqe->flags |= IOSQE_ASYNC;
memset(&msg, 0, sizeof(msg));
msg.msg_namelen = sizeof(struct sockaddr_in);
msg.msg_iov = iov;
@@ -81,18 +87,19 @@ static int recv_prep(struct io_uring *ring, struct iovec iov[], int iov_count,
goto err;
}
- close(sockfd);
return 0;
err:
- close(sockfd);
+ close(*sockfd);
return 1;
}
struct recv_data {
pthread_mutex_t *mutex;
int buf_select;
+ int buf_ring;
int no_buf_add;
int iov_count;
+ int async;
};
static int do_recvmsg(struct io_uring *ring, char buf[MAX_MSG + 1],
@@ -107,18 +114,18 @@ static int do_recvmsg(struct io_uring *ring, char buf[MAX_MSG + 1],
goto err;
}
if (cqe->res < 0) {
- if (rd->no_buf_add && rd->buf_select)
+ if (rd->no_buf_add && (rd->buf_select || rd->buf_ring))
return 0;
fprintf(stderr, "%s: failed cqe: %d\n", __FUNCTION__, cqe->res);
goto err;
}
- if (cqe->flags) {
+ if (cqe->flags & IORING_CQE_F_BUFFER) {
int bid = cqe->flags >> 16;
if (bid != BUF_BID)
fprintf(stderr, "Buffer ID mismatch %d\n", bid);
}
- if (rd->no_buf_add && rd->buf_select) {
+ if (rd->no_buf_add && (rd->buf_ring || rd->buf_select)) {
fprintf(stderr, "Expected -ENOBUFS: %d\n", cqe->res);
goto err;
}
@@ -158,12 +165,14 @@ static void *recv_fn(void *data)
{
struct recv_data *rd = data;
pthread_mutex_t *mutex = rd->mutex;
+ struct io_uring_buf_ring *br = NULL;
char buf[MAX_MSG + 1];
struct iovec iov[MAX_IOV_COUNT];
- struct io_uring_sqe *sqe;
- struct io_uring_cqe *cqe;
struct io_uring ring;
- int ret;
+ int ret, sockfd;
+
+ if (rd->buf_ring && no_pbuf_ring)
+ goto out_no_ring;
init_iov(iov, rd->iov_count, buf);
@@ -173,34 +182,61 @@ static void *recv_fn(void *data)
goto err;
}
- if (rd->buf_select && !rd->no_buf_add) {
- sqe = io_uring_get_sqe(&ring);
- io_uring_prep_provide_buffers(sqe, buf, sizeof(buf) -1, 1,
- BUF_BGID, BUF_BID);
- ret = io_uring_submit(&ring);
- if (ret != 1) {
- fprintf(stderr, "submit ret=%d\n", ret);
- goto err;
- }
-
- ret = io_uring_wait_cqe(&ring, &cqe);
- if (ret) {
- fprintf(stderr, "wait_cqe=%d\n", ret);
- goto err;
- }
- ret = cqe->res;
- io_uring_cqe_seen(&ring, cqe);
- if (ret == -EINVAL) {
- fprintf(stdout, "PROVIDE_BUFFERS not supported, skip\n");
- goto out;
- goto err;
- } else if (ret < 0) {
- fprintf(stderr, "PROVIDER_BUFFERS %d\n", ret);
- goto err;
+ if ((rd->buf_ring || rd->buf_select) && !rd->no_buf_add) {
+ if (rd->buf_ring) {
+ struct io_uring_buf_reg reg = { };
+ void *ptr;
+
+ if (posix_memalign(&ptr, 4096, 4096))
+ goto err;
+
+ reg.ring_addr = (unsigned long) ptr;
+ reg.ring_entries = 1;
+ reg.bgid = BUF_BGID;
+ if (io_uring_register_buf_ring(&ring, &reg, 0)) {
+ no_pbuf_ring = 1;
+ goto out;
+ }
+
+ br = ptr;
+ io_uring_buf_ring_init(br);
+ io_uring_buf_ring_add(br, buf, sizeof(buf), BUF_BID,
+ io_uring_buf_ring_mask(1), 0);
+ io_uring_buf_ring_advance(br, 1);
+ } else {
+ struct io_uring_sqe *sqe;
+ struct io_uring_cqe *cqe;
+
+ sqe = io_uring_get_sqe(&ring);
+ io_uring_prep_provide_buffers(sqe, buf, sizeof(buf) -1,
+ 1, BUF_BGID, BUF_BID);
+ sqe->user_data = ++ud;
+ ret = io_uring_submit(&ring);
+ if (ret != 1) {
+ fprintf(stderr, "submit ret=%d\n", ret);
+ goto err;
+ }
+
+ ret = io_uring_wait_cqe(&ring, &cqe);
+ if (ret) {
+ fprintf(stderr, "wait_cqe=%d\n", ret);
+ goto err;
+ }
+ ret = cqe->res;
+ io_uring_cqe_seen(&ring, cqe);
+ if (ret == -EINVAL) {
+ fprintf(stdout, "PROVIDE_BUFFERS not supported, skip\n");
+ goto out;
+ } else if (ret < 0) {
+ fprintf(stderr, "PROVIDER_BUFFERS %d\n", ret);
+ goto err;
+ }
}
}
- ret = recv_prep(&ring, iov, rd->iov_count, rd->buf_select ? BUF_BGID : 0);
+ ret = recv_prep(&ring, &sockfd, iov, rd->iov_count,
+ (rd->buf_ring || rd->buf_select) ? BUF_BGID : 0,
+ rd->async);
if (ret) {
fprintf(stderr, "recv_prep failed: %d\n", ret);
goto err;
@@ -208,14 +244,19 @@ static void *recv_fn(void *data)
pthread_mutex_unlock(mutex);
ret = do_recvmsg(&ring, buf, rd);
+ close(sockfd);
io_uring_queue_exit(&ring);
-
+ if (br)
+ free(br);
err:
return (void *)(intptr_t)ret;
out:
- pthread_mutex_unlock(mutex);
io_uring_queue_exit(&ring);
+out_no_ring:
+ pthread_mutex_unlock(mutex);
+ if (br)
+ free(br);
return NULL;
}
@@ -255,8 +296,11 @@ static int do_sendmsg(void)
return 1;
}
+ usleep(10000);
+
sqe = io_uring_get_sqe(&ring);
io_uring_prep_sendmsg(sqe, sockfd, &msg, 0);
+ sqe->user_data = ++ud;
ret = io_uring_submit(&ring);
if (ret <= 0) {
@@ -277,7 +321,8 @@ err:
return 1;
}
-static int test(int buf_select, int no_buf_add, int iov_count)
+static int test(int buf_select, int buf_ring, int no_buf_add, int iov_count,
+ int async)
{
struct recv_data rd;
pthread_mutexattr_t attr;
@@ -286,6 +331,9 @@ static int test(int buf_select, int no_buf_add, int iov_count)
int ret;
void *retval;
+ if (buf_select || buf_ring)
+ assert(iov_count == 1);
+
pthread_mutexattr_init(&attr);
pthread_mutexattr_setpshared(&attr, 1);
pthread_mutex_init(&mutex, &attr);
@@ -293,8 +341,10 @@ static int test(int buf_select, int no_buf_add, int iov_count)
rd.mutex = &mutex;
rd.buf_select = buf_select;
+ rd.buf_ring = buf_ring;
rd.no_buf_add = no_buf_add;
rd.iov_count = iov_count;
+ rd.async = async;
ret = pthread_create(&recv_thread, NULL, recv_fn, &rd);
if (ret) {
pthread_mutex_unlock(&mutex);
@@ -305,7 +355,7 @@ static int test(int buf_select, int no_buf_add, int iov_count)
pthread_mutex_lock(&mutex);
do_sendmsg();
pthread_join(recv_thread, &retval);
- ret = (int)(intptr_t)retval;
+ ret = (intptr_t)retval;
return ret;
}
@@ -317,27 +367,87 @@ int main(int argc, char *argv[])
if (argc > 1)
return 0;
- ret = test(0, 0, 1);
+ ret = test(0, 0, 0, 1, 0);
if (ret) {
- fprintf(stderr, "send_recvmsg 0 failed\n");
+ fprintf(stderr, "send_recvmsg 0 0 0 1 0 failed\n");
return 1;
}
- ret = test(0, 0, 10);
+ ret = test(0, 0, 0, 10, 0);
if (ret) {
fprintf(stderr, "send_recvmsg multi iov failed\n");
return 1;
}
- ret = test(1, 0, 1);
+ ret = test(1, 0, 0, 1, 0);
+ if (ret) {
+ fprintf(stderr, "send_recvmsg 1 0 0 1 0 failed\n");
+ return 1;
+ }
+
+ ret = test(1, 0, 1, 1, 0);
+ if (ret) {
+ fprintf(stderr, "send_recvmsg 1 0 1 1 0 failed\n");
+ return 1;
+ }
+
+ ret = test(0, 1, 0, 1, 0);
+ if (ret) {
+ fprintf(stderr, "send_recvmsg 0 1 0 1 0 failed\n");
+ return 1;
+ }
+
+ ret = test(1, 1, 0, 1, 0);
+ if (ret) {
+ fprintf(stderr, "send_recvmsg 1 1 0 1 0 failed\n");
+ return 1;
+ }
+
+ ret = test(1, 1, 1, 1, 0);
+ if (ret) {
+ fprintf(stderr, "send_recvmsg 1 1 1 1 0 failed\n");
+ return 1;
+ }
+
+ ret = test(0, 0, 0, 1, 1);
+ if (ret) {
+ fprintf(stderr, "send_recvmsg async 0 0 0 1 1 failed\n");
+ return 1;
+ }
+
+ ret = test(0, 0, 0, 10, 1);
+ if (ret) {
+ fprintf(stderr, "send_recvmsg async multi iov failed\n");
+ return 1;
+ }
+
+ ret = test(1, 0, 0, 1, 1);
+ if (ret) {
+ fprintf(stderr, "send_recvmsg async 1 0 0 1 1 failed\n");
+ return 1;
+ }
+
+ ret = test(1, 0, 1, 1, 1);
+ if (ret) {
+ fprintf(stderr, "send_recvmsg async 1 0 1 1 1 failed\n");
+ return 1;
+ }
+
+ ret = test(0, 1, 0, 1, 1);
+ if (ret) {
+ fprintf(stderr, "send_recvmsg async 0 1 0 1 1 failed\n");
+ return 1;
+ }
+
+ ret = test(1, 1, 0, 1, 1);
if (ret) {
- fprintf(stderr, "send_recvmsg 1 0 failed\n");
+ fprintf(stderr, "send_recvmsg async 1 1 0 1 1 failed\n");
return 1;
}
- ret = test(1, 1, 1);
+ ret = test(1, 1, 1, 1, 1);
if (ret) {
- fprintf(stderr, "send_recvmsg 1 1 failed\n");
+ fprintf(stderr, "send_recvmsg async 1 1 1 1 1 failed\n");
return 1;
}