From ce52a5631bd14ae2872abf504cec537701eca266 Mon Sep 17 00:00:00 2001 From: Josh Bleecher Snyder Date: Thu, 8 Jul 2021 18:26:19 -0700 Subject: [PATCH] Revert "eliminate prefetch" this breaks graceful shutdown. details TBD. This reverts commit a474e79bf8967a573f05d07cee0b1abdbee4608a. --- net/uring/file_linux.go | 2 ++ net/uring/io_uring_linux.c | 8 +++-- net/uring/io_uring_linux.go | 67 ++++++++++++++++++++++++++----------- net/uring/udp_linux.go | 2 ++ syncs/syncs.go | 4 +-- 5 files changed, 60 insertions(+), 23 deletions(-) diff --git a/net/uring/file_linux.go b/net/uring/file_linux.go index 484098f61..5e56dee50 100644 --- a/net/uring/file_linux.go +++ b/net/uring/file_linux.go @@ -166,6 +166,8 @@ func (u *file) Write(buf []byte) (int, error) { rbuf := sliceOf(r.buf, len(buf)) copy(rbuf, buf) C.submit_writev_request(u.writeRing.ring, r, C.int(len(buf))) + // Get an extra buffer, if available. + u.writeRing.prefetch() return len(buf), nil } diff --git a/net/uring/io_uring_linux.c b/net/uring/io_uring_linux.c index 192c477a0..71d82d506 100644 --- a/net/uring/io_uring_linux.c +++ b/net/uring/io_uring_linux.c @@ -136,13 +136,17 @@ struct completion_result { typedef struct completion_result go_completion_result; -static go_completion_result completion(struct io_uring *ring) { +static go_completion_result completion(struct io_uring *ring, int block) { struct io_uring_cqe *cqe; struct completion_result res; res.err = 0; res.n = 0; res.idx = 0; - res.err = io_uring_wait_cqe(ring, &cqe); + if (block) { + res.err = io_uring_wait_cqe(ring, &cqe); + } else { + res.err = io_uring_peek_cqe(ring, &cqe); + } if (res.err < 0) { return res; } diff --git a/net/uring/io_uring_linux.go b/net/uring/io_uring_linux.go index df4c81d98..e591b0ebf 100644 --- a/net/uring/io_uring_linux.go +++ b/net/uring/io_uring_linux.go @@ -9,8 +9,6 @@ import ( "reflect" "syscall" "unsafe" - - "tailscale.com/syncs" ) // A writeRing is an io_uring usable for sendmsg or pwritev calls. @@ -21,11 +19,9 @@ type writeRing struct { // We dispatch them to the kernel as writes are requested. // The array length is tied to the size of the uring. reqs [8]*C.goreq - // free is the lowed unused index into reqs. - // It is only useful for the first len(reqs) requests. - // After that, we retrieve a completion result from - // the kernel for new writes/sends. - free syncs.AtomicInt32 + // reqC is a channel containing indices into reqs + // that are free to use (that is, not in the kernel). + reqC chan int } // initReqs initializes r's reqs so that they can be used for writes/sends. @@ -33,26 +29,46 @@ func (r *writeRing) initReqs(ipLen int) { for i := range &r.reqs { r.reqs[i] = C.initializeReq(bufferSize, C.size_t(i), C.int(ipLen)) } + r.reqC = make(chan int, len(r.reqs)) + for i := range r.reqs { + r.reqC <- i + } } // getReq gets a req usable for a write/send. // It blocks until such a req is available. func (r *writeRing) getReq() (req *C.goreq, err error) { - if idx := r.free.Add(1) - 1; idx < int32(len(r.reqs)) { - return r.reqs[idx], nil - } - // Get a req from the kernel. - n, idx, err := waitCompletion(r.ring) - if err != nil { - return nil, fmt.Errorf("Write io_uring call failed: %w", err) - } - if n < 0 { - // Past syscall failed. - return nil, fmt.Errorf("previous Write failed: %w", syscall.Errno(-n)) + var idx int + select { + case idx = <-r.reqC: + default: + // No request available. Get one from the kernel. + n, idx, err := waitCompletion(r.ring) + if err != nil { + return nil, fmt.Errorf("Write io_uring call failed: %w", err) + } + if n < 0 { + // Past syscall failed. + r.reqC <- idx // don't leak idx + return nil, fmt.Errorf("previous Write failed: %w", syscall.Errno(-n)) + } } return r.reqs[idx], nil } +// prefetch attempts to fetch a req for use by future writes. +// It does not block. +// TODO: does this actually buy us anything? +// TODO: return errors encountered here, rather than delaying them until later? +func (r *writeRing) prefetch() { + idx, ok := peekCompletion(r.ring) + if ok { + // Put the request buffer back in the usable queue. + // Should never block, by construction. + r.reqC <- idx + } +} + // freeReqs frees the reqs allocated by initReqs. func (r *writeRing) freeReqs() { for _, req := range r.reqs { @@ -60,12 +76,17 @@ func (r *writeRing) freeReqs() { } } +const ( + noBlockForCompletion = 0 + blockForCompletion = 1 +) + // waitCompletion blocks until a completion on ring succeeds, or until *fd == 0. // If *fd == 0, that indicates that the ring is no loner valid, in which case waitCompletion returns net.ErrClosed. // Reads of *fd are atomic. func waitCompletion(ring *C.go_uring) (n, idx int, err error) { for { - r := C.completion(ring) + r := C.completion(ring, blockForCompletion) if syscall.Errno(-r.err) == syscall.EAGAIN || syscall.Errno(-r.err) == syscall.EINTR { continue } @@ -77,6 +98,14 @@ func waitCompletion(ring *C.go_uring) (n, idx int, err error) { } } +func peekCompletion(ring *C.go_uring) (idx int, ok bool) { + r := C.completion(ring, noBlockForCompletion) + if r.err < 0 { + return 0, false + } + return int(r.idx), true +} + // sliceOf returns ptr[:n] as a byte slice. // TODO: replace with unsafe.Slice once we are using Go 1.17. func sliceOf(ptr *C.char, n int) []byte { diff --git a/net/uring/udp_linux.go b/net/uring/udp_linux.go index 3934fdc98..1efb28507 100644 --- a/net/uring/udp_linux.go +++ b/net/uring/udp_linux.go @@ -262,6 +262,8 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { r.sa6.sin6_family = C.AF_INET6 } C.submit_sendmsg_request(u.sendRing.ring, r, C.int(len(p))) + // Get an extra buffer, if available. + u.sendRing.prefetch() return len(p), nil } diff --git a/syncs/syncs.go b/syncs/syncs.go index 68515dc88..ed04fa06c 100644 --- a/syncs/syncs.go +++ b/syncs/syncs.go @@ -105,8 +105,8 @@ func (b *AtomicInt32) Get() int32 { return atomic.LoadInt32((*int32)(b)) } -func (b *AtomicInt32) Add(v int32) int32 { - return atomic.AddInt32((*int32)(b), v) +func (b *AtomicInt32) Add(v int32) { + atomic.AddInt32((*int32)(b), v) } // Semaphore is a counting semaphore.