diff --git a/cmd/containerboot/main.go b/cmd/containerboot/main.go index 720eb278b..6d2ad6cfc 100644 --- a/cmd/containerboot/main.go +++ b/cmd/containerboot/main.go @@ -158,6 +158,7 @@ func main() { PodIP: defaultEnv("POD_IP", ""), EnableForwardingOptimizations: defaultBool("TS_EXPERIMENTAL_ENABLE_FORWARDING_OPTIMIZATIONS", false), HealthCheckAddrPort: defaultEnv("TS_HEALTHCHECK_ADDR_PORT", ""), + EgressSvcsCfgPath: defaultEnv("TS_EGRESS_SERVICES_CONFIG_PATH", ""), } if err := cfg.validate(); err != nil { @@ -275,10 +276,8 @@ authLoop: switch *n.State { case ipn.NeedsLogin: if isOneStepConfig(cfg) { - // This could happen if this is the - // first time tailscaled was run for - // this device and the auth key was not - // passed via the configfile. + // This could happen if this is the first time tailscaled was run for this + // device and the auth key was not passed via the configfile. log.Fatalf("invalid state: tailscaled daemon started with a config file, but tailscale is not logged in: ensure you pass a valid auth key in the config file.") } if err := authTailscale(); err != nil { @@ -376,6 +375,9 @@ authLoop: } }) ) + // egressSvcsErrorChan will get an error sent to it if this containerboot instance is configured to expose 1+ + // egress services in HA mode and errored. + var egressSvcsErrorChan = make(chan error) defer t.Stop() // resetTimer resets timer for when to next attempt to resolve the DNS // name for the proxy configured with TS_EXPERIMENTAL_DEST_DNS_NAME. The @@ -401,6 +403,7 @@ authLoop: failedResolveAttempts++ } + var egressSvcsNotify chan ipn.Notify notifyChan := make(chan ipn.Notify) errChan := make(chan error) go func() { @@ -575,31 +578,50 @@ runLoop: h.Unlock() healthzRunner() } + if egressSvcsNotify != nil { + egressSvcsNotify <- n + } } if !startupTasksDone { - // For containerboot instances that act as TCP - // proxies (proxying traffic to an endpoint - // passed via one of the env vars that - // containerbot reads) and store state in a - // Kubernetes Secret, we consider startup tasks - // done at the point when device info has been - // successfully stored to state Secret. - // For all other containerboot instances, if we - // just get to this point the startup tasks can - // be considered done. + // For containerboot instances that act as TCP proxies (proxying traffic to an endpoint + // passed via one of the env vars that containerboot reads) and store state in a + // Kubernetes Secret, we consider startup tasks done at the point when device info has + // been successfully stored to state Secret. For all other containerboot instances, if + // we just get to this point the startup tasks can be considered done. if !isL3Proxy(cfg) || !hasKubeStateStore(cfg) || (currentDeviceEndpoints != deephash.Sum{} && currentDeviceID != deephash.Sum{}) { // This log message is used in tests to detect when all // post-auth configuration is done. log.Println("Startup complete, waiting for shutdown signal") startupTasksDone = true - // Wait on tailscaled process. It won't - // be cleaned up by default when the - // container exits as it is not PID1. - // TODO (irbekrm): perhaps we can - // replace the reaper by a running - // cmd.Wait in a goroutine immediately - // after starting tailscaled? + // Configure egress proxy. Egress proxy will set up firewall rules to proxy + // traffic to tailnet targets configured in the provided configuration file. It + // will then continuously monitor the config file and netmap updates and + // reconfigure the firewall rules as needed. If any of its operations fail, it + // will crash this node. + if cfg.EgressSvcsCfgPath != "" { + log.Printf("configuring egress proxy using configuration file at %s", cfg.EgressSvcsCfgPath) + egressSvcsNotify = make(chan ipn.Notify) + ep := egressProxy{ + cfgPath: cfg.EgressSvcsCfgPath, + nfr: nfr, + kc: kc, + stateSecret: cfg.KubeSecret, + netmapChan: egressSvcsNotify, + podIP: cfg.PodIP, + tailnetAddrs: addrs, + } + go func() { + if err := ep.run(ctx, n); err != nil { + egressSvcsErrorChan <- err + } + }() + } + + // Wait on tailscaled process. It won't be cleaned up by default when the + // container exits as it is not PID1. TODO (irbekrm): perhaps we can replace the + // reaper by a running cmd.Wait in a goroutine immediately after starting + // tailscaled? reaper := func() { defer wg.Done() for { @@ -637,6 +659,8 @@ runLoop: } backendAddrs = newBackendAddrs resetTimer(false) + case e := <-egressSvcsErrorChan: + log.Fatalf("egress proxy failed: %v", e) } } wg.Wait() diff --git a/cmd/containerboot/services.go b/cmd/containerboot/services.go new file mode 100644 index 000000000..a3d7cdad2 --- /dev/null +++ b/cmd/containerboot/services.go @@ -0,0 +1,570 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build linux + +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "net/netip" + "os" + "path/filepath" + "reflect" + "strings" + "time" + + "github.com/fsnotify/fsnotify" + "tailscale.com/ipn" + "tailscale.com/kube/egressservices" + "tailscale.com/kube/kubeclient" + "tailscale.com/tailcfg" + "tailscale.com/util/linuxfw" + "tailscale.com/util/mak" +) + +const tailscaleTunInterface = "tailscale0" + +// This file contains functionality to run containerboot as a proxy that can +// route cluster traffic to one or more tailnet targets, based on portmapping +// rules read from a configfile. Currently (9/2024) this is only used for the +// Kubernetes operator egress proxies. + +// egressProxy knows how to configure firewall rules to route cluster traffic to +// one or more tailnet services. +type egressProxy struct { + cfgPath string // path to egress service config file + + nfr linuxfw.NetfilterRunner // never nil + + kc kubeclient.Client // never nil + stateSecret string // name of the kube state Secret + + netmapChan chan ipn.Notify // chan to receive netmap updates on + + podIP string // never empty string + + // tailnetFQDNs is the egress service FQDN to tailnet IP mappings that + // were last used to configure firewall rules for this proxy. + // TODO(irbekrm): target addresses are also stored in the state Secret. + // Evaluate whether we should retrieve them from there and not store in + // memory at all. + targetFQDNs map[string][]netip.Prefix + + // used to configure firewall rules. + tailnetAddrs []netip.Prefix +} + +// run configures egress proxy firewall rules and ensures that the firewall rules are reconfigured when: +// - the mounted egress config has changed +// - the proxy's tailnet IP addresses have changed +// - tailnet IPs have changed for any backend targets specified by tailnet FQDN +func (ep *egressProxy) run(ctx context.Context, n ipn.Notify) error { + var tickChan <-chan time.Time + var eventChan <-chan fsnotify.Event + // TODO (irbekrm): take a look if this can be pulled into a single func + // shared with serve config loader. + if w, err := fsnotify.NewWatcher(); err != nil { + log.Printf("failed to create fsnotify watcher, timer-only mode: %v", err) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + tickChan = ticker.C + } else { + defer w.Close() + if err := w.Add(filepath.Dir(ep.cfgPath)); err != nil { + return fmt.Errorf("failed to add fsnotify watch: %w", err) + } + eventChan = w.Events + } + + if err := ep.sync(ctx, n); err != nil { + return err + } + for { + var err error + select { + case <-ctx.Done(): + return nil + case <-tickChan: + err = ep.sync(ctx, n) + case <-eventChan: + log.Printf("config file change detected, ensuring firewall config is up to date...") + err = ep.sync(ctx, n) + case n = <-ep.netmapChan: + shouldResync := ep.shouldResync(n) + if shouldResync { + log.Printf("netmap change detected, ensuring firewall config is up to date...") + err = ep.sync(ctx, n) + } + } + if err != nil { + return fmt.Errorf("error syncing egress service config: %w", err) + } + } +} + +// sync triggers an egress proxy config resync. The resync calculates the diff between config and status to determine if +// any firewall rules need to be updated. Currently using status in state Secret as a reference for what is the current +// firewall configuration is good enough because - the status is keyed by the Pod IP - we crash the Pod on errors such +// as failed firewall update +func (ep *egressProxy) sync(ctx context.Context, n ipn.Notify) error { + cfgs, err := ep.getConfigs() + if err != nil { + return fmt.Errorf("error retrieving egress service configs: %w", err) + } + status, err := ep.getStatus(ctx) + if err != nil { + return fmt.Errorf("error retrieving current egress proxy status: %w", err) + } + newStatus, err := ep.syncEgressConfigs(cfgs, status, n) + if err != nil { + return fmt.Errorf("error syncing egress service configs: %w", err) + } + if !servicesStatusIsEqual(newStatus, status) { + if err := ep.setStatus(ctx, newStatus, n); err != nil { + return fmt.Errorf("error setting egress proxy status: %w", err) + } + } + return nil +} + +// addrsHaveChanged returns true if the provided netmap update contains tailnet address change for this proxy node. +// Netmap must not be nil. +func (ep *egressProxy) addrsHaveChanged(n ipn.Notify) bool { + return !reflect.DeepEqual(ep.tailnetAddrs, n.NetMap.SelfNode.Addresses()) +} + +// syncEgressConfigs adds and deletes firewall rules to match the desired +// configuration. It uses the provided status to determine what is currently +// applied and updates the status after a successful sync. +func (ep *egressProxy) syncEgressConfigs(cfgs *egressservices.Configs, status *egressservices.Status, n ipn.Notify) (*egressservices.Status, error) { + if !(wantsServicesConfigured(cfgs) || hasServicesConfigured(status)) { + return nil, nil + } + + // Delete unnecessary services. + if err := ep.deleteUnnecessaryServices(cfgs, status); err != nil { + return nil, fmt.Errorf("error deleting services: %w", err) + + } + newStatus := &egressservices.Status{} + if !wantsServicesConfigured(cfgs) { + return newStatus, nil + } + + // Add new services, update rules for any that have changed. + rulesPerSvcToAdd := make(map[string][]rule, 0) + rulesPerSvcToDelete := make(map[string][]rule, 0) + for svcName, cfg := range *cfgs { + tailnetTargetIPs, err := ep.tailnetTargetIPsForSvc(cfg, n) + if err != nil { + return nil, fmt.Errorf("error determining tailnet target IPs: %w", err) + } + rulesToAdd, rulesToDelete, err := updatesForCfg(svcName, cfg, status, tailnetTargetIPs) + if err != nil { + return nil, fmt.Errorf("error validating service changes: %v", err) + } + log.Printf("syncegressservices: looking at svc %s rulesToAdd %d rulesToDelete %d", svcName, len(rulesToAdd), len(rulesToDelete)) + if len(rulesToAdd) != 0 { + mak.Set(&rulesPerSvcToAdd, svcName, rulesToAdd) + } + if len(rulesToDelete) != 0 { + mak.Set(&rulesPerSvcToDelete, svcName, rulesToDelete) + } + if len(rulesToAdd) != 0 || ep.addrsHaveChanged(n) { + // For each tailnet target, set up SNAT from the local tailnet device address of the matching + // family. + for _, t := range tailnetTargetIPs { + if t.Is6() && !ep.nfr.HasIPV6NAT() { + continue + } + var local netip.Addr + for _, pfx := range n.NetMap.SelfNode.Addresses().All() { + if !pfx.IsSingleIP() { + continue + } + if pfx.Addr().Is4() != t.Is4() { + continue + } + local = pfx.Addr() + break + } + if !local.IsValid() { + return nil, fmt.Errorf("no valid local IP: %v", local) + } + // TODO(irbekrm): only create the SNAT rule if it does not already exist. + if err := ep.nfr.AddSNATRuleForDst(local, t); err != nil { + return nil, fmt.Errorf("error setting up SNAT rule: %w", err) + } + } + } + // Update the status. Status will be written back to the state Secret by the caller. + mak.Set(&newStatus.Services, svcName, &egressservices.ServiceStatus{TailnetTargetIPs: tailnetTargetIPs, TailnetTarget: cfg.TailnetTarget, Ports: cfg.Ports}) + } + + // Actually apply the firewall rules. + if err := ensureRulesAdded(rulesPerSvcToAdd, ep.nfr); err != nil { + return nil, fmt.Errorf("error adding rules: %w", err) + } + if err := ensureRulesDeleted(rulesPerSvcToDelete, ep.nfr); err != nil { + return nil, fmt.Errorf("error deleting rules: %w", err) + } + + return newStatus, nil +} + +// updatesForCfg calculates any rules that need to be added or deleted for an individucal egress service config. +func updatesForCfg(svcName string, cfg egressservices.Config, status *egressservices.Status, tailnetTargetIPs []netip.Addr) ([]rule, []rule, error) { + rulesToAdd := make([]rule, 0) + rulesToDelete := make([]rule, 0) + currentConfig, ok := lookupCurrentConfig(svcName, status) + + // If no rules for service are present yet, add them all. + if !ok { + for _, t := range tailnetTargetIPs { + for ports := range cfg.Ports { + log.Printf("syncegressservices: svc %s adding port %v", svcName, ports) + rulesToAdd = append(rulesToAdd, rule{tailnetPort: ports.TargetPort, containerPort: ports.MatchPort, protocol: ports.Protocol, tailnetIP: t}) + } + } + return rulesToAdd, rulesToDelete, nil + } + + // If there are no backend targets available, delete any currently configured rules. + if len(tailnetTargetIPs) == 0 { + log.Printf("tailnet target for egress service %s does not have any backend addresses, deleting all rules", svcName) + for _, ip := range currentConfig.TailnetTargetIPs { + for ports := range currentConfig.Ports { + rulesToDelete = append(rulesToAdd, rule{tailnetPort: ports.TargetPort, containerPort: ports.MatchPort, protocol: ports.Protocol, tailnetIP: ip}) + } + } + return rulesToAdd, rulesToDelete, nil + } + + // If there are rules present for backend targets that no longer match, delete them. + for _, ip := range currentConfig.TailnetTargetIPs { + var found bool + for _, wantsIP := range tailnetTargetIPs { + if reflect.DeepEqual(ip, wantsIP) { + found = true + break + } + } + if !found { + for ports := range currentConfig.Ports { + rulesToDelete = append(rulesToDelete, rule{tailnetPort: ports.TargetPort, containerPort: ports.MatchPort, protocol: ports.Protocol, tailnetIP: ip}) + } + } + } + + // Sync rules for the currently wanted backend targets. + for _, ip := range tailnetTargetIPs { + + // If the backend target is not yet present in status, add all rules. + var found bool + for _, gotIP := range currentConfig.TailnetTargetIPs { + if reflect.DeepEqual(ip, gotIP) { + found = true + break + } + } + if !found { + for ports := range cfg.Ports { + rulesToAdd = append(rulesToAdd, rule{tailnetPort: ports.TargetPort, containerPort: ports.MatchPort, protocol: ports.Protocol, tailnetIP: ip}) + } + continue + } + + // If the backend target is present in status, check that the + // currently applied rules are up to date. + + // Delete any current portmappings that are no longer present in config. + for port := range currentConfig.Ports { + if _, ok := cfg.Ports[port]; ok { + continue + } + rulesToDelete = append(rulesToDelete, rule{tailnetPort: port.TargetPort, containerPort: port.MatchPort, protocol: port.Protocol, tailnetIP: ip}) + } + + // Add any new portmappings. + for port := range cfg.Ports { + if _, ok := currentConfig.Ports[port]; ok { + continue + } + rulesToAdd = append(rulesToAdd, rule{tailnetPort: port.TargetPort, containerPort: port.MatchPort, protocol: port.Protocol, tailnetIP: ip}) + } + } + return rulesToAdd, rulesToDelete, nil +} + +// deleteUnneccessaryServices ensure that any services found on status, but not +// present in config are deleted. +func (ep *egressProxy) deleteUnnecessaryServices(cfgs *egressservices.Configs, status *egressservices.Status) error { + if !hasServicesConfigured(status) { + return nil + } + if !wantsServicesConfigured(cfgs) { + for svcName, svc := range status.Services { + log.Printf("service %s is no longer required, deleting", svcName) + if err := ensureServiceDeleted(svcName, svc, ep.nfr); err != nil { + return fmt.Errorf("error deleting service %s: %w", svcName, err) + } + } + return nil + } + + for svcName, svc := range status.Services { + if _, ok := (*cfgs)[svcName]; !ok { + log.Printf("service %s is no longer required, deleting", svcName) + if err := ensureServiceDeleted(svcName, svc, ep.nfr); err != nil { + return fmt.Errorf("error deleting service %s: %w", svcName, err) + } + // TODO (irbekrm): also delete the SNAT rule here + } + } + return nil +} + +// getConfigs gets the mounted egress service configuration. +func (ep *egressProxy) getConfigs() (*egressservices.Configs, error) { + j, err := os.ReadFile(ep.cfgPath) + if os.IsNotExist(err) { + return nil, nil + } + if err != nil { + return nil, err + } + if len(j) == 0 || string(j) == "" { + return nil, nil + } + cfg := &egressservices.Configs{} + if err := json.Unmarshal(j, &cfg); err != nil { + return nil, err + } + return cfg, nil +} + +// getStatus gets the current status of the configured firewall. The current +// status is stored in state Secret. Returns nil status if no status that +// applies to the current proxy Pod was found. Uses the Pod IP to determine if a +// status found in the state Secret applies to this proxy Pod. +func (ep *egressProxy) getStatus(ctx context.Context) (*egressservices.Status, error) { + secret, err := ep.kc.GetSecret(ctx, ep.stateSecret) + if err != nil { + return nil, fmt.Errorf("error retrieving state secret: %w", err) + } + status := &egressservices.Status{} + raw, ok := secret.Data[egressservices.KeyEgressServices] + if !ok { + return nil, nil + } + if err := json.Unmarshal([]byte(raw), status); err != nil { + return nil, fmt.Errorf("error unmarshalling previous config: %w", err) + } + if reflect.DeepEqual(status.PodIP, ep.podIP) { + return status, nil + } + return nil, nil +} + +// setStatus writes egress proxy's currently configured firewall to the state +// Secret and updates proxy's tailnet addresses. +func (ep *egressProxy) setStatus(ctx context.Context, status *egressservices.Status, n ipn.Notify) error { + // Pod IP is used to determine if a stored status applies to THIS proxy Pod. + status.PodIP = ep.podIP + secret, err := ep.kc.GetSecret(ctx, ep.stateSecret) + if err != nil { + return fmt.Errorf("error retrieving state Secret: %w", err) + } + bs, err := json.Marshal(status) + if err != nil { + return fmt.Errorf("error marshalling service config: %w", err) + } + secret.Data[egressservices.KeyEgressServices] = bs + patch := kubeclient.JSONPatch{ + Op: "replace", + Path: fmt.Sprintf("/data/%s", egressservices.KeyEgressServices), + Value: bs, + } + if err := ep.kc.JSONPatchSecret(ctx, ep.stateSecret, []kubeclient.JSONPatch{patch}); err != nil { + return fmt.Errorf("error patching state Secret: %w", err) + } + ep.tailnetAddrs = n.NetMap.SelfNode.Addresses().AsSlice() + return nil +} + +// tailnetTargetIPsForSvc returns the tailnet IPs to which traffic for this +// egress service should be proxied. The egress service can be configured by IP +// or by FQDN. If it's configured by IP, just return that. If it's configured by +// FQDN, resolve the FQDN and return the resolved IPs. +func (ep *egressProxy) tailnetTargetIPsForSvc(svc egressservices.Config, n ipn.Notify) (addrs []netip.Addr, err error) { + if svc.TailnetTarget.IP != "" { + addr, err := netip.ParseAddr(svc.TailnetTarget.IP) + if err != nil { + return nil, fmt.Errorf("error parsing tailnet target IP: %w", err) + } + return []netip.Addr{addr}, nil + } + + if svc.TailnetTarget.FQDN == "" { + return nil, errors.New("unexpected egress service config- neither tailnet target IP nor FQDN is set") + } + if n.NetMap == nil { + log.Printf("netmap is not available, unable to determine backend addresses for %s", svc.TailnetTarget.FQDN) + return addrs, nil + } + var ( + node tailcfg.NodeView + nodeFound bool + ) + for _, nn := range n.NetMap.Peers { + if equalFQDNs(nn.Name(), svc.TailnetTarget.FQDN) { + node = nn + nodeFound = true + break + } + } + if nodeFound { + for _, addr := range node.Addresses().AsSlice() { + addrs = append(addrs, addr.Addr()) + } + // Egress target endpoints configured via FQDN are stored, so + // that we can determine if a netmap update should trigger a + // resync. + mak.Set(&ep.targetFQDNs, svc.TailnetTarget.FQDN, node.Addresses().AsSlice()) + } + return addrs, nil +} + +// shouldResync parses netmap update and returns true if the update contains +// changes for which the egress proxy's firewall should be reconfigured. +func (ep *egressProxy) shouldResync(n ipn.Notify) bool { + if n.NetMap == nil { + return false + } + + // If proxy's tailnet addresses have changed, resync. + if !reflect.DeepEqual(n.NetMap.SelfNode.Addresses().AsSlice(), ep.tailnetAddrs) { + log.Printf("node addresses have changed, trigger egress config resync") + ep.tailnetAddrs = n.NetMap.SelfNode.Addresses().AsSlice() + return true + } + + // If the IPs for any of the egress services configured via FQDN have + // changed, resync. + for fqdn, ips := range ep.targetFQDNs { + for _, nn := range n.NetMap.Peers { + if equalFQDNs(nn.Name(), fqdn) { + if !reflect.DeepEqual(ips, nn.Addresses().AsSlice()) { + log.Printf("backend addresses for egress target %q have changed old IPs %v, new IPs %v trigger egress config resync", nn.Name(), ips, nn.Addresses().AsSlice()) + } + return true + } + } + } + return false +} + +// ensureServiceDeleted ensures that any rules for an egress service are removed +// from the firewall configuration. +func ensureServiceDeleted(svcName string, svc *egressservices.ServiceStatus, nfr linuxfw.NetfilterRunner) error { + + // Note that the portmap is needed for iptables based firewall only. + // Nftables group rules for a service in a chain, so there is no need to + // specify individual portmapping based rules. + pms := make([]linuxfw.PortMap, 0) + for pm := range svc.Ports { + pms = append(pms, linuxfw.PortMap{MatchPort: pm.MatchPort, TargetPort: pm.TargetPort, Protocol: pm.Protocol}) + } + + if err := nfr.DeleteSvc(svcName, tailscaleTunInterface, svc.TailnetTargetIPs, pms); err != nil { + return fmt.Errorf("error deleting service %s: %w", svcName, err) + } + return nil +} + +// ensureRulesAdded ensures that all portmapping rules are added to the firewall +// configuration. For any rules that already exist, calling this function is a +// no-op. In case of nftables, a service consists of one or two (one per IP +// family) chains that conain the portmapping rules for the service and the +// chains as needed when this function is called. +func ensureRulesAdded(rulesPerSvc map[string][]rule, nfr linuxfw.NetfilterRunner) error { + for svc, rules := range rulesPerSvc { + for _, rule := range rules { + if rule.tailnetIP.Is6() && !nfr.HasIPV6NAT() { + log.Printf("host does not support IPv6 NAT; skipping IPv6 target %s", rule.tailnetIP) + continue + } + log.Printf("ensureRulesAdded svc %s tailnetTarget %s container port %d tailnet port %d protocol %s", svc, rule.tailnetIP, rule.containerPort, rule.tailnetPort, rule.protocol) + if err := nfr.EnsurePortMapRuleForSvc(svc, tailscaleTunInterface, rule.tailnetIP, linuxfw.PortMap{MatchPort: rule.containerPort, TargetPort: rule.tailnetPort, Protocol: rule.protocol}); err != nil { + return fmt.Errorf("error ensuring rule: %w", err) + } + } + } + return nil +} + +// ensureRulesDeleted ensures that the given rules are deleted from the firewall +// configuration. For any rules that do not exist, calling this funcion is a +// no-op. +func ensureRulesDeleted(rulesPerSvc map[string][]rule, nfr linuxfw.NetfilterRunner) error { + for svc, rules := range rulesPerSvc { + for _, rule := range rules { + if rule.tailnetIP.Is6() && !nfr.HasIPV6NAT() { + log.Printf("host does not support IPv6 NAT; skipping IPv6 target %s", rule.tailnetIP) + continue + } + log.Printf("ensureRulesDeleted svc %s tailnetTarget %s container port %d tailnet port %d protocol %s", svc, rule.tailnetIP, rule.containerPort, rule.tailnetPort, rule.protocol) + if err := nfr.DeletePortMapRuleForSvc(svc, tailscaleTunInterface, rule.tailnetIP, linuxfw.PortMap{MatchPort: rule.containerPort, TargetPort: rule.tailnetPort, Protocol: rule.protocol}); err != nil { + return fmt.Errorf("error deleting rule: %w", err) + } + } + } + return nil +} + +func lookupCurrentConfig(svcName string, status *egressservices.Status) (*egressservices.ServiceStatus, bool) { + if status == nil || len(status.Services) == 0 { + return nil, false + } + c, ok := status.Services[svcName] + return c, ok +} + +func equalFQDNs(s, s1 string) bool { + s, _ = strings.CutSuffix(s, ".") + s1, _ = strings.CutSuffix(s1, ".") + return strings.EqualFold(s, s1) +} + +// rule contains configuration for an egress proxy firewall rule. +type rule struct { + containerPort uint16 // port to match incoming traffic + tailnetPort uint16 // tailnet service port + tailnetIP netip.Addr // tailnet service IP + protocol string +} + +func wantsServicesConfigured(cfgs *egressservices.Configs) bool { + return cfgs != nil && len(*cfgs) != 0 +} + +func hasServicesConfigured(status *egressservices.Status) bool { + return status != nil && len(status.Services) != 0 +} + +func servicesStatusIsEqual(st, st1 *egressservices.Status) bool { + if st == nil && st1 == nil { + return true + } + if st == nil || st1 == nil { + return false + } + st.PodIP = "" + st1.PodIP = "" + return reflect.DeepEqual(*st, *st1) +} diff --git a/cmd/containerboot/services_test.go b/cmd/containerboot/services_test.go new file mode 100644 index 000000000..46f6db1cf --- /dev/null +++ b/cmd/containerboot/services_test.go @@ -0,0 +1,175 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build linux + +package main + +import ( + "net/netip" + "reflect" + "testing" + + "tailscale.com/kube/egressservices" +) + +func Test_updatesForSvc(t *testing.T) { + tailnetIPv4, tailnetIPv6 := netip.MustParseAddr("100.99.99.99"), netip.MustParseAddr("fd7a:115c:a1e0::701:b62a") + tailnetIPv4_1, tailnetIPv6_1 := netip.MustParseAddr("100.88.88.88"), netip.MustParseAddr("fd7a:115c:a1e0::4101:512f") + ports := map[egressservices.PortMap]struct{}{{Protocol: "tcp", MatchPort: 4003, TargetPort: 80}: {}} + ports1 := map[egressservices.PortMap]struct{}{{Protocol: "udp", MatchPort: 4004, TargetPort: 53}: {}} + ports2 := map[egressservices.PortMap]struct{}{{Protocol: "tcp", MatchPort: 4003, TargetPort: 80}: {}, + {Protocol: "tcp", MatchPort: 4005, TargetPort: 443}: {}} + fqdnSpec := egressservices.Config{ + TailnetTarget: egressservices.TailnetTarget{FQDN: "test"}, + Ports: ports, + } + fqdnSpec1 := egressservices.Config{ + TailnetTarget: egressservices.TailnetTarget{FQDN: "test"}, + Ports: ports1, + } + fqdnSpec2 := egressservices.Config{ + TailnetTarget: egressservices.TailnetTarget{IP: tailnetIPv4.String()}, + Ports: ports, + } + fqdnSpec3 := egressservices.Config{ + TailnetTarget: egressservices.TailnetTarget{IP: tailnetIPv4.String()}, + Ports: ports2, + } + r := rule{containerPort: 4003, tailnetPort: 80, protocol: "tcp", tailnetIP: tailnetIPv4} + r1 := rule{containerPort: 4003, tailnetPort: 80, protocol: "tcp", tailnetIP: tailnetIPv6} + r2 := rule{tailnetPort: 53, containerPort: 4004, protocol: "udp", tailnetIP: tailnetIPv4} + r3 := rule{tailnetPort: 53, containerPort: 4004, protocol: "udp", tailnetIP: tailnetIPv6} + r4 := rule{containerPort: 4003, tailnetPort: 80, protocol: "tcp", tailnetIP: tailnetIPv4_1} + r5 := rule{containerPort: 4003, tailnetPort: 80, protocol: "tcp", tailnetIP: tailnetIPv6_1} + r6 := rule{containerPort: 4005, tailnetPort: 443, protocol: "tcp", tailnetIP: tailnetIPv4} + + tests := []struct { + name string + svcName string + tailnetTargetIPs []netip.Addr + podIP string + spec egressservices.Config + status *egressservices.Status + wantRulesToAdd []rule + wantRulesToDelete []rule + }{ + { + name: "add_fqdn_svc_that_does_not_yet_exist", + svcName: "test", + tailnetTargetIPs: []netip.Addr{tailnetIPv4, tailnetIPv6}, + spec: fqdnSpec, + status: &egressservices.Status{}, + wantRulesToAdd: []rule{r, r1}, + wantRulesToDelete: []rule{}, + }, + { + name: "fqdn_svc_already_exists", + svcName: "test", + tailnetTargetIPs: []netip.Addr{tailnetIPv4, tailnetIPv6}, + spec: fqdnSpec, + status: &egressservices.Status{ + Services: map[string]*egressservices.ServiceStatus{"test": { + TailnetTargetIPs: []netip.Addr{tailnetIPv4, tailnetIPv6}, + TailnetTarget: egressservices.TailnetTarget{FQDN: "test"}, + Ports: ports, + }}}, + wantRulesToAdd: []rule{}, + wantRulesToDelete: []rule{}, + }, + { + name: "fqdn_svc_already_exists_add_port_remove_port", + svcName: "test", + tailnetTargetIPs: []netip.Addr{tailnetIPv4, tailnetIPv6}, + spec: fqdnSpec1, + status: &egressservices.Status{ + Services: map[string]*egressservices.ServiceStatus{"test": { + TailnetTargetIPs: []netip.Addr{tailnetIPv4, tailnetIPv6}, + TailnetTarget: egressservices.TailnetTarget{FQDN: "test"}, + Ports: ports, + }}}, + wantRulesToAdd: []rule{r2, r3}, + wantRulesToDelete: []rule{r, r1}, + }, + { + name: "fqdn_svc_already_exists_change_fqdn_backend_ips", + svcName: "test", + tailnetTargetIPs: []netip.Addr{tailnetIPv4_1, tailnetIPv6_1}, + spec: fqdnSpec, + status: &egressservices.Status{ + Services: map[string]*egressservices.ServiceStatus{"test": { + TailnetTargetIPs: []netip.Addr{tailnetIPv4, tailnetIPv6}, + TailnetTarget: egressservices.TailnetTarget{FQDN: "test"}, + Ports: ports, + }}}, + wantRulesToAdd: []rule{r4, r5}, + wantRulesToDelete: []rule{r, r1}, + }, + { + name: "add_ip_service", + svcName: "test", + tailnetTargetIPs: []netip.Addr{tailnetIPv4}, + spec: fqdnSpec2, + status: &egressservices.Status{}, + wantRulesToAdd: []rule{r}, + wantRulesToDelete: []rule{}, + }, + { + name: "add_ip_service_already_exists", + svcName: "test", + tailnetTargetIPs: []netip.Addr{tailnetIPv4}, + spec: fqdnSpec2, + status: &egressservices.Status{ + Services: map[string]*egressservices.ServiceStatus{"test": { + TailnetTargetIPs: []netip.Addr{tailnetIPv4}, + TailnetTarget: egressservices.TailnetTarget{IP: tailnetIPv4.String()}, + Ports: ports, + }}}, + wantRulesToAdd: []rule{}, + wantRulesToDelete: []rule{}, + }, + { + name: "ip_service_add_port", + svcName: "test", + tailnetTargetIPs: []netip.Addr{tailnetIPv4}, + spec: fqdnSpec3, + status: &egressservices.Status{ + Services: map[string]*egressservices.ServiceStatus{"test": { + TailnetTargetIPs: []netip.Addr{tailnetIPv4}, + TailnetTarget: egressservices.TailnetTarget{IP: tailnetIPv4.String()}, + Ports: ports, + }}}, + wantRulesToAdd: []rule{r6}, + wantRulesToDelete: []rule{}, + }, + { + name: "ip_service_delete_port", + svcName: "test", + tailnetTargetIPs: []netip.Addr{tailnetIPv4}, + spec: fqdnSpec, + status: &egressservices.Status{ + Services: map[string]*egressservices.ServiceStatus{"test": { + TailnetTargetIPs: []netip.Addr{tailnetIPv4}, + TailnetTarget: egressservices.TailnetTarget{IP: tailnetIPv4.String()}, + Ports: ports2, + }}}, + wantRulesToAdd: []rule{}, + wantRulesToDelete: []rule{r6}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotRulesToAdd, gotRulesToDelete, err := updatesForCfg(tt.svcName, tt.spec, tt.status, tt.tailnetTargetIPs) + if err != nil { + t.Errorf("updatesForSvc() unexpected error %v", err) + return + } + if !reflect.DeepEqual(gotRulesToAdd, tt.wantRulesToAdd) { + t.Errorf("updatesForSvc() got rulesToAdd = \n%v\n want rulesToAdd \n%v", gotRulesToAdd, tt.wantRulesToAdd) + } + if !reflect.DeepEqual(gotRulesToDelete, tt.wantRulesToDelete) { + t.Errorf("updatesForSvc() got rulesToDelete = \n%v\n want rulesToDelete \n%v", gotRulesToDelete, tt.wantRulesToDelete) + } + }) + } +} diff --git a/cmd/containerboot/settings.go b/cmd/containerboot/settings.go index c61996949..d72aefbdf 100644 --- a/cmd/containerboot/settings.go +++ b/cmd/containerboot/settings.go @@ -64,6 +64,7 @@ type settings struct { // target. PodIP string HealthCheckAddrPort string + EgressSvcsCfgPath string } func (s *settings) validate() error { @@ -198,7 +199,7 @@ func isOneStepConfig(cfg *settings) bool { // as an L3 proxy, proxying to an endpoint provided via one of the config env // vars. func isL3Proxy(cfg *settings) bool { - return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress + return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress || cfg.EgressSvcsCfgPath != "" } // hasKubeStateStore returns true if the state must be stored in a Kubernetes diff --git a/kube/egressservices/egressservices.go b/kube/egressservices/egressservices.go new file mode 100644 index 000000000..32a75b987 --- /dev/null +++ b/kube/egressservices/egressservices.go @@ -0,0 +1,103 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +// Package egressservices contains shared types for exposing tailnet services to +// cluster workloads. +// These are split into a separate package for consumption of +// non-Kubernetes shared libraries and binaries. Be mindful of not increasing +// dependency size for those consumers when adding anything new here. +package egressservices + +import ( + "encoding" + "fmt" + "net/netip" + "strconv" + "strings" +) + +// KeyEgressServices is name of the proxy state Secret field that contains the +// currently applied egress proxy config. +const KeyEgressServices = "egress-services" + +// Configs contains the desired configuration for egress services keyed by +// service name. +type Configs map[string]Config + +// Config is an egress service configuration. +// TODO(irbekrm): version this? +type Config struct { + // TailnetTarget is the target to which cluster traffic for this service + // should be proxied. + TailnetTarget TailnetTarget `json:"tailnetTarget"` + // Ports contains mappings for ports that can be accessed on the tailnet target. + Ports map[PortMap]struct{} `json:"ports"` +} + +// TailnetTarget is the tailnet target to which traffic for the egress service +// should be proxied. Exactly one of IP or FQDN should be set. +type TailnetTarget struct { + // IP is the tailnet IP of the target. + IP string `json:"ip"` + // FQDN is the full tailnet FQDN of the target. + FQDN string `json:"fqdn"` +} + +// PorMap is a mapping between match port on which proxy receives cluster +// traffic and target port where traffic received on match port should be +// fowardded to. +type PortMap struct { + Protocol string `json:"protocol"` + MatchPort uint16 `json:"matchPort"` + TargetPort uint16 `json:"targetPort"` +} + +// PortMap is used as a Config.Ports map key. Config needs to be serialized/deserialized to/from JSON. JSON only +// supports string map keys, so we need to implement TextMarshaler/TextUnmarshaler to convert PortMap to string and +// back. +var _ encoding.TextMarshaler = PortMap{} +var _ encoding.TextUnmarshaler = &PortMap{} + +func (pm *PortMap) UnmarshalText(t []byte) error { + tt := string(t) + ss := strings.Split(tt, ":") + if len(ss) != 3 { + return fmt.Errorf("error unmarshalling portmap from JSON, wants a portmap in form ::, got %q", tt) + } + (*pm).Protocol = ss[0] + matchPort, err := strconv.ParseUint(ss[1], 10, 16) + if err != nil { + return fmt.Errorf("error converting match port %q to uint16: %w", ss[1], err) + } + (*pm).MatchPort = uint16(matchPort) + targetPort, err := strconv.ParseUint(ss[2], 10, 16) + if err != nil { + return fmt.Errorf("error converting target port %q to uint16: %w", ss[2], err) + } + (*pm).TargetPort = uint16(targetPort) + return nil +} + +func (pm PortMap) MarshalText() ([]byte, error) { + s := fmt.Sprintf("%s:%d:%d", pm.Protocol, pm.MatchPort, pm.TargetPort) + return []byte(s), nil +} + +// Status represents the currently configured firewall rules for all egress +// services for a proxy identified by the PodIP. +type Status struct { + PodIP string `json:"podIP"` + // All egress service status keyed by service name. + Services map[string]*ServiceStatus `json:"services"` +} + +// ServiceStatus is the currently configured firewall rules for an egress +// service. +type ServiceStatus struct { + Ports map[PortMap]struct{} `json:"ports"` + // TailnetTargetIPs are the tailnet target IPs that were used to + // configure these firewall rules. For a TailnetTarget with IP set, this + // is the same as IP. + TailnetTargetIPs []netip.Addr `json:"tailnetTargetIPs"` + TailnetTarget TailnetTarget `json:"tailnetTarget"` +} diff --git a/kube/egressservices/egressservices_test.go b/kube/egressservices/egressservices_test.go new file mode 100644 index 000000000..5e5651e77 --- /dev/null +++ b/kube/egressservices/egressservices_test.go @@ -0,0 +1,76 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package egressservices + +import ( + "encoding/json" + "reflect" + "testing" +) + +func Test_jsonUnmarshalConfig(t *testing.T) { + tests := []struct { + name string + bs []byte + wantsCfg Config + wantsErr bool + }{ + { + name: "success", + bs: []byte(`{"ports":{"tcp:4003:80":{}}}`), + wantsCfg: Config{Ports: map[PortMap]struct{}{{Protocol: "tcp", MatchPort: 4003, TargetPort: 80}: {}}}, + }, + { + name: "failure_invalid_format", + bs: []byte(`{"ports":{"tcp:80":{}}}`), + wantsCfg: Config{Ports: map[PortMap]struct{}{}}, + wantsErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := Config{} + if gotErr := json.Unmarshal(tt.bs, &cfg); (gotErr != nil) != tt.wantsErr { + t.Errorf("json.Unmarshal returned error %v, wants error %v", gotErr, tt.wantsErr) + } + if !reflect.DeepEqual(cfg, tt.wantsCfg) { + t.Errorf("json.Unmarshal produced Config %v, wants Config %v", cfg, tt.wantsCfg) + } + }) + } +} + +func Test_jsonMarshalConfig(t *testing.T) { + tests := []struct { + name string + protocol string + matchPort uint16 + targetPort uint16 + wantsBs []byte + }{ + { + name: "success", + protocol: "tcp", + matchPort: 4003, + targetPort: 80, + wantsBs: []byte(`{"tailnetTarget":{"ip":"","fqdn":""},"ports":{"tcp:4003:80":{}}}`), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := Config{Ports: map[PortMap]struct{}{{ + Protocol: tt.protocol, + MatchPort: tt.matchPort, + TargetPort: tt.targetPort}: {}}} + + gotBs, gotErr := json.Marshal(&cfg) + if gotErr != nil { + t.Errorf("json.Marshal(%+#v) returned unexpected error %v", cfg, gotErr) + } + if !reflect.DeepEqual(gotBs, tt.wantsBs) { + t.Errorf("json.Marshal(%+#v) returned '%v', wants '%v'", cfg, string(gotBs), string(tt.wantsBs)) + } + }) + } +} diff --git a/kube/kubeclient/client.go b/kube/kubeclient/client.go index 35cb4f713..e8ddec75d 100644 --- a/kube/kubeclient/client.go +++ b/kube/kubeclient/client.go @@ -257,8 +257,8 @@ type JSONPatch struct { // It currently (2023-03-02) only supports "add" and "remove" operations. func (c *client) JSONPatchSecret(ctx context.Context, name string, patch []JSONPatch) error { for _, p := range patch { - if p.Op != "remove" && p.Op != "add" { - panic(fmt.Errorf("unsupported JSON patch operation: %q", p.Op)) + if p.Op != "remove" && p.Op != "add" && p.Op != "replace" { + return fmt.Errorf("unsupported JSON patch operation: %q", p.Op) } } return c.doRequest(ctx, "PATCH", c.secretURL(name), patch, nil, setHeader("Content-Type", "application/json-patch+json")) diff --git a/util/linuxfw/iptables_for_svcs.go b/util/linuxfw/iptables_for_svcs.go new file mode 100644 index 000000000..8e0f5d48d --- /dev/null +++ b/util/linuxfw/iptables_for_svcs.go @@ -0,0 +1,79 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build linux + +package linuxfw + +import ( + "fmt" + "net/netip" +) + +// This file contains functionality to insert portmapping rules for a 'service'. +// These are currently only used by the Kubernetes operator proxies. +// An iptables rule for such a service contains a comment with the service name. + +// EnsurePortMapRuleForSvc adds a prerouting rule that forwards traffic received +// on match port and NOT on the provided interface to target IP and target port. +// Rule will only be added if it does not already exists. +func (i *iptablesRunner) EnsurePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm PortMap) error { + table := i.getIPTByAddr(targetIP) + args := argsForPortMapRule(svc, tun, targetIP, pm) + exists, err := table.Exists("nat", "PREROUTING", args...) + if err != nil { + return fmt.Errorf("error checking if rule exists: %w", err) + } + if !exists { + return table.Append("nat", "PREROUTING", args...) + } + return nil +} + +// DeleteMapRuleForSvc constructs a prerouting rule as would be created by +// EnsurePortMapRuleForSvc with the provided args and, if such a rule exists, +// deletes it. +func (i *iptablesRunner) DeletePortMapRuleForSvc(svc, excludeI string, targetIP netip.Addr, pm PortMap) error { + table := i.getIPTByAddr(targetIP) + args := argsForPortMapRule(svc, excludeI, targetIP, pm) + exists, err := table.Exists("nat", "PREROUTING", args...) + if err != nil { + return fmt.Errorf("error checking if rule exists: %w", err) + } + if exists { + return table.Delete("nat", "PREROUTING", args...) + } + return nil +} + +// DeleteSvc constructs all possible rules that would have been created by +// EnsurePortMapRuleForSvc from the provided args and ensures that each one that +// exists is deleted. +func (i *iptablesRunner) DeleteSvc(svc, tun string, targetIPs []netip.Addr, pms []PortMap) error { + for _, tip := range targetIPs { + for _, pm := range pms { + if err := i.DeletePortMapRuleForSvc(svc, tun, tip, pm); err != nil { + return fmt.Errorf("error deleting rule: %w", err) + } + } + } + return nil +} + +func argsForPortMapRule(svc, excludeI string, targetIP netip.Addr, pm PortMap) []string { + c := commentForSvc(svc, pm) + return []string{ + "!", "-i", excludeI, + "-p", pm.Protocol, + "--dport", fmt.Sprintf("%d", pm.MatchPort), + "-m", "comment", "--comment", c, + "-j", "DNAT", + "--to-destination", fmt.Sprintf("%v:%v", targetIP, pm.TargetPort), + } +} + +// commentForSvc generates a comment to be added to an iptables DNAT rule for a +// service. This is for iptables debugging/readability purposes only. +func commentForSvc(svc string, pm PortMap) string { + return fmt.Sprintf("%s:%s:%d -> %s:%d", svc, pm.Protocol, pm.MatchPort, pm.Protocol, pm.TargetPort) +} diff --git a/util/linuxfw/iptables_for_svcs_test.go b/util/linuxfw/iptables_for_svcs_test.go new file mode 100644 index 000000000..99b2f517f --- /dev/null +++ b/util/linuxfw/iptables_for_svcs_test.go @@ -0,0 +1,196 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build linux + +package linuxfw + +import ( + "net/netip" + "testing" +) + +func Test_iptablesRunner_EnsurePortMapRuleForSvc(t *testing.T) { + v4Addr := netip.MustParseAddr("10.0.0.4") + v6Addr := netip.MustParseAddr("fd7a:115c:a1e0::701:b62a") + testPM := PortMap{Protocol: "tcp", MatchPort: 4003, TargetPort: 80} + testPM2 := PortMap{Protocol: "udp", MatchPort: 4004, TargetPort: 53} + v4Rule := argsForPortMapRule("test-svc", "tailscale0", v4Addr, testPM) + tests := []struct { + name string + targetIP netip.Addr + svc string + pm PortMap + precreateSvcRules [][]string + }{ + { + name: "pm_for_ipv4", + targetIP: v4Addr, + svc: "test-svc", + pm: testPM, + }, + { + name: "pm_for_ipv6", + targetIP: v6Addr, + svc: "test-svc-2", + pm: testPM2, + }, + { + name: "add_existing_rule", + targetIP: v4Addr, + svc: "test-svc", + pm: testPM, + precreateSvcRules: [][]string{v4Rule}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + iptr := NewFakeIPTablesRunner() + table := iptr.getIPTByAddr(tt.targetIP) + for _, ruleset := range tt.precreateSvcRules { + mustPrecreatePortMapRule(t, ruleset, table) + } + if err := iptr.EnsurePortMapRuleForSvc(tt.svc, "tailscale0", tt.targetIP, tt.pm); err != nil { + t.Errorf("[unexpected error] iptablesRunner.EnsurePortMapRuleForSvc() = %v", err) + } + args := argsForPortMapRule(tt.svc, "tailscale0", tt.targetIP, tt.pm) + exists, err := table.Exists("nat", "PREROUTING", args...) + if err != nil { + t.Fatalf("error checking if rule exists: %v", err) + } + if !exists { + t.Errorf("expected rule was not created") + } + }) + } +} + +func Test_iptablesRunner_DeletePortMapRuleForSvc(t *testing.T) { + v4Addr := netip.MustParseAddr("10.0.0.4") + v6Addr := netip.MustParseAddr("fd7a:115c:a1e0::701:b62a") + testPM := PortMap{Protocol: "tcp", MatchPort: 4003, TargetPort: 80} + v4Rule := argsForPortMapRule("test", "tailscale0", v4Addr, testPM) + v6Rule := argsForPortMapRule("test", "tailscale0", v6Addr, testPM) + + tests := []struct { + name string + targetIP netip.Addr + svc string + pm PortMap + precreateSvcRules [][]string + }{ + { + name: "multiple_rules_ipv4_deleted", + targetIP: v4Addr, + svc: "test", + pm: testPM, + precreateSvcRules: [][]string{v4Rule, v6Rule}, + }, + { + name: "multiple_rules_ipv6_deleted", + targetIP: v6Addr, + svc: "test", + pm: testPM, + precreateSvcRules: [][]string{v4Rule, v6Rule}, + }, + { + name: "non-existent_rule_deleted", + targetIP: v4Addr, + svc: "test", + pm: testPM, + precreateSvcRules: [][]string{v6Rule}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + iptr := NewFakeIPTablesRunner() + table := iptr.getIPTByAddr(tt.targetIP) + for _, ruleset := range tt.precreateSvcRules { + mustPrecreatePortMapRule(t, ruleset, table) + } + if err := iptr.DeletePortMapRuleForSvc(tt.svc, "tailscale0", tt.targetIP, tt.pm); err != nil { + t.Errorf("iptablesRunner.DeletePortMapRuleForSvc() errored: %v ", err) + } + deletedRule := argsForPortMapRule(tt.svc, "tailscale0", tt.targetIP, tt.pm) + exists, err := table.Exists("nat", "PREROUTING", deletedRule...) + if err != nil { + t.Fatalf("error verifying that rule does not exist after deletion: %v", err) + } + if exists { + t.Errorf("portmap rule exists after deletion") + } + }) + } +} + +func Test_iptablesRunner_DeleteSvc(t *testing.T) { + v4Addr := netip.MustParseAddr("10.0.0.4") + v6Addr := netip.MustParseAddr("fd7a:115c:a1e0::701:b62a") + testPM := PortMap{Protocol: "tcp", MatchPort: 4003, TargetPort: 80} + iptr := NewFakeIPTablesRunner() + + // create two rules that will consitute svc1 + s1R1 := argsForPortMapRule("svc1", "tailscale0", v4Addr, testPM) + mustPrecreatePortMapRule(t, s1R1, iptr.getIPTByAddr(v4Addr)) + s1R2 := argsForPortMapRule("svc1", "tailscale0", v6Addr, testPM) + mustPrecreatePortMapRule(t, s1R2, iptr.getIPTByAddr(v6Addr)) + + // create two rules that will consitute svc2 + s2R1 := argsForPortMapRule("svc2", "tailscale0", v4Addr, testPM) + mustPrecreatePortMapRule(t, s2R1, iptr.getIPTByAddr(v4Addr)) + s2R2 := argsForPortMapRule("svc2", "tailscale0", v6Addr, testPM) + mustPrecreatePortMapRule(t, s2R2, iptr.getIPTByAddr(v6Addr)) + + // delete svc1 + if err := iptr.DeleteSvc("svc1", "tailscale0", []netip.Addr{v4Addr, v6Addr}, []PortMap{testPM}); err != nil { + t.Fatalf("error deleting service: %v", err) + } + + // validate that svc1 no longer exists + svcMustNotExist(t, "svc1", map[string][]string{v4Addr.String(): s1R1, v6Addr.String(): s1R2}, iptr) + + // validate that svc2 still exists + svcMustExist(t, "svc2", map[string][]string{v4Addr.String(): s2R1, v6Addr.String(): s2R2}, iptr) +} + +func svcMustExist(t *testing.T, svcName string, rules map[string][]string, iptr *iptablesRunner) { + t.Helper() + for dst, ruleset := range rules { + tip := netip.MustParseAddr(dst) + exists, err := iptr.getIPTByAddr(tip).Exists("nat", "PREROUTING", ruleset...) + if err != nil { + t.Fatalf("error checking whether %s exists: %v", svcName, err) + } + if !exists { + t.Fatalf("service %s should be deleted,but found rule for %s", svcName, dst) + } + } +} + +func svcMustNotExist(t *testing.T, svcName string, rules map[string][]string, iptr *iptablesRunner) { + t.Helper() + for dst, ruleset := range rules { + tip := netip.MustParseAddr(dst) + exists, err := iptr.getIPTByAddr(tip).Exists("nat", "PREROUTING", ruleset...) + if err != nil { + t.Fatalf("error checking whether %s exists: %v", svcName, err) + } + if exists { + t.Fatalf("service %s should exist, but rule for %s is missing", svcName, dst) + } + } +} + +func mustPrecreatePortMapRule(t *testing.T, rules []string, table iptablesInterface) { + t.Helper() + exists, err := table.Exists("nat", "PREROUTING", rules...) + if err != nil { + t.Fatalf("error ensuring that nat PREROUTING table exists: %v", err) + } + if exists { + return + } + if err := table.Append("nat", "PREROUTING", rules...); err != nil { + t.Fatalf("error precreating portmap rule: %v", err) + } +} diff --git a/util/linuxfw/iptables_runner.go b/util/linuxfw/iptables_runner.go index 507f6cd48..e221ad596 100644 --- a/util/linuxfw/iptables_runner.go +++ b/util/linuxfw/iptables_runner.go @@ -682,7 +682,7 @@ func delTSHook(ipt iptablesInterface, table, chain string, logf logger.Logf) err return nil } -// delChain flushs and deletes a chain. If the chain does not exist, it's a no-op +// delChain flushes and deletes a chain. If the chain does not exist, it's a no-op // since the desired state is already achieved. otherwise, it returns an error. func delChain(ipt iptablesInterface, table, chain string) error { if err := ipt.ClearChain(table, chain); err != nil { diff --git a/util/linuxfw/nftables_for_svcs.go b/util/linuxfw/nftables_for_svcs.go new file mode 100644 index 000000000..130585b22 --- /dev/null +++ b/util/linuxfw/nftables_for_svcs.go @@ -0,0 +1,245 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build linux + +package linuxfw + +import ( + "errors" + "fmt" + "net/netip" + "reflect" + "strings" + + "github.com/google/nftables" + "github.com/google/nftables/binaryutil" + "github.com/google/nftables/expr" + "golang.org/x/sys/unix" +) + +// This file contains functionality that is currently (09/2024) used to set up +// routing for the Tailscale Kubernetes operator egress proxies. A tailnet +// service (identified by tailnet IP or FQDN) that gets exposed to cluster +// workloads gets a separate prerouting chain created for it for each IP family +// of the chain's target addresses. Each service's prerouting chain contains one +// or more portmapping rules. A portmapping rule DNATs traffic received on a +// particular port to a port of the tailnet service. Creating a chain per +// service makes it easier to delete a service when no longer needed and helps +// with readability. + +// EnsurePortMapRuleForSvc: +// - ensures that nat table exists +// - ensures that there is a prerouting chain for the given service and IP family of the target address in the nat table +// - ensures that there is a portmapping rule mathcing the given portmap (only creates the rule if it does not already exist) +func (n *nftablesRunner) EnsurePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm PortMap) error { + t, ch, err := n.ensureChainForSvc(svc, targetIP) + if err != nil { + return fmt.Errorf("error ensuring chain for %s: %w", svc, err) + } + meta := svcPortMapRuleMeta(svc, targetIP, pm) + rule, err := n.findRuleByMetadata(t, ch, meta) + if err != nil { + return fmt.Errorf("error looking up rule: %w", err) + } + if rule != nil { + return nil + } + p, err := protoFromString(pm.Protocol) + if err != nil { + return fmt.Errorf("error converting protocol %s: %w", pm.Protocol, err) + } + + rule = portMapRule(t, ch, tun, targetIP, pm.MatchPort, pm.TargetPort, p, meta) + n.conn.InsertRule(rule) + return n.conn.Flush() +} + +// DeletePortMapRuleForSvc deletes a portmapping rule in the given service/IP family chain. +// It finds the matching rule using metadata attached to the rule. +// The caller is expected to call DeleteSvc if the whole service (the chain) +// needs to be deleted, so we don't deal with the case where this is the only +// rule in the chain here. +func (n *nftablesRunner) DeletePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm PortMap) error { + table, err := n.getNFTByAddr(targetIP) + if err != nil { + return fmt.Errorf("error setting up nftables for IP family of %s: %w", targetIP, err) + } + t, err := getTableIfExists(n.conn, table.Proto, "nat") + if err != nil { + return fmt.Errorf("error checking if nat table exists: %w", err) + } + if t == nil { + return nil + } + ch, err := getChainFromTable(n.conn, t, svc) + if err != nil && !errors.Is(err, errorChainNotFound{t.Name, svc}) { + return fmt.Errorf("error checking if chain %s exists: %w", svc, err) + } + if errors.Is(err, errorChainNotFound{t.Name, svc}) { + return nil // service chain does not exist, so neither does the portmapping rule + } + meta := svcPortMapRuleMeta(svc, targetIP, pm) + rule, err := n.findRuleByMetadata(t, ch, meta) + if err != nil { + return fmt.Errorf("error checking if rule exists: %w", err) + } + if rule == nil { + return nil + } + if err := n.conn.DelRule(rule); err != nil { + return fmt.Errorf("error deleting rule: %w", err) + } + return n.conn.Flush() +} + +// DeleteSvc deletes the chains for the given service if any exist. +func (n *nftablesRunner) DeleteSvc(svc, tun string, targetIPs []netip.Addr, pm []PortMap) error { + for _, tip := range targetIPs { + table, err := n.getNFTByAddr(tip) + if err != nil { + return fmt.Errorf("error setting up nftables for IP family of %s: %w", tip, err) + } + t, err := getTableIfExists(n.conn, table.Proto, "nat") + if err != nil { + return fmt.Errorf("error checking if nat table exists: %w", err) + } + if t == nil { + return nil + } + ch, err := getChainFromTable(n.conn, t, svc) + if err != nil && !errors.Is(err, errorChainNotFound{t.Name, svc}) { + return fmt.Errorf("error checking if chain %s exists: %w", svc, err) + } + if errors.Is(err, errorChainNotFound{t.Name, svc}) { + return nil + } + n.conn.DelChain(ch) + } + return n.conn.Flush() +} + +func portMapRule(t *nftables.Table, ch *nftables.Chain, tun string, targetIP netip.Addr, matchPort, targetPort uint16, proto uint8, meta []byte) *nftables.Rule { + var fam uint32 + if targetIP.Is4() { + fam = unix.NFPROTO_IPV4 + } else { + fam = unix.NFPROTO_IPV6 + } + rule := &nftables.Rule{ + Table: t, + Chain: ch, + UserData: meta, + Exprs: []expr.Any{ + &expr.Meta{Key: expr.MetaKeyOIFNAME, Register: 1}, + &expr.Cmp{ + Op: expr.CmpOpNeq, + Register: 1, + Data: []byte(tun), + }, + &expr.Meta{Key: expr.MetaKeyL4PROTO, Register: 1}, + &expr.Cmp{ + Op: expr.CmpOpEq, + Register: 1, + Data: []byte{proto}, + }, + &expr.Payload{ + DestRegister: 1, + Base: expr.PayloadBaseTransportHeader, + Offset: 2, + Len: 2, + }, + &expr.Cmp{ + Op: expr.CmpOpEq, + Register: 1, + Data: binaryutil.BigEndian.PutUint16(matchPort), + }, + &expr.Immediate{ + Register: 1, + Data: targetIP.AsSlice(), + }, + &expr.Immediate{ + Register: 2, + Data: binaryutil.BigEndian.PutUint16(targetPort), + }, + &expr.NAT{ + Type: expr.NATTypeDestNAT, + Family: fam, + RegAddrMin: 1, + RegAddrMax: 1, + RegProtoMin: 2, + RegProtoMax: 2, + }, + }, + } + return rule +} + +// svcPortMapRuleMeta generates metadata for a rule. +// This metadata can then be used to find the rule. +// https://github.com/google/nftables/issues/48 +func svcPortMapRuleMeta(svcName string, targetIP netip.Addr, pm PortMap) []byte { + return []byte(fmt.Sprintf("svc:%s,targetIP:%s:matchPort:%v,targetPort:%v,proto:%v", svcName, targetIP.String(), pm.MatchPort, pm.TargetPort, pm.Protocol)) +} + +func (n *nftablesRunner) findRuleByMetadata(t *nftables.Table, ch *nftables.Chain, meta []byte) (*nftables.Rule, error) { + if n.conn == nil || t == nil || ch == nil || len(meta) == 0 { + return nil, nil + } + rules, err := n.conn.GetRules(t, ch) + if err != nil { + return nil, fmt.Errorf("error listing rules: %w", err) + } + for _, rule := range rules { + if reflect.DeepEqual(rule.UserData, meta) { + return rule, nil + } + } + return nil, nil +} + +func (n *nftablesRunner) ensureChainForSvc(svc string, targetIP netip.Addr) (*nftables.Table, *nftables.Chain, error) { + polAccept := nftables.ChainPolicyAccept + table, err := n.getNFTByAddr(targetIP) + if err != nil { + return nil, nil, fmt.Errorf("error setting up nftables for IP family of %v: %w", targetIP, err) + } + nat, err := createTableIfNotExist(n.conn, table.Proto, "nat") + if err != nil { + return nil, nil, fmt.Errorf("error ensuring nat table: %w", err) + } + svcCh, err := getOrCreateChain(n.conn, chainInfo{ + table: nat, + name: svc, + chainType: nftables.ChainTypeNAT, + chainHook: nftables.ChainHookPrerouting, + chainPriority: nftables.ChainPriorityNATDest, + chainPolicy: &polAccept, + }) + if err != nil { + return nil, nil, fmt.Errorf("error ensuring prerouting chain: %w", err) + } + return nat, svcCh, nil +} + +// // PortMap is the port mapping for a service rule. +type PortMap struct { + // MatchPort is the local port to which the rule should apply. + MatchPort uint16 + // TargetPort is the port to which the traffic should be forwarded. + TargetPort uint16 + // Protocol is the protocol to match packets on. Only TCP and UDP are + // supported. + Protocol string +} + +func protoFromString(s string) (uint8, error) { + switch strings.ToLower(s) { + case "tcp": + return unix.IPPROTO_TCP, nil + case "udp": + return unix.IPPROTO_UDP, nil + default: + return 0, fmt.Errorf("unrecognized protocol: %q", s) + } +} diff --git a/util/linuxfw/nftables_for_svcs_test.go b/util/linuxfw/nftables_for_svcs_test.go new file mode 100644 index 000000000..8a735d602 --- /dev/null +++ b/util/linuxfw/nftables_for_svcs_test.go @@ -0,0 +1,156 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build linux + +package linuxfw + +import ( + "net/netip" + "testing" + + "github.com/google/nftables" +) + +// This test creates a temporary network namespace for the nftables rules being +// set up, so it needs to run in a privileged mode. Locally it needs to be run +// by root, else it will be silently skipped. In CI it runs in a privileged +// container. +func Test_nftablesRunner_EnsurePortMapRuleForSvc(t *testing.T) { + conn := newSysConn(t) + runner := newFakeNftablesRunnerWithConn(t, conn, true) + ipv4, ipv6 := netip.MustParseAddr("100.99.99.99"), netip.MustParseAddr("fd7a:115c:a1e0::701:b62a") + pmTCP := PortMap{MatchPort: 4003, TargetPort: 80, Protocol: "TCP"} + pmTCP1 := PortMap{MatchPort: 4004, TargetPort: 443, Protocol: "TCP"} + + // Create a rule for service 'foo' to forward TCP traffic to IPv4 endpoint + runner.EnsurePortMapRuleForSvc("foo", "tailscale0", ipv4, pmTCP) + svcChains(t, 1, conn) + chainRuleCount(t, "foo", 1, conn, nftables.TableFamilyIPv4) + chainRule(t, "foo", ipv4, pmTCP, runner, nftables.TableFamilyIPv4) + + // Create another rule for service 'foo' to forward TCP traffic to the + // same IPv4 endpoint, but to a different port. + runner.EnsurePortMapRuleForSvc("foo", "tailscale0", ipv4, pmTCP1) + svcChains(t, 1, conn) + chainRuleCount(t, "foo", 2, conn, nftables.TableFamilyIPv4) + chainRule(t, "foo", ipv4, pmTCP1, runner, nftables.TableFamilyIPv4) + + // Create a rule for service 'foo' to forward TCP traffic to an IPv6 endpoint + runner.EnsurePortMapRuleForSvc("foo", "tailscale0", ipv6, pmTCP) + svcChains(t, 2, conn) + chainRuleCount(t, "foo", 1, conn, nftables.TableFamilyIPv6) + chainRule(t, "foo", ipv6, pmTCP, runner, nftables.TableFamilyIPv6) + + // Create a rule for service 'bar' to forward TCP traffic to IPv4 endpoint + runner.EnsurePortMapRuleForSvc("bar", "tailscale0", ipv4, pmTCP) + svcChains(t, 3, conn) + chainRuleCount(t, "bar", 1, conn, nftables.TableFamilyIPv4) + chainRule(t, "bar", ipv4, pmTCP, runner, nftables.TableFamilyIPv4) + + // Create a rule for service 'bar' to forward TCP traffic to an IPv6 endpoint + runner.EnsurePortMapRuleForSvc("bar", "tailscale0", ipv6, pmTCP) + svcChains(t, 4, conn) + chainRuleCount(t, "bar", 1, conn, nftables.TableFamilyIPv6) + chainRule(t, "bar", ipv6, pmTCP, runner, nftables.TableFamilyIPv6) + + // Delete service bar + runner.DeleteSvc("bar", "tailscale0", []netip.Addr{ipv4, ipv6}, []PortMap{pmTCP}) + svcChains(t, 2, conn) + + // Delete a rule from service foo + runner.DeletePortMapRuleForSvc("foo", "tailscale0", ipv4, pmTCP) + svcChains(t, 2, conn) + chainRuleCount(t, "foo", 1, conn, nftables.TableFamilyIPv4) + + // Delete service foo + runner.DeleteSvc("foo", "tailscale0", []netip.Addr{ipv4, ipv6}, []PortMap{pmTCP, pmTCP1}) + svcChains(t, 0, conn) +} + +// svcChains verifies that the expected number of chains exist (for either IP +// family) and that each of them is configured as NAT prerouting chain. +func svcChains(t *testing.T, wantCount int, conn *nftables.Conn) { + t.Helper() + chains, err := conn.ListChains() + if err != nil { + t.Fatalf("error listing chains: %v", err) + } + if len(chains) != wantCount { + t.Fatalf("wants %d chains, got %d", wantCount, len(chains)) + } + for _, ch := range chains { + if *ch.Policy != nftables.ChainPolicyAccept { + t.Fatalf("chain %s has unexpected policy %v", ch.Name, *ch.Policy) + } + if ch.Type != nftables.ChainTypeNAT { + t.Fatalf("chain %s has unexpected type %v", ch.Name, ch.Type) + } + if *ch.Hooknum != *nftables.ChainHookPrerouting { + t.Fatalf("chain %s is attached to unexpected hook %v", ch.Name, ch.Hooknum) + } + if *ch.Priority != *nftables.ChainPriorityNATDest { + t.Fatalf("chain %s has unexpected priority %v", ch.Name, ch.Priority) + } + } +} + +// chainRuleCount returns number of rules in a chain identified by service name and IP family. +func chainRuleCount(t *testing.T, svc string, count int, conn *nftables.Conn, fam nftables.TableFamily) { + t.Helper() + chains, err := conn.ListChainsOfTableFamily(fam) + if err != nil { + t.Fatalf("error listing chains: %v", err) + } + + found := false + for _, ch := range chains { + if ch.Name == svc { + found = true + rules, err := conn.GetRules(ch.Table, ch) + if err != nil { + t.Fatalf("error getting rules: %v", err) + } + if len(rules) != count { + t.Fatalf("unexpected number of rules, wants %d got %d", count, len(rules)) + } + break + } + } + if !found { + t.Fatalf("chain for service %s does not exist", svc) + } +} + +// chainRule verifies that rule for the provided target IP and PortMap exists in +// a chain identified by service name and IP family. +func chainRule(t *testing.T, svc string, targetIP netip.Addr, pm PortMap, runner *nftablesRunner, fam nftables.TableFamily) { + t.Helper() + chains, err := runner.conn.ListChainsOfTableFamily(fam) + if err != nil { + t.Fatalf("error listing chains: %v", err) + } + var chain *nftables.Chain + for _, ch := range chains { + if ch.Name == svc { + chain = ch + break + } + } + if chain == nil { + t.Fatalf("chain for service %s does not exist", svc) + } + meta := svcPortMapRuleMeta(svc, targetIP, pm) + p, err := protoFromString(pm.Protocol) + if err != nil { + t.Fatalf("error converting protocol: %v", err) + } + wantsRule := portMapRule(chain.Table, chain, "tailscale0", targetIP, pm.MatchPort, pm.TargetPort, p, meta) + gotRule, err := findRule(runner.conn, wantsRule) + if err != nil { + t.Fatalf("error looking up rule: %v", err) + } + if gotRule == nil { + t.Fatalf("rule not found") + } +} diff --git a/util/linuxfw/nftables_runner.go b/util/linuxfw/nftables_runner.go index 317d84c12..a7a407222 100644 --- a/util/linuxfw/nftables_runner.go +++ b/util/linuxfw/nftables_runner.go @@ -569,6 +569,12 @@ type NetfilterRunner interface { // the Tailscale interface, as used in the Kubernetes egress proxies. DNATNonTailscaleTraffic(exemptInterface string, dst netip.Addr) error + EnsurePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm PortMap) error + + DeletePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm PortMap) error + + DeleteSvc(svc, tun string, targetIPs []netip.Addr, pm []PortMap) error + // ClampMSSToPMTU adds a rule to the mangle/FORWARD chain to clamp MSS for // traffic destined for the provided tun interface. ClampMSSToPMTU(tun string, addr netip.Addr) error diff --git a/wgengine/router/router_linux_test.go b/wgengine/router/router_linux_test.go index f55361225..893ff4a70 100644 --- a/wgengine/router/router_linux_test.go +++ b/wgengine/router/router_linux_test.go @@ -537,6 +537,17 @@ func (n *fakeIPTablesRunner) AddSNATRuleForDst(src, dst netip.Addr) error { func (n *fakeIPTablesRunner) DNATNonTailscaleTraffic(exemptInterface string, dst netip.Addr) error { return errors.New("not implemented") } +func (n *fakeIPTablesRunner) EnsurePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm linuxfw.PortMap) error { + return errors.New("not implemented") +} + +func (n *fakeIPTablesRunner) DeletePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm linuxfw.PortMap) error { + return errors.New("not implemented") +} + +func (n *fakeIPTablesRunner) DeleteSvc(svc, tun string, targetIPs []netip.Addr, pm []linuxfw.PortMap) error { + return errors.New("not implemented") +} func (n *fakeIPTablesRunner) ClampMSSToPMTU(tun string, addr netip.Addr) error { return errors.New("not implemented")