updated GHA
Some checks reported errors
continuous-integration/drone/pr Build encountered an error
continuous-integration/drone/push Build encountered an error

Update to v2 SDK
updated dependencies
This commit is contained in:
2022-08-06 16:21:18 +02:00
parent 989e7079a5
commit e1266ebf64
1909 changed files with 122367 additions and 279095 deletions

View File

@ -0,0 +1,167 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package resolver provides internal resolver-related functionality.
package resolver
import (
"context"
"sync"
"google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
)
// ConfigSelector controls what configuration to use for every RPC.
type ConfigSelector interface {
// Selects the configuration for the RPC, or terminates it using the error.
// This error will be converted by the gRPC library to a status error with
// code UNKNOWN if it is not returned as a status error.
SelectConfig(RPCInfo) (*RPCConfig, error)
}
// RPCInfo contains RPC information needed by a ConfigSelector.
type RPCInfo struct {
// Context is the user's context for the RPC and contains headers and
// application timeout. It is passed for interception purposes and for
// efficiency reasons. SelectConfig should not be blocking.
Context context.Context
Method string // i.e. "/Service/Method"
}
// RPCConfig describes the configuration to use for each RPC.
type RPCConfig struct {
// The context to use for the remainder of the RPC; can pass info to LB
// policy or affect timeout or metadata.
Context context.Context
MethodConfig serviceconfig.MethodConfig // configuration to use for this RPC
OnCommitted func() // Called when the RPC has been committed (retries no longer possible)
Interceptor ClientInterceptor
}
// ClientStream is the same as grpc.ClientStream, but defined here for circular
// dependency reasons.
type ClientStream interface {
// Header returns the header metadata received from the server if there
// is any. It blocks if the metadata is not ready to read.
Header() (metadata.MD, error)
// Trailer returns the trailer metadata from the server, if there is any.
// It must only be called after stream.CloseAndRecv has returned, or
// stream.Recv has returned a non-nil error (including io.EOF).
Trailer() metadata.MD
// CloseSend closes the send direction of the stream. It closes the stream
// when non-nil error is met. It is also not safe to call CloseSend
// concurrently with SendMsg.
CloseSend() error
// Context returns the context for this stream.
//
// It should not be called until after Header or RecvMsg has returned. Once
// called, subsequent client-side retries are disabled.
Context() context.Context
// SendMsg is generally called by generated code. On error, SendMsg aborts
// the stream. If the error was generated by the client, the status is
// returned directly; otherwise, io.EOF is returned and the status of
// the stream may be discovered using RecvMsg.
//
// SendMsg blocks until:
// - There is sufficient flow control to schedule m with the transport, or
// - The stream is done, or
// - The stream breaks.
//
// SendMsg does not wait until the message is received by the server. An
// untimely stream closure may result in lost messages. To ensure delivery,
// users should ensure the RPC completed successfully using RecvMsg.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines. It is also
// not safe to call CloseSend concurrently with SendMsg.
SendMsg(m interface{}) error
// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the stream completes successfully. On
// any other error, the stream is aborted and the error contains the RPC
// status.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not
// safe to call RecvMsg on the same stream in different goroutines.
RecvMsg(m interface{}) error
}
// ClientInterceptor is an interceptor for gRPC client streams.
type ClientInterceptor interface {
// NewStream produces a ClientStream for an RPC which may optionally use
// the provided function to produce a stream for delegation. Note:
// RPCInfo.Context should not be used (will be nil).
//
// done is invoked when the RPC is finished using its connection, or could
// not be assigned a connection. RPC operations may still occur on
// ClientStream after done is called, since the interceptor is invoked by
// application-layer operations. done must never be nil when called.
NewStream(ctx context.Context, ri RPCInfo, done func(), newStream func(ctx context.Context, done func()) (ClientStream, error)) (ClientStream, error)
}
// ServerInterceptor is an interceptor for incoming RPC's on gRPC server side.
type ServerInterceptor interface {
// AllowRPC checks if an incoming RPC is allowed to proceed based on
// information about connection RPC was received on, and HTTP Headers. This
// information will be piped into context.
AllowRPC(ctx context.Context) error // TODO: Make this a real interceptor for filters such as rate limiting.
}
type csKeyType string
const csKey = csKeyType("grpc.internal.resolver.configSelector")
// SetConfigSelector sets the config selector in state and returns the new
// state.
func SetConfigSelector(state resolver.State, cs ConfigSelector) resolver.State {
state.Attributes = state.Attributes.WithValue(csKey, cs)
return state
}
// GetConfigSelector retrieves the config selector from state, if present, and
// returns it or nil if absent.
func GetConfigSelector(state resolver.State) ConfigSelector {
cs, _ := state.Attributes.Value(csKey).(ConfigSelector)
return cs
}
// SafeConfigSelector allows for safe switching of ConfigSelector
// implementations such that previous values are guaranteed to not be in use
// when UpdateConfigSelector returns.
type SafeConfigSelector struct {
mu sync.RWMutex
cs ConfigSelector
}
// UpdateConfigSelector swaps to the provided ConfigSelector and blocks until
// all uses of the previous ConfigSelector have completed.
func (scs *SafeConfigSelector) UpdateConfigSelector(cs ConfigSelector) {
scs.mu.Lock()
defer scs.mu.Unlock()
scs.cs = cs
}
// SelectConfig defers to the current ConfigSelector in scs.
func (scs *SafeConfigSelector) SelectConfig(r RPCInfo) (*RPCConfig, error) {
scs.mu.RLock()
defer scs.mu.RUnlock()
return scs.cs.SelectConfig(r)
}

View File

@ -32,7 +32,9 @@ import (
"sync"
"time"
grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver"
@ -43,6 +45,15 @@ import (
// addresses from SRV records. Must not be changed after init time.
var EnableSRVLookups = false
var logger = grpclog.Component("dns")
// Globals to stub out in tests. TODO: Perhaps these two can be combined into a
// single variable for testing the resolver?
var (
newTimer = time.NewTimer
newTimerDNSResRate = time.NewTimer
)
func init() {
resolver.Register(NewBuilder())
}
@ -140,7 +151,6 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
d.wg.Add(1)
go d.watcher()
d.ResolveNow(resolver.ResolveNowOptions{})
return d, nil
}
@ -198,28 +208,38 @@ func (d *dnsResolver) Close() {
func (d *dnsResolver) watcher() {
defer d.wg.Done()
backoffIndex := 1
for {
select {
case <-d.ctx.Done():
return
case <-d.rn:
}
state, err := d.lookup()
if err != nil {
// Report error to the underlying grpc.ClientConn.
d.cc.ReportError(err)
} else {
d.cc.UpdateState(*state)
err = d.cc.UpdateState(*state)
}
// Sleep to prevent excessive re-resolutions. Incoming resolution requests
// will be queued in d.rn.
t := time.NewTimer(minDNSResRate)
var timer *time.Timer
if err == nil {
// Success resolving, wait for the next ResolveNow. However, also wait 30 seconds at the very least
// to prevent constantly re-resolving.
backoffIndex = 1
timer = newTimerDNSResRate(minDNSResRate)
select {
case <-d.ctx.Done():
timer.Stop()
return
case <-d.rn:
}
} else {
// Poll on an error found in DNS Resolver or an error received from ClientConn.
timer = newTimer(backoff.DefaultExponential.Backoff(backoffIndex))
backoffIndex++
}
select {
case <-t.C:
case <-d.ctx.Done():
t.Stop()
timer.Stop()
return
case <-timer.C:
}
}
}
@ -251,27 +271,22 @@ func (d *dnsResolver) lookupSRV() ([]resolver.Address, error) {
return nil, fmt.Errorf("dns: error parsing A record IP address %v", a)
}
addr := ip + ":" + strconv.Itoa(int(s.Port))
newAddrs = append(newAddrs, resolver.Address{Addr: addr, Type: resolver.GRPCLB, ServerName: s.Target})
newAddrs = append(newAddrs, resolver.Address{Addr: addr, ServerName: s.Target})
}
}
return newAddrs, nil
}
var filterError = func(err error) error {
func handleDNSError(err error, lookupType string) error {
if dnsErr, ok := err.(*net.DNSError); ok && !dnsErr.IsTimeout && !dnsErr.IsTemporary {
// Timeouts and temporary errors should be communicated to gRPC to
// attempt another DNS query (with backoff). Other errors should be
// suppressed (they may represent the absence of a TXT record).
return nil
}
return err
}
func handleDNSError(err error, lookupType string) error {
err = filterError(err)
if err != nil {
err = fmt.Errorf("dns: %v record lookup error: %v", lookupType, err)
grpclog.Infoln(err)
logger.Info(err)
}
return err
}
@ -294,7 +309,7 @@ func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {
// TXT record must have "grpc_config=" attribute in order to be used as service config.
if !strings.HasPrefix(res, txtAttribute) {
grpclog.Warningf("dns: TXT record %v missing %v attribute", res, txtAttribute)
logger.Warningf("dns: TXT record %v missing %v attribute", res, txtAttribute)
// This is not an error; it is the equivalent of not having a service config.
return nil
}
@ -303,12 +318,12 @@ func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {
}
func (d *dnsResolver) lookupHost() ([]resolver.Address, error) {
var newAddrs []resolver.Address
addrs, err := d.resolver.LookupHost(d.ctx, d.host)
if err != nil {
err = handleDNSError(err, "A")
return nil, err
}
newAddrs := make([]resolver.Address, 0, len(addrs))
for _, a := range addrs {
ip, ok := formatIP(a)
if !ok {
@ -326,13 +341,15 @@ func (d *dnsResolver) lookup() (*resolver.State, error) {
if hostErr != nil && (srvErr != nil || len(srv) == 0) {
return nil, hostErr
}
state := &resolver.State{
Addresses: append(addrs, srv...),
state := resolver.State{Addresses: addrs}
if len(srv) > 0 {
state = grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: srv})
}
if !d.disableServiceConfig {
state.ServiceConfig = d.lookupTXT()
}
return state, nil
return &state, nil
}
// formatIP returns ok = false if addr is not a valid textual representation of an IP address.
@ -418,12 +435,12 @@ func canaryingSC(js string) string {
var rcs []rawChoice
err := json.Unmarshal([]byte(js), &rcs)
if err != nil {
grpclog.Warningf("dns: error parsing service config json: %v", err)
logger.Warningf("dns: error parsing service config json: %v", err)
return ""
}
cliHostname, err := os.Hostname()
if err != nil {
grpclog.Warningf("dns: error getting client hostname: %v", err)
logger.Warningf("dns: error getting client hostname: %v", err)
return ""
}
var sc string

View File

@ -1,33 +0,0 @@
// +build go1.13
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package dns
import "net"
func init() {
filterError = func(err error) error {
if dnsErr, ok := err.(*net.DNSError); ok && dnsErr.IsNotFound {
// The name does not exist; not an error.
return nil
}
return err
}
}

View File

@ -0,0 +1,73 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package unix implements a resolver for unix targets.
package unix
import (
"fmt"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/resolver"
)
const unixScheme = "unix"
const unixAbstractScheme = "unix-abstract"
type builder struct {
scheme string
}
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
if target.Authority != "" {
return nil, fmt.Errorf("invalid (non-empty) authority: %v", target.Authority)
}
// gRPC was parsing the dial target manually before PR #4817, and we
// switched to using url.Parse() in that PR. To avoid breaking existing
// resolver implementations we ended up stripping the leading "/" from the
// endpoint. This obviously does not work for the "unix" scheme. Hence we
// end up using the parsed URL instead.
endpoint := target.URL.Path
if endpoint == "" {
endpoint = target.URL.Opaque
}
addr := resolver.Address{Addr: endpoint}
if b.scheme == unixAbstractScheme {
// prepend "\x00" to address for unix-abstract
addr.Addr = "\x00" + addr.Addr
}
cc.UpdateState(resolver.State{Addresses: []resolver.Address{networktype.Set(addr, "unix")}})
return &nopResolver{}, nil
}
func (b *builder) Scheme() string {
return b.scheme
}
type nopResolver struct {
}
func (*nopResolver) ResolveNow(resolver.ResolveNowOptions) {}
func (*nopResolver) Close() {}
func init() {
resolver.Register(&builder{scheme: unixScheme})
resolver.Register(&builder{scheme: unixAbstractScheme})
}