Welcome to mirror list, hosted at ThFree Co, Russian Federation.

pipe.go « streamcache « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 4cafacc403018edf9895920195d229806c5a6577 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
package streamcache

import (
	"errors"
	"fmt"
	"io"
	"os"
	"sync"
)

// Pipes
//
//            +-------+
//            |       | <- *pipeReader <- Read()
// Write() -> | *pipe |       |
//            |       | <- *pipeReader <- Read()
//            +-------+       |  |
//                |           |  |
//                v           |  |
//           +----------+     |  |
//           |          | <---+  |
//           |   file   |        |
//           |          | <------+
//           +----------+
//
// Pipes are called so because their interface and behavior somewhat
// resembles Unix pipes, except there are multiple readers as opposed to
// just one. Just like with Unix pipes, pipe readers exert backpressure
// on the writer. When the write end is closed, the readers see EOF, just
// like they would when reading a file. When the read end is closed
// before the writer is done, the writer receives an error. This is all
// like it is with Unix pipes.
//
// The differences are as follows. When you create a pipe you get a write
// end and a read end, just like with Unix pipes. But now you can open an
// additional reader by calling OpenReader on the write end (the *pipe
// instance). Under the covers, a Unix pipe is just a buffer, and you
// cannot "rewind" it. With our pipe, there is an underlying file, so a
// new reader starts at position 0 in the file. It can then catch up with
// the other readers.
//
// Backpressure is implemented using two "cursors", wcursor and rcursor.
// A cursor is an integer value with a list of subscribers (see cursor.go).
// Every time the value goes up, the subscribers get a notification.
// Everytime the writer makes progress it increases the wcursor value
// so that it reflects the total number of bytes written to the file so
// far. The readers are all subscribed to wcursor so they get notified
// when there is new data for them to read. Conversely, the readers
// update the rcursor value after each read. Because of the way the
// cursor datatype works, the rcursor counter reflects the file position of
// the fastest reader. The writer is the sole subscriber of rcursor.
// Before each write, the writer checks the rcursor counter to make sure
// the writer itself is not ahead of its fastest reader.
//
// The wcursor cursor also serves second purpose. Besides notifying the
// readers when the writer writes new data, we also use it to check if
// there are any readers at all: if the subscriber list of wcursor is
// empty then there are no more readers. Each time a reader is closed,
// its subscription is removed from the wcursor subscriber list. If the
// list becomes empty before the writer is done, the writer fails with an
// error and the pipe is marked as "broken". This prevents us from
// writing data to disk that no one will read.

type pipe struct {
	m sync.Mutex

	// Access to underlying file
	name string
	w    io.WriteCloser

	// Reader/writer coordination. If wcursor > rcursor, the writer blocks
	// (back pressure). If rcursor >= wcursor, the readers block (waiting for
	// new data).
	wcursor *cursor
	rcursor *cursor

	// wnotifier is the channel the writer uses to wait for reader progress
	// notifications.
	wnotifier *notifier
}

func newPipe(w namedWriteCloser) (*pipeReader, *pipe, error) {
	p := &pipe{
		name:    w.Name(),
		w:       w,
		wcursor: newCursor(),
		rcursor: newCursor(),
	}
	p.wnotifier = p.rcursor.Subscribe()

	pr, err := p.OpenReader()
	if err != nil {
		return nil, nil, err
	}

	return pr, p, nil
}

func (p *pipe) Write(b []byte) (int, error) {
	// Loop (block) until at least one reader catches up with our last write.
	for p.wcursor.Position() > p.rcursor.Position() {
		select {
		case <-p.wcursor.Done():
			// Prevent writing bytes no-one will read
			return 0, errWrongCloseOrder
		case <-p.wnotifier.C:
		}
	}

	n, err := p.w.Write(b)

	// Notify blocked readers, if any, of new data that is available.
	p.wcursor.SetPosition(p.wcursor.Position() + int64(n))

	return n, err
}

var (
	errWrongCloseOrder     = errors.New("streamcache.pipe: all readers closed before writer finished")
	errWriterAlreadyClosed = errors.New("streamcache.pipe: writer already closed")
)

func (p *pipe) Close() error {
	p.m.Lock()
	defer p.m.Unlock()

	errClose := p.w.Close()

	if p.rcursor.IsDone() {
		return errWriterAlreadyClosed
	}

	if p.wcursor.IsDone() {
		return errWrongCloseOrder
	}

	// After this, p.rcursor.IsDone() will return true.
	p.rcursor.Unsubscribe(p.wnotifier)

	return errClose
}

func (p *pipe) RemoveFile() error { return os.Remove(p.name) }

func (p *pipe) OpenReader() (*pipeReader, error) {
	p.m.Lock()
	defer p.m.Unlock()

	if p.wcursor.IsDone() && !p.rcursor.IsDone() {
		return nil, errWrongCloseOrder
	}

	r, err := os.Open(p.name)
	if err != nil {
		return nil, fmt.Errorf("OpenReader: %w", err)
	}

	pr := &pipeReader{
		pipe:     p,
		reader:   r,
		notifier: p.wcursor.Subscribe(),
	}

	return pr, nil
}

func (p *pipe) closeReader(pr *pipeReader) {
	p.m.Lock()
	defer p.m.Unlock()

	// Even though wcursor has its own embedded lock, we need to hold the
	// pipe lock when modifying it. This is because p.wcursor and p.rcursor
	// interact (see Close()).
	p.wcursor.Unsubscribe(pr.notifier)
}

type pipeReader struct {
	pipe     *pipe
	reader   io.ReadCloser
	position int64
	notifier *notifier

	// golangci-lint does not like this struct field because it is only used
	// on Linux. On macOS, it complains the field is unused. On Linux, it
	// complains that "nolint:unused" is unused. So we need "unused" and "structcheck" for
	// platforms other than Linux, and "nolintlint" for Linux.
	sendfileCalledSuccessfully bool //nolint:structcheck,unused,nolintlint
}

func (pr *pipeReader) Close() error {
	pr.pipe.closeReader(pr)
	return pr.reader.Close()
}

func (pr *pipeReader) waitReadable() bool {
	// Block until there is data for us to read. Note that it can actually
	// happen that pr.position > pr.pipe.wcursor, so we really want >= here, not
	// ==. There is a race between the moment the write end finishes writing
	// a chunk of data to the file and the moment pr.pipe.wcursor gets
	// updated.
wait:
	for pr.position >= pr.pipe.wcursor.Position() {
		select {
		case <-pr.pipe.rcursor.Done():
			break wait
		case <-pr.notifier.C:
		}
	}

	return pr.position < pr.pipe.wcursor.Position()
}

func (pr *pipeReader) advancePosition(n int) {
	pr.position += int64(n)

	// The writer is subscribed to changes in pr.pipe.rcursor. If it is
	// currently blocked, this call to SetPosition() will unblock it.
	pr.pipe.rcursor.SetPosition(pr.position)
}

func (pr *pipeReader) Read(b []byte) (int, error) {
	pr.waitReadable()
	n, err := pr.reader.Read(b)
	pr.advancePosition(n)
	return n, err
}