From 4847a89ecf27c9ff62bff739fac88d06a85624bb Mon Sep 17 00:00:00 2001 From: Josh Bleecher Snyder Date: Thu, 8 Jul 2021 13:23:35 -0700 Subject: [PATCH] spit. not so much polish. --- net/uring/io_uring.c | 6 +-- net/uring/io_uring_linux.go | 91 +++++++++++++++++++++---------------- 2 files changed, 56 insertions(+), 41 deletions(-) diff --git a/net/uring/io_uring.c b/net/uring/io_uring.c index 2a3c2aa2f..cb0622363 100644 --- a/net/uring/io_uring.c +++ b/net/uring/io_uring.c @@ -48,7 +48,7 @@ struct req { typedef struct req goreq; -static struct req *initializeReq(size_t sz, int ipVersion) { +static struct req *initializeReq(size_t sz, int ipLen) { struct req *r = malloc(sizeof(struct req)); memset(r, 0, sizeof(*r)); r->buf = malloc(sz); @@ -57,12 +57,12 @@ static struct req *initializeReq(size_t sz, int ipVersion) { r->iov.iov_len = sz; r->hdr.msg_iov = &r->iov; r->hdr.msg_iovlen = 1; - switch(ipVersion) { + switch(ipLen) { case 4: r->hdr.msg_name = &r->sa; r->hdr.msg_namelen = sizeof(r->sa); break; - case 6: + case 16: r->hdr.msg_name = &r->sa6; r->hdr.msg_namelen = sizeof(r->sa6); break; diff --git a/net/uring/io_uring_linux.go b/net/uring/io_uring_linux.go index f36c1dd96..c42203ed7 100644 --- a/net/uring/io_uring_linux.go +++ b/net/uring/io_uring_linux.go @@ -45,6 +45,11 @@ type UDPConn struct { // closed is an atomic variable that indicates whether the connection has been closed. // TODO: Make an atomic bool type that we can use here. closed uint32 + // shutdown is a sequence of funcs to be called when the UDPConn closes. + shutdown []func() + + // file is the os file underlying this connection. + file *os.File // local is the local address of this UDPConn. local net.Addr @@ -61,7 +66,8 @@ type UDPConn struct { // sendReqC is a channel containing indices into sendReqs // that are free to use (that is, not in the kernel). sendReqC chan int - is4 bool + // is4 indicates whether the conn is an IPv4 connection. + is4 bool // reads counts the number of outstanding read requests. // It is accessed atomically. reads int32 @@ -72,52 +78,64 @@ func NewUDPConn(pconn net.PacketConn) (*UDPConn, error) { if !ok { return nil, fmt.Errorf("cannot use io_uring with conn of type %T", pconn) } - // this is dumb - local := conn.LocalAddr().String() - ip, err := netaddr.ParseIPPort(local) - if err != nil { - return nil, fmt.Errorf("failed to parse UDPConn local addr %s as IP: %w", local, err) - } - ipVersion := 6 - if ip.IP().Is4() { - ipVersion = 4 + local := conn.LocalAddr() + udpAddr, ok := local.(*net.UDPAddr) + if !ok { + return nil, fmt.Errorf("cannot use io_uring with conn.LocalAddr of type %T", local) } + // TODO: probe for system capabilities: https://unixism.net/loti/tutorial/probe_liburing.html + file, err := conn.File() if err != nil { return nil, err } // conn.File dup'd the conn's fd. We no longer need the original conn. conn.Close() - recvRing := new(C.go_uring) - sendRing := new(C.go_uring) + + u := &UDPConn{ + recvRing: new(C.go_uring), + sendRing: new(C.go_uring), + file: file, + local: local, + is4: len(udpAddr.IP) == 4, + } fd := file.Fd() - for _, r := range []*C.go_uring{recvRing, sendRing} { - ret := C.initialize(r, C.int(fd)) - if ret < 0 { - // TODO: free recvRing if sendRing initialize failed - return nil, fmt.Errorf("uring initialization failed: %d", ret) - } + u.shutdown = append(u.shutdown, func() { file.Close() }) + + if ret := C.initialize(u.recvRing, C.int(fd)); ret < 0 { + u.doShutdown() + return nil, fmt.Errorf("recvRing initialization failed: %w", syscall.Errno(-ret)) } - u := &UDPConn{ - recvRing: recvRing, - sendRing: sendRing, - local: conn.LocalAddr(), - is4: ipVersion == 4, + u.shutdown = append(u.shutdown, func() { C.io_uring_queue_exit(u.recvRing) }) + + if ret := C.initialize(u.sendRing, C.int(fd)); ret < 0 { + u.doShutdown() + return nil, fmt.Errorf("sendRing initialization failed: %w", syscall.Errno(-ret)) } + u.shutdown = append(u.shutdown, func() { C.io_uring_queue_exit(u.sendRing) }) // Initialize buffers - for _, reqs := range []*[8]*C.goreq{&u.recvReqs, &u.sendReqs} { - for i := range reqs { - reqs[i] = C.initializeReq(bufferSize, C.int(ipVersion)) - } + for i := range u.recvReqs { + u.recvReqs[i] = C.initializeReq(bufferSize, C.int(len(udpAddr.IP))) } + for i := range u.sendReqs { + u.sendReqs[i] = C.initializeReq(bufferSize, C.int(len(udpAddr.IP))) + } + u.shutdown = append(u.shutdown, func() { + for _, r := range u.recvReqs { + C.freeReq(r) + } + for _, r := range u.sendReqs { + C.freeReq(r) + } + }) // Initialize recv half. for i := range u.recvReqs { if err := u.submitRecvRequest(i); err != nil { - u.Close() // TODO: will this crash? + u.doShutdown() return nil, err } } @@ -233,20 +251,17 @@ func (u *UDPConn) Close() error { } // TODO: block until no one else uses our rings. // (Or is that unnecessary now?) - C.io_uring_queue_exit(u.recvRing) - C.io_uring_queue_exit(u.sendRing) - - // Free buffers - for _, r := range u.recvReqs { - C.freeReq(r) - } - for _, r := range u.sendReqs { - C.freeReq(r) - } + u.doShutdown() }) return nil } +func (u *UDPConn) doShutdown() { + for _, fn := range u.shutdown { + fn() + } +} + // Implement net.PacketConn, for convenience integrating with magicsock. var _ net.PacketConn = (*UDPConn)(nil)