Bump github.com/hashicorp/terraform-plugin-sdk/v2 from 2.26.1 to 2.27.0
Bumps [github.com/hashicorp/terraform-plugin-sdk/v2](https://github.com/hashicorp/terraform-plugin-sdk) from 2.26.1 to 2.27.0. - [Release notes](https://github.com/hashicorp/terraform-plugin-sdk/releases) - [Changelog](https://github.com/hashicorp/terraform-plugin-sdk/blob/main/CHANGELOG.md) - [Commits](https://github.com/hashicorp/terraform-plugin-sdk/compare/v2.26.1...v2.27.0) --- updated-dependencies: - dependency-name: github.com/hashicorp/terraform-plugin-sdk/v2 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
127
vendor/google.golang.org/grpc/internal/transport/controlbuf.go
generated
vendored
127
vendor/google.golang.org/grpc/internal/transport/controlbuf.go
generated
vendored
@ -22,6 +22,7 @@ import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
@ -29,6 +30,7 @@ import (
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/hpack"
|
||||
"google.golang.org/grpc/internal/grpclog"
|
||||
"google.golang.org/grpc/internal/grpcutil"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
@ -191,7 +193,7 @@ type goAway struct {
|
||||
code http2.ErrCode
|
||||
debugData []byte
|
||||
headsUp bool
|
||||
closeConn bool
|
||||
closeConn error // if set, loopyWriter will exit, resulting in conn closure
|
||||
}
|
||||
|
||||
func (*goAway) isTransportResponseFrame() bool { return false }
|
||||
@ -209,6 +211,14 @@ type outFlowControlSizeRequest struct {
|
||||
|
||||
func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
|
||||
|
||||
// closeConnection is an instruction to tell the loopy writer to flush the
|
||||
// framer and exit, which will cause the transport's connection to be closed
|
||||
// (by the client or server). The transport itself will close after the reader
|
||||
// encounters the EOF caused by the connection closure.
|
||||
type closeConnection struct{}
|
||||
|
||||
func (closeConnection) isTransportResponseFrame() bool { return false }
|
||||
|
||||
type outStreamState int
|
||||
|
||||
const (
|
||||
@ -408,7 +418,7 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
|
||||
select {
|
||||
case <-c.ch:
|
||||
case <-c.done:
|
||||
return nil, ErrConnClosing
|
||||
return nil, errors.New("transport closed by client")
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -478,12 +488,14 @@ type loopyWriter struct {
|
||||
hEnc *hpack.Encoder // HPACK encoder.
|
||||
bdpEst *bdpEstimator
|
||||
draining bool
|
||||
conn net.Conn
|
||||
logger *grpclog.PrefixLogger
|
||||
|
||||
// Side-specific handlers
|
||||
ssGoAwayHandler func(*goAway) (bool, error)
|
||||
}
|
||||
|
||||
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
|
||||
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger) *loopyWriter {
|
||||
var buf bytes.Buffer
|
||||
l := &loopyWriter{
|
||||
side: s,
|
||||
@ -496,6 +508,8 @@ func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimato
|
||||
hBuf: &buf,
|
||||
hEnc: hpack.NewEncoder(&buf),
|
||||
bdpEst: bdpEst,
|
||||
conn: conn,
|
||||
logger: logger,
|
||||
}
|
||||
return l
|
||||
}
|
||||
@ -513,23 +527,26 @@ const minBatchSize = 1000
|
||||
// 2. Stream level flow control quota available.
|
||||
//
|
||||
// In each iteration of run loop, other than processing the incoming control
|
||||
// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
|
||||
// This results in writing of HTTP2 frames into an underlying write buffer.
|
||||
// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
|
||||
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
|
||||
// if the batch size is too low to give stream goroutines a chance to fill it up.
|
||||
// frame, loopy calls processData, which processes one node from the
|
||||
// activeStreams linked-list. This results in writing of HTTP2 frames into an
|
||||
// underlying write buffer. When there's no more control frames to read from
|
||||
// controlBuf, loopy flushes the write buffer. As an optimization, to increase
|
||||
// the batch size for each flush, loopy yields the processor, once if the batch
|
||||
// size is too low to give stream goroutines a chance to fill it up.
|
||||
//
|
||||
// Upon exiting, if the error causing the exit is not an I/O error, run()
|
||||
// flushes and closes the underlying connection. Otherwise, the connection is
|
||||
// left open to allow the I/O error to be encountered by the reader instead.
|
||||
func (l *loopyWriter) run() (err error) {
|
||||
defer func() {
|
||||
if err == ErrConnClosing {
|
||||
// Don't log ErrConnClosing as error since it happens
|
||||
// 1. When the connection is closed by some other known issue.
|
||||
// 2. User closed the connection.
|
||||
// 3. A graceful close of connection.
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("transport: loopyWriter.run returning. %v", err)
|
||||
}
|
||||
err = nil
|
||||
if l.logger.V(logLevel) {
|
||||
l.logger.Infof("loopyWriter exiting with error: %v", err)
|
||||
}
|
||||
if !isIOError(err) {
|
||||
l.framer.writer.Flush()
|
||||
l.conn.Close()
|
||||
}
|
||||
l.cbuf.finish()
|
||||
}()
|
||||
for {
|
||||
it, err := l.cbuf.get(true)
|
||||
@ -574,7 +591,6 @@ func (l *loopyWriter) run() (err error) {
|
||||
}
|
||||
l.framer.writer.Flush()
|
||||
break hasdata
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -583,11 +599,11 @@ func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error
|
||||
return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
|
||||
}
|
||||
|
||||
func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
|
||||
func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) {
|
||||
// Otherwise update the quota.
|
||||
if w.streamID == 0 {
|
||||
l.sendQuota += w.increment
|
||||
return nil
|
||||
return
|
||||
}
|
||||
// Find the stream and update it.
|
||||
if str, ok := l.estdStreams[w.streamID]; ok {
|
||||
@ -595,10 +611,9 @@ func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error
|
||||
if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
|
||||
str.state = active
|
||||
l.activeStreams.enqueue(str)
|
||||
return nil
|
||||
return
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
|
||||
@ -606,13 +621,11 @@ func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
|
||||
}
|
||||
|
||||
func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
|
||||
if err := l.applySettings(s.ss); err != nil {
|
||||
return err
|
||||
}
|
||||
l.applySettings(s.ss)
|
||||
return l.framer.fr.WriteSettingsAck()
|
||||
}
|
||||
|
||||
func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
|
||||
func (l *loopyWriter) registerStreamHandler(h *registerStream) {
|
||||
str := &outStream{
|
||||
id: h.streamID,
|
||||
state: empty,
|
||||
@ -620,15 +633,14 @@ func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
|
||||
wq: h.wq,
|
||||
}
|
||||
l.estdStreams[h.streamID] = str
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *loopyWriter) headerHandler(h *headerFrame) error {
|
||||
if l.side == serverSide {
|
||||
str, ok := l.estdStreams[h.streamID]
|
||||
if !ok {
|
||||
if logger.V(logLevel) {
|
||||
logger.Warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
|
||||
if l.logger.V(logLevel) {
|
||||
l.logger.Infof("Unrecognized streamID %d in loopyWriter", h.streamID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -655,19 +667,20 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
|
||||
itl: &itemList{},
|
||||
wq: h.wq,
|
||||
}
|
||||
str.itl.enqueue(h)
|
||||
return l.originateStream(str)
|
||||
return l.originateStream(str, h)
|
||||
}
|
||||
|
||||
func (l *loopyWriter) originateStream(str *outStream) error {
|
||||
hdr := str.itl.dequeue().(*headerFrame)
|
||||
if err := hdr.initStream(str.id); err != nil {
|
||||
if err == ErrConnClosing {
|
||||
return err
|
||||
}
|
||||
// Other errors(errStreamDrain) need not close transport.
|
||||
func (l *loopyWriter) originateStream(str *outStream, hdr *headerFrame) error {
|
||||
// l.draining is set when handling GoAway. In which case, we want to avoid
|
||||
// creating new streams.
|
||||
if l.draining {
|
||||
// TODO: provide a better error with the reason we are in draining.
|
||||
hdr.onOrphaned(errStreamDrain)
|
||||
return nil
|
||||
}
|
||||
if err := hdr.initStream(str.id); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -682,8 +695,8 @@ func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.He
|
||||
l.hBuf.Reset()
|
||||
for _, f := range hf {
|
||||
if err := l.hEnc.WriteField(f); err != nil {
|
||||
if logger.V(logLevel) {
|
||||
logger.Warningf("transport: loopyWriter.writeHeader encountered error while encoding headers: %v", err)
|
||||
if l.logger.V(logLevel) {
|
||||
l.logger.Warningf("Encountered error while encoding headers: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -721,10 +734,10 @@ func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.He
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *loopyWriter) preprocessData(df *dataFrame) error {
|
||||
func (l *loopyWriter) preprocessData(df *dataFrame) {
|
||||
str, ok := l.estdStreams[df.streamID]
|
||||
if !ok {
|
||||
return nil
|
||||
return
|
||||
}
|
||||
// If we got data for a stream it means that
|
||||
// stream was originated and the headers were sent out.
|
||||
@ -733,7 +746,6 @@ func (l *loopyWriter) preprocessData(df *dataFrame) error {
|
||||
str.state = active
|
||||
l.activeStreams.enqueue(str)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *loopyWriter) pingHandler(p *ping) error {
|
||||
@ -744,9 +756,8 @@ func (l *loopyWriter) pingHandler(p *ping) error {
|
||||
|
||||
}
|
||||
|
||||
func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error {
|
||||
func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) {
|
||||
o.resp <- l.sendQuota
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
|
||||
@ -763,8 +774,9 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
|
||||
return ErrConnClosing
|
||||
if l.draining && len(l.estdStreams) == 0 {
|
||||
// Flush and close the connection; we are done with it.
|
||||
return errors.New("finished processing active streams while in draining mode")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -799,7 +811,8 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
|
||||
if l.side == clientSide {
|
||||
l.draining = true
|
||||
if len(l.estdStreams) == 0 {
|
||||
return ErrConnClosing
|
||||
// Flush and close the connection; we are done with it.
|
||||
return errors.New("received GOAWAY with no active streams")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@ -820,7 +833,7 @@ func (l *loopyWriter) goAwayHandler(g *goAway) error {
|
||||
func (l *loopyWriter) handle(i interface{}) error {
|
||||
switch i := i.(type) {
|
||||
case *incomingWindowUpdate:
|
||||
return l.incomingWindowUpdateHandler(i)
|
||||
l.incomingWindowUpdateHandler(i)
|
||||
case *outgoingWindowUpdate:
|
||||
return l.outgoingWindowUpdateHandler(i)
|
||||
case *incomingSettings:
|
||||
@ -830,7 +843,7 @@ func (l *loopyWriter) handle(i interface{}) error {
|
||||
case *headerFrame:
|
||||
return l.headerHandler(i)
|
||||
case *registerStream:
|
||||
return l.registerStreamHandler(i)
|
||||
l.registerStreamHandler(i)
|
||||
case *cleanupStream:
|
||||
return l.cleanupStreamHandler(i)
|
||||
case *earlyAbortStream:
|
||||
@ -838,19 +851,24 @@ func (l *loopyWriter) handle(i interface{}) error {
|
||||
case *incomingGoAway:
|
||||
return l.incomingGoAwayHandler(i)
|
||||
case *dataFrame:
|
||||
return l.preprocessData(i)
|
||||
l.preprocessData(i)
|
||||
case *ping:
|
||||
return l.pingHandler(i)
|
||||
case *goAway:
|
||||
return l.goAwayHandler(i)
|
||||
case *outFlowControlSizeRequest:
|
||||
return l.outFlowControlSizeRequestHandler(i)
|
||||
l.outFlowControlSizeRequestHandler(i)
|
||||
case closeConnection:
|
||||
// Just return a non-I/O error and run() will flush and close the
|
||||
// connection.
|
||||
return ErrConnClosing
|
||||
default:
|
||||
return fmt.Errorf("transport: unknown control message type %T", i)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *loopyWriter) applySettings(ss []http2.Setting) error {
|
||||
func (l *loopyWriter) applySettings(ss []http2.Setting) {
|
||||
for _, s := range ss {
|
||||
switch s.ID {
|
||||
case http2.SettingInitialWindowSize:
|
||||
@ -869,7 +887,6 @@ func (l *loopyWriter) applySettings(ss []http2.Setting) error {
|
||||
updateHeaderTblSize(l.hEnc, s.Val)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// processData removes the first stream from active streams, writes out at most 16KB
|
||||
@ -903,7 +920,7 @@ func (l *loopyWriter) processData() (bool, error) {
|
||||
return false, err
|
||||
}
|
||||
if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
|
||||
return false, nil
|
||||
return false, err
|
||||
}
|
||||
} else {
|
||||
l.activeStreams.enqueue(str)
|
||||
|
6
vendor/google.golang.org/grpc/internal/transport/defaults.go
generated
vendored
6
vendor/google.golang.org/grpc/internal/transport/defaults.go
generated
vendored
@ -47,3 +47,9 @@ const (
|
||||
defaultClientMaxHeaderListSize = uint32(16 << 20)
|
||||
defaultServerMaxHeaderListSize = uint32(16 << 20)
|
||||
)
|
||||
|
||||
// MaxStreamID is the upper bound for the stream ID before the current
|
||||
// transport gracefully closes and new transport is created for subsequent RPCs.
|
||||
// This is set to 75% of 2^31-1. Streams are identified with an unsigned 31-bit
|
||||
// integer. It's exported so that tests can override it.
|
||||
var MaxStreamID = uint32(math.MaxInt32 * 3 / 4)
|
||||
|
52
vendor/google.golang.org/grpc/internal/transport/handler_server.go
generated
vendored
52
vendor/google.golang.org/grpc/internal/transport/handler_server.go
generated
vendored
@ -39,6 +39,7 @@ import (
|
||||
"golang.org/x/net/http2"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/internal/grpclog"
|
||||
"google.golang.org/grpc/internal/grpcutil"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
@ -46,24 +47,32 @@ import (
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// NewServerHandlerTransport returns a ServerTransport handling gRPC
|
||||
// from inside an http.Handler. It requires that the http Server
|
||||
// supports HTTP/2.
|
||||
// NewServerHandlerTransport returns a ServerTransport handling gRPC from
|
||||
// inside an http.Handler, or writes an HTTP error to w and returns an error.
|
||||
// It requires that the http Server supports HTTP/2.
|
||||
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) {
|
||||
if r.ProtoMajor != 2 {
|
||||
return nil, errors.New("gRPC requires HTTP/2")
|
||||
msg := "gRPC requires HTTP/2"
|
||||
http.Error(w, msg, http.StatusBadRequest)
|
||||
return nil, errors.New(msg)
|
||||
}
|
||||
if r.Method != "POST" {
|
||||
return nil, errors.New("invalid gRPC request method")
|
||||
msg := fmt.Sprintf("invalid gRPC request method %q", r.Method)
|
||||
http.Error(w, msg, http.StatusBadRequest)
|
||||
return nil, errors.New(msg)
|
||||
}
|
||||
contentType := r.Header.Get("Content-Type")
|
||||
// TODO: do we assume contentType is lowercase? we did before
|
||||
contentSubtype, validContentType := grpcutil.ContentSubtype(contentType)
|
||||
if !validContentType {
|
||||
return nil, errors.New("invalid gRPC request content-type")
|
||||
msg := fmt.Sprintf("invalid gRPC request content-type %q", contentType)
|
||||
http.Error(w, msg, http.StatusUnsupportedMediaType)
|
||||
return nil, errors.New(msg)
|
||||
}
|
||||
if _, ok := w.(http.Flusher); !ok {
|
||||
return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher")
|
||||
msg := "gRPC requires a ResponseWriter supporting http.Flusher"
|
||||
http.Error(w, msg, http.StatusInternalServerError)
|
||||
return nil, errors.New(msg)
|
||||
}
|
||||
|
||||
st := &serverHandlerTransport{
|
||||
@ -75,11 +84,14 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
|
||||
contentSubtype: contentSubtype,
|
||||
stats: stats,
|
||||
}
|
||||
st.logger = prefixLoggerForServerHandlerTransport(st)
|
||||
|
||||
if v := r.Header.Get("grpc-timeout"); v != "" {
|
||||
to, err := decodeTimeout(v)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "malformed time-out: %v", err)
|
||||
msg := fmt.Sprintf("malformed grpc-timeout: %v", err)
|
||||
http.Error(w, msg, http.StatusBadRequest)
|
||||
return nil, status.Error(codes.Internal, msg)
|
||||
}
|
||||
st.timeoutSet = true
|
||||
st.timeout = to
|
||||
@ -97,7 +109,9 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
|
||||
for _, v := range vv {
|
||||
v, err := decodeMetadataHeader(k, v)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "malformed binary metadata: %v", err)
|
||||
msg := fmt.Sprintf("malformed binary metadata %q in header %q: %v", v, k, err)
|
||||
http.Error(w, msg, http.StatusBadRequest)
|
||||
return nil, status.Error(codes.Internal, msg)
|
||||
}
|
||||
metakv = append(metakv, k, v)
|
||||
}
|
||||
@ -138,15 +152,19 @@ type serverHandlerTransport struct {
|
||||
// TODO make sure this is consistent across handler_server and http2_server
|
||||
contentSubtype string
|
||||
|
||||
stats []stats.Handler
|
||||
stats []stats.Handler
|
||||
logger *grpclog.PrefixLogger
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) Close() {
|
||||
ht.closeOnce.Do(ht.closeCloseChanOnce)
|
||||
func (ht *serverHandlerTransport) Close(err error) {
|
||||
ht.closeOnce.Do(func() {
|
||||
if ht.logger.V(logLevel) {
|
||||
ht.logger.Infof("Closing: %v", err)
|
||||
}
|
||||
close(ht.closedCh)
|
||||
})
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) }
|
||||
|
||||
func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }
|
||||
|
||||
// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
|
||||
@ -236,7 +254,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
|
||||
})
|
||||
}
|
||||
}
|
||||
ht.Close()
|
||||
ht.Close(errors.New("finished writing status"))
|
||||
return err
|
||||
}
|
||||
|
||||
@ -346,7 +364,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
|
||||
case <-ht.req.Context().Done():
|
||||
}
|
||||
cancel()
|
||||
ht.Close()
|
||||
ht.Close(errors.New("request is done processing"))
|
||||
}()
|
||||
|
||||
req := ht.req
|
||||
@ -435,7 +453,7 @@ func (ht *serverHandlerTransport) IncrMsgSent() {}
|
||||
|
||||
func (ht *serverHandlerTransport) IncrMsgRecv() {}
|
||||
|
||||
func (ht *serverHandlerTransport) Drain() {
|
||||
func (ht *serverHandlerTransport) Drain(debugData string) {
|
||||
panic("Drain() is not implemented")
|
||||
}
|
||||
|
||||
|
124
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
124
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
@ -38,6 +38,7 @@ import (
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
icredentials "google.golang.org/grpc/internal/credentials"
|
||||
"google.golang.org/grpc/internal/grpclog"
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/internal/grpcutil"
|
||||
imetadata "google.golang.org/grpc/internal/metadata"
|
||||
@ -59,11 +60,15 @@ var clientConnectionCounter uint64
|
||||
|
||||
// http2Client implements the ClientTransport interface with HTTP2.
|
||||
type http2Client struct {
|
||||
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
|
||||
userAgent string
|
||||
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
|
||||
userAgent string
|
||||
// address contains the resolver returned address for this transport.
|
||||
// If the `ServerName` field is set, it takes precedence over `CallHdr.Host`
|
||||
// passed to `NewStream`, when determining the :authority header.
|
||||
address resolver.Address
|
||||
md metadata.MD
|
||||
conn net.Conn // underlying communication channel
|
||||
loopy *loopyWriter
|
||||
@ -136,12 +141,12 @@ type http2Client struct {
|
||||
channelzID *channelz.Identifier
|
||||
czData *channelzData
|
||||
|
||||
onGoAway func(GoAwayReason)
|
||||
onClose func()
|
||||
onClose func(GoAwayReason)
|
||||
|
||||
bufferPool *bufferPool
|
||||
|
||||
connectionID uint64
|
||||
logger *grpclog.PrefixLogger
|
||||
}
|
||||
|
||||
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
|
||||
@ -193,7 +198,7 @@ func isTemporary(err error) bool {
|
||||
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
||||
// and starts to receive messages on it. Non-nil error returns if construction
|
||||
// fails.
|
||||
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
|
||||
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) {
|
||||
scheme := "http"
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer func() {
|
||||
@ -213,7 +218,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
if opts.FailOnNonTempDialError {
|
||||
return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
|
||||
}
|
||||
return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
|
||||
return nil, connectionErrorf(true, err, "transport: Error while dialing: %v", err)
|
||||
}
|
||||
|
||||
// Any further errors will close the underlying connection
|
||||
@ -238,8 +243,11 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
go func(conn net.Conn) {
|
||||
defer ctxMonitorDone.Fire() // Signal this goroutine has exited.
|
||||
<-newClientCtx.Done() // Block until connectCtx expires or the defer above executes.
|
||||
if connectCtx.Err() != nil {
|
||||
if err := connectCtx.Err(); err != nil {
|
||||
// connectCtx expired before exiting the function. Hard close the connection.
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("Aborting due to connect deadline expiring: %v", err)
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
}(conn)
|
||||
@ -314,6 +322,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
cancel: cancel,
|
||||
userAgent: opts.UserAgent,
|
||||
registeredCompressors: grpcutil.RegisteredCompressors(),
|
||||
address: addr,
|
||||
conn: conn,
|
||||
remoteAddr: conn.RemoteAddr(),
|
||||
localAddr: conn.LocalAddr(),
|
||||
@ -335,11 +344,11 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
streamQuota: defaultMaxStreamsClient,
|
||||
streamsQuotaAvailable: make(chan struct{}, 1),
|
||||
czData: new(channelzData),
|
||||
onGoAway: onGoAway,
|
||||
keepaliveEnabled: keepaliveEnabled,
|
||||
bufferPool: newBufferPool(),
|
||||
onClose: onClose,
|
||||
}
|
||||
t.logger = prefixLoggerForClientTransport(t)
|
||||
// Add peer information to the http2client context.
|
||||
t.ctx = peer.NewContext(t.ctx, t.getPeer())
|
||||
|
||||
@ -438,17 +447,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
return nil, err
|
||||
}
|
||||
go func() {
|
||||
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
|
||||
err := t.loopy.run()
|
||||
if err != nil {
|
||||
if logger.V(logLevel) {
|
||||
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
|
||||
}
|
||||
}
|
||||
// Do not close the transport. Let reader goroutine handle it since
|
||||
// there might be data in the buffers.
|
||||
t.conn.Close()
|
||||
t.controlBuf.finish()
|
||||
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
|
||||
t.loopy.run()
|
||||
close(t.writerDone)
|
||||
}()
|
||||
return t, nil
|
||||
@ -702,6 +702,18 @@ func (e NewStreamError) Error() string {
|
||||
// streams. All non-nil errors returned will be *NewStreamError.
|
||||
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
|
||||
ctx = peer.NewContext(ctx, t.getPeer())
|
||||
|
||||
// ServerName field of the resolver returned address takes precedence over
|
||||
// Host field of CallHdr to determine the :authority header. This is because,
|
||||
// the ServerName field takes precedence for server authentication during
|
||||
// TLS handshake, and the :authority header should match the value used
|
||||
// for server authentication.
|
||||
if t.address.ServerName != "" {
|
||||
newCallHdr := *callHdr
|
||||
newCallHdr.Host = t.address.ServerName
|
||||
callHdr = &newCallHdr
|
||||
}
|
||||
|
||||
headerFields, err := t.createHeaderFields(ctx, callHdr)
|
||||
if err != nil {
|
||||
return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
|
||||
@ -726,15 +738,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||
endStream: false,
|
||||
initStream: func(id uint32) error {
|
||||
t.mu.Lock()
|
||||
if state := t.state; state != reachable {
|
||||
// TODO: handle transport closure in loopy instead and remove this
|
||||
// initStream is never called when transport is draining.
|
||||
if t.state == closing {
|
||||
t.mu.Unlock()
|
||||
// Do a quick cleanup.
|
||||
err := error(errStreamDrain)
|
||||
if state == closing {
|
||||
err = ErrConnClosing
|
||||
}
|
||||
cleanup(err)
|
||||
return err
|
||||
cleanup(ErrConnClosing)
|
||||
return ErrConnClosing
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
atomic.AddInt64(&t.czData.streamsStarted, 1)
|
||||
@ -752,6 +761,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||
}
|
||||
firstTry := true
|
||||
var ch chan struct{}
|
||||
transportDrainRequired := false
|
||||
checkForStreamQuota := func(it interface{}) bool {
|
||||
if t.streamQuota <= 0 { // Can go negative if server decreases it.
|
||||
if firstTry {
|
||||
@ -767,10 +777,15 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||
h := it.(*headerFrame)
|
||||
h.streamID = t.nextID
|
||||
t.nextID += 2
|
||||
|
||||
// Drain client transport if nextID > MaxStreamID which signals gRPC that
|
||||
// the connection is closed and a new one must be created for subsequent RPCs.
|
||||
transportDrainRequired = t.nextID > MaxStreamID
|
||||
|
||||
s.id = h.streamID
|
||||
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
|
||||
t.mu.Lock()
|
||||
if t.activeStreams == nil { // Can be niled from Close().
|
||||
if t.state == draining || t.activeStreams == nil { // Can be niled from Close().
|
||||
t.mu.Unlock()
|
||||
return false // Don't create a stream if the transport is already closed.
|
||||
}
|
||||
@ -846,6 +861,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||
sh.HandleRPC(s.ctx, outHeader)
|
||||
}
|
||||
}
|
||||
if transportDrainRequired {
|
||||
if t.logger.V(logLevel) {
|
||||
t.logger.Infof("Draining transport: t.nextID > MaxStreamID")
|
||||
}
|
||||
t.GracefulClose()
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
@ -934,9 +955,14 @@ func (t *http2Client) Close(err error) {
|
||||
t.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if t.logger.V(logLevel) {
|
||||
t.logger.Infof("Closing: %v", err)
|
||||
}
|
||||
// Call t.onClose ASAP to prevent the client from attempting to create new
|
||||
// streams.
|
||||
t.onClose()
|
||||
if t.state != draining {
|
||||
t.onClose(GoAwayInvalid)
|
||||
}
|
||||
t.state = closing
|
||||
streams := t.activeStreams
|
||||
t.activeStreams = nil
|
||||
@ -986,11 +1012,15 @@ func (t *http2Client) GracefulClose() {
|
||||
t.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if t.logger.V(logLevel) {
|
||||
t.logger.Infof("GracefulClose called")
|
||||
}
|
||||
t.onClose(GoAwayInvalid)
|
||||
t.state = draining
|
||||
active := len(t.activeStreams)
|
||||
t.mu.Unlock()
|
||||
if active == 0 {
|
||||
t.Close(ErrConnClosing)
|
||||
t.Close(connectionErrorf(true, nil, "no active streams left to process while draining"))
|
||||
return
|
||||
}
|
||||
t.controlBuf.put(&incomingGoAway{})
|
||||
@ -1147,8 +1177,8 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
|
||||
}
|
||||
statusCode, ok := http2ErrConvTab[f.ErrCode]
|
||||
if !ok {
|
||||
if logger.V(logLevel) {
|
||||
logger.Warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
|
||||
if t.logger.V(logLevel) {
|
||||
t.logger.Infof("Received a RST_STREAM frame with code %q, but found no mapped gRPC status", f.ErrCode)
|
||||
}
|
||||
statusCode = codes.Unknown
|
||||
}
|
||||
@ -1230,10 +1260,12 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
t.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
|
||||
}
|
||||
if f.ErrCode == http2.ErrCodeEnhanceYourCalm && string(f.DebugData()) == "too_many_pings" {
|
||||
// When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
|
||||
// data equal to ASCII "too_many_pings", it should log the occurrence at a log level that is
|
||||
// enabled by default and double the configure KEEPALIVE_TIME used for new connections
|
||||
// on that channel.
|
||||
logger.Errorf("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\".")
|
||||
}
|
||||
id := f.LastStreamID
|
||||
if id > 0 && id%2 == 0 {
|
||||
@ -1266,8 +1298,10 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
// Notify the clientconn about the GOAWAY before we set the state to
|
||||
// draining, to allow the client to stop attempting to create streams
|
||||
// before disallowing new streams on this connection.
|
||||
t.onGoAway(t.goAwayReason)
|
||||
t.state = draining
|
||||
if t.state != draining {
|
||||
t.onClose(t.goAwayReason)
|
||||
t.state = draining
|
||||
}
|
||||
}
|
||||
// All streams with IDs greater than the GoAwayId
|
||||
// and smaller than the previous GoAway ID should be killed.
|
||||
@ -1303,7 +1337,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
|
||||
// setGoAwayReason sets the value of t.goAwayReason based
|
||||
// on the GoAway frame received.
|
||||
// It expects a lock on transport's mutext to be held by
|
||||
// It expects a lock on transport's mutex to be held by
|
||||
// the caller.
|
||||
func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
|
||||
t.goAwayReason = GoAwayNoReason
|
||||
@ -1756,3 +1790,9 @@ func (t *http2Client) getOutFlowWindow() int64 {
|
||||
return -2
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Client) stateForTesting() transportState {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
return t.state
|
||||
}
|
||||
|
218
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
218
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
@ -21,6 +21,7 @@ package transport
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
@ -34,13 +35,16 @@ import (
|
||||
"github.com/golang/protobuf/proto"
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/hpack"
|
||||
"google.golang.org/grpc/internal/grpclog"
|
||||
"google.golang.org/grpc/internal/grpcutil"
|
||||
"google.golang.org/grpc/internal/pretty"
|
||||
"google.golang.org/grpc/internal/syscall"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/grpcrand"
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
@ -101,13 +105,13 @@ type http2Server struct {
|
||||
|
||||
mu sync.Mutex // guard the following
|
||||
|
||||
// drainChan is initialized when Drain() is called the first time.
|
||||
// After which the server writes out the first GoAway(with ID 2^31-1) frame.
|
||||
// Then an independent goroutine will be launched to later send the second GoAway.
|
||||
// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
|
||||
// Thus call to Drain() will be a no-op if drainChan is already initialized since draining is
|
||||
// already underway.
|
||||
drainChan chan struct{}
|
||||
// drainEvent is initialized when Drain() is called the first time. After
|
||||
// which the server writes out the first GoAway(with ID 2^31-1) frame. Then
|
||||
// an independent goroutine will be launched to later send the second
|
||||
// GoAway. During this time we don't want to write another first GoAway(with
|
||||
// ID 2^31 -1) frame. Thus call to Drain() will be a no-op if drainEvent is
|
||||
// already initialized since draining is already underway.
|
||||
drainEvent *grpcsync.Event
|
||||
state transportState
|
||||
activeStreams map[uint32]*Stream
|
||||
// idle is the time instant when the connection went idle.
|
||||
@ -127,6 +131,8 @@ type http2Server struct {
|
||||
// This lock may not be taken if mu is already held.
|
||||
maxStreamMu sync.Mutex
|
||||
maxStreamID uint32 // max stream ID ever seen
|
||||
|
||||
logger *grpclog.PrefixLogger
|
||||
}
|
||||
|
||||
// NewServerTransport creates a http2 transport with conn and configuration
|
||||
@ -265,6 +271,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
czData: new(channelzData),
|
||||
bufferPool: newBufferPool(),
|
||||
}
|
||||
t.logger = prefixLoggerForServerTransport(t)
|
||||
// Add peer information to the http2server context.
|
||||
t.ctx = peer.NewContext(t.ctx, t.getPeer())
|
||||
|
||||
@ -293,7 +300,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
t.Close()
|
||||
t.Close(err)
|
||||
}
|
||||
}()
|
||||
|
||||
@ -329,23 +336,18 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
t.handleSettings(sf)
|
||||
|
||||
go func() {
|
||||
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
|
||||
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
|
||||
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
|
||||
if err := t.loopy.run(); err != nil {
|
||||
if logger.V(logLevel) {
|
||||
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
|
||||
}
|
||||
}
|
||||
t.conn.Close()
|
||||
t.controlBuf.finish()
|
||||
t.loopy.run()
|
||||
close(t.writerDone)
|
||||
}()
|
||||
go t.keepalive()
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// operateHeader takes action on the decoded headers.
|
||||
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
|
||||
// operateHeaders takes action on the decoded headers. Returns an error if fatal
|
||||
// error encountered and transport needs to close, otherwise returns nil.
|
||||
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) error {
|
||||
// Acquire max stream ID lock for entire duration
|
||||
t.maxStreamMu.Lock()
|
||||
defer t.maxStreamMu.Unlock()
|
||||
@ -361,15 +363,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
rstCode: http2.ErrCodeFrameSize,
|
||||
onWrite: func() {},
|
||||
})
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
|
||||
if streamID%2 != 1 || streamID <= t.maxStreamID {
|
||||
// illegal gRPC stream id.
|
||||
if logger.V(logLevel) {
|
||||
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
|
||||
}
|
||||
return true
|
||||
return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", streamID, frame)
|
||||
}
|
||||
t.maxStreamID = streamID
|
||||
|
||||
@ -381,13 +380,14 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
fc: &inFlow{limit: uint32(t.initialWindowSize)},
|
||||
}
|
||||
var (
|
||||
// If a gRPC Response-Headers has already been received, then it means
|
||||
// that the peer is speaking gRPC and we are in gRPC mode.
|
||||
isGRPC = false
|
||||
mdata = make(map[string][]string)
|
||||
httpMethod string
|
||||
// headerError is set if an error is encountered while parsing the headers
|
||||
headerError bool
|
||||
// if false, content-type was missing or invalid
|
||||
isGRPC = false
|
||||
contentType = ""
|
||||
mdata = make(metadata.MD, len(frame.Fields))
|
||||
httpMethod string
|
||||
// these are set if an error is encountered while parsing the headers
|
||||
protocolError bool
|
||||
headerError *status.Status
|
||||
|
||||
timeoutSet bool
|
||||
timeout time.Duration
|
||||
@ -398,11 +398,23 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
case "content-type":
|
||||
contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
|
||||
if !validContentType {
|
||||
contentType = hf.Value
|
||||
break
|
||||
}
|
||||
mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
|
||||
s.contentSubtype = contentSubtype
|
||||
isGRPC = true
|
||||
|
||||
case "grpc-accept-encoding":
|
||||
mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
|
||||
if hf.Value == "" {
|
||||
continue
|
||||
}
|
||||
compressors := hf.Value
|
||||
if s.clientAdvertisedCompressors != "" {
|
||||
compressors = s.clientAdvertisedCompressors + "," + compressors
|
||||
}
|
||||
s.clientAdvertisedCompressors = compressors
|
||||
case "grpc-encoding":
|
||||
s.recvCompress = hf.Value
|
||||
case ":method":
|
||||
@ -413,23 +425,23 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
timeoutSet = true
|
||||
var err error
|
||||
if timeout, err = decodeTimeout(hf.Value); err != nil {
|
||||
headerError = true
|
||||
headerError = status.Newf(codes.Internal, "malformed grpc-timeout: %v", err)
|
||||
}
|
||||
// "Transports must consider requests containing the Connection header
|
||||
// as malformed." - A41
|
||||
case "connection":
|
||||
if logger.V(logLevel) {
|
||||
logger.Errorf("transport: http2Server.operateHeaders parsed a :connection header which makes a request malformed as per the HTTP/2 spec")
|
||||
if t.logger.V(logLevel) {
|
||||
t.logger.Infof("Received a HEADERS frame with a :connection header which makes the request malformed, as per the HTTP/2 spec")
|
||||
}
|
||||
headerError = true
|
||||
protocolError = true
|
||||
default:
|
||||
if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
|
||||
break
|
||||
}
|
||||
v, err := decodeMetadataHeader(hf.Name, hf.Value)
|
||||
if err != nil {
|
||||
headerError = true
|
||||
logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
|
||||
headerError = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", hf.Value, hf.Name, err)
|
||||
t.logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
|
||||
break
|
||||
}
|
||||
mdata[hf.Name] = append(mdata[hf.Name], v)
|
||||
@ -443,27 +455,47 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
// error, this takes precedence over a client not speaking gRPC.
|
||||
if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
|
||||
errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
|
||||
if logger.V(logLevel) {
|
||||
logger.Errorf("transport: %v", errMsg)
|
||||
if t.logger.V(logLevel) {
|
||||
t.logger.Infof("Aborting the stream early: %v", errMsg)
|
||||
}
|
||||
t.controlBuf.put(&earlyAbortStream{
|
||||
httpStatus: 400,
|
||||
httpStatus: http.StatusBadRequest,
|
||||
streamID: streamID,
|
||||
contentSubtype: s.contentSubtype,
|
||||
status: status.New(codes.Internal, errMsg),
|
||||
rst: !frame.StreamEnded(),
|
||||
})
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
|
||||
if !isGRPC || headerError {
|
||||
if protocolError {
|
||||
t.controlBuf.put(&cleanupStream{
|
||||
streamID: streamID,
|
||||
rst: true,
|
||||
rstCode: http2.ErrCodeProtocol,
|
||||
onWrite: func() {},
|
||||
})
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
if !isGRPC {
|
||||
t.controlBuf.put(&earlyAbortStream{
|
||||
httpStatus: http.StatusUnsupportedMediaType,
|
||||
streamID: streamID,
|
||||
contentSubtype: s.contentSubtype,
|
||||
status: status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType),
|
||||
rst: !frame.StreamEnded(),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
if headerError != nil {
|
||||
t.controlBuf.put(&earlyAbortStream{
|
||||
httpStatus: http.StatusBadRequest,
|
||||
streamID: streamID,
|
||||
contentSubtype: s.contentSubtype,
|
||||
status: headerError,
|
||||
rst: !frame.StreamEnded(),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// "If :authority is missing, Host must be renamed to :authority." - A41
|
||||
@ -503,7 +535,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
if t.state != reachable {
|
||||
t.mu.Unlock()
|
||||
s.cancel()
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
if uint32(len(t.activeStreams)) >= t.maxStreams {
|
||||
t.mu.Unlock()
|
||||
@ -514,13 +546,13 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
onWrite: func() {},
|
||||
})
|
||||
s.cancel()
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
if httpMethod != http.MethodPost {
|
||||
t.mu.Unlock()
|
||||
errMsg := fmt.Sprintf("http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod)
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("transport: %v", errMsg)
|
||||
errMsg := fmt.Sprintf("Received a HEADERS frame with :method %q which should be POST", httpMethod)
|
||||
if t.logger.V(logLevel) {
|
||||
t.logger.Infof("Aborting the stream early: %v", errMsg)
|
||||
}
|
||||
t.controlBuf.put(&earlyAbortStream{
|
||||
httpStatus: 405,
|
||||
@ -530,14 +562,14 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
rst: !frame.StreamEnded(),
|
||||
})
|
||||
s.cancel()
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
if t.inTapHandle != nil {
|
||||
var err error
|
||||
if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method}); err != nil {
|
||||
t.mu.Unlock()
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
|
||||
if t.logger.V(logLevel) {
|
||||
t.logger.Infof("Aborting the stream early due to InTapHandle failure: %v", err)
|
||||
}
|
||||
stat, ok := status.FromError(err)
|
||||
if !ok {
|
||||
@ -550,7 +582,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
status: stat,
|
||||
rst: !frame.StreamEnded(),
|
||||
})
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
}
|
||||
t.activeStreams[streamID] = s
|
||||
@ -574,7 +606,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
LocalAddr: t.localAddr,
|
||||
Compression: s.recvCompress,
|
||||
WireLength: int(frame.Header().Length),
|
||||
Header: metadata.MD(mdata).Copy(),
|
||||
Header: mdata.Copy(),
|
||||
}
|
||||
sh.HandleRPC(s.ctx, inHeader)
|
||||
}
|
||||
@ -597,7 +629,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
wq: s.wq,
|
||||
})
|
||||
handle(s)
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
|
||||
// HandleStreams receives incoming streams using the given handler. This is
|
||||
@ -611,8 +643,8 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
|
||||
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
|
||||
if err != nil {
|
||||
if se, ok := err.(http2.StreamError); ok {
|
||||
if logger.V(logLevel) {
|
||||
logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
|
||||
if t.logger.V(logLevel) {
|
||||
t.logger.Warningf("Encountered http2.StreamError: %v", se)
|
||||
}
|
||||
t.mu.Lock()
|
||||
s := t.activeStreams[se.StreamID]
|
||||
@ -630,19 +662,16 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
|
||||
continue
|
||||
}
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
t.Close()
|
||||
t.Close(err)
|
||||
return
|
||||
}
|
||||
if logger.V(logLevel) {
|
||||
logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
|
||||
}
|
||||
t.Close()
|
||||
t.Close(err)
|
||||
return
|
||||
}
|
||||
switch frame := frame.(type) {
|
||||
case *http2.MetaHeadersFrame:
|
||||
if t.operateHeaders(frame, handle, traceCtx) {
|
||||
t.Close()
|
||||
if err := t.operateHeaders(frame, handle, traceCtx); err != nil {
|
||||
t.Close(err)
|
||||
break
|
||||
}
|
||||
case *http2.DataFrame:
|
||||
@ -658,8 +687,8 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
|
||||
case *http2.GoAwayFrame:
|
||||
// TODO: Handle GoAway from the client appropriately.
|
||||
default:
|
||||
if logger.V(logLevel) {
|
||||
logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
|
||||
if t.logger.V(logLevel) {
|
||||
t.logger.Infof("Received unsupported frame type %T", frame)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -843,8 +872,8 @@ const (
|
||||
|
||||
func (t *http2Server) handlePing(f *http2.PingFrame) {
|
||||
if f.IsAck() {
|
||||
if f.Data == goAwayPing.data && t.drainChan != nil {
|
||||
close(t.drainChan)
|
||||
if f.Data == goAwayPing.data && t.drainEvent != nil {
|
||||
t.drainEvent.Fire()
|
||||
return
|
||||
}
|
||||
// Maybe it's a BDP ping.
|
||||
@ -886,10 +915,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
|
||||
|
||||
if t.pingStrikes > maxPingStrikes {
|
||||
// Send goaway and close the connection.
|
||||
if logger.V(logLevel) {
|
||||
logger.Errorf("transport: Got too many pings from the client, closing the connection.")
|
||||
}
|
||||
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
|
||||
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
|
||||
}
|
||||
}
|
||||
|
||||
@ -921,8 +947,8 @@ func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
|
||||
var sz int64
|
||||
for _, f := range hdrFrame.hf {
|
||||
if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
|
||||
if logger.V(logLevel) {
|
||||
logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
|
||||
if t.logger.V(logLevel) {
|
||||
t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
|
||||
}
|
||||
return false
|
||||
}
|
||||
@ -1035,7 +1061,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
stBytes, err := proto.Marshal(p)
|
||||
if err != nil {
|
||||
// TODO: return error instead, when callers are able to handle it.
|
||||
logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
|
||||
t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err)
|
||||
} else {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
|
||||
}
|
||||
@ -1140,20 +1166,20 @@ func (t *http2Server) keepalive() {
|
||||
if val <= 0 {
|
||||
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
|
||||
// Gracefully close the connection.
|
||||
t.Drain()
|
||||
t.Drain("max_idle")
|
||||
return
|
||||
}
|
||||
idleTimer.Reset(val)
|
||||
case <-ageTimer.C:
|
||||
t.Drain()
|
||||
t.Drain("max_age")
|
||||
ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
|
||||
select {
|
||||
case <-ageTimer.C:
|
||||
// Close the connection after grace period.
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("transport: closing server transport due to maximum connection age.")
|
||||
if t.logger.V(logLevel) {
|
||||
t.logger.Infof("Closing server transport due to maximum connection age")
|
||||
}
|
||||
t.Close()
|
||||
t.controlBuf.put(closeConnection{})
|
||||
case <-t.done:
|
||||
}
|
||||
return
|
||||
@ -1169,10 +1195,7 @@ func (t *http2Server) keepalive() {
|
||||
continue
|
||||
}
|
||||
if outstandingPing && kpTimeoutLeft <= 0 {
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("transport: closing server transport due to idleness.")
|
||||
}
|
||||
t.Close()
|
||||
t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Time))
|
||||
return
|
||||
}
|
||||
if !outstandingPing {
|
||||
@ -1199,20 +1222,23 @@ func (t *http2Server) keepalive() {
|
||||
// Close starts shutting down the http2Server transport.
|
||||
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
|
||||
// could cause some resource issue. Revisit this later.
|
||||
func (t *http2Server) Close() {
|
||||
func (t *http2Server) Close(err error) {
|
||||
t.mu.Lock()
|
||||
if t.state == closing {
|
||||
t.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if t.logger.V(logLevel) {
|
||||
t.logger.Infof("Closing: %v", err)
|
||||
}
|
||||
t.state = closing
|
||||
streams := t.activeStreams
|
||||
t.activeStreams = nil
|
||||
t.mu.Unlock()
|
||||
t.controlBuf.finish()
|
||||
close(t.done)
|
||||
if err := t.conn.Close(); err != nil && logger.V(logLevel) {
|
||||
logger.Infof("transport: error closing conn during Close: %v", err)
|
||||
if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {
|
||||
t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)
|
||||
}
|
||||
channelz.RemoveEntry(t.channelzID)
|
||||
// Cancel all active streams.
|
||||
@ -1292,14 +1318,14 @@ func (t *http2Server) RemoteAddr() net.Addr {
|
||||
return t.remoteAddr
|
||||
}
|
||||
|
||||
func (t *http2Server) Drain() {
|
||||
func (t *http2Server) Drain(debugData string) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
if t.drainChan != nil {
|
||||
if t.drainEvent != nil {
|
||||
return
|
||||
}
|
||||
t.drainChan = make(chan struct{})
|
||||
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte{}, headsUp: true})
|
||||
t.drainEvent = grpcsync.NewEvent()
|
||||
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})
|
||||
}
|
||||
|
||||
var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
|
||||
@ -1319,19 +1345,17 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
|
||||
// Stop accepting more streams now.
|
||||
t.state = draining
|
||||
sid := t.maxStreamID
|
||||
retErr := g.closeConn
|
||||
if len(t.activeStreams) == 0 {
|
||||
g.closeConn = true
|
||||
retErr = errors.New("second GOAWAY written and no active streams left to process")
|
||||
}
|
||||
t.mu.Unlock()
|
||||
t.maxStreamMu.Unlock()
|
||||
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
|
||||
return false, err
|
||||
}
|
||||
if g.closeConn {
|
||||
// Abruptly close the connection following the GoAway (via
|
||||
// loopywriter). But flush out what's inside the buffer first.
|
||||
t.framer.writer.Flush()
|
||||
return false, fmt.Errorf("transport: Connection closing")
|
||||
if retErr != nil {
|
||||
return false, retErr
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
@ -1343,7 +1367,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
|
||||
// originated before the GoAway reaches the client.
|
||||
// After getting the ack or timer expiration send out another GoAway this
|
||||
// time with an ID of the max stream server intends to process.
|
||||
if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
|
||||
if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil {
|
||||
return false, err
|
||||
}
|
||||
if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
|
||||
@ -1353,7 +1377,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
|
||||
timer := time.NewTimer(time.Minute)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-t.drainChan:
|
||||
case <-t.drainEvent.Done():
|
||||
case <-timer.C:
|
||||
case <-t.done:
|
||||
return
|
||||
|
26
vendor/google.golang.org/grpc/internal/transport/http_util.go
generated
vendored
26
vendor/google.golang.org/grpc/internal/transport/http_util.go
generated
vendored
@ -21,6 +21,7 @@ package transport
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
@ -37,7 +38,6 @@ import (
|
||||
"golang.org/x/net/http2/hpack"
|
||||
spb "google.golang.org/genproto/googleapis/rpc/status"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
@ -85,7 +85,6 @@ var (
|
||||
// 504 Gateway timeout - UNAVAILABLE.
|
||||
http.StatusGatewayTimeout: codes.Unavailable,
|
||||
}
|
||||
logger = grpclog.Component("transport")
|
||||
)
|
||||
|
||||
// isReservedHeader checks whether hdr belongs to HTTP2 headers
|
||||
@ -330,7 +329,8 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
|
||||
return 0, w.err
|
||||
}
|
||||
if w.batchSize == 0 { // Buffer has been disabled.
|
||||
return w.conn.Write(b)
|
||||
n, err = w.conn.Write(b)
|
||||
return n, toIOError(err)
|
||||
}
|
||||
for len(b) > 0 {
|
||||
nn := copy(w.buf[w.offset:], b)
|
||||
@ -352,10 +352,30 @@ func (w *bufWriter) Flush() error {
|
||||
return nil
|
||||
}
|
||||
_, w.err = w.conn.Write(w.buf[:w.offset])
|
||||
w.err = toIOError(w.err)
|
||||
w.offset = 0
|
||||
return w.err
|
||||
}
|
||||
|
||||
type ioError struct {
|
||||
error
|
||||
}
|
||||
|
||||
func (i ioError) Unwrap() error {
|
||||
return i.error
|
||||
}
|
||||
|
||||
func isIOError(err error) bool {
|
||||
return errors.As(err, &ioError{})
|
||||
}
|
||||
|
||||
func toIOError(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
return ioError{error: err}
|
||||
}
|
||||
|
||||
type framer struct {
|
||||
writer *bufWriter
|
||||
fr *http2.Framer
|
||||
|
40
vendor/google.golang.org/grpc/internal/transport/logging.go
generated
vendored
Normal file
40
vendor/google.golang.org/grpc/internal/transport/logging.go
generated
vendored
Normal file
@ -0,0 +1,40 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2023 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package transport
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"google.golang.org/grpc/grpclog"
|
||||
internalgrpclog "google.golang.org/grpc/internal/grpclog"
|
||||
)
|
||||
|
||||
var logger = grpclog.Component("transport")
|
||||
|
||||
func prefixLoggerForServerTransport(p *http2Server) *internalgrpclog.PrefixLogger {
|
||||
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[server-transport %p] ", p))
|
||||
}
|
||||
|
||||
func prefixLoggerForServerHandlerTransport(p *serverHandlerTransport) *internalgrpclog.PrefixLogger {
|
||||
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[server-handler-transport %p] ", p))
|
||||
}
|
||||
|
||||
func prefixLoggerForClientTransport(p *http2Client) *internalgrpclog.PrefixLogger {
|
||||
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[client-transport %p] ", p))
|
||||
}
|
31
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
31
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
@ -257,6 +257,9 @@ type Stream struct {
|
||||
fc *inFlow
|
||||
wq *writeQuota
|
||||
|
||||
// Holds compressor names passed in grpc-accept-encoding metadata from the
|
||||
// client. This is empty for the client side stream.
|
||||
clientAdvertisedCompressors string
|
||||
// Callback to state application's intentions to read data. This
|
||||
// is used to adjust flow control, if needed.
|
||||
requestRead func(int)
|
||||
@ -345,8 +348,24 @@ func (s *Stream) RecvCompress() string {
|
||||
}
|
||||
|
||||
// SetSendCompress sets the compression algorithm to the stream.
|
||||
func (s *Stream) SetSendCompress(str string) {
|
||||
s.sendCompress = str
|
||||
func (s *Stream) SetSendCompress(name string) error {
|
||||
if s.isHeaderSent() || s.getState() == streamDone {
|
||||
return errors.New("transport: set send compressor called after headers sent or stream done")
|
||||
}
|
||||
|
||||
s.sendCompress = name
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendCompress returns the send compressor name.
|
||||
func (s *Stream) SendCompress() string {
|
||||
return s.sendCompress
|
||||
}
|
||||
|
||||
// ClientAdvertisedCompressors returns the compressor names advertised by the
|
||||
// client via grpc-accept-encoding header.
|
||||
func (s *Stream) ClientAdvertisedCompressors() string {
|
||||
return s.clientAdvertisedCompressors
|
||||
}
|
||||
|
||||
// Done returns a channel which is closed when it receives the final status
|
||||
@ -583,8 +602,8 @@ type ConnectOptions struct {
|
||||
|
||||
// NewClientTransport establishes the transport with the required ConnectOptions
|
||||
// and returns it to the caller.
|
||||
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
|
||||
return newHTTP2Client(connectCtx, ctx, addr, opts, onGoAway, onClose)
|
||||
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error) {
|
||||
return newHTTP2Client(connectCtx, ctx, addr, opts, onClose)
|
||||
}
|
||||
|
||||
// Options provides additional hints and information for message
|
||||
@ -701,13 +720,13 @@ type ServerTransport interface {
|
||||
// Close tears down the transport. Once it is called, the transport
|
||||
// should not be accessed any more. All the pending streams and their
|
||||
// handlers will be terminated asynchronously.
|
||||
Close()
|
||||
Close(err error)
|
||||
|
||||
// RemoteAddr returns the remote network address.
|
||||
RemoteAddr() net.Addr
|
||||
|
||||
// Drain notifies the client this ServerTransport stops accepting new RPCs.
|
||||
Drain()
|
||||
Drain(debugData string)
|
||||
|
||||
// IncrMsgSent increments the number of message sent through this transport.
|
||||
IncrMsgSent()
|
||||
|
Reference in New Issue
Block a user