tailscale/k8s-operator/sessionrecording/spdy/conn.go

257 lines
9.0 KiB
Go

// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
// Package spdy contains functionality for parsing SPDY streaming sessions. This
// is used for 'kubectl exec' session recording.
package spdy
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"net"
"net/http"
"sync"
"sync/atomic"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"tailscale.com/k8s-operator/sessionrecording/tsrecorder"
"tailscale.com/sessionrecording"
)
// New wraps the provided network connection and returns a connection whose reads and writes will get triggered as data is received on the hijacked connection.
// The connection must be a hijacked connection for a 'kubectl exec' session using SPDY.
// The hijacked connection is used to transmit SPDY streams between Kubernetes client ('kubectl') and the destination container.
// Data read from the underlying network connection is data sent via one of the SPDY streams from the client to the container.
// Data written to the underlying connection is data sent from the container to the client.
// We parse the data and send everything for the stdout/stderr streams to the configured tsrecorder as an asciinema recording with the provided header.
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/4006-transition-spdy-to-websockets#background-remotecommand-subprotocol
func New(nc net.Conn, rec *tsrecorder.Client, ch sessionrecording.CastHeader, hasTerm bool, log *zap.SugaredLogger) net.Conn {
return &conn{
Conn: nc,
rec: rec,
ch: ch,
log: log,
hasTerm: hasTerm,
initialTermSizeSet: make(chan struct{}),
}
}
// conn is a wrapper around net.Conn. It reads the bytestream for a 'kubectl
// exec' session streamed using SPDY protocol, sends session recording data to
// the configured recorder and forwards the raw bytes to the original
// destination.
type conn struct {
net.Conn
// rec knows how to send data written to it to a tsrecorder instance.
rec *tsrecorder.Client
stdoutStreamID atomic.Uint32
stderrStreamID atomic.Uint32
resizeStreamID atomic.Uint32
wmu sync.Mutex // sequences writes
closed bool
rmu sync.Mutex // sequences reads
// The following fields are related to sending asciinema CastHeader.
// CastHeader must be sent before any payload. If the session has a
// terminal attached, the CastHeader must have '.Width' and '.Height'
// fields set for the tsrecorder UI to be able to play the recording.
// For 'kubectl exec' sessions, terminal width and height are sent as a
// resize message on resize stream from the client when the session
// starts as well as at any time the client detects a terminal change.
// We can intercept the resize message on Read calls. As there is no
// guarantee that the resize message from client will be intercepted
// before server writes stdout messages that we must record, we need to
// ensure that parsing stdout/stderr messages written to the connection
// waits till a resize message has been received and a CastHeader with
// correct terminal dimensions can be written.
// ch is the asciinema CastHeader for the current session.
// https://docs.asciinema.org/manual/asciicast/v2/#header
ch sessionrecording.CastHeader
// writeCastHeaderOnce is used to ensure CastHeader gets sent to tsrecorder once.
writeCastHeaderOnce sync.Once
hasTerm bool // whether the session had TTY attached
// initialTermSizeSet channel gets sent a value once, when the Read has
// received a resize message and set the initial terminal size. It must
// be set to a buffered channel to prevent Reads being blocked on the
// first stdout/stderr write reading from the channel.
initialTermSizeSet chan struct{}
// sendInitialTermSizeSetOnce is used to ensure that a value is sent to
// initialTermSizeSet channel only once, when the initial resize message
// is received.
sendinitialTermSizeSetOnce sync.Once
zlibReqReader zlibReader
// writeBuf is used to store data written to the connection that has not
// yet been parsed as SPDY frames.
writeBuf bytes.Buffer
// readBuf is used to store data read from the connection that has not
// yet been parsed as SPDY frames.
readBuf bytes.Buffer
log *zap.SugaredLogger
}
// Read reads bytes from the original connection and parses them as SPDY frames.
// If the frame is a data frame for resize stream, sends resize message to the
// recorder. If the frame is a SYN_STREAM control frame that starts stdout,
// stderr or resize stream, store the stream ID.
func (c *conn) Read(b []byte) (int, error) {
c.rmu.Lock()
defer c.rmu.Unlock()
n, err := c.Conn.Read(b)
if err != nil {
return n, fmt.Errorf("error reading from connection: %w", err)
}
c.readBuf.Write(b[:n])
var sf spdyFrame
ok, err := sf.Parse(c.readBuf.Bytes(), c.log)
if err != nil {
return 0, fmt.Errorf("error parsing data read from connection: %w", err)
}
if !ok {
// The parsed data in the buffer will be processed together with
// the new data on the next call to Read.
return n, nil
}
c.readBuf.Next(len(sf.Raw)) // advance buffer past the parsed frame
if !sf.Ctrl { // data frame
switch sf.StreamID {
case c.resizeStreamID.Load():
var msg spdyResizeMsg
if err = json.Unmarshal(sf.Payload, &msg); err != nil {
return 0, fmt.Errorf("error umarshalling resize msg: %w", err)
}
c.ch.Width = msg.Width
c.ch.Height = msg.Height
// If this is initial resize message, the width and
// height will be sent in the CastHeader. If this is a
// subsequent resize message, we need to send asciinema
// resize message.
var isInitialResize bool
c.sendinitialTermSizeSetOnce.Do(func() {
isInitialResize = true
close(c.initialTermSizeSet) // unblock sending of CastHeader
})
if !isInitialResize {
if err := c.rec.WriteResize(c.ch.Height, c.ch.Width); err != nil {
return 0, fmt.Errorf("error writing resize message: %w", err)
}
}
}
return n, nil
}
// We always want to parse the headers, even if we don't care about the
// frame, as we need to advance the zlib reader otherwise we will get
// garbage.
header, err := sf.parseHeaders(&c.zlibReqReader, c.log)
if err != nil {
return 0, fmt.Errorf("error parsing frame headers: %w", err)
}
if sf.Type == SYN_STREAM {
c.storeStreamID(sf, header)
}
return n, nil
}
// Write forwards the raw data of the latest parsed SPDY frame to the original
// destination. If the frame is an SPDY data frame, it also sends the payload to
// the connected session recorder.
func (c *conn) Write(b []byte) (int, error) {
c.wmu.Lock()
defer c.wmu.Unlock()
c.writeBuf.Write(b)
var sf spdyFrame
ok, err := sf.Parse(c.writeBuf.Bytes(), c.log)
if err != nil {
return 0, fmt.Errorf("error parsing data: %w", err)
}
if !ok {
// The parsed data in the buffer will be processed together with
// the new data on the next call to Write.
return len(b), nil
}
c.writeBuf.Next(len(sf.Raw)) // advance buffer past the parsed frame
// If this is a stdout or stderr data frame, send its payload to the
// session recorder.
if !sf.Ctrl {
switch sf.StreamID {
case c.stdoutStreamID.Load(), c.stderrStreamID.Load():
var err error
c.writeCastHeaderOnce.Do(func() {
// If this is a session with a terminal attached,
// we must wait for the terminal width and
// height to be parsed from a resize message
// before sending CastHeader, else tsrecorder
// will not be able to play this recording.
if c.hasTerm {
c.log.Debugf("write: waiting for the initial terminal size to be set before proceeding with sending the first payload")
<-c.initialTermSizeSet
}
err = c.rec.WriteCastHeader(c.ch)
})
if err != nil {
return 0, fmt.Errorf("error writing CastHeader: %w", err)
}
if err := c.rec.WriteOutput(sf.Payload); err != nil {
return 0, fmt.Errorf("error sending payload to session recorder: %w", err)
}
}
}
// Forward the whole frame to the original destination.
_, err = c.Conn.Write(sf.Raw) // send to net.Conn
return len(b), err
}
func (c *conn) Close() error {
c.wmu.Lock()
defer c.wmu.Unlock()
if c.closed {
return nil
}
c.writeBuf.Reset()
c.closed = true
err := c.Conn.Close()
c.rec.Close()
return err
}
// storeStreamID parses SYN_STREAM SPDY control frame and updates
// conn to store the newly created stream's ID if it is one of
// the stream types we care about. Storing stream_id:stream_type mapping allows
// us to parse received data frames (that have stream IDs) differently depening
// on which stream they belong to (i.e send data frame payload for stdout stream
// to session recorder).
func (c *conn) storeStreamID(sf spdyFrame, header http.Header) {
const (
streamTypeHeaderKey = "Streamtype"
)
id := binary.BigEndian.Uint32(sf.Payload[0:4])
switch header.Get(streamTypeHeaderKey) {
case corev1.StreamTypeStdout:
c.stdoutStreamID.Store(id)
case corev1.StreamTypeStderr:
c.stderrStreamID.Store(id)
case corev1.StreamTypeResize:
c.resizeStreamID.Store(id)
}
}
type spdyResizeMsg struct {
Width int `json:"width"`
Height int `json:"height"`
}