aboutsummaryrefslogtreecommitdiffstats
path: root/io_uring
diff options
authorLinus Torvalds <torvalds@linux-foundation.org>2026-06-16 12:53:59 +0530
committerLinus Torvalds <torvalds@linux-foundation.org>2026-06-16 12:53:59 +0530
commit9b40ba14edcdf70240af8114092a76f75f070774 (patch)
tree7fd6958aa2c7ab93d86d2afed222b37c67e1c21b /io_uring
parentd29fd593e6836c96c6fd6df2b0cc6a47dda21b74 (diff)
parentd9b710f683dc68b5c0b7dd0c6c64aeb5d27a1ac4 (diff)
downloadath-9b40ba14edcdf70240af8114092a76f75f070774.tar.gz
Merge tag 'for-7.2/io_uring-20260615' of git://git.kernel.org/pub/scm/linux/kernel/git/axboe/linux
Pull io_uring updates from Jens Axboe: - Rework the task_work infrastructure. Both the local (DEFER_TASKRUN) and the normal (tctx) task_work lists were llist based, which is LIFO ordered, and hence each run had to do an O(n) list reversal pass first to restore queue order. Additionally, to cap the amount of task_work run, each method needed a retry list as well. Add a lockless MPCS FIFO queue (based on Dmitry Vyukov's intrusive MPSC algorithm) and switch both task_work lists to it. It performs better than llists and we can then also ditch the retry lists as well as entries are popped one-at-the-time. On top of those changes, run the tctx fallback task_work directly and remove the now-unused per-ctx fallback machinery entirely. - zcrx user notifications. Add a mechanism for zcrx to communicate conditions back to userspace via a dedicated CQE, with the initial users being notification on running out of buffers and on a frag copy fallback, plus shared-memory notification statistics. Alongside that, a series of zcrx reliability and cleanup fixes: more reliable scrubbing, poisoning pointers on unregistration, dropping an extra ifq close, adding a ctx back-pointer, reordering fd allocation in the export path, and killing a dead 'sock' member. - Allow using io_uring registered buffers for plain SEND and RECV, not just for the zero-copy send path. This enables targets like ublk's NBD backend to push/pull IO data directly to/from a registered buffer over a plain send/recv on a TCP socket. - Registered buffer improvements: account huge pages correctly, bump the io_mapped_ubuf length field to size_t, and raise the previous 1GB registered buffer size limit. - Restrict the ctx access exposed to io_uring BPF struct_ops programs by handing them an opaque type rather than the full io_ring_ctx, and add a separate MAINTAINERS entry for the bpf-ops code. - Allow opcode filtering on IORING_OP_CONNECT. - Validate ring-provided buffer addresses with access_ok(), and align the legacy buffer add limit with MAX_BIDS_PER_BGID. - Various other cleanups and minor fixes, including avoiding msghdr async data on connect/bind, dropping async_size for OP_LISTEN, making the POLL_FIRST receive side checks consistent, re-checking IO_WQ_BIT_EXIT for each linked work item, and using trace_call__##name() at guarded tracepoint call sites. * tag 'for-7.2/io_uring-20260615' of git://git.kernel.org/pub/scm/linux/kernel/git/axboe/linux: (31 commits) io_uring/bpf-ops: add a separate maintainer entry io_uring/net: make POLL_FIRST receive side checks consistent io_uring: remove the per-ctx fallback task_work machinery io_uring: run the tctx task_work fallback directly io_uring: switch normal task_work to a mpscq io_uring: switch local task_work to a mpscq io_uring/mpscq: add lockless multi-producer, single-consumer FIFO queue io_uring: grab RCU read lock marking task run io_uring/zcrx: kill dead 'sock' member in struct io_zcrx_args io_uring/kbuf: validate ring provided buffer addresses with access_ok() io_uring/net: support registered buffer for plain send and recv io_uring/nop: Drop a wrong comment in struct io_nop io_uring/net: Remove async_size for OP_LISTEN io_uring/net: Avoid msghdr on op_connect/op_bind async data io_uring/bpf-ops: restrict ctx access to BPF io_uring/io-wq: re-check IO_WQ_BIT_EXIT for each linked work item io_uring/kbuf: align legacy buffer add limit with MAX_BIDS_PER_BGID io_uring/zcrx: add shared-memory notification statistics io_uring/zcrx: notify user on frag copy fallback io_uring/zcrx: notify user when out of buffers ...
Diffstat (limited to 'io_uring')
-rw-r--r--io_uring/bpf-ops.c9
-rw-r--r--io_uring/bpf-ops.h2
-rw-r--r--io_uring/cancel.c2
-rw-r--r--io_uring/fdinfo.c2
-rw-r--r--io_uring/io-wq.c2
-rw-r--r--io_uring/io_uring.c14
-rw-r--r--io_uring/io_uring.h3
-rw-r--r--io_uring/kbuf.c18
-rw-r--r--io_uring/loop.c2
-rw-r--r--io_uring/loop.h10
-rw-r--r--io_uring/mpscq.h125
-rw-r--r--io_uring/net.c137
-rw-r--r--io_uring/net.h7
-rw-r--r--io_uring/nop.c1
-rw-r--r--io_uring/opdef.c7
-rw-r--r--io_uring/query.c16
-rw-r--r--io_uring/rsrc.c267
-rw-r--r--io_uring/rsrc.h7
-rw-r--r--io_uring/sqpoll.c30
-rw-r--r--io_uring/tctx.c3
-rw-r--r--io_uring/tw.c315
-rw-r--r--io_uring/tw.h11
-rw-r--r--io_uring/wait.c2
-rw-r--r--io_uring/wait.h12
-rw-r--r--io_uring/zcrx.c229
-rw-r--r--io_uring/zcrx.h11
26 files changed, 894 insertions, 350 deletions
diff --git a/io_uring/bpf-ops.c b/io_uring/bpf-ops.c
index 937e48bef40bc..5a50f0675fe58 100644
--- a/io_uring/bpf-ops.c
+++ b/io_uring/bpf-ops.c
@@ -14,15 +14,18 @@ static const struct btf_type *loop_params_type;
__bpf_kfunc_start_defs();
-__bpf_kfunc int bpf_io_uring_submit_sqes(struct io_ring_ctx *ctx, u32 nr)
+__bpf_kfunc int bpf_io_uring_submit_sqes(struct iou_ctx *loop_ctx, u32 nr)
{
+ struct io_ring_ctx *ctx = io_loop_demangle_ctx(loop_ctx);
+
return io_submit_sqes(ctx, nr);
}
__bpf_kfunc
-__u8 *bpf_io_uring_get_region(struct io_ring_ctx *ctx, __u32 region_id,
+__u8 *bpf_io_uring_get_region(struct iou_ctx *loop_ctx, __u32 region_id,
const size_t rdwr_buf_size)
{
+ struct io_ring_ctx *ctx = io_loop_demangle_ctx(loop_ctx);
struct io_mapped_region *r;
lockdep_assert_held(&ctx->uring_lock);
@@ -58,7 +61,7 @@ static const struct btf_kfunc_id_set bpf_io_uring_kfunc_set = {
.set = &io_uring_kfunc_set,
};
-static int io_bpf_ops__loop_step(struct io_ring_ctx *ctx,
+static int io_bpf_ops__loop_step(struct iou_ctx *ctx,
struct iou_loop_params *lp)
{
return IOU_LOOP_STOP;
diff --git a/io_uring/bpf-ops.h b/io_uring/bpf-ops.h
index b39b3fd3acdab..0b6d7894915e1 100644
--- a/io_uring/bpf-ops.h
+++ b/io_uring/bpf-ops.h
@@ -11,7 +11,7 @@ enum {
};
struct io_uring_bpf_ops {
- int (*loop_step)(struct io_ring_ctx *ctx, struct iou_loop_params *lp);
+ int (*loop_step)(struct iou_ctx *, struct iou_loop_params *lp);
__u32 ring_fd;
void *priv;
diff --git a/io_uring/cancel.c b/io_uring/cancel.c
index 4aa3103ba9c39..8c6fa6f367e4f 100644
--- a/io_uring/cancel.c
+++ b/io_uring/cancel.c
@@ -565,8 +565,6 @@ __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
mutex_unlock(&ctx->uring_lock);
if (tctx)
ret |= io_run_task_work() > 0;
- else
- ret |= flush_delayed_work(&ctx->fallback_work);
return ret;
}
diff --git a/io_uring/fdinfo.c b/io_uring/fdinfo.c
index 001fb542dc11a..3ae4804765b94 100644
--- a/io_uring/fdinfo.c
+++ b/io_uring/fdinfo.c
@@ -224,7 +224,7 @@ static void __io_uring_show_fdinfo(struct io_ring_ctx *ctx, struct seq_file *m)
if (ctx->buf_table.nodes[i])
buf = ctx->buf_table.nodes[i]->buf;
if (buf)
- seq_printf(m, "%5u: 0x%llx/%u\n", i, buf->ubuf, buf->len);
+ seq_printf(m, "%5u: 0x%llx/%zu\n", i, buf->ubuf, buf->len);
else
seq_printf(m, "%5u: <none>\n", i);
}
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index 8cc7b47d30894..2e14880eef92a 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -602,7 +602,6 @@ static void io_worker_handle_work(struct io_wq_acct *acct,
struct io_wq *wq = worker->wq;
do {
- bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
struct io_wq_work *work;
/*
@@ -638,6 +637,7 @@ static void io_worker_handle_work(struct io_wq_acct *acct,
/* handle a whole dependent link */
do {
+ bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
struct io_wq_work *next_hashed, *linked;
unsigned int work_flags = atomic_read(&work->flags);
unsigned int hash = __io_wq_is_hashed(work_flags)
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index bf847ca823f70..1ea2fca34a36f 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -160,7 +160,7 @@ static void io_poison_cached_req(struct io_kiocb *req)
req->apoll = IO_URING_PTR_POISON;
}
-static void io_poison_req(struct io_kiocb *req)
+void io_poison_req(struct io_kiocb *req)
{
io_poison_cached_req(req);
req->async_data = IO_URING_PTR_POISON;
@@ -233,6 +233,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
return NULL;
xa_init(&ctx->io_bl_xa);
+ xa_init(&ctx->hpage_acct);
/*
* Use 5 bits less than the max cq entries, that should give us around
@@ -279,7 +280,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
INIT_LIST_HEAD(&ctx->defer_list);
INIT_LIST_HEAD(&ctx->timeout_list);
INIT_LIST_HEAD(&ctx->ltimeout_list);
- init_llist_head(&ctx->work_llist);
+ mpscq_init(&ctx->work_list, &ctx->work_head);
INIT_LIST_HEAD(&ctx->tctx_list);
mutex_init(&ctx->tctx_lock);
ctx->submit_state.free_list.next = NULL;
@@ -288,7 +289,6 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
#ifdef CONFIG_FUTEX
INIT_HLIST_HEAD(&ctx->futex_list);
#endif
- INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
INIT_HLIST_HEAD(&ctx->cancelable_uring_cmd);
io_napi_init(ctx);
@@ -302,6 +302,7 @@ err:
io_free_alloc_caches(ctx);
kvfree(ctx->cancel_table.hbs);
xa_destroy(&ctx->io_bl_xa);
+ xa_destroy(&ctx->hpage_acct);
kfree(ctx);
return NULL;
}
@@ -1197,7 +1198,7 @@ __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
mutex_unlock(&ctx->uring_lock);
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
- io_move_task_work_from_local(ctx);
+ io_cancel_local_task_work(ctx);
}
static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned int min_events)
@@ -2209,6 +2210,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
io_napi_free(ctx);
kvfree(ctx->cancel_table.hbs);
xa_destroy(&ctx->io_bl_xa);
+ xa_destroy(&ctx->hpage_acct);
kfree(ctx);
}
@@ -2342,7 +2344,7 @@ static __cold void io_ring_exit_work(struct work_struct *work)
/* The SQPOLL thread never reaches this path */
do {
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
- io_move_task_work_from_local(ctx);
+ io_cancel_local_task_work(ctx);
cond_resched();
} while (io_uring_try_cancel_requests(ctx, NULL, true, false));
@@ -2428,8 +2430,6 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
io_unregister_personality(ctx, index);
mutex_unlock(&ctx->uring_lock);
- flush_delayed_work(&ctx->fallback_work);
-
INIT_WORK(&ctx->exit_work, io_ring_exit_work);
/*
* Use system_dfl_wq to avoid spawning tons of event kworkers
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index e612a66ee80e9..cb736b8154224 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -213,6 +213,7 @@ bool __io_alloc_req_refill(struct io_ring_ctx *ctx);
void io_activate_pollwq(struct io_ring_ctx *ctx);
void io_restriction_clone(struct io_restriction *dst, struct io_restriction *src);
+void io_poison_req(struct io_kiocb *req);
static inline void io_lockdep_assert_cq_locked(struct io_ring_ctx *ctx)
{
@@ -312,7 +313,7 @@ static __always_inline bool io_fill_cqe_req(struct io_ring_ctx *ctx,
}
if (trace_io_uring_complete_enabled())
- trace_io_uring_complete(req->ctx, req, cqe);
+ trace_call__io_uring_complete(req->ctx, req, cqe);
return true;
}
diff --git a/io_uring/kbuf.c b/io_uring/kbuf.c
index 926254b6898f9..3cd29477fff2d 100644
--- a/io_uring/kbuf.c
+++ b/io_uring/kbuf.c
@@ -21,7 +21,7 @@
#define MAX_BIDS_PER_BGID (1 << 16)
/* Mapped buffer ring, return io_uring_buf from head */
-#define io_ring_head_to_buf(br, head, mask) &(br)->bufs[(head) & (mask)]
+#define io_ring_head_to_buf(br, head, mask) (&(br)->bufs[(head) & (mask)])
struct io_provide_buf {
struct file *file;
@@ -210,10 +210,14 @@ static struct io_br_sel io_ring_buffer_select(struct io_kiocb *req, size_t *len,
buf_len = READ_ONCE(buf->len);
if (*len == 0 || *len > buf_len)
*len = buf_len;
+ sel.addr = u64_to_user_ptr(READ_ONCE(buf->addr));
+ if (unlikely(!access_ok(sel.addr, *len))) {
+ sel.addr = NULL;
+ return sel;
+ }
req->flags |= REQ_F_BUFFER_RING | REQ_F_BUFFERS_COMMIT;
req->buf_index = READ_ONCE(buf->bid);
sel.buf_list = bl;
- sel.addr = u64_to_user_ptr(READ_ONCE(buf->addr));
if (io_should_commit(req, issue_flags)) {
if (!io_kbuf_commit(req, sel.buf_list, *len, 1))
@@ -250,6 +254,7 @@ static int io_ring_buffers_peek(struct io_kiocb *req, struct buf_sel_arg *arg,
struct io_buffer_list *bl)
{
struct io_uring_buf_ring *br = bl->buf_ring;
+ struct iovec *org_iovs = arg->iovs;
struct iovec *iov = arg->iovs;
int nr_iovs = arg->nr_iovs;
__u16 nr_avail, tail, head;
@@ -310,6 +315,11 @@ static int io_ring_buffers_peek(struct io_kiocb *req, struct buf_sel_arg *arg,
iov->iov_base = u64_to_user_ptr(READ_ONCE(buf->addr));
iov->iov_len = len;
+ if (unlikely(!access_ok(iov->iov_base, len))) {
+ if (arg->iovs != org_iovs)
+ kfree(arg->iovs);
+ return -EFAULT;
+ }
iov++;
arg->out_len += len;
@@ -540,11 +550,11 @@ static int io_add_buffers(struct io_ring_ctx *ctx, struct io_provide_buf *pbuf,
for (i = 0; i < pbuf->nbufs; i++) {
/*
- * Nonsensical to have more than sizeof(bid) buffers in a
+ * Nonsensical to have more than MAX_BIDS_PER_BGID buffers in a
* buffer list, as the application then has no way of knowing
* which duplicate bid refers to what buffer.
*/
- if (bl->nbufs == USHRT_MAX) {
+ if (bl->nbufs == MAX_BIDS_PER_BGID) {
ret = -EOVERFLOW;
break;
}
diff --git a/io_uring/loop.c b/io_uring/loop.c
index 31843cc3e4510..bbbb6ef14e6a8 100644
--- a/io_uring/loop.c
+++ b/io_uring/loop.c
@@ -49,7 +49,7 @@ static int __io_run_loop(struct io_ring_ctx *ctx)
if (unlikely(!ctx->loop_step))
return -EFAULT;
- step_res = ctx->loop_step(ctx, &lp);
+ step_res = ctx->loop_step(io_loop_mangle_ctx(ctx), &lp);
if (step_res == IOU_LOOP_STOP)
break;
if (step_res != IOU_LOOP_CONTINUE)
diff --git a/io_uring/loop.h b/io_uring/loop.h
index d7718b9ce61ee..4dd4fb3aefefc 100644
--- a/io_uring/loop.h
+++ b/io_uring/loop.h
@@ -24,4 +24,14 @@ static inline bool io_has_loop_ops(struct io_ring_ctx *ctx)
int io_run_loop(struct io_ring_ctx *ctx);
+static inline struct iou_ctx *io_loop_mangle_ctx(struct io_ring_ctx *ctx)
+{
+ return (struct iou_ctx *)ctx;
+}
+
+static inline struct io_ring_ctx *io_loop_demangle_ctx(struct iou_ctx *ctx)
+{
+ return (struct io_ring_ctx *)ctx;
+}
+
#endif
diff --git a/io_uring/mpscq.h b/io_uring/mpscq.h
new file mode 100644
index 0000000000000..c801384c6a0aa
--- /dev/null
+++ b/io_uring/mpscq.h
@@ -0,0 +1,125 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef IOU_MPSCQ_H
+#define IOU_MPSCQ_H
+
+#include <linux/io_uring_types.h>
+
+/*
+ * mpscq - lockless multi-producer, single-consumer FIFO queue
+ *
+ * Unlike llist, which is LIFO ordered and hence needs an O(n)
+ * llist_reverse_order() pass before entries can be processed in queue order,
+ * this queue hands out nodes in the order they were pushed.
+ *
+ * The consumer cursor is held by the caller rather than in the queue struct
+ * (see below), and with the stub reinsertion done as a single cmpxchg attempt
+ * instead of an unconditional push, keeping tail == stub a reliable empty test
+ * while a producer is in the middle of a push.
+ *
+ * Producers may run in any context (task, softirq, hardirq) and are wait-free:
+ * a push is one xchg() plus one store, with no retry loops. FIFO order between
+ * producers is the order in which the xchg() on ->tail serializes them.
+ *
+ * The price for linked-list FIFO is that a push publishes the node in two
+ * steps: the xchg() makes it the new tail, and the subsequent store links it to
+ * its predecessor. In between, the tail end of the queue is not yet reachable
+ * from the head. mpscq_pop() detects this and returns NULL, while mpscq_empty()
+ * reports false. The consumer must not treat such a NULL as "queue empty" - it
+ * should retry later. The window is two instructions wide, but a producer can
+ * be preempted inside it, so the consumer must not spin on it while holding
+ * resources the producer might need to make progress.
+ *
+ * The consumer side only supports a single consumer at a time, callers must
+ * provide their own serialization for it. The stub node is what allows the
+ * consumer to detach the final node without racing with the link stores of
+ * producers. This scheme also guarantees that the previous tail observed by
+ * mpscq_push() cannot be freed by the consumer until the push has linked it,
+ * which is what makes the deferred link store safe.
+ *
+ * The queue struct only holds the producer side. The consumer keeps its cursor
+ * (the oldest not yet handed out node) externally and passes it to mpscq_pop(),
+ * so that it can be placed on a different cacheline: the cursor is written for
+ * every pop, and having it share a line with ->tail would have the consumer
+ * invalidating the line that producers need for every push.
+ */
+static inline void mpscq_init(struct mpscq *q, struct llist_node **headp)
+{
+ q->tail = *headp = &q->stub;
+ q->stub.next = NULL;
+}
+
+/*
+ * Returns true if the queue holds no entries that mpscq_pop() hasn't handed out
+ * yet. May be called from any context. Note that !empty doesn't guarantee that
+ * mpscq_pop() will return an entry yet, see the in-flight producer window
+ * above.
+ */
+static inline bool mpscq_empty(struct mpscq *q)
+{
+ return READ_ONCE(q->tail) == &q->stub;
+}
+
+/*
+ * Push a node onto the queue. Safe against concurrent pushes from any context,
+ * and against the (single) consumer. Returns true if the queue was empty
+ * before this push.
+ */
+static inline bool mpscq_push(struct mpscq *q, struct llist_node *node)
+{
+ struct llist_node *prev;
+
+ node->next = NULL;
+ /*
+ * xchg() implies a full barrier, so the initialization of the
+ * entry (including ->next above) is visible before the node can
+ * be reached, either via ->tail or via ->next chasing from the
+ * head once the store below has linked it.
+ */
+ prev = xchg(&q->tail, node);
+ WRITE_ONCE(prev->next, node);
+ return prev == &q->stub;
+}
+
+/*
+ * Pop the oldest node off the queue, or return NULL if no node is available.
+ * NULL is returned both when the queue is empty and when a producer has
+ * published a node via ->tail but hasn't linked it yet; use mpscq_empty() to
+ * tell the two apart. Single consumer only, with headp being the consumer
+ * cursor that mpscq_init() set up.
+ */
+static inline struct llist_node *mpscq_pop(struct mpscq *q,
+ struct llist_node **headp)
+{
+ struct llist_node *head = *headp, *next;
+
+ if (head == &q->stub) {
+ head = READ_ONCE(head->next);
+ if (!head)
+ return NULL;
+ /*
+ * The stub is now detached and stays quiescent until the
+ * consumer reinserts it as the tail, so reset its ->next here,
+ * ready for that.
+ */
+ q->stub.next = NULL;
+ *headp = head;
+ }
+ next = READ_ONCE(head->next);
+ if (next) {
+ *headp = next;
+ return head;
+ }
+ /*
+ * 'head' is the last linked node, it can only be handed out once the
+ * stub has taken its place as the tail. If the cmpxchg fails, a
+ * producer has made a new node the tail but hasn't linked 'head' to
+ * it yet - bail and let the caller retry.
+ */
+ if (try_cmpxchg(&q->tail, &head, &q->stub)) {
+ *headp = &q->stub;
+ return head;
+ }
+ return NULL;
+}
+
+#endif /* IOU_MPSCQ_H */
diff --git a/io_uring/net.c b/io_uring/net.c
index 34ab1f97cc5a3..00a7df803b99b 100644
--- a/io_uring/net.c
+++ b/io_uring/net.c
@@ -418,7 +418,8 @@ static int io_sendmsg_setup(struct io_kiocb *req, const struct io_uring_sqe *sqe
return io_net_import_vec(req, kmsg, msg.msg_iov, msg.msg_iovlen, ITER_SOURCE);
}
-#define SENDMSG_FLAGS (IORING_RECVSEND_POLL_FIRST | IORING_RECVSEND_BUNDLE | IORING_SEND_VECTORIZED)
+#define SENDMSG_FLAGS (IORING_RECVSEND_POLL_FIRST | IORING_RECVSEND_BUNDLE | \
+ IORING_SEND_VECTORIZED | IORING_RECVSEND_FIXED_BUF)
int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
@@ -431,6 +432,14 @@ int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
sr->flags = READ_ONCE(sqe->ioprio);
if (sr->flags & ~SENDMSG_FLAGS)
return -EINVAL;
+ if (sr->flags & IORING_RECVSEND_FIXED_BUF) {
+ /* registered buffer send only supported for plain IORING_OP_SEND */
+ if (req->opcode != IORING_OP_SEND ||
+ (req->flags & REQ_F_BUFFER_SELECT) ||
+ (sr->flags & (IORING_RECVSEND_BUNDLE|IORING_SEND_VECTORIZED)))
+ return -EINVAL;
+ req->buf_index = READ_ONCE(sqe->buf_index);
+ }
sr->msg_flags = READ_ONCE(sqe->msg_flags) | MSG_NOSIGNAL;
if (sr->msg_flags & MSG_DONTWAIT)
req->flags |= REQ_F_NOWAIT;
@@ -662,6 +671,15 @@ int io_send(struct io_kiocb *req, unsigned int issue_flags)
(sr->flags & IORING_RECVSEND_POLL_FIRST))
return -EAGAIN;
+ if (req->flags & REQ_F_IMPORT_BUFFER) {
+ ret = io_import_reg_buf(req, &kmsg->msg.msg_iter,
+ (u64)(uintptr_t)sr->buf, sr->len,
+ ITER_SOURCE, issue_flags);
+ if (unlikely(ret))
+ return ret;
+ req->flags &= ~REQ_F_IMPORT_BUFFER;
+ }
+
flags = sr->msg_flags;
if (issue_flags & IO_URING_F_NONBLOCK)
flags |= MSG_DONTWAIT;
@@ -776,6 +794,10 @@ static int io_recvmsg_prep_setup(struct io_kiocb *req)
if (req->flags & REQ_F_BUFFER_SELECT)
return 0;
+ if (sr->flags & IORING_RECVSEND_FIXED_BUF) {
+ req->flags |= REQ_F_IMPORT_BUFFER;
+ return 0;
+ }
return import_ubuf(ITER_DEST, sr->buf, sr->len,
&kmsg->msg.msg_iter);
}
@@ -784,7 +806,7 @@ static int io_recvmsg_prep_setup(struct io_kiocb *req)
}
#define RECVMSG_FLAGS (IORING_RECVSEND_POLL_FIRST | IORING_RECV_MULTISHOT | \
- IORING_RECVSEND_BUNDLE)
+ IORING_RECVSEND_BUNDLE | IORING_RECVSEND_FIXED_BUF)
int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
@@ -802,6 +824,14 @@ int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
sr->flags = READ_ONCE(sqe->ioprio);
if (sr->flags & ~RECVMSG_FLAGS)
return -EINVAL;
+ if (sr->flags & IORING_RECVSEND_FIXED_BUF) {
+ /* registered buffer recv only for plain IORING_OP_RECV */
+ if (req->opcode != IORING_OP_RECV ||
+ (req->flags & REQ_F_BUFFER_SELECT) ||
+ (sr->flags & (IORING_RECV_MULTISHOT | IORING_RECVSEND_BUNDLE)))
+ return -EINVAL;
+ req->buf_index = READ_ONCE(sqe->buf_index);
+ }
sr->msg_flags = READ_ONCE(sqe->msg_flags);
if (sr->msg_flags & MSG_DONTWAIT)
req->flags |= REQ_F_NOWAIT;
@@ -1187,18 +1217,30 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags)
bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
bool mshot_finished;
- if (!(req->flags & REQ_F_POLLED) &&
- (sr->flags & IORING_RECVSEND_POLL_FIRST))
- return -EAGAIN;
-
sock = sock_from_file(req->file);
if (unlikely(!sock))
return -ENOTSOCK;
+ if (!(req->flags & REQ_F_POLLED) &&
+ (sr->flags & IORING_RECVSEND_POLL_FIRST))
+ return -EAGAIN;
+
flags = sr->msg_flags;
if (force_nonblock)
flags |= MSG_DONTWAIT;
+ if (req->flags & REQ_F_IMPORT_BUFFER) {
+ ret = io_import_reg_buf(req, &kmsg->msg.msg_iter,
+ (u64)(uintptr_t)sr->buf, sr->len,
+ ITER_DEST, issue_flags);
+ if (unlikely(ret)) {
+ kmsg->msg.msg_inq = -1;
+ sel.buf_list = NULL;
+ goto out_free;
+ }
+ req->flags &= ~REQ_F_IMPORT_BUFFER;
+ }
+
retry_multishot:
sel.buf_list = NULL;
if (io_do_buffer_select(req)) {
@@ -1287,14 +1329,14 @@ int io_recvzc(struct io_kiocb *req, unsigned int issue_flags)
unsigned int len;
int ret;
- if (!(req->flags & REQ_F_POLLED) &&
- (zc->flags & IORING_RECVSEND_POLL_FIRST))
- return -EAGAIN;
-
sock = sock_from_file(req->file);
if (unlikely(!sock))
return -ENOTSOCK;
+ if (!(req->flags & REQ_F_POLLED) &&
+ (zc->flags & IORING_RECVSEND_POLL_FIRST))
+ return -EAGAIN;
+
len = zc->len;
ret = io_zcrx_recv(req, zc->ifq, sock, 0, issue_flags, &zc->len);
if (len && zc->len == 0) {
@@ -1675,6 +1717,46 @@ void io_socket_bpf_populate(struct io_uring_bpf_ctx *bctx, struct io_kiocb *req)
bctx->socket.protocol = sock->protocol;
}
+void io_connect_bpf_populate(struct io_uring_bpf_ctx *bctx, struct io_kiocb *req)
+{
+ struct io_connect *conn = io_kiocb_to_cmd(req, struct io_connect);
+ struct sockaddr_storage *ss = req->async_data;
+
+ /*
+ * move_addr_to_kernel() skips the copy for addr_len == 0, so
+ * iomsg->addr may hold stale data from a prior CONNECT. Bail
+ * unless addr_len covers the family discriminator.
+ */
+ if (conn->addr_len < (int)sizeof(sa_family_t))
+ return;
+
+ bctx->connect.family = ss->ss_family;
+ switch (ss->ss_family) {
+ case AF_INET: {
+ struct sockaddr_in *sin = (struct sockaddr_in *)ss;
+
+ if (conn->addr_len < (int)sizeof(*sin))
+ break;
+ bctx->connect.port = sin->sin_port;
+ bctx->connect.v4_addr = sin->sin_addr.s_addr;
+ break;
+ }
+ case AF_INET6: {
+ struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)ss;
+
+ if (conn->addr_len < (int)sizeof(*sin6))
+ break;
+ bctx->connect.port = sin6->sin6_port;
+ memcpy(bctx->connect.v6_addr, &sin6->sin6_addr,
+ sizeof(bctx->connect.v6_addr));
+ break;
+ }
+ default:
+ /* family is set; per-family fields stay zero - family-only filtering */
+ break;
+ }
+}
+
int io_socket_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_socket *sock = io_kiocb_to_cmd(req, struct io_socket);
@@ -1732,7 +1814,7 @@ int io_socket(struct io_kiocb *req, unsigned int issue_flags)
int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_connect *conn = io_kiocb_to_cmd(req, struct io_connect);
- struct io_async_msghdr *io;
+ struct sockaddr_storage *addr;
if (sqe->len || sqe->buf_index || sqe->rw_flags || sqe->splice_fd_in)
return -EINVAL;
@@ -1741,17 +1823,17 @@ int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
conn->addr_len = READ_ONCE(sqe->addr2);
conn->in_progress = conn->seen_econnaborted = false;
- io = io_msg_alloc_async(req);
- if (unlikely(!io))
+ addr = io_uring_alloc_async_data(NULL, req);
+ if (unlikely(!addr))
return -ENOMEM;
- return move_addr_to_kernel(conn->addr, conn->addr_len, &io->addr);
+ return move_addr_to_kernel(conn->addr, conn->addr_len, addr);
}
int io_connect(struct io_kiocb *req, unsigned int issue_flags)
{
struct io_connect *connect = io_kiocb_to_cmd(req, struct io_connect);
- struct io_async_msghdr *io = req->async_data;
+ struct sockaddr_storage *addr = req->async_data;
unsigned file_flags;
int ret;
bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
@@ -1765,8 +1847,7 @@ int io_connect(struct io_kiocb *req, unsigned int issue_flags)
file_flags = force_nonblock ? O_NONBLOCK : 0;
- ret = __sys_connect_file(req->file, &io->addr, connect->addr_len,
- file_flags);
+ ret = __sys_connect_file(req->file, addr, connect->addr_len, file_flags);
if ((ret == -EAGAIN || ret == -EINPROGRESS || ret == -ECONNABORTED)
&& force_nonblock) {
if (ret == -EINPROGRESS) {
@@ -1795,7 +1876,6 @@ get_sock_err:
out:
if (ret < 0)
req_set_fail(req);
- io_req_msg_cleanup(req, issue_flags);
io_req_set_res(req, ret, 0);
return IOU_COMPLETE;
}
@@ -1805,15 +1885,15 @@ out:
* which in turn end up in mnt_want_write() which will grab the fs
* percpu start write sem. This can trigger a lockdep warning.
*/
-static int io_bind_file_create(const struct io_async_msghdr *io, int addr_len)
+static int io_bind_file_create(const struct sockaddr_storage *addr, int addr_len)
{
const struct sockaddr_un *sun;
- if (io->addr.ss_family != AF_UNIX)
+ if (addr->ss_family != AF_UNIX)
return 0;
if (addr_len <= offsetof(struct sockaddr_un, sun_path))
return 0;
- sun = (const struct sockaddr_un *) &io->addr;
+ sun = (const struct sockaddr_un *) addr;
return sun->sun_path[0] != '\0';
}
@@ -1821,7 +1901,7 @@ int io_bind_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_bind *bind = io_kiocb_to_cmd(req, struct io_bind);
struct sockaddr __user *uaddr;
- struct io_async_msghdr *io;
+ struct sockaddr_storage *addr;
int ret;
if (sqe->len || sqe->buf_index || sqe->rw_flags || sqe->splice_fd_in)
@@ -1830,21 +1910,22 @@ int io_bind_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
uaddr = u64_to_user_ptr(READ_ONCE(sqe->addr));
bind->addr_len = READ_ONCE(sqe->addr2);
- io = io_msg_alloc_async(req);
- if (unlikely(!io))
+ addr = io_uring_alloc_async_data(NULL, req);
+ if (unlikely(!addr))
return -ENOMEM;
- ret = move_addr_to_kernel(uaddr, bind->addr_len, &io->addr);
+ ret = move_addr_to_kernel(uaddr, bind->addr_len, addr);
if (unlikely(ret))
return ret;
- if (io_bind_file_create(io, bind->addr_len))
+ if (io_bind_file_create(addr, bind->addr_len))
req->flags |= REQ_F_FORCE_ASYNC;
return 0;
}
+
int io_bind(struct io_kiocb *req, unsigned int issue_flags)
{
struct io_bind *bind = io_kiocb_to_cmd(req, struct io_bind);
- struct io_async_msghdr *io = req->async_data;
+ struct sockaddr_storage *addr = req->async_data;
struct socket *sock;
int ret;
@@ -1852,7 +1933,7 @@ int io_bind(struct io_kiocb *req, unsigned int issue_flags)
if (unlikely(!sock))
return -ENOTSOCK;
- ret = __sys_bind_socket(sock, &io->addr, bind->addr_len);
+ ret = __sys_bind_socket(sock, addr, bind->addr_len);
if (ret < 0)
req_set_fail(req);
io_req_set_res(req, ret, 0);
diff --git a/io_uring/net.h b/io_uring/net.h
index d4d1ddce50e3a..51fda715d3c0b 100644
--- a/io_uring/net.h
+++ b/io_uring/net.h
@@ -46,6 +46,7 @@ int io_accept(struct io_kiocb *req, unsigned int issue_flags);
int io_socket_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
int io_socket(struct io_kiocb *req, unsigned int issue_flags);
void io_socket_bpf_populate(struct io_uring_bpf_ctx *bctx, struct io_kiocb *req);
+void io_connect_bpf_populate(struct io_uring_bpf_ctx *bctx, struct io_kiocb *req);
int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
int io_connect(struct io_kiocb *req, unsigned int issue_flags);
@@ -69,4 +70,10 @@ static inline void io_socket_bpf_populate(struct io_uring_bpf_ctx *bctx,
struct io_kiocb *req)
{
}
+
+static inline void io_connect_bpf_populate(struct io_uring_bpf_ctx *bctx,
+ struct io_kiocb *req)
+{
+}
+
#endif
diff --git a/io_uring/nop.c b/io_uring/nop.c
index f5c9969e7f64a..91ae0b2e7e556 100644
--- a/io_uring/nop.c
+++ b/io_uring/nop.c
@@ -12,7 +12,6 @@
#include "nop.h"
struct io_nop {
- /* NOTE: kiocb has the file as the first member, so don't do it here */
struct file *file;
int result;
int fd;
diff --git a/io_uring/opdef.c b/io_uring/opdef.c
index c3ef52b708113..88a45c7d897f2 100644
--- a/io_uring/opdef.c
+++ b/io_uring/opdef.c
@@ -203,9 +203,11 @@ const struct io_issue_def io_issue_defs[] = {
.unbound_nonreg_file = 1,
.pollout = 1,
#if defined(CONFIG_NET)
- .async_size = sizeof(struct io_async_msghdr),
+ .filter_pdu_size = sizeof_field(struct io_uring_bpf_ctx, connect),
+ .async_size = sizeof(struct sockaddr_storage),
.prep = io_connect_prep,
.issue = io_connect,
+ .filter_populate = io_connect_bpf_populate,
#else
.prep = io_eopnotsupp_prep,
#endif
@@ -503,7 +505,7 @@ const struct io_issue_def io_issue_defs[] = {
.needs_file = 1,
.prep = io_bind_prep,
.issue = io_bind,
- .async_size = sizeof(struct io_async_msghdr),
+ .async_size = sizeof(struct sockaddr_storage),
#else
.prep = io_eopnotsupp_prep,
#endif
@@ -513,7 +515,6 @@ const struct io_issue_def io_issue_defs[] = {
.needs_file = 1,
.prep = io_listen_prep,
.issue = io_listen,
- .async_size = sizeof(struct io_async_msghdr),
#else
.prep = io_eopnotsupp_prep,
#endif
diff --git a/io_uring/query.c b/io_uring/query.c
index c1704d088374c..d529d94aa8f4a 100644
--- a/io_uring/query.c
+++ b/io_uring/query.c
@@ -9,6 +9,7 @@
union io_query_data {
struct io_uring_query_opcode opcodes;
struct io_uring_query_zcrx zcrx;
+ struct io_uring_query_zcrx_notif zcrx_notif;
struct io_uring_query_scq scq;
};
@@ -44,6 +45,18 @@ static ssize_t io_query_zcrx(union io_query_data *data)
return sizeof(*e);
}
+static ssize_t io_query_zcrx_notif(union io_query_data *data)
+{
+ struct io_uring_query_zcrx_notif *e = &data->zcrx_notif;
+
+ e->notif_flags = ZCRX_NOTIF_TYPE_MASK;
+ e->notif_stats_size = sizeof(struct zcrx_notif_stats);
+ e->notif_stats_off_alignment = __alignof__(struct zcrx_notif_stats);
+ e->__resv1 = 0;
+ memset(&e->__resv2, 0, sizeof(e->__resv2));
+ return sizeof(*e);
+}
+
static ssize_t io_query_scq(union io_query_data *data)
{
struct io_uring_query_scq *e = &data->scq;
@@ -83,6 +96,9 @@ static int io_handle_query_entry(union io_query_data *data, void __user *uhdr,
case IO_URING_QUERY_ZCRX:
ret = io_query_zcrx(data);
break;
+ case IO_URING_QUERY_ZCRX_NOTIF:
+ ret = io_query_zcrx_notif(data);
+ break;
case IO_URING_QUERY_SCQ:
ret = io_query_scq(data);
break;
diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c
index 650303626be6e..7f553d115e365 100644
--- a/io_uring/rsrc.c
+++ b/io_uring/rsrc.c
@@ -28,7 +28,52 @@ struct io_rsrc_update {
};
static struct io_rsrc_node *io_sqe_buffer_register(struct io_ring_ctx *ctx,
- struct iovec *iov, struct page **last_hpage);
+ struct iovec *iov);
+
+static int hpage_acct_ref(struct io_ring_ctx *ctx, struct page *hpage,
+ bool *acct_new)
+{
+ unsigned long key = (unsigned long) hpage;
+ unsigned long count;
+ void *entry;
+ int ret;
+
+ lockdep_assert_held(&ctx->uring_lock);
+
+ entry = xa_load(&ctx->hpage_acct, key);
+ if (entry) {
+ *acct_new = false;
+ count = xa_to_value(entry) + 1;
+ } else {
+ ret = xa_reserve(&ctx->hpage_acct, key, GFP_KERNEL_ACCOUNT);
+ if (ret)
+ return ret;
+ *acct_new = true;
+ count = 1;
+ }
+ xa_store(&ctx->hpage_acct, key, xa_mk_value(count), GFP_KERNEL_ACCOUNT);
+ return 0;
+}
+
+static bool hpage_acct_unref(struct io_ring_ctx *ctx, struct page *hpage)
+{
+ unsigned long key = (unsigned long) hpage;
+ unsigned long count;
+ void *entry;
+
+ lockdep_assert_held(&ctx->uring_lock);
+
+ entry = xa_load(&ctx->hpage_acct, key);
+ if (WARN_ON_ONCE(!entry))
+ return false;
+ count = xa_to_value(entry);
+ if (count == 1) {
+ xa_erase(&ctx->hpage_acct, key);
+ return true;
+ }
+ xa_store(&ctx->hpage_acct, key, xa_mk_value(count - 1), GFP_KERNEL_ACCOUNT);
+ return false;
+}
/* only define max */
#define IORING_MAX_FIXED_FILES (1U << 20)
@@ -88,9 +133,14 @@ int io_validate_user_buf_range(u64 uaddr, u64 ulen)
unsigned long tmp, base = (unsigned long)uaddr;
unsigned long acct_len = (unsigned long)PAGE_ALIGN(ulen);
- /* arbitrary limit, but we need something */
- if (ulen > SZ_1G || !ulen)
+ if (!ulen)
+ return -EFAULT;
+ /* 32-bit sanity checking */
+ if (ulen > ULONG_MAX || uaddr > ULONG_MAX)
return -EFAULT;
+ /* cap to 1TB for 64-bit */
+ if (ulen > SZ_1T)
+ return -EINVAL;
if (check_add_overflow(base, acct_len, &tmp))
return -EOVERFLOW;
return 0;
@@ -124,15 +174,53 @@ static void io_free_imu(struct io_ring_ctx *ctx, struct io_mapped_ubuf *imu)
kvfree(imu);
}
+static unsigned long io_buffer_unaccount_pages(struct io_ring_ctx *ctx,
+ struct io_mapped_ubuf *imu)
+{
+ struct page *seen = NULL;
+ unsigned long acct = 0;
+ int i;
+
+ if (imu->flags & IO_REGBUF_F_KBUF || !ctx->user)
+ return 0;
+
+ for (i = 0; i < imu->nr_bvecs; i++) {
+ struct page *page = imu->bvec[i].bv_page;
+ struct page *hpage;
+
+ if (!PageCompound(page)) {
+ acct++;
+ continue;
+ }
+
+ hpage = compound_head(page);
+ if (hpage == seen)
+ continue;
+ seen = hpage;
+
+ /* Unaccount on last reference */
+ if (hpage_acct_unref(ctx, hpage))
+ acct += page_size(hpage) >> PAGE_SHIFT;
+ cond_resched();
+ }
+
+ return acct;
+}
+
static void io_buffer_unmap(struct io_ring_ctx *ctx, struct io_mapped_ubuf *imu)
{
+ unsigned long acct_pages = 0;
+
+ /* Always decrement, so it works for cloned buffers too */
+ acct_pages = io_buffer_unaccount_pages(ctx, imu);
+
if (unlikely(refcount_read(&imu->refs) > 1)) {
if (!refcount_dec_and_test(&imu->refs))
return;
}
- if (imu->acct_pages)
- io_unaccount_mem(ctx->user, ctx->mm_account, imu->acct_pages);
+ if (acct_pages)
+ io_unaccount_mem(ctx->user, ctx->mm_account, acct_pages);
imu->release(imu->priv);
io_free_imu(ctx, imu);
}
@@ -282,7 +370,6 @@ static int __io_sqe_buffers_update(struct io_ring_ctx *ctx,
{
u64 __user *tags = u64_to_user_ptr(up->tags);
struct iovec fast_iov, *iov;
- struct page *last_hpage = NULL;
struct iovec __user *uvec;
u64 user_data = up->data;
__u32 done;
@@ -307,7 +394,7 @@ static int __io_sqe_buffers_update(struct io_ring_ctx *ctx,
err = -EFAULT;
break;
}
- node = io_sqe_buffer_register(ctx, iov, &last_hpage);
+ node = io_sqe_buffer_register(ctx, iov);
if (IS_ERR(node)) {
err = PTR_ERR(node);
break;
@@ -605,76 +692,79 @@ int io_sqe_buffers_unregister(struct io_ring_ctx *ctx)
}
/*
- * Not super efficient, but this is just a registration time. And we do cache
- * the last compound head, so generally we'll only do a full search if we don't
- * match that one.
- *
- * We check if the given compound head page has already been accounted, to
- * avoid double accounting it. This allows us to account the full size of the
- * page, not just the constituent pages of a huge page.
+ * Undo hpage_acct_ref() calls made during io_buffer_account_pin() on failure.
+ * This operates on the pages array since imu->bvec isn't populated yet.
*/
-static bool headpage_already_acct(struct io_ring_ctx *ctx, struct page **pages,
- int nr_pages, struct page *hpage)
+static void io_buffer_unaccount_hpages(struct io_ring_ctx *ctx,
+ struct page **pages, int nr_pages)
{
- int i, j;
+ struct page *seen = NULL;
+ int i;
+
+ if (!ctx->user)
+ return;
- /* check current page array */
for (i = 0; i < nr_pages; i++) {
+ struct page *hpage;
+
if (!PageCompound(pages[i]))
continue;
- if (compound_head(pages[i]) == hpage)
- return true;
- }
- /* check previously registered pages */
- for (i = 0; i < ctx->buf_table.nr; i++) {
- struct io_rsrc_node *node = ctx->buf_table.nodes[i];
- struct io_mapped_ubuf *imu;
-
- if (!node)
+ hpage = compound_head(pages[i]);
+ if (hpage == seen)
continue;
- imu = node->buf;
- for (j = 0; j < imu->nr_bvecs; j++) {
- if (!PageCompound(imu->bvec[j].bv_page))
- continue;
- if (compound_head(imu->bvec[j].bv_page) == hpage)
- return true;
- }
- }
+ seen = hpage;
- return false;
+ hpage_acct_unref(ctx, hpage);
+ cond_resched();
+ }
}
static int io_buffer_account_pin(struct io_ring_ctx *ctx, struct page **pages,
- int nr_pages, struct io_mapped_ubuf *imu,
- struct page **last_hpage)
+ int nr_pages)
{
+ unsigned long acct_pages = 0;
+ struct page *seen = NULL;
int i, ret;
- imu->acct_pages = 0;
+ if (!ctx->user)
+ return 0;
+
for (i = 0; i < nr_pages; i++) {
+ struct page *hpage;
+ bool acct_new;
+
if (!PageCompound(pages[i])) {
- imu->acct_pages++;
- } else {
- struct page *hpage;
+ acct_pages++;
+ continue;
+ }
- hpage = compound_head(pages[i]);
- if (hpage == *last_hpage)
- continue;
- *last_hpage = hpage;
- if (headpage_already_acct(ctx, pages, i, hpage))
- continue;
- imu->acct_pages += page_size(hpage) >> PAGE_SHIFT;
+ hpage = compound_head(pages[i]);
+ if (hpage == seen)
+ continue;
+ seen = hpage;
+
+ ret = hpage_acct_ref(ctx, hpage, &acct_new);
+ if (ret) {
+ io_buffer_unaccount_hpages(ctx, pages, i);
+ return ret;
}
+ if (acct_new)
+ acct_pages += page_size(hpage) >> PAGE_SHIFT;
+ cond_resched();
}
- if (!imu->acct_pages)
- return 0;
+ /* Try to account the memory */
+ if (acct_pages) {
+ ret = io_account_mem(ctx->user, ctx->mm_account, acct_pages);
+ if (ret) {
+ /* Undo the refs we just added */
+ io_buffer_unaccount_hpages(ctx, pages, nr_pages);
+ return ret;
+ }
+ }
- ret = io_account_mem(ctx->user, ctx->mm_account, imu->acct_pages);
- if (ret)
- imu->acct_pages = 0;
- return ret;
+ return 0;
}
static bool io_coalesce_buffer(struct page ***pages, int *nr_pages,
@@ -763,8 +853,7 @@ bool io_check_coalesce_buffer(struct page **page_array, int nr_pages,
}
static struct io_rsrc_node *io_sqe_buffer_register(struct io_ring_ctx *ctx,
- struct iovec *iov,
- struct page **last_hpage)
+ struct iovec *iov)
{
struct io_mapped_ubuf *imu = NULL;
struct page **pages = NULL;
@@ -811,7 +900,7 @@ static struct io_rsrc_node *io_sqe_buffer_register(struct io_ring_ctx *ctx,
goto done;
imu->nr_bvecs = nr_pages;
- ret = io_buffer_account_pin(ctx, pages, nr_pages, imu, last_hpage);
+ ret = io_buffer_account_pin(ctx, pages, nr_pages);
if (ret)
goto done;
@@ -861,7 +950,6 @@ done:
int io_sqe_buffers_register(struct io_ring_ctx *ctx, void __user *arg,
unsigned int nr_args, u64 __user *tags)
{
- struct page *last_hpage = NULL;
struct io_rsrc_data data;
struct iovec fast_iov, *iov = &fast_iov;
const struct iovec __user *uvec;
@@ -904,7 +992,7 @@ int io_sqe_buffers_register(struct io_ring_ctx *ctx, void __user *arg,
}
}
- node = io_sqe_buffer_register(ctx, iov, &last_hpage);
+ node = io_sqe_buffer_register(ctx, iov);
if (IS_ERR(node)) {
ret = PTR_ERR(node);
break;
@@ -971,7 +1059,6 @@ int io_buffer_register_bvec(struct io_uring_cmd *cmd, struct request *rq,
imu->ubuf = 0;
imu->len = blk_rq_bytes(rq);
- imu->acct_pages = 0;
imu->folio_shift = PAGE_SHIFT;
refcount_set(&imu->refs, 1);
imu->release = release;
@@ -1136,6 +1223,56 @@ int io_import_reg_buf(struct io_kiocb *req, struct iov_iter *iter,
return io_import_fixed(ddir, iter, node->buf, buf_addr, len);
}
+static int io_buffer_acct_cloned_hpages(struct io_ring_ctx *ctx,
+ struct io_mapped_ubuf *imu)
+{
+ struct page *seen = NULL;
+ int i, ret = 0;
+
+ if (imu->flags & IO_REGBUF_F_KBUF || !ctx->user)
+ return 0;
+
+ for (i = 0; i < imu->nr_bvecs; i++) {
+ struct page *page = imu->bvec[i].bv_page;
+ struct page *hpage;
+ bool acct_new;
+
+ if (!PageCompound(page))
+ continue;
+
+ hpage = compound_head(page);
+ if (hpage == seen)
+ continue;
+ seen = hpage;
+
+ /* Atomically add reference for cloned buffer */
+ ret = hpage_acct_ref(ctx, hpage, &acct_new);
+ if (ret)
+ break;
+
+ cond_resched();
+ }
+
+ if (!ret)
+ return 0;
+
+ /* Undo refs we added for bvecs [0..i) */
+ seen = NULL;
+ for (int j = 0; j < i; j++) {
+ struct page *p = imu->bvec[j].bv_page;
+ struct page *hp;
+
+ if (!PageCompound(p))
+ continue;
+ hp = compound_head(p);
+ if (hp == seen)
+ continue;
+ seen = hp;
+ hpage_acct_unref(ctx, hp);
+ }
+ return ret;
+}
+
/* Lock two rings at once. The rings must be different! */
static void lock_two_rings(struct io_ring_ctx *ctx1, struct io_ring_ctx *ctx2)
{
@@ -1218,6 +1355,14 @@ static int io_clone_buffers(struct io_ring_ctx *ctx, struct io_ring_ctx *src_ctx
refcount_inc(&src_node->buf->refs);
dst_node->buf = src_node->buf;
+ /* track compound references to clones */
+ ret = io_buffer_acct_cloned_hpages(ctx, src_node->buf);
+ if (ret) {
+ refcount_dec(&src_node->buf->refs);
+ io_cache_free(&ctx->node_cache, dst_node);
+ io_rsrc_data_free(ctx, &data);
+ return ret;
+ }
}
data.nodes[off++] = dst_node;
i++;
diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h
index 44e3386f7c1ca..98ae8ef51009d 100644
--- a/io_uring/rsrc.h
+++ b/io_uring/rsrc.h
@@ -34,15 +34,14 @@ enum {
struct io_mapped_ubuf {
u64 ubuf;
- unsigned int len;
+ size_t len;
unsigned int nr_bvecs;
unsigned int folio_shift;
refcount_t refs;
- unsigned long acct_pages;
- void (*release)(void *);
- void *priv;
u8 flags;
u8 dir;
+ void (*release)(void *);
+ void *priv;
struct bio_vec bvec[] __counted_by(nr_bvecs);
};
diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c
index 46c12afec73ec..2460bd6052665 100644
--- a/io_uring/sqpoll.c
+++ b/io_uring/sqpoll.c
@@ -260,39 +260,29 @@ static bool io_sqd_handle_event(struct io_sq_data *sqd)
}
/*
- * Run task_work, processing the retry_list first. The retry_list holds
- * entries that we passed on in the previous run, if we had more task_work
- * than we were asked to process. Newly queued task_work isn't run until the
- * retry list has been fully processed.
+ * Run task_work, processing no more than max_entries at a time. If more
+ * than that is pending, it simply stays on the queue for the next run.
*/
-static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
+static unsigned int io_sq_tw(int max_entries)
{
struct io_uring_task *tctx = current->io_uring;
unsigned int count = 0;
- if (*retry_list) {
- *retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
- if (count >= max_entries)
- goto out;
- max_entries -= count;
- }
- *retry_list = tctx_task_work_run(tctx, max_entries, &count);
-out:
+ tctx_task_work_run(tctx, max_entries, &count);
if (task_work_pending(current))
task_work_run();
return count;
}
-static bool io_sq_tw_pending(struct llist_node *retry_list)
+static bool io_sq_tw_pending(void)
{
struct io_uring_task *tctx = current->io_uring;
- return retry_list || !llist_empty(&tctx->task_list);
+ return !mpscq_empty(&tctx->task_list);
}
static int io_sq_thread(void *data)
{
- struct llist_node *retry_list = NULL;
struct io_sq_data *sqd = data;
struct io_ring_ctx *ctx;
unsigned long timeout = 0;
@@ -347,7 +337,7 @@ static int io_sq_thread(void *data)
if (!sqt_spin && (ret > 0 || !list_empty(&ctx->iopoll_list)))
sqt_spin = true;
}
- if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE))
+ if (io_sq_tw(IORING_TW_CAP_ENTRIES_VALUE))
sqt_spin = true;
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
@@ -372,7 +362,7 @@ static int io_sq_thread(void *data)
}
prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
- if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) {
+ if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending()) {
bool needs_sched = true;
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
@@ -411,8 +401,8 @@ static int io_sq_thread(void *data)
timeout = jiffies + sqd->sq_thread_idle;
}
- if (retry_list)
- io_sq_tw(&retry_list, UINT_MAX);
+ if (io_sq_tw_pending())
+ io_sq_tw(UINT_MAX);
io_uring_cancel_generic(true, sqd);
rcu_assign_pointer(sqd->thread, NULL);
diff --git a/io_uring/tctx.c b/io_uring/tctx.c
index 42b219b34aa8f..cc3bf2b3bdbc9 100644
--- a/io_uring/tctx.c
+++ b/io_uring/tctx.c
@@ -103,7 +103,8 @@ __cold struct io_uring_task *io_uring_alloc_task_context(struct task_struct *tas
init_waitqueue_head(&tctx->wait);
atomic_set(&tctx->in_cancel, 0);
atomic_set(&tctx->inflight_tracked, 0);
- init_llist_head(&tctx->task_list);
+ mpscq_init(&tctx->task_list, &tctx->task_head);
+ INIT_WORK(&tctx->fallback_work, io_tctx_fallback_work);
init_task_work(&tctx->task_work, tctx_task_work);
return tctx;
}
diff --git a/io_uring/tw.c b/io_uring/tw.c
index 023d5e6bc491a..e74372233f40b 100644
--- a/io_uring/tw.c
+++ b/io_uring/tw.c
@@ -14,24 +14,7 @@
#include "rw.h"
#include "eventfd.h"
#include "wait.h"
-
-void io_fallback_req_func(struct work_struct *work)
-{
- struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
- fallback_work.work);
- struct llist_node *node = llist_del_all(&ctx->fallback_llist);
- struct io_kiocb *req, *tmp;
- struct io_tw_state ts = {};
-
- percpu_ref_get(&ctx->refs);
- mutex_lock(&ctx->uring_lock);
- ts.cancel = io_should_terminate_tw(ctx);
- llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
- req->io_task_work.func((struct io_tw_req){req}, ts);
- io_submit_flush_completions(ctx);
- mutex_unlock(&ctx->uring_lock);
- percpu_ref_put(&ctx->refs);
-}
+#include "mpscq.h"
static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
{
@@ -45,23 +28,68 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
percpu_ref_put(&ctx->refs);
}
+void io_tctx_fallback_work(struct work_struct *work)
+{
+ struct io_uring_task *tctx = container_of(work, struct io_uring_task,
+ fallback_work);
+ unsigned int count = 0;
+
+ /* see tctx_task_work() - a set bit must always have a run coming */
+ clear_bit(0, &tctx->tw_pending);
+ smp_mb__after_atomic();
+
+ /*
+ * Run the entries directly. We're in PF_KTHRED context, hence
+ * io_should_terminate_tw() is true and they will be marked as
+ * canceled.
+ */
+ tctx_task_work_run(tctx, UINT_MAX, &count);
+ put_task_struct(tctx->task);
+}
+
+static void io_fallback_tw(struct io_uring_task *tctx)
+{
+ /*
+ * The task ref both keeps ->task valid and, as __io_uring_free() is
+ * only called when the task itself is freed, ensures the tctx (and
+ * the queued work) stay around until the drain has run.
+ */
+ get_task_struct(tctx->task);
+ if (!queue_work(system_unbound_wq, &tctx->fallback_work))
+ put_task_struct(tctx->task);
+}
+
/*
- * Run queued task_work, returning the number of entries processed in *count.
- * If more entries than max_entries are available, stop processing once this
- * is reached and return the rest of the list.
+ * Run queued task_work, processing no more than max_entries, with the number
+ * of entries processed added to *count. If more entries than max_entries are
+ * available, the remainder simply stay on the queue for the next run.
*/
-struct llist_node *io_handle_tw_list(struct llist_node *node,
- unsigned int *count,
- unsigned int max_entries)
+void tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries,
+ unsigned int *count)
{
struct io_ring_ctx *ctx = NULL;
struct io_tw_state ts = { };
- do {
- struct llist_node *next = node->next;
- struct io_kiocb *req = container_of(node, struct io_kiocb,
- io_task_work.node);
+ while (*count < max_entries) {
+ struct llist_node *node = mpscq_pop(&tctx->task_list,
+ &tctx->task_head);
+ struct io_kiocb *req;
+ if (!node) {
+ if (mpscq_empty(&tctx->task_list))
+ break;
+ /*
+ * A producer has published a node but hasn't
+ * linked it into the queue yet (see mpscq_pop()).
+ * Give it a chance to finish rather than spinning,
+ * and don't sit on the ctx lock while doing so.
+ */
+ ctx_flush_and_put(ctx, ts);
+ ctx = NULL;
+ cond_resched();
+ continue;
+ }
+ req = container_of(node, struct io_kiocb, io_task_work.node);
if (req->ctx != ctx) {
ctx_flush_and_put(ctx, ts);
ctx = req->ctx;
@@ -72,84 +100,36 @@ struct llist_node *io_handle_tw_list(struct llist_node *node,
INDIRECT_CALL_2(req->io_task_work.func,
io_poll_task_func, io_req_rw_complete,
(struct io_tw_req){req}, ts);
- node = next;
(*count)++;
if (unlikely(need_resched())) {
ctx_flush_and_put(ctx, ts);
ctx = NULL;
cond_resched();
}
- } while (node && *count < max_entries);
-
- ctx_flush_and_put(ctx, ts);
- return node;
-}
-
-static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
-{
- struct io_ring_ctx *last_ctx = NULL;
- struct io_kiocb *req;
-
- while (node) {
- req = container_of(node, struct io_kiocb, io_task_work.node);
- node = node->next;
- if (last_ctx != req->ctx) {
- if (last_ctx) {
- if (sync)
- flush_delayed_work(&last_ctx->fallback_work);
- percpu_ref_put(&last_ctx->refs);
- }
- last_ctx = req->ctx;
- percpu_ref_get(&last_ctx->refs);
- }
- if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist))
- schedule_delayed_work(&last_ctx->fallback_work, 1);
- }
-
- if (last_ctx) {
- if (sync)
- flush_delayed_work(&last_ctx->fallback_work);
- percpu_ref_put(&last_ctx->refs);
- }
-}
-
-static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
-{
- struct llist_node *node = llist_del_all(&tctx->task_list);
-
- __io_fallback_tw(node, sync);
-}
-
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
- unsigned int max_entries,
- unsigned int *count)
-{
- struct llist_node *node;
-
- node = llist_del_all(&tctx->task_list);
- if (node) {
- node = llist_reverse_order(node);
- node = io_handle_tw_list(node, count, max_entries);
}
+ ctx_flush_and_put(ctx, ts);
- /* relaxed read is enough as only the task itself sets ->in_cancel */
- if (unlikely(atomic_read(&tctx->in_cancel)))
+ /*
+ * Relaxed read is enough as only the task itself sets ->in_cancel.
+ * The tctx may also be drained by io_tctx_fallback_work(), in which
+ * case current is a kworker that has no tctx refs to drop.
+ */
+ if (unlikely(atomic_read(&tctx->in_cancel)) &&
+ current->io_uring == tctx)
io_uring_drop_tctx_refs(current);
trace_io_uring_task_work_run(tctx, *count);
- return node;
}
void tctx_task_work(struct callback_head *cb)
{
struct io_uring_task *tctx;
- struct llist_node *ret;
unsigned int count = 0;
tctx = container_of(cb, struct io_uring_task, task_work);
- ret = tctx_task_work_run(tctx, UINT_MAX, &count);
- /* can't happen */
- WARN_ON_ONCE(ret);
+ clear_bit(0, &tctx->tw_pending);
+ smp_mb__after_atomic();
+ tctx_task_work_run(tctx, UINT_MAX, &count);
}
/*
@@ -158,11 +138,11 @@ void tctx_task_work(struct callback_head *cb)
*/
static void io_ctx_mark_taskrun(struct io_ring_ctx *ctx)
{
- lockdep_assert_in_rcu_read_lock();
-
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) {
- struct io_rings *rings = rcu_dereference(ctx->rings_rcu);
+ struct io_rings *rings;
+ guard(rcu)();
+ rings = rcu_dereference(ctx->rings_rcu);
atomic_or(IORING_SQ_TASKRUN, &rings->sq_flags);
}
}
@@ -170,11 +150,7 @@ static void io_ctx_mark_taskrun(struct io_ring_ctx *ctx)
void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
{
struct io_ring_ctx *ctx = req->ctx;
- unsigned nr_wait, nr_tw, nr_tw_prev;
- struct llist_node *head;
-
- /* See comment above IO_CQ_WAKE_INIT */
- BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
+ int nr_wait;
/*
* We don't know how many requests there are in the link and whether
@@ -183,56 +159,45 @@ void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
if (req->flags & IO_REQ_LINK_FLAGS)
flags &= ~IOU_F_TWQ_LAZY_WAKE;
- guard(rcu)();
-
- head = READ_ONCE(ctx->work_llist.first);
- do {
- nr_tw_prev = 0;
- if (head) {
- struct io_kiocb *first_req = container_of(head,
- struct io_kiocb,
- io_task_work.node);
- /*
- * Might be executed at any moment, rely on
- * SLAB_TYPESAFE_BY_RCU to keep it alive.
- */
- nr_tw_prev = READ_ONCE(first_req->nr_tw);
- }
-
- /*
- * Theoretically, it can overflow, but that's fine as one of
- * previous adds should've tried to wake the task.
- */
- nr_tw = nr_tw_prev + 1;
- if (!(flags & IOU_F_TWQ_LAZY_WAKE))
- nr_tw = IO_CQ_WAKE_FORCE;
-
- req->nr_tw = nr_tw;
- req->io_task_work.node.next = head;
- } while (!try_cmpxchg(&ctx->work_llist.first, &head,
- &req->io_task_work.node));
-
/*
- * cmpxchg implies a full barrier, which pairs with the barrier
- * in set_current_state() on the io_cqring_wait() side. It's used
- * to ensure that either we see updated ->cq_wait_nr, or waiters
- * going to sleep will observe the work added to the list, which
- * is similar to the wait/wawke task state sync.
+ * The xchg() in mpscq_push() implies a full barrier, which pairs with
+ * the barrier in set_current_state() on the io_cqring_wait() side. This
+ * ensures that either we see the updated ->cq_wait_nr, or waiters going
+ * to sleep will observe the work added to the list, which is similar to
+ * the wait/wake task state sync.
*/
-
- if (!head) {
+ if (mpscq_push(&ctx->work_list, &req->io_task_work.node)) {
io_ctx_mark_taskrun(ctx);
if (data_race(ctx->int_flags) & IO_RING_F_HAS_EVFD)
io_eventfd_signal(ctx, false);
}
+ /*
+ * No one is waiting (IO_CQ_WAKE_INIT), or this cycle's wake up has
+ * already been issued (zero or negative, see below).
+ */
nr_wait = atomic_read(&ctx->cq_wait_nr);
- /* not enough or no one is waiting */
- if (nr_tw < nr_wait)
+ if (nr_wait <= 0)
return;
- /* the previous add has already woken it up */
- if (nr_tw_prev >= nr_wait)
+ if (flags & IOU_F_TWQ_LAZY_WAKE) {
+ /*
+ * ->cq_wait_nr counts down the number of lazy adds, once it
+ * hits zero we're good to wake the waiter. A producer that
+ * gets delayed between pushing its entry and getting here
+ * may count down a later wait cycle. That's OK, it'll be an
+ * early wake, not a lost one.
+ */
+ if (!atomic_dec_and_test(&ctx->cq_wait_nr))
+ return;
+ } else if (atomic_xchg(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT) <= 0) {
+ /*
+ * Potentially raced with lazy add, claim the wake. A value
+ * <= 0 means a lazy add hit zero or another forced add
+ * claimed IO_CQ_WAKE_INIT. Either way, the wake up for this
+ * wait cycle has already been done.
+ */
return;
+ }
wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
}
@@ -242,7 +207,7 @@ void io_req_normal_work_add(struct io_kiocb *req)
struct io_ring_ctx *ctx = req->ctx;
/* task_work already pending, we're done */
- if (!llist_add(&req->io_task_work.node, &tctx->task_list))
+ if (!mpscq_push(&tctx->task_list, &req->io_task_work.node))
return;
/*
@@ -258,10 +223,14 @@ void io_req_normal_work_add(struct io_kiocb *req)
return;
}
+ /* task_work must only be added once */
+ if (test_and_set_bit(0, &tctx->tw_pending))
+ return;
+
if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method)))
return;
- io_fallback_tw(tctx, false);
+ io_fallback_tw(tctx);
}
void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)
@@ -271,23 +240,34 @@ void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)
__io_req_task_work_add(req, flags);
}
-void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
+void __cold io_cancel_local_task_work(struct io_ring_ctx *ctx)
{
+ struct io_tw_state ts = { .cancel = true };
struct llist_node *node;
/*
- * Running the work items may utilize ->retry_llist as a means
- * for capping the number of task_work entries run at the same
- * time. But that list can potentially race with moving the work
- * from here, if the task is exiting. As any normal task_work
- * running holds ->uring_lock already, just guard this slow path
- * with ->uring_lock to avoid racing on ->retry_llist.
+ * The work list consumer side is serialized by ->uring_lock, see
+ * __io_run_local_work(). Grab it to guard against racing with normal
+ * task_work running, as the task may be exiting. The ring is going
+ * away, run the entries in cancel mode right here - the callers
+ * provide the same process context the per-ctx fallback work that
+ * they were previously punted to ran in.
*/
guard(mutex)(&ctx->uring_lock);
- node = llist_del_all(&ctx->work_llist);
- __io_fallback_tw(node, false);
- node = llist_del_all(&ctx->retry_llist);
- __io_fallback_tw(node, false);
+
+ while (!mpscq_empty(&ctx->work_list)) {
+ struct io_kiocb *req;
+
+ node = mpscq_pop(&ctx->work_list, &ctx->work_head);
+ if (!node) {
+ /* a producer is mid-push, wait for it to link */
+ cond_resched();
+ continue;
+ }
+ req = container_of(node, struct io_kiocb, io_task_work.node);
+ req->io_task_work.func((struct io_tw_req){req}, ts);
+ }
+ io_submit_flush_completions(ctx);
}
static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
@@ -302,22 +282,23 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
return false;
}
-static int __io_run_local_work_loop(struct llist_node **node,
+static int __io_run_local_work_loop(struct io_ring_ctx *ctx,
io_tw_token_t tw,
int events)
{
int ret = 0;
- while (*node) {
- struct llist_node *next = (*node)->next;
- struct io_kiocb *req = container_of(*node, struct io_kiocb,
- io_task_work.node);
+ while (ret < events) {
+ struct llist_node *node = mpscq_pop(&ctx->work_list, &ctx->work_head);
+ struct io_kiocb *req;
+
+ if (!node)
+ break;
+ req = container_of(node, struct io_kiocb, io_task_work.node);
INDIRECT_CALL_2(req->io_task_work.func,
io_poll_task_func, io_req_rw_complete,
(struct io_tw_req){req}, tw);
- *node = next;
- if (++ret >= events)
- break;
+ ret++;
}
return ret;
@@ -326,7 +307,6 @@ static int __io_run_local_work_loop(struct llist_node **node,
static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw,
int min_events, int max_events)
{
- struct llist_node *node;
unsigned int loops = 0;
int ret = 0;
@@ -335,24 +315,21 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw,
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
again:
- tw.cancel = io_should_terminate_tw(ctx);
- min_events -= ret;
- ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events);
- if (ctx->retry_llist.first)
- goto retry_done;
-
/*
- * llists are in reverse order, flip it back the right way before
- * running the pending items.
+ * If the last loop made no progress while work is still pending,
+ * a producer has published a node but hasn't linked it into the
+ * queue yet (see mpscq_pop()). Give it a chance to finish rather
+ * than spinning on the queue.
*/
- node = llist_reverse_order(llist_del_all(&ctx->work_llist));
- ret += __io_run_local_work_loop(&node, tw, max_events - ret);
- ctx->retry_llist.first = node;
+ if (unlikely(loops && !ret))
+ cond_resched();
+ tw.cancel = io_should_terminate_tw(ctx);
+ min_events -= ret;
+ ret = __io_run_local_work_loop(ctx, tw, max_events);
loops++;
if (io_run_local_work_continue(ctx, ret, min_events))
goto again;
-retry_done:
io_submit_flush_completions(ctx);
if (io_run_local_work_continue(ctx, ret, min_events))
goto again;
diff --git a/io_uring/tw.h b/io_uring/tw.h
index 415e330fabdeb..3ade5ad577fda 100644
--- a/io_uring/tw.h
+++ b/io_uring/tw.h
@@ -6,6 +6,8 @@
#include <linux/percpu-refcount.h>
#include <linux/io_uring_types.h>
+#include "mpscq.h"
+
#define IO_LOCAL_TW_DEFAULT_MAX 20
/*
@@ -23,18 +25,17 @@ static inline bool io_should_terminate_tw(struct io_ring_ctx *ctx)
}
void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags);
-struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries);
void tctx_task_work(struct callback_head *cb);
+void io_tctx_fallback_work(struct work_struct *work);
int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events);
int io_run_task_work_sig(struct io_ring_ctx *ctx);
-__cold void io_fallback_req_func(struct work_struct *work);
-__cold void io_move_task_work_from_local(struct io_ring_ctx *ctx);
+__cold void io_cancel_local_task_work(struct io_ring_ctx *ctx);
int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events);
void io_req_local_work_add(struct io_kiocb *req, unsigned flags);
void io_req_normal_work_add(struct io_kiocb *req);
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
+void tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
static inline void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
{
@@ -89,7 +90,7 @@ static inline int io_run_task_work(void)
static inline bool io_local_work_pending(struct io_ring_ctx *ctx)
{
- return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist);
+ return !mpscq_empty(&ctx->work_list);
}
static inline bool io_task_work_pending(struct io_ring_ctx *ctx)
diff --git a/io_uring/wait.c b/io_uring/wait.c
index d005ea17b35f4..a48523c2f356b 100644
--- a/io_uring/wait.c
+++ b/io_uring/wait.c
@@ -98,7 +98,7 @@ static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer)
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
atomic_set(&ctx->cq_wait_nr, 1);
smp_mb();
- if (!llist_empty(&ctx->work_llist))
+ if (io_local_work_pending(ctx))
goto out_wake;
}
diff --git a/io_uring/wait.h b/io_uring/wait.h
index a4274b137f817..b7b9c46b1b013 100644
--- a/io_uring/wait.h
+++ b/io_uring/wait.h
@@ -5,12 +5,14 @@
#include <linux/io_uring_types.h>
/*
- * No waiters. It's larger than any valid value of the tw counter
- * so that tests against ->cq_wait_nr would fail and skip wake_up().
+ * ->cq_wait_nr is armed with the number of lazy task_work adds the waiter
+ * still needs, and counted down by the add side, with the add reaching zero
+ * issuing the (single) wake up for this wait cycle. Zero and below means no
+ * wake up is to be issued: IO_CQ_WAKE_INIT when no task is waiting (also
+ * what a forced wake up resets it to when claiming one), zero once the
+ * countdown has fired.
*/
-#define IO_CQ_WAKE_INIT (-1U)
-/* Forced wake up if there is a waiter regardless of ->cq_wait_nr */
-#define IO_CQ_WAKE_FORCE (IO_CQ_WAKE_INIT >> 1)
+#define IO_CQ_WAKE_INIT (-1)
struct ext_arg {
size_t argsz;
diff --git a/io_uring/zcrx.c b/io_uring/zcrx.c
index 19837e0b5e918..49163f9c39df3 100644
--- a/io_uring/zcrx.c
+++ b/io_uring/zcrx.c
@@ -44,6 +44,17 @@ static inline struct io_zcrx_area *io_zcrx_iov_to_area(const struct net_iov *nio
return container_of(owner, struct io_zcrx_area, nia);
}
+static bool zcrx_set_ring_ctx(struct io_zcrx_ifq *zcrx,
+ struct io_ring_ctx *ctx)
+{
+ guard(spinlock_bh)(&zcrx->ctx_lock);
+ if (zcrx->master_ctx)
+ return false;
+ percpu_ref_get(&ctx->refs);
+ zcrx->master_ctx = ctx;
+ return true;
+}
+
static inline struct page *io_zcrx_iov_page(const struct net_iov *niov)
{
struct io_zcrx_area *area = io_zcrx_iov_to_area(niov);
@@ -245,14 +256,13 @@ static void io_release_area_mem(struct io_zcrx_mem *mem)
{
if (mem->is_dmabuf) {
io_release_dmabuf(mem);
- return;
- }
- if (mem->pages) {
+ } else if (mem->pages) {
unpin_user_pages(mem->pages, mem->nr_folios);
sg_free_table(mem->sgt);
- mem->sgt = NULL;
kvfree(mem->pages);
}
+ mem->pages = IO_URING_PTR_POISON;
+ mem->sgt = IO_URING_PTR_POISON;
}
static int io_import_area(struct io_zcrx_ifq *ifq,
@@ -329,7 +339,6 @@ static void zcrx_sync_for_device(struct page_pool *pp, struct io_zcrx_ifq *zcrx,
struct io_zcrx_args {
struct io_kiocb *req;
struct io_zcrx_ifq *ifq;
- struct socket *sock;
unsigned nr_skbs;
};
@@ -403,8 +412,9 @@ static int io_allocate_rbuf_ring(struct io_ring_ctx *ctx,
static void io_free_rbuf_ring(struct io_zcrx_ifq *ifq)
{
io_free_region(ifq->user, &ifq->rq_region);
- ifq->rq.ring = NULL;
- ifq->rq.rqes = NULL;
+ ifq->rq.ring = IO_URING_PTR_POISON;
+ ifq->rq.rqes = IO_URING_PTR_POISON;
+ ifq->notif_stats = IO_URING_PTR_POISON;
}
static void io_zcrx_free_area(struct io_zcrx_ifq *ifq,
@@ -530,6 +540,7 @@ static struct io_zcrx_ifq *io_zcrx_ifq_alloc(struct io_ring_ctx *ctx)
return NULL;
ifq->if_rxq = -1;
+ spin_lock_init(&ifq->ctx_lock);
spin_lock_init(&ifq->rq.lock);
mutex_init(&ifq->pp_lock);
refcount_set(&ifq->refs, 1);
@@ -575,7 +586,12 @@ static void io_close_queue(struct io_zcrx_ifq *ifq)
static void io_zcrx_ifq_free(struct io_zcrx_ifq *ifq)
{
- io_close_queue(ifq);
+ if (WARN_ON_ONCE(ifq->if_rxq != -1))
+ return;
+ if (WARN_ON_ONCE(ifq->netdev != NULL))
+ return;
+ if (WARN_ON_ONCE(ifq->master_ctx))
+ return;
if (ifq->area)
io_zcrx_free_area(ifq, ifq->area);
@@ -652,17 +668,24 @@ static void io_zcrx_scrub(struct io_zcrx_ifq *ifq)
}
}
-static void zcrx_unregister_user(struct io_zcrx_ifq *ifq)
+static void zcrx_unregister_user(struct io_zcrx_ifq *ifq, struct io_ring_ctx *ctx)
{
+ scoped_guard(spinlock_bh, &ifq->ctx_lock) {
+ if (ctx && ifq->master_ctx == ctx) {
+ ifq->master_ctx = NULL;
+ percpu_ref_put(&ctx->refs);
+ }
+ }
+
if (refcount_dec_and_test(&ifq->user_refs)) {
io_close_queue(ifq);
io_zcrx_scrub(ifq);
}
}
-static void zcrx_unregister(struct io_zcrx_ifq *ifq)
+static void zcrx_unregister(struct io_zcrx_ifq *ifq, struct io_ring_ctx *ctx)
{
- zcrx_unregister_user(ifq);
+ zcrx_unregister_user(ifq, ctx);
io_put_zcrx_ifq(ifq);
}
@@ -682,7 +705,7 @@ static int zcrx_box_release(struct inode *inode, struct file *file)
if (WARN_ON_ONCE(!ifq))
return -EFAULT;
- zcrx_unregister(ifq);
+ zcrx_unregister(ifq, NULL);
return 0;
}
@@ -696,19 +719,10 @@ static int zcrx_export(struct io_ring_ctx *ctx, struct io_zcrx_ifq *ifq,
{
struct zcrx_ctrl_export *ce = &ctrl->zc_export;
struct file *file;
- int fd = -1;
+ int fd;
if (!mem_is_zero(ce, sizeof(*ce)))
return -EINVAL;
- fd = get_unused_fd_flags(O_CLOEXEC);
- if (fd < 0)
- return fd;
-
- ce->zcrx_fd = fd;
- if (copy_to_user(arg, ctrl, sizeof(*ctrl))) {
- put_unused_fd(fd);
- return -EFAULT;
- }
refcount_inc(&ifq->refs);
refcount_inc(&ifq->user_refs);
@@ -716,11 +730,23 @@ static int zcrx_export(struct io_ring_ctx *ctx, struct io_zcrx_ifq *ifq,
file = anon_inode_create_getfile("[zcrx]", &zcrx_box_fops,
ifq, O_CLOEXEC, NULL);
if (IS_ERR(file)) {
- put_unused_fd(fd);
- zcrx_unregister(ifq);
+ zcrx_unregister(ifq, NULL);
return PTR_ERR(file);
}
+ fd = get_unused_fd_flags(O_CLOEXEC);
+ if (fd < 0) {
+ fput(file);
+ return fd;
+ }
+
+ ce->zcrx_fd = fd;
+ if (copy_to_user(arg, ctrl, sizeof(*ctrl))) {
+ fput(file);
+ put_unused_fd(fd);
+ return -EFAULT;
+ }
+
fd_install(fd, file);
return 0;
}
@@ -740,6 +766,8 @@ static int import_zcrx(struct io_ring_ctx *ctx,
return -EINVAL;
if (reg->if_rxq || reg->rq_entries || reg->area_ptr || reg->region_ptr)
return -EINVAL;
+ if (reg->notif_desc)
+ return -EINVAL;
if (reg->flags & ~ZCRX_REG_IMPORT)
return -EINVAL;
@@ -780,7 +808,7 @@ err_xa_erase:
scoped_guard(mutex, &ctx->mmap_lock)
xa_erase(&ctx->zcrx_ctxs, id);
err:
- zcrx_unregister(ifq);
+ zcrx_unregister(ifq, ctx);
return ret;
}
@@ -825,9 +853,37 @@ netdev_put_unlock:
return ret;
}
+static int zcrx_validate_notif_stats(struct io_zcrx_ifq *ifq,
+ const struct io_uring_zcrx_ifq_reg *reg,
+ const struct zcrx_notification_desc *notif)
+{
+ size_t stats_off = notif->stats_offset;
+ size_t used, end;
+
+ used = reg->offsets.rqes +
+ sizeof(struct io_uring_zcrx_rqe) * reg->rq_entries;
+
+ if (!IS_ALIGNED(stats_off, __alignof__(struct zcrx_notif_stats)))
+ return -EINVAL;
+ if (stats_off < used)
+ return -ERANGE;
+ if (check_add_overflow(stats_off,
+ sizeof(struct zcrx_notif_stats),
+ &end))
+ return -ERANGE;
+ if (end > io_region_size(&ifq->rq_region))
+ return -ERANGE;
+
+ ifq->notif_stats = io_region_get_ptr(&ifq->rq_region) + stats_off;
+ memset(ifq->notif_stats, 0, sizeof(*ifq->notif_stats));
+
+ return 0;
+}
+
int io_register_zcrx(struct io_ring_ctx *ctx,
struct io_uring_zcrx_ifq_reg __user *arg)
{
+ struct zcrx_notification_desc notif;
struct io_uring_zcrx_area_reg area;
struct io_uring_zcrx_ifq_reg reg;
struct io_uring_region_desc rd;
@@ -871,10 +927,28 @@ int io_register_zcrx(struct io_ring_ctx *ctx,
if (copy_from_user(&area, u64_to_user_ptr(reg.area_ptr), sizeof(area)))
return -EFAULT;
+ memset(&notif, 0, sizeof(notif));
+ if (reg.notif_desc && copy_from_user(&notif, u64_to_user_ptr(reg.notif_desc),
+ sizeof(notif)))
+ return -EFAULT;
+ if (notif.type_mask & ~ZCRX_NOTIF_TYPE_MASK)
+ return -EINVAL;
+ if (notif.flags & ~ZCRX_NOTIF_DESC_FLAG_STATS)
+ return -EINVAL;
+ if (!(notif.flags & ZCRX_NOTIF_DESC_FLAG_STATS)) {
+ if (notif.stats_offset)
+ return -EINVAL;
+ }
+ if (!mem_is_zero(&notif.__resv2, sizeof(notif.__resv2)))
+ return -EINVAL;
+
ifq = io_zcrx_ifq_alloc(ctx);
if (!ifq)
return -ENOMEM;
+ ifq->notif_data = notif.user_data;
+ ifq->allowed_notif_mask = notif.type_mask;
+
if (ctx->user) {
get_uid(ctx->user);
ifq->user = ctx->user;
@@ -896,6 +970,12 @@ int io_register_zcrx(struct io_ring_ctx *ctx,
if (ret)
goto err;
+ if (notif.flags & ZCRX_NOTIF_DESC_FLAG_STATS) {
+ ret = zcrx_validate_notif_stats(ifq, &reg, &notif);
+ if (ret)
+ goto err;
+ }
+
ifq->kern_readable = !(area.flags & IORING_ZCRX_AREA_DMABUF);
if (!(reg.flags & ZCRX_REG_NODEV)) {
@@ -925,12 +1005,15 @@ int io_register_zcrx(struct io_ring_ctx *ctx,
ret = -EFAULT;
goto err;
}
+
+ if (notif.type_mask)
+ zcrx_set_ring_ctx(ifq, ctx);
return 0;
err:
scoped_guard(mutex, &ctx->mmap_lock)
xa_erase(&ctx->zcrx_ctxs, id);
ifq_free:
- zcrx_unregister(ifq);
+ zcrx_unregister(ifq, ctx);
return ret;
}
@@ -960,7 +1043,7 @@ void io_terminate_zcrx(struct io_ring_ctx *ctx)
break;
set_zcrx_entry_mark(ctx, id);
id++;
- zcrx_unregister_user(ifq);
+ zcrx_unregister_user(ifq, ctx);
}
}
@@ -985,6 +1068,12 @@ void io_unregister_zcrx(struct io_ring_ctx *ctx)
}
if (!ifq)
break;
+ /*
+ * io_uring can run requests and return buffers to the user
+ * after termination, scrub it again.
+ */
+ if (refcount_read(&ifq->user_refs) == 0)
+ io_zcrx_scrub(ifq);
io_put_zcrx_ifq(ifq);
}
@@ -1091,6 +1180,53 @@ static unsigned io_zcrx_refill_slow(struct page_pool *pp, struct io_zcrx_ifq *if
return allocated;
}
+static void zcrx_notif_tw(struct io_tw_req tw_req, io_tw_token_t tw)
+{
+ struct io_kiocb *req = tw_req.req;
+ struct io_ring_ctx *ctx = req->ctx;
+
+ io_post_aux_cqe(ctx, req->cqe.user_data, req->cqe.res, 0);
+ percpu_ref_put(&ctx->refs);
+ io_poison_req(req);
+ kmem_cache_free(req_cachep, req);
+}
+
+static void zcrx_stat_add(__u64 *p, s64 v)
+{
+ WRITE_ONCE(*p, READ_ONCE(*p) + v);
+}
+
+static void zcrx_send_notif(struct io_zcrx_ifq *ifq, unsigned type)
+{
+ gfp_t gfp = GFP_ATOMIC | __GFP_NOWARN | __GFP_ZERO;
+ u32 type_mask = 1 << type;
+ struct io_kiocb *req;
+
+ if (!(type_mask & ifq->allowed_notif_mask))
+ return;
+
+ guard(spinlock_bh)(&ifq->ctx_lock);
+ if (!ifq->master_ctx)
+ return;
+ if (type_mask & ifq->fired_notifs)
+ return;
+
+ req = kmem_cache_alloc(req_cachep, gfp);
+ if (unlikely(!req))
+ return;
+
+ ifq->fired_notifs |= type_mask;
+
+ req->opcode = IORING_OP_NOP;
+ req->cqe.user_data = ifq->notif_data;
+ req->cqe.res = type;
+ req->ctx = ifq->master_ctx;
+ percpu_ref_get(&req->ctx->refs);
+ req->tctx = NULL;
+ req->io_task_work.func = zcrx_notif_tw;
+ io_req_task_work_add(req);
+}
+
static netmem_ref io_pp_zc_alloc_netmems(struct page_pool *pp, gfp_t gfp)
{
struct io_zcrx_ifq *ifq = io_pp_to_ifq(pp);
@@ -1107,8 +1243,10 @@ static netmem_ref io_pp_zc_alloc_netmems(struct page_pool *pp, gfp_t gfp)
goto out_return;
allocated = io_zcrx_refill_slow(pp, ifq, netmems, to_alloc);
- if (!allocated)
+ if (!allocated) {
+ zcrx_send_notif(ifq, ZCRX_NOTIF_NO_BUFFERS);
return 0;
+ }
out_return:
zcrx_sync_for_device(pp, ifq, netmems, allocated);
allocated--;
@@ -1257,12 +1395,32 @@ static int zcrx_flush_rq(struct io_ring_ctx *ctx, struct io_zcrx_ifq *zcrx,
return 0;
}
+static int zcrx_arm_notif(struct io_ring_ctx *ctx, struct io_zcrx_ifq *zcrx,
+ struct zcrx_ctrl *ctrl)
+{
+ const struct zcrx_ctrl_arm_notif *an = &ctrl->zc_arm_notif;
+ unsigned type_mask;
+
+ if (an->notif_type >= __ZCRX_NOTIF_TYPE_LAST)
+ return -EINVAL;
+ if (!mem_is_zero(&an->__resv, sizeof(an->__resv)))
+ return -EINVAL;
+
+ guard(spinlock_bh)(&zcrx->ctx_lock);
+ type_mask = 1U << an->notif_type;
+ if (type_mask & ~zcrx->fired_notifs)
+ return -EINVAL;
+ zcrx->fired_notifs &= ~type_mask;
+ return 0;
+}
+
int io_zcrx_ctrl(struct io_ring_ctx *ctx, void __user *arg, unsigned nr_args)
{
struct zcrx_ctrl ctrl;
struct io_zcrx_ifq *zcrx;
BUILD_BUG_ON(sizeof(ctrl.zc_export) != sizeof(ctrl.zc_flush));
+ BUILD_BUG_ON(sizeof(ctrl.zc_export) != sizeof(ctrl.zc_arm_notif));
if (nr_args)
return -EINVAL;
@@ -1280,6 +1438,8 @@ int io_zcrx_ctrl(struct io_ring_ctx *ctx, void __user *arg, unsigned nr_args)
return zcrx_flush_rq(ctx, zcrx, &ctrl);
case ZCRX_CTRL_EXPORT:
return zcrx_export(ctx, zcrx, &ctrl, arg);
+ case ZCRX_CTRL_ARM_NOTIFICATION:
+ return zcrx_arm_notif(ctx, zcrx, &ctrl);
}
return -EOPNOTSUPP;
@@ -1416,8 +1576,18 @@ static int io_zcrx_copy_frag(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
const skb_frag_t *frag, int off, int len)
{
struct page *page = skb_frag_page(frag);
+ int ret;
- return io_zcrx_copy_chunk(req, ifq, page, off + skb_frag_off(frag), len);
+ ret = io_zcrx_copy_chunk(req, ifq, page, off + skb_frag_off(frag), len);
+ if (ret > 0) {
+ if (ifq->notif_stats) {
+ zcrx_stat_add(&ifq->notif_stats->copy_count, 1);
+ zcrx_stat_add(&ifq->notif_stats->copy_bytes, ret);
+ }
+ zcrx_send_notif(ifq, ZCRX_NOTIF_COPY);
+ }
+
+ return ret;
}
static int io_zcrx_recv_frag(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
@@ -1562,7 +1732,6 @@ static int io_zcrx_tcp_recvmsg(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
struct io_zcrx_args args = {
.req = req,
.ifq = ifq,
- .sock = sk->sk_socket,
};
read_descriptor_t rd_desc = {
.count = len ? len : UINT_MAX,
diff --git a/io_uring/zcrx.h b/io_uring/zcrx.h
index 75e0a4e6ef6e4..fa00900e479e7 100644
--- a/io_uring/zcrx.h
+++ b/io_uring/zcrx.h
@@ -9,7 +9,9 @@
#include <net/net_trackers.h>
#define ZCRX_SUPPORTED_REG_FLAGS (ZCRX_REG_IMPORT | ZCRX_REG_NODEV)
-#define ZCRX_FEATURES (ZCRX_FEATURE_RX_PAGE_SIZE)
+#define ZCRX_FEATURES (ZCRX_FEATURE_RX_PAGE_SIZE |\
+ ZCRX_FEATURE_NOTIFICATION)
+#define ZCRX_NOTIF_TYPE_MASK ((1U << ZCRX_NOTIF_NO_BUFFERS) | (1U << ZCRX_NOTIF_COPY))
struct io_zcrx_mem {
unsigned long size;
@@ -72,6 +74,13 @@ struct io_zcrx_ifq {
*/
struct mutex pp_lock;
struct io_mapped_region rq_region;
+
+ spinlock_t ctx_lock;
+ struct io_ring_ctx *master_ctx;
+ u32 allowed_notif_mask;
+ u32 fired_notifs;
+ u64 notif_data;
+ struct zcrx_notif_stats *notif_stats;
};
#if defined(CONFIG_IO_URING_ZCRX)