Welcome to mirror list, hosted at ThFree Co, Russian Federation.

sidechannel.go « hook « gitaly « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: ca83a11f053996f1a662bcb6d9c37e186a30ea17 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package hook

import (
	"context"
	"fmt"
	"net"
	"os"
	"path"
	"time"

	gitaly_metadata "gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
	"google.golang.org/grpc/metadata"
)

const (
	sidechannelHeader = "gitaly-sidechannel-socket"
	sidechannelSocket = "sidechannel"
)

type errInvalidSidechannelAddress struct{ string }

func (e *errInvalidSidechannelAddress) Error() string {
	return fmt.Sprintf("invalid side channel address: %q", e.string)
}

// GetSidechannel looks for a sidechannel address in an incoming context
// and establishes a connection if it finds an address.
func GetSidechannel(ctx context.Context) (net.Conn, error) {
	address := gitaly_metadata.GetValue(ctx, sidechannelHeader)
	if path.Base(address) != sidechannelSocket {
		return nil, &errInvalidSidechannelAddress{address}
	}

	return net.DialTimeout("unix", address, time.Second)
}

// SetupSidechannel creates a sidechannel listener in a tempdir and
// launches a goroutine that will run the callback if the listener
// receives a connection. The address of the listener is stored in the
// returned context, so that the caller can propagate it to a server. The
// caller must Close the SidechannelWaiter to prevent resource leaks.
func SetupSidechannel(ctx context.Context, callback func(*net.UnixConn) error) (_ context.Context, _ *SidechannelWaiter, err error) {
	socketDir, err := os.MkdirTemp("", "gitaly")
	if err != nil {
		return nil, nil, err
	}
	defer func() {
		if err != nil {
			_ = os.RemoveAll(socketDir)
		}
	}()

	address := path.Join(socketDir, sidechannelSocket)
	l, err := net.ListenUnix("unix", &net.UnixAddr{Net: "unix", Name: address})
	if err != nil {
		return nil, nil, err
	}

	wt := &SidechannelWaiter{
		errC:      make(chan error),
		socketDir: socketDir,
		listener:  l,
	}
	go wt.run(callback)

	ctx = metadata.AppendToOutgoingContext(ctx, sidechannelHeader, address)
	return ctx, wt, nil
}

// SidechannelWaiter provides cleanup and error propagation for a
// sidechannel callback.
type SidechannelWaiter struct {
	errC      chan error
	socketDir string
	listener  *net.UnixListener
}

func (wt *SidechannelWaiter) run(callback func(*net.UnixConn) error) {
	defer close(wt.errC)

	wt.errC <- func() error {
		c, err := wt.listener.AcceptUnix()
		if err != nil {
			return err
		}
		defer c.Close()

		// Eagerly remove the socket directory, in case the process exits before
		// wt.Close() can run.
		if err := os.RemoveAll(wt.socketDir); err != nil {
			return err
		}

		return callback(c)
	}()
}

// Close cleans up sidechannel resources. If the callback is already
// running, Close will block until the callback is done.
func (wt *SidechannelWaiter) Close() error {
	// Run all cleanup actions _before_ checking errors, so that we cannot
	// forget one.
	cleanupErrors := []error{
		// If wt.run() is blocked on AcceptUnix(), this will unblock it.
		wt.listener.Close(),
		// Remove the socket directory to prevent garbage in case wt.run() did
		// not run.
		os.RemoveAll(wt.socketDir),
		// Block until wt.run() is done.
		wt.Wait(),
	}

	for _, err := range cleanupErrors {
		if err != nil {
			return err
		}
	}

	return nil
}

// Wait waits for the callback to run and returns its error value.
func (wt *SidechannelWaiter) Wait() error { return <-wt.errC }