1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
package dnsresolver
import (
"context"
"net"
"sync"
"time"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v15/internal/backoff"
"gitlab.com/gitlab-org/gitaly/v15/internal/structerr"
"google.golang.org/grpc/resolver"
)
type dnsResolver struct {
logger *logrus.Entry
retry backoff.Strategy
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
host string
port string
refreshRate time.Duration
lookup dnsLookuper
reqs chan struct{}
wg sync.WaitGroup
}
var dnsLookupTimeout = 15 * time.Second
type dnsLookuper interface {
LookupHost(context.Context, string) ([]string, error)
}
// ResolveNow signals the resolver to perform a DNS resolution immediately. This method returns
// without waiting for the result. The resolver treats this as a hint rather than a command. The
// client connection receives the resolution result asynchronously via `clientconn.UpdateState`
// This method also skip resolver caching because it's likely the client calls this method after
// encounter an error with recent subchannels.
func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) {
select {
case d.reqs <- struct{}{}:
default:
}
}
// Close cancels all activities of this dns resolver. It waits until the watch goroutine exits.
func (d *dnsResolver) Close() {
d.cancel()
d.wg.Wait()
}
func (d *dnsResolver) watch() {
defer d.wg.Done()
d.logger.Info("dns resolver: started")
defer d.logger.Info("dns resolver: stopped")
// Exponential retry after failed to resolve or client connection failed to update its state
var retries uint
for {
state, err := d.resolve()
if err != nil {
d.logger.WithField("dns.retries", retries).WithField("dns.error", err).Error(
"dns resolver: fail to lookup dns")
d.cc.ReportError(err)
} else {
err = d.updateState(state)
}
var timer *time.Timer
if err == nil {
timer = time.NewTimer(d.refreshRate)
retries = 0
} else {
timer = time.NewTimer(d.retry.Backoff(retries))
retries++
}
select {
case <-d.ctx.Done():
timer.Stop()
return
case <-timer.C:
// Refresh timer expires, issue another DNS lookup.
d.logger.Debug("dns resolver: refreshing")
continue
case <-d.reqs:
// If the resolver is requested to resolve now, force notify the client
// connection. Typically, client connection contacts the resolver when any
// of the subchannels change its connectivity state.
timer.Stop()
d.logger.Debug("dns resolver: handle ResolveNow request")
}
}
}
func (d *dnsResolver) updateState(state *resolver.State) error {
d.logger.WithField("dns.state", state).Info("dns resolver: updating state")
return d.cc.UpdateState(*state)
}
func (d *dnsResolver) resolve() (*resolver.State, error) {
ctx, cancel := context.WithTimeout(d.ctx, dnsLookupTimeout)
defer cancel()
addrs, err := d.lookup.LookupHost(ctx, d.host)
if err != nil {
err = handleDNSError(err)
return &resolver.State{Addresses: []resolver.Address{}}, err
}
newAddrs := make([]resolver.Address, 0, len(addrs))
for _, a := range addrs {
addr, ok := tryParseIP(a, d.port)
if !ok {
return nil, structerr.New("dns: error parsing dns record IP address %v", a)
}
newAddrs = append(newAddrs, resolver.Address{Addr: addr})
}
return &resolver.State{Addresses: newAddrs}, nil
}
// handleDNSError massages the error to fit into expectations of the gRPC model:
// - Timeouts and temporary errors should be communicated to gRPC to attempt another DNS query (with
// Backoff).
// - Other errors should be suppressed (they may represent the absence of a TXT record).
func handleDNSError(err error) error {
if dnsErr, ok := err.(*net.DNSError); ok && !dnsErr.IsTimeout && !dnsErr.IsTemporary {
return nil
}
return structerr.New("dns: record resolve error: %w", err)
}
|