wgengine: move link monitor to be owned by the engine, not the router
And make the monitor package portable with no-op implementations on unsupported operating systems. Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
parent
09fbae01a9
commit
7f5e3febe5
|
@ -2,27 +2,33 @@
|
|||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// +build linux freebsd
|
||||
|
||||
// Package monitor provides facilities for monitoring network
|
||||
// interface changes.
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"tailscale.com/types/logger"
|
||||
)
|
||||
|
||||
// Message represents a message returned from a connection.
|
||||
// TODO(]|[): currently messages are being discarded, so the
|
||||
// properties of the message haven't been defined.
|
||||
type Message interface{}
|
||||
// message represents a message returned from an osMon.
|
||||
//
|
||||
// TODO: currently messages are being discarded, so the properties of
|
||||
// the message haven't been defined.
|
||||
type message interface{}
|
||||
|
||||
// Conn represents the connection that is being monitored.
|
||||
type Conn interface {
|
||||
// osMon is the interface that each operating system-specific
|
||||
// implementation of the link monitor must implement.
|
||||
type osMon interface {
|
||||
Close() error
|
||||
Receive() (Message, error)
|
||||
|
||||
// Receive returns a new network interface change message. It
|
||||
// should block until there's either something to return, or
|
||||
// until the osMon is closed. After a Close, the returned
|
||||
// error is ignored.
|
||||
Receive() (message, error)
|
||||
}
|
||||
|
||||
// ChangeFunc is a callback function that's called when
|
||||
|
@ -33,41 +39,68 @@ type ChangeFunc func()
|
|||
type Mon struct {
|
||||
logf logger.Logf
|
||||
cb ChangeFunc
|
||||
conn Conn
|
||||
om osMon // nil means not supported on this platform
|
||||
change chan struct{}
|
||||
stop chan struct{}
|
||||
|
||||
onceStart sync.Once
|
||||
started bool
|
||||
goroutines sync.WaitGroup
|
||||
}
|
||||
|
||||
// New instantiates and starts a monitoring instance. Change notifications
|
||||
// are propagated to the callback function.
|
||||
// The returned monitor is inactive until it's started by the Start method.
|
||||
func New(logf logger.Logf, callback ChangeFunc) (*Mon, error) {
|
||||
conn, err := NewConn()
|
||||
om, err := newOSMon()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret := &Mon{
|
||||
return &Mon{
|
||||
logf: logf,
|
||||
cb: callback,
|
||||
conn: conn,
|
||||
om: om,
|
||||
change: make(chan struct{}, 1),
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
go ret.pump()
|
||||
go ret.debounce()
|
||||
return ret, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close is used to close the underlying connection.
|
||||
// Start starts the monitor.
|
||||
// A monitor can only be started & closed once.
|
||||
func (m *Mon) Start() {
|
||||
m.onceStart.Do(func() {
|
||||
if m.om == nil {
|
||||
return
|
||||
}
|
||||
m.started = true
|
||||
m.goroutines.Add(2)
|
||||
go m.pump()
|
||||
go m.debounce()
|
||||
})
|
||||
}
|
||||
|
||||
// Close closes the monitor.
|
||||
// It may only be called once.
|
||||
func (m *Mon) Close() error {
|
||||
close(m.stop)
|
||||
return m.conn.Close()
|
||||
var err error
|
||||
if m.om != nil {
|
||||
err = m.om.Close()
|
||||
}
|
||||
// If it was previously started, wait for those goroutines to finish.
|
||||
m.onceStart.Do(func() {})
|
||||
if m.started {
|
||||
m.goroutines.Wait()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// pump continuously retrieves messages from the connection, notifying
|
||||
// the change channel of changes, and stopping when a stop is issued.
|
||||
func (m *Mon) pump() {
|
||||
defer m.goroutines.Done()
|
||||
for {
|
||||
_, err := m.conn.Receive()
|
||||
_, err := m.om.Receive()
|
||||
if err != nil {
|
||||
select {
|
||||
case <-m.stop:
|
||||
|
@ -90,6 +123,7 @@ func (m *Mon) pump() {
|
|||
// debounce calls the callback function with a delay between events
|
||||
// and exits when a stop is issued.
|
||||
func (m *Mon) debounce() {
|
||||
defer m.goroutines.Done()
|
||||
for {
|
||||
select {
|
||||
case <-m.stop:
|
||||
|
|
|
@ -11,11 +11,12 @@ import (
|
|||
"strings"
|
||||
)
|
||||
|
||||
// devdConn implements osMon using devd(8).
|
||||
type devdConn struct {
|
||||
conn net.Conn
|
||||
}
|
||||
|
||||
func NewConn() (Conn, error) {
|
||||
func newOSMon() (osMon, error) {
|
||||
conn, err := net.Dial("unixpacket", "/var/run/devd.seqpacket.pipe")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("devd dial error: %v", err)
|
||||
|
@ -30,7 +31,7 @@ func (c *devdConn) Close() error {
|
|||
return c.conn.Close()
|
||||
}
|
||||
|
||||
func (c *devdConn) Receive() (Message, error) {
|
||||
func (c *devdConn) Receive() (message, error) {
|
||||
for {
|
||||
msg, err := bufio.NewReader(c.conn).ReadString('\n')
|
||||
if err != nil {
|
||||
|
|
|
@ -12,8 +12,8 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
RTMGRP_IPV4_IFADDR = 0x10
|
||||
RTMGRP_IPV4_ROUTE = 0x40
|
||||
_RTMGRP_IPV4_IFADDR = 0x10
|
||||
_RTMGRP_IPV4_ROUTE = 0x40
|
||||
)
|
||||
|
||||
// nlConn wraps a *netlink.Conn and returns a monitor.Message
|
||||
|
@ -25,7 +25,7 @@ type nlConn struct {
|
|||
conn *netlink.Conn
|
||||
}
|
||||
|
||||
func NewConn() (Conn, error) {
|
||||
func newOSMon() (osMon, error) {
|
||||
conn, err := netlink.Dial(unix.NETLINK_ROUTE, &netlink.Config{
|
||||
// IPv4 address and route changes. Routes get us most of the
|
||||
// events of interest, but we need address as well to cover
|
||||
|
@ -36,7 +36,7 @@ func NewConn() (Conn, error) {
|
|||
// Why magic numbers? These aren't exposed in x/sys/unix
|
||||
// yet. The values come from rtnetlink.h, RTMGRP_IPV4_IFADDR
|
||||
// and RTMGRP_IPV4_ROUTE.
|
||||
Groups: RTMGRP_IPV4_IFADDR | RTMGRP_IPV4_ROUTE,
|
||||
Groups: _RTMGRP_IPV4_IFADDR | _RTMGRP_IPV4_ROUTE,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("dialing netlink socket: %v", err)
|
||||
|
@ -48,7 +48,7 @@ func (c *nlConn) Close() error {
|
|||
return c.conn.Close()
|
||||
}
|
||||
|
||||
func (c *nlConn) Receive() (Message, error) {
|
||||
func (c *nlConn) Receive() (message, error) {
|
||||
// currently ignoring the message
|
||||
_, err := c.conn.Receive()
|
||||
if err != nil {
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
// Copyright (c) 2020 Tailscale Inc & AUTHORS All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// +build !linux,!freebsd
|
||||
|
||||
package monitor
|
||||
|
||||
func newOSMon() (osMon, error) { return nil, nil }
|
|
@ -34,7 +34,7 @@ type bsdRouter struct {
|
|||
routes map[wgcfg.CIDR]struct{}
|
||||
}
|
||||
|
||||
func newUserspaceRouter(logf logger.Logf, _ *device.Device, tundev tun.Device, _ func()) (Router, error) {
|
||||
func newUserspaceRouter(logf logger.Logf, _ *device.Device, tundev tun.Device) (Router, error) {
|
||||
tunname, err := tundev.Name()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -14,7 +14,7 @@ type darwinRouter struct {
|
|||
tunname string
|
||||
}
|
||||
|
||||
func newUserspaceRouter(logf logger.Logf, _ *device.Device, tundev tun.Device, netChanged func()) (Router, error) {
|
||||
func newUserspaceRouter(logf logger.Logf, _ *device.Device, tundev tun.Device) (Router, error) {
|
||||
tunname, err := tundev.Name()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -12,7 +12,7 @@ import (
|
|||
|
||||
// NewFakeRouter returns a new fake Router implementation whose
|
||||
// implementation does nothing and always returns nil errors.
|
||||
func NewFakeRouter(logf logger.Logf, _ *device.Device, _ tun.Device, netChanged func()) (Router, error) {
|
||||
func NewFakeRouter(logf logger.Logf, _ *device.Device, _ tun.Device) (Router, error) {
|
||||
return fakeRouter{logf: logf}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -20,35 +20,24 @@ import (
|
|||
"github.com/tailscale/wireguard-go/wgcfg"
|
||||
"tailscale.com/atomicfile"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/wgengine/monitor"
|
||||
)
|
||||
|
||||
type linuxRouter struct {
|
||||
logf func(fmt string, args ...interface{})
|
||||
tunname string
|
||||
mon *monitor.Mon
|
||||
netChanged func()
|
||||
local wgcfg.CIDR
|
||||
routes map[wgcfg.CIDR]struct{}
|
||||
logf func(fmt string, args ...interface{})
|
||||
tunname string
|
||||
local wgcfg.CIDR
|
||||
routes map[wgcfg.CIDR]struct{}
|
||||
}
|
||||
|
||||
func newUserspaceRouter(logf logger.Logf, _ *device.Device, tunDev tun.Device, netChanged func()) (Router, error) {
|
||||
// TODO: move monitor out of Router, make it created/owned by Engine
|
||||
mon, err := monitor.New(logf, netChanged)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func newUserspaceRouter(logf logger.Logf, _ *device.Device, tunDev tun.Device) (Router, error) {
|
||||
tunname, err := tunDev.Name()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &linuxRouter{
|
||||
logf: logf,
|
||||
tunname: tunname,
|
||||
mon: mon,
|
||||
netChanged: netChanged,
|
||||
logf: logf,
|
||||
tunname: tunname,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -172,7 +161,6 @@ func (r *linuxRouter) SetRoutes(rs RouteSettings) error {
|
|||
|
||||
func (r *linuxRouter) Close() error {
|
||||
var ret error
|
||||
r.mon.Close()
|
||||
if err := r.restoreResolvConf(); err != nil {
|
||||
r.logf("failed to restore system resolv.conf: %v", err)
|
||||
if ret == nil {
|
||||
|
|
|
@ -21,7 +21,7 @@ type winRouter struct {
|
|||
routeChangeCallback *winipcfg.RouteChangeCallback
|
||||
}
|
||||
|
||||
func newUserspaceRouter(logf logger.Logf, wgdev *device.Device, tundev tun.Device, netChanged func()) (Router, error) {
|
||||
func newUserspaceRouter(logf logger.Logf, wgdev *device.Device, tundev tun.Device) (Router, error) {
|
||||
tunname, err := tundev.Name()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"tailscale.com/types/logger"
|
||||
"tailscale.com/wgengine/filter"
|
||||
"tailscale.com/wgengine/magicsock"
|
||||
"tailscale.com/wgengine/monitor"
|
||||
"tailscale.com/wgengine/packet"
|
||||
)
|
||||
|
||||
|
@ -32,6 +33,7 @@ type userspaceEngine struct {
|
|||
wgdev *device.Device
|
||||
router Router
|
||||
magicConn *magicsock.Conn
|
||||
linkMon *monitor.Mon
|
||||
|
||||
wgLock sync.Mutex // serializes all wgdev operations
|
||||
lastReconfig string
|
||||
|
@ -97,6 +99,12 @@ func newUserspaceEngineAdvanced(logf logger.Logf, tundev tun.Device, routerGen R
|
|||
tundev: tundev,
|
||||
}
|
||||
|
||||
mon, err := monitor.New(logf, func() { e.LinkChange(false) })
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.linkMon = mon
|
||||
|
||||
endpointsFn := func(endpoints []string) {
|
||||
e.mu.Lock()
|
||||
if e.endpoints != nil {
|
||||
|
@ -113,7 +121,6 @@ func newUserspaceEngineAdvanced(logf logger.Logf, tundev tun.Device, routerGen R
|
|||
// TODO(crawshaw): DERP: magicsock.DefaultDERP,
|
||||
EndpointsFunc: endpointsFn,
|
||||
}
|
||||
var err error
|
||||
e.magicConn, err = magicsock.Listen(magicsockOpts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("wgengine: %v", err)
|
||||
|
@ -162,7 +169,7 @@ func newUserspaceEngineAdvanced(logf logger.Logf, tundev tun.Device, routerGen R
|
|||
}
|
||||
}()
|
||||
|
||||
e.router, err = routerGen(logf, e.wgdev, e.tundev, func() { e.LinkChange(false) })
|
||||
e.router, err = routerGen(logf, e.wgdev, e.tundev)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -196,6 +203,7 @@ func newUserspaceEngineAdvanced(logf logger.Logf, tundev tun.Device, routerGen R
|
|||
e.wgdev.Close()
|
||||
return nil, err
|
||||
}
|
||||
e.linkMon.Start()
|
||||
|
||||
return e, nil
|
||||
}
|
||||
|
@ -455,6 +463,7 @@ func (e *userspaceEngine) RequestStatus() {
|
|||
|
||||
func (e *userspaceEngine) Close() {
|
||||
e.Reconfig(&wgcfg.Config{}, nil)
|
||||
e.linkMon.Close()
|
||||
e.router.Close()
|
||||
e.magicConn.Close()
|
||||
close(e.waitCh)
|
||||
|
|
|
@ -62,13 +62,13 @@ func (rs *RouteSettings) OnlyRelevantParts() string {
|
|||
}
|
||||
|
||||
// NewUserspaceRouter returns a new Router for the current platform, using the provided tun device.
|
||||
func NewUserspaceRouter(logf logger.Logf, wgdev *device.Device, tundev tun.Device, netChanged func()) (Router, error) {
|
||||
return newUserspaceRouter(logf, wgdev, tundev, netChanged)
|
||||
func NewUserspaceRouter(logf logger.Logf, wgdev *device.Device, tundev tun.Device) (Router, error) {
|
||||
return newUserspaceRouter(logf, wgdev, tundev)
|
||||
}
|
||||
|
||||
// RouterGen is the signature for the two funcs that create Router implementations:
|
||||
// NewUserspaceRouter (which varies by operating system) and NewFakeRouter.
|
||||
type RouterGen func(logf logger.Logf, wgdev *device.Device, tundev tun.Device, netStateChanged func()) (Router, error)
|
||||
type RouterGen func(logf logger.Logf, wgdev *device.Device, tundev tun.Device) (Router, error)
|
||||
|
||||
// Router is responsible for managing the system route table.
|
||||
//
|
||||
|
|
Loading…
Reference in New Issue