458 lines
10 KiB
Go
458 lines
10 KiB
Go
|
package plugin
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"crypto/tls"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"log"
|
||
|
"net"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
|
||
|
"github.com/hashicorp/go-plugin/internal/plugin"
|
||
|
|
||
|
"github.com/oklog/run"
|
||
|
"google.golang.org/grpc"
|
||
|
"google.golang.org/grpc/credentials"
|
||
|
)
|
||
|
|
||
|
// streamer interface is used in the broker to send/receive connection
|
||
|
// information.
|
||
|
type streamer interface {
|
||
|
Send(*plugin.ConnInfo) error
|
||
|
Recv() (*plugin.ConnInfo, error)
|
||
|
Close()
|
||
|
}
|
||
|
|
||
|
// sendErr is used to pass errors back during a send.
|
||
|
type sendErr struct {
|
||
|
i *plugin.ConnInfo
|
||
|
ch chan error
|
||
|
}
|
||
|
|
||
|
// gRPCBrokerServer is used by the plugin to start a stream and to send
|
||
|
// connection information to/from the plugin. Implements GRPCBrokerServer and
|
||
|
// streamer interfaces.
|
||
|
type gRPCBrokerServer struct {
|
||
|
// send is used to send connection info to the gRPC stream.
|
||
|
send chan *sendErr
|
||
|
|
||
|
// recv is used to receive connection info from the gRPC stream.
|
||
|
recv chan *plugin.ConnInfo
|
||
|
|
||
|
// quit closes down the stream.
|
||
|
quit chan struct{}
|
||
|
|
||
|
// o is used to ensure we close the quit channel only once.
|
||
|
o sync.Once
|
||
|
}
|
||
|
|
||
|
func newGRPCBrokerServer() *gRPCBrokerServer {
|
||
|
return &gRPCBrokerServer{
|
||
|
send: make(chan *sendErr),
|
||
|
recv: make(chan *plugin.ConnInfo),
|
||
|
quit: make(chan struct{}),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// StartStream implements the GRPCBrokerServer interface and will block until
|
||
|
// the quit channel is closed or the context reports Done. The stream will pass
|
||
|
// connection information to/from the client.
|
||
|
func (s *gRPCBrokerServer) StartStream(stream plugin.GRPCBroker_StartStreamServer) error {
|
||
|
doneCh := stream.Context().Done()
|
||
|
defer s.Close()
|
||
|
|
||
|
// Proccess send stream
|
||
|
go func() {
|
||
|
for {
|
||
|
select {
|
||
|
case <-doneCh:
|
||
|
return
|
||
|
case <-s.quit:
|
||
|
return
|
||
|
case se := <-s.send:
|
||
|
err := stream.Send(se.i)
|
||
|
se.ch <- err
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
// Process receive stream
|
||
|
for {
|
||
|
i, err := stream.Recv()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
select {
|
||
|
case <-doneCh:
|
||
|
return nil
|
||
|
case <-s.quit:
|
||
|
return nil
|
||
|
case s.recv <- i:
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Send is used by the GRPCBroker to pass connection information into the stream
|
||
|
// to the client.
|
||
|
func (s *gRPCBrokerServer) Send(i *plugin.ConnInfo) error {
|
||
|
ch := make(chan error)
|
||
|
defer close(ch)
|
||
|
|
||
|
select {
|
||
|
case <-s.quit:
|
||
|
return errors.New("broker closed")
|
||
|
case s.send <- &sendErr{
|
||
|
i: i,
|
||
|
ch: ch,
|
||
|
}:
|
||
|
}
|
||
|
|
||
|
return <-ch
|
||
|
}
|
||
|
|
||
|
// Recv is used by the GRPCBroker to pass connection information that has been
|
||
|
// sent from the client from the stream to the broker.
|
||
|
func (s *gRPCBrokerServer) Recv() (*plugin.ConnInfo, error) {
|
||
|
select {
|
||
|
case <-s.quit:
|
||
|
return nil, errors.New("broker closed")
|
||
|
case i := <-s.recv:
|
||
|
return i, nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Close closes the quit channel, shutting down the stream.
|
||
|
func (s *gRPCBrokerServer) Close() {
|
||
|
s.o.Do(func() {
|
||
|
close(s.quit)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// gRPCBrokerClientImpl is used by the client to start a stream and to send
|
||
|
// connection information to/from the client. Implements GRPCBrokerClient and
|
||
|
// streamer interfaces.
|
||
|
type gRPCBrokerClientImpl struct {
|
||
|
// client is the underlying GRPC client used to make calls to the server.
|
||
|
client plugin.GRPCBrokerClient
|
||
|
|
||
|
// send is used to send connection info to the gRPC stream.
|
||
|
send chan *sendErr
|
||
|
|
||
|
// recv is used to receive connection info from the gRPC stream.
|
||
|
recv chan *plugin.ConnInfo
|
||
|
|
||
|
// quit closes down the stream.
|
||
|
quit chan struct{}
|
||
|
|
||
|
// o is used to ensure we close the quit channel only once.
|
||
|
o sync.Once
|
||
|
}
|
||
|
|
||
|
func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl {
|
||
|
return &gRPCBrokerClientImpl{
|
||
|
client: plugin.NewGRPCBrokerClient(conn),
|
||
|
send: make(chan *sendErr),
|
||
|
recv: make(chan *plugin.ConnInfo),
|
||
|
quit: make(chan struct{}),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// StartStream implements the GRPCBrokerClient interface and will block until
|
||
|
// the quit channel is closed or the context reports Done. The stream will pass
|
||
|
// connection information to/from the plugin.
|
||
|
func (s *gRPCBrokerClientImpl) StartStream() error {
|
||
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
||
|
defer cancelFunc()
|
||
|
defer s.Close()
|
||
|
|
||
|
stream, err := s.client.StartStream(ctx)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
doneCh := stream.Context().Done()
|
||
|
|
||
|
go func() {
|
||
|
for {
|
||
|
select {
|
||
|
case <-doneCh:
|
||
|
return
|
||
|
case <-s.quit:
|
||
|
return
|
||
|
case se := <-s.send:
|
||
|
err := stream.Send(se.i)
|
||
|
se.ch <- err
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
for {
|
||
|
i, err := stream.Recv()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
select {
|
||
|
case <-doneCh:
|
||
|
return nil
|
||
|
case <-s.quit:
|
||
|
return nil
|
||
|
case s.recv <- i:
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Send is used by the GRPCBroker to pass connection information into the stream
|
||
|
// to the plugin.
|
||
|
func (s *gRPCBrokerClientImpl) Send(i *plugin.ConnInfo) error {
|
||
|
ch := make(chan error)
|
||
|
defer close(ch)
|
||
|
|
||
|
select {
|
||
|
case <-s.quit:
|
||
|
return errors.New("broker closed")
|
||
|
case s.send <- &sendErr{
|
||
|
i: i,
|
||
|
ch: ch,
|
||
|
}:
|
||
|
}
|
||
|
|
||
|
return <-ch
|
||
|
}
|
||
|
|
||
|
// Recv is used by the GRPCBroker to pass connection information that has been
|
||
|
// sent from the plugin to the broker.
|
||
|
func (s *gRPCBrokerClientImpl) Recv() (*plugin.ConnInfo, error) {
|
||
|
select {
|
||
|
case <-s.quit:
|
||
|
return nil, errors.New("broker closed")
|
||
|
case i := <-s.recv:
|
||
|
return i, nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Close closes the quit channel, shutting down the stream.
|
||
|
func (s *gRPCBrokerClientImpl) Close() {
|
||
|
s.o.Do(func() {
|
||
|
close(s.quit)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// GRPCBroker is responsible for brokering connections by unique ID.
|
||
|
//
|
||
|
// It is used by plugins to create multiple gRPC connections and data
|
||
|
// streams between the plugin process and the host process.
|
||
|
//
|
||
|
// This allows a plugin to request a channel with a specific ID to connect to
|
||
|
// or accept a connection from, and the broker handles the details of
|
||
|
// holding these channels open while they're being negotiated.
|
||
|
//
|
||
|
// The Plugin interface has access to these for both Server and Client.
|
||
|
// The broker can be used by either (optionally) to reserve and connect to
|
||
|
// new streams. This is useful for complex args and return values,
|
||
|
// or anything else you might need a data stream for.
|
||
|
type GRPCBroker struct {
|
||
|
nextId uint32
|
||
|
streamer streamer
|
||
|
streams map[uint32]*gRPCBrokerPending
|
||
|
tls *tls.Config
|
||
|
doneCh chan struct{}
|
||
|
o sync.Once
|
||
|
|
||
|
sync.Mutex
|
||
|
}
|
||
|
|
||
|
type gRPCBrokerPending struct {
|
||
|
ch chan *plugin.ConnInfo
|
||
|
doneCh chan struct{}
|
||
|
}
|
||
|
|
||
|
func newGRPCBroker(s streamer, tls *tls.Config) *GRPCBroker {
|
||
|
return &GRPCBroker{
|
||
|
streamer: s,
|
||
|
streams: make(map[uint32]*gRPCBrokerPending),
|
||
|
tls: tls,
|
||
|
doneCh: make(chan struct{}),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Accept accepts a connection by ID.
|
||
|
//
|
||
|
// This should not be called multiple times with the same ID at one time.
|
||
|
func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) {
|
||
|
listener, err := serverListener()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
err = b.streamer.Send(&plugin.ConnInfo{
|
||
|
ServiceId: id,
|
||
|
Network: listener.Addr().Network(),
|
||
|
Address: listener.Addr().String(),
|
||
|
})
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return listener, nil
|
||
|
}
|
||
|
|
||
|
// AcceptAndServe is used to accept a specific stream ID and immediately
|
||
|
// serve a gRPC server on that stream ID. This is used to easily serve
|
||
|
// complex arguments. Each AcceptAndServe call opens a new listener socket and
|
||
|
// sends the connection info down the stream to the dialer. Since a new
|
||
|
// connection is opened every call, these calls should be used sparingly.
|
||
|
// Multiple gRPC server implementations can be registered to a single
|
||
|
// AcceptAndServe call.
|
||
|
func (b *GRPCBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) {
|
||
|
listener, err := b.Accept(id)
|
||
|
if err != nil {
|
||
|
log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
|
||
|
return
|
||
|
}
|
||
|
defer listener.Close()
|
||
|
|
||
|
var opts []grpc.ServerOption
|
||
|
if b.tls != nil {
|
||
|
opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(b.tls))}
|
||
|
}
|
||
|
|
||
|
server := s(opts)
|
||
|
|
||
|
// Here we use a run group to close this goroutine if the server is shutdown
|
||
|
// or the broker is shutdown.
|
||
|
var g run.Group
|
||
|
{
|
||
|
// Serve on the listener, if shutting down call GracefulStop.
|
||
|
g.Add(func() error {
|
||
|
return server.Serve(listener)
|
||
|
}, func(err error) {
|
||
|
server.GracefulStop()
|
||
|
})
|
||
|
}
|
||
|
{
|
||
|
// block on the closeCh or the doneCh. If we are shutting down close the
|
||
|
// closeCh.
|
||
|
closeCh := make(chan struct{})
|
||
|
g.Add(func() error {
|
||
|
select {
|
||
|
case <-b.doneCh:
|
||
|
case <-closeCh:
|
||
|
}
|
||
|
return nil
|
||
|
}, func(err error) {
|
||
|
close(closeCh)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// Block until we are done
|
||
|
g.Run()
|
||
|
}
|
||
|
|
||
|
// Close closes the stream and all servers.
|
||
|
func (b *GRPCBroker) Close() error {
|
||
|
b.streamer.Close()
|
||
|
b.o.Do(func() {
|
||
|
close(b.doneCh)
|
||
|
})
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Dial opens a connection by ID.
|
||
|
func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) {
|
||
|
var c *plugin.ConnInfo
|
||
|
|
||
|
// Open the stream
|
||
|
p := b.getStream(id)
|
||
|
select {
|
||
|
case c = <-p.ch:
|
||
|
close(p.doneCh)
|
||
|
case <-time.After(5 * time.Second):
|
||
|
return nil, fmt.Errorf("timeout waiting for connection info")
|
||
|
}
|
||
|
|
||
|
var addr net.Addr
|
||
|
switch c.Network {
|
||
|
case "tcp":
|
||
|
addr, err = net.ResolveTCPAddr("tcp", c.Address)
|
||
|
case "unix":
|
||
|
addr, err = net.ResolveUnixAddr("unix", c.Address)
|
||
|
default:
|
||
|
err = fmt.Errorf("Unknown address type: %s", c.Address)
|
||
|
}
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return dialGRPCConn(b.tls, netAddrDialer(addr))
|
||
|
}
|
||
|
|
||
|
// NextId returns a unique ID to use next.
|
||
|
//
|
||
|
// It is possible for very long-running plugin hosts to wrap this value,
|
||
|
// though it would require a very large amount of calls. In practice
|
||
|
// we've never seen it happen.
|
||
|
func (m *GRPCBroker) NextId() uint32 {
|
||
|
return atomic.AddUint32(&m.nextId, 1)
|
||
|
}
|
||
|
|
||
|
// Run starts the brokering and should be executed in a goroutine, since it
|
||
|
// blocks forever, or until the session closes.
|
||
|
//
|
||
|
// Uses of GRPCBroker never need to call this. It is called internally by
|
||
|
// the plugin host/client.
|
||
|
func (m *GRPCBroker) Run() {
|
||
|
for {
|
||
|
stream, err := m.streamer.Recv()
|
||
|
if err != nil {
|
||
|
// Once we receive an error, just exit
|
||
|
break
|
||
|
}
|
||
|
|
||
|
// Initialize the waiter
|
||
|
p := m.getStream(stream.ServiceId)
|
||
|
select {
|
||
|
case p.ch <- stream:
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
go m.timeoutWait(stream.ServiceId, p)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending {
|
||
|
m.Lock()
|
||
|
defer m.Unlock()
|
||
|
|
||
|
p, ok := m.streams[id]
|
||
|
if ok {
|
||
|
return p
|
||
|
}
|
||
|
|
||
|
m.streams[id] = &gRPCBrokerPending{
|
||
|
ch: make(chan *plugin.ConnInfo, 1),
|
||
|
doneCh: make(chan struct{}),
|
||
|
}
|
||
|
return m.streams[id]
|
||
|
}
|
||
|
|
||
|
func (m *GRPCBroker) timeoutWait(id uint32, p *gRPCBrokerPending) {
|
||
|
// Wait for the stream to either be picked up and connected, or
|
||
|
// for a timeout.
|
||
|
select {
|
||
|
case <-p.doneCh:
|
||
|
case <-time.After(5 * time.Second):
|
||
|
}
|
||
|
|
||
|
m.Lock()
|
||
|
defer m.Unlock()
|
||
|
|
||
|
// Delete the stream so no one else can grab it
|
||
|
delete(m.streams, id)
|
||
|
}
|