Revert "eliminate prefetch"
this breaks graceful shutdown. details TBD. This reverts commit a474e79bf8967a573f05d07cee0b1abdbee4608a.
This commit is contained in:
parent
e33d6a049b
commit
ce52a5631b
|
@ -166,6 +166,8 @@ func (u *file) Write(buf []byte) (int, error) {
|
||||||
rbuf := sliceOf(r.buf, len(buf))
|
rbuf := sliceOf(r.buf, len(buf))
|
||||||
copy(rbuf, buf)
|
copy(rbuf, buf)
|
||||||
C.submit_writev_request(u.writeRing.ring, r, C.int(len(buf)))
|
C.submit_writev_request(u.writeRing.ring, r, C.int(len(buf)))
|
||||||
|
// Get an extra buffer, if available.
|
||||||
|
u.writeRing.prefetch()
|
||||||
return len(buf), nil
|
return len(buf), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -136,13 +136,17 @@ struct completion_result {
|
||||||
|
|
||||||
typedef struct completion_result go_completion_result;
|
typedef struct completion_result go_completion_result;
|
||||||
|
|
||||||
static go_completion_result completion(struct io_uring *ring) {
|
static go_completion_result completion(struct io_uring *ring, int block) {
|
||||||
struct io_uring_cqe *cqe;
|
struct io_uring_cqe *cqe;
|
||||||
struct completion_result res;
|
struct completion_result res;
|
||||||
res.err = 0;
|
res.err = 0;
|
||||||
res.n = 0;
|
res.n = 0;
|
||||||
res.idx = 0;
|
res.idx = 0;
|
||||||
res.err = io_uring_wait_cqe(ring, &cqe);
|
if (block) {
|
||||||
|
res.err = io_uring_wait_cqe(ring, &cqe);
|
||||||
|
} else {
|
||||||
|
res.err = io_uring_peek_cqe(ring, &cqe);
|
||||||
|
}
|
||||||
if (res.err < 0) {
|
if (res.err < 0) {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,8 +9,6 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"syscall"
|
"syscall"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
"tailscale.com/syncs"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// A writeRing is an io_uring usable for sendmsg or pwritev calls.
|
// A writeRing is an io_uring usable for sendmsg or pwritev calls.
|
||||||
|
@ -21,11 +19,9 @@ type writeRing struct {
|
||||||
// We dispatch them to the kernel as writes are requested.
|
// We dispatch them to the kernel as writes are requested.
|
||||||
// The array length is tied to the size of the uring.
|
// The array length is tied to the size of the uring.
|
||||||
reqs [8]*C.goreq
|
reqs [8]*C.goreq
|
||||||
// free is the lowed unused index into reqs.
|
// reqC is a channel containing indices into reqs
|
||||||
// It is only useful for the first len(reqs) requests.
|
// that are free to use (that is, not in the kernel).
|
||||||
// After that, we retrieve a completion result from
|
reqC chan int
|
||||||
// the kernel for new writes/sends.
|
|
||||||
free syncs.AtomicInt32
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// initReqs initializes r's reqs so that they can be used for writes/sends.
|
// initReqs initializes r's reqs so that they can be used for writes/sends.
|
||||||
|
@ -33,26 +29,46 @@ func (r *writeRing) initReqs(ipLen int) {
|
||||||
for i := range &r.reqs {
|
for i := range &r.reqs {
|
||||||
r.reqs[i] = C.initializeReq(bufferSize, C.size_t(i), C.int(ipLen))
|
r.reqs[i] = C.initializeReq(bufferSize, C.size_t(i), C.int(ipLen))
|
||||||
}
|
}
|
||||||
|
r.reqC = make(chan int, len(r.reqs))
|
||||||
|
for i := range r.reqs {
|
||||||
|
r.reqC <- i
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getReq gets a req usable for a write/send.
|
// getReq gets a req usable for a write/send.
|
||||||
// It blocks until such a req is available.
|
// It blocks until such a req is available.
|
||||||
func (r *writeRing) getReq() (req *C.goreq, err error) {
|
func (r *writeRing) getReq() (req *C.goreq, err error) {
|
||||||
if idx := r.free.Add(1) - 1; idx < int32(len(r.reqs)) {
|
var idx int
|
||||||
return r.reqs[idx], nil
|
select {
|
||||||
}
|
case idx = <-r.reqC:
|
||||||
// Get a req from the kernel.
|
default:
|
||||||
n, idx, err := waitCompletion(r.ring)
|
// No request available. Get one from the kernel.
|
||||||
if err != nil {
|
n, idx, err := waitCompletion(r.ring)
|
||||||
return nil, fmt.Errorf("Write io_uring call failed: %w", err)
|
if err != nil {
|
||||||
}
|
return nil, fmt.Errorf("Write io_uring call failed: %w", err)
|
||||||
if n < 0 {
|
}
|
||||||
// Past syscall failed.
|
if n < 0 {
|
||||||
return nil, fmt.Errorf("previous Write failed: %w", syscall.Errno(-n))
|
// Past syscall failed.
|
||||||
|
r.reqC <- idx // don't leak idx
|
||||||
|
return nil, fmt.Errorf("previous Write failed: %w", syscall.Errno(-n))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return r.reqs[idx], nil
|
return r.reqs[idx], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// prefetch attempts to fetch a req for use by future writes.
|
||||||
|
// It does not block.
|
||||||
|
// TODO: does this actually buy us anything?
|
||||||
|
// TODO: return errors encountered here, rather than delaying them until later?
|
||||||
|
func (r *writeRing) prefetch() {
|
||||||
|
idx, ok := peekCompletion(r.ring)
|
||||||
|
if ok {
|
||||||
|
// Put the request buffer back in the usable queue.
|
||||||
|
// Should never block, by construction.
|
||||||
|
r.reqC <- idx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// freeReqs frees the reqs allocated by initReqs.
|
// freeReqs frees the reqs allocated by initReqs.
|
||||||
func (r *writeRing) freeReqs() {
|
func (r *writeRing) freeReqs() {
|
||||||
for _, req := range r.reqs {
|
for _, req := range r.reqs {
|
||||||
|
@ -60,12 +76,17 @@ func (r *writeRing) freeReqs() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
noBlockForCompletion = 0
|
||||||
|
blockForCompletion = 1
|
||||||
|
)
|
||||||
|
|
||||||
// waitCompletion blocks until a completion on ring succeeds, or until *fd == 0.
|
// waitCompletion blocks until a completion on ring succeeds, or until *fd == 0.
|
||||||
// If *fd == 0, that indicates that the ring is no loner valid, in which case waitCompletion returns net.ErrClosed.
|
// If *fd == 0, that indicates that the ring is no loner valid, in which case waitCompletion returns net.ErrClosed.
|
||||||
// Reads of *fd are atomic.
|
// Reads of *fd are atomic.
|
||||||
func waitCompletion(ring *C.go_uring) (n, idx int, err error) {
|
func waitCompletion(ring *C.go_uring) (n, idx int, err error) {
|
||||||
for {
|
for {
|
||||||
r := C.completion(ring)
|
r := C.completion(ring, blockForCompletion)
|
||||||
if syscall.Errno(-r.err) == syscall.EAGAIN || syscall.Errno(-r.err) == syscall.EINTR {
|
if syscall.Errno(-r.err) == syscall.EAGAIN || syscall.Errno(-r.err) == syscall.EINTR {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -77,6 +98,14 @@ func waitCompletion(ring *C.go_uring) (n, idx int, err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func peekCompletion(ring *C.go_uring) (idx int, ok bool) {
|
||||||
|
r := C.completion(ring, noBlockForCompletion)
|
||||||
|
if r.err < 0 {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
return int(r.idx), true
|
||||||
|
}
|
||||||
|
|
||||||
// sliceOf returns ptr[:n] as a byte slice.
|
// sliceOf returns ptr[:n] as a byte slice.
|
||||||
// TODO: replace with unsafe.Slice once we are using Go 1.17.
|
// TODO: replace with unsafe.Slice once we are using Go 1.17.
|
||||||
func sliceOf(ptr *C.char, n int) []byte {
|
func sliceOf(ptr *C.char, n int) []byte {
|
||||||
|
|
|
@ -262,6 +262,8 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
|
||||||
r.sa6.sin6_family = C.AF_INET6
|
r.sa6.sin6_family = C.AF_INET6
|
||||||
}
|
}
|
||||||
C.submit_sendmsg_request(u.sendRing.ring, r, C.int(len(p)))
|
C.submit_sendmsg_request(u.sendRing.ring, r, C.int(len(p)))
|
||||||
|
// Get an extra buffer, if available.
|
||||||
|
u.sendRing.prefetch()
|
||||||
return len(p), nil
|
return len(p), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -105,8 +105,8 @@ func (b *AtomicInt32) Get() int32 {
|
||||||
return atomic.LoadInt32((*int32)(b))
|
return atomic.LoadInt32((*int32)(b))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *AtomicInt32) Add(v int32) int32 {
|
func (b *AtomicInt32) Add(v int32) {
|
||||||
return atomic.AddInt32((*int32)(b), v)
|
atomic.AddInt32((*int32)(b), v)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Semaphore is a counting semaphore.
|
// Semaphore is a counting semaphore.
|
||||||
|
|
Loading…
Reference in New Issue