198 lines
4.6 KiB
Go
198 lines
4.6 KiB
Go
package plugin
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"net/rpc"
|
|
"sync"
|
|
|
|
"github.com/hashicorp/yamux"
|
|
)
|
|
|
|
// RPCServer listens for network connections and then dispenses interface
|
|
// implementations over net/rpc.
|
|
//
|
|
// After setting the fields below, they shouldn't be read again directly
|
|
// from the structure which may be reading/writing them concurrently.
|
|
type RPCServer struct {
|
|
Plugins map[string]Plugin
|
|
|
|
// Stdout, Stderr are what this server will use instead of the
|
|
// normal stdin/out/err. This is because due to the multi-process nature
|
|
// of our plugin system, we can't use the normal process values so we
|
|
// make our own custom one we pipe across.
|
|
Stdout io.Reader
|
|
Stderr io.Reader
|
|
|
|
// DoneCh should be set to a non-nil channel that will be closed
|
|
// when the control requests the RPC server to end.
|
|
DoneCh chan<- struct{}
|
|
|
|
lock sync.Mutex
|
|
}
|
|
|
|
// ServerProtocol impl.
|
|
func (s *RPCServer) Init() error { return nil }
|
|
|
|
// ServerProtocol impl.
|
|
func (s *RPCServer) Config() string { return "" }
|
|
|
|
// ServerProtocol impl.
|
|
func (s *RPCServer) Serve(lis net.Listener) {
|
|
for {
|
|
conn, err := lis.Accept()
|
|
if err != nil {
|
|
log.Printf("[ERR] plugin: plugin server: %s", err)
|
|
return
|
|
}
|
|
|
|
go s.ServeConn(conn)
|
|
}
|
|
}
|
|
|
|
// ServeConn runs a single connection.
|
|
//
|
|
// ServeConn blocks, serving the connection until the client hangs up.
|
|
func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) {
|
|
// First create the yamux server to wrap this connection
|
|
mux, err := yamux.Server(conn, nil)
|
|
if err != nil {
|
|
conn.Close()
|
|
log.Printf("[ERR] plugin: error creating yamux server: %s", err)
|
|
return
|
|
}
|
|
|
|
// Accept the control connection
|
|
control, err := mux.Accept()
|
|
if err != nil {
|
|
mux.Close()
|
|
if err != io.EOF {
|
|
log.Printf("[ERR] plugin: error accepting control connection: %s", err)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Connect the stdstreams (in, out, err)
|
|
stdstream := make([]net.Conn, 2)
|
|
for i, _ := range stdstream {
|
|
stdstream[i], err = mux.Accept()
|
|
if err != nil {
|
|
mux.Close()
|
|
log.Printf("[ERR] plugin: accepting stream %d: %s", i, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Copy std streams out to the proper place
|
|
go copyStream("stdout", stdstream[0], s.Stdout)
|
|
go copyStream("stderr", stdstream[1], s.Stderr)
|
|
|
|
// Create the broker and start it up
|
|
broker := newMuxBroker(mux)
|
|
go broker.Run()
|
|
|
|
// Use the control connection to build the dispenser and serve the
|
|
// connection.
|
|
server := rpc.NewServer()
|
|
server.RegisterName("Control", &controlServer{
|
|
server: s,
|
|
})
|
|
server.RegisterName("Dispenser", &dispenseServer{
|
|
broker: broker,
|
|
plugins: s.Plugins,
|
|
})
|
|
server.ServeConn(control)
|
|
}
|
|
|
|
// done is called internally by the control server to trigger the
|
|
// doneCh to close which is listened to by the main process to cleanly
|
|
// exit.
|
|
func (s *RPCServer) done() {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
if s.DoneCh != nil {
|
|
close(s.DoneCh)
|
|
s.DoneCh = nil
|
|
}
|
|
}
|
|
|
|
// dispenseServer dispenses variousinterface implementations for Terraform.
|
|
type controlServer struct {
|
|
server *RPCServer
|
|
}
|
|
|
|
// Ping can be called to verify the connection (and likely the binary)
|
|
// is still alive to a plugin.
|
|
func (c *controlServer) Ping(
|
|
null bool, response *struct{}) error {
|
|
*response = struct{}{}
|
|
return nil
|
|
}
|
|
|
|
func (c *controlServer) Quit(
|
|
null bool, response *struct{}) error {
|
|
// End the server
|
|
c.server.done()
|
|
|
|
// Always return true
|
|
*response = struct{}{}
|
|
|
|
return nil
|
|
}
|
|
|
|
// dispenseServer dispenses variousinterface implementations for Terraform.
|
|
type dispenseServer struct {
|
|
broker *MuxBroker
|
|
plugins map[string]Plugin
|
|
}
|
|
|
|
func (d *dispenseServer) Dispense(
|
|
name string, response *uint32) error {
|
|
// Find the function to create this implementation
|
|
p, ok := d.plugins[name]
|
|
if !ok {
|
|
return fmt.Errorf("unknown plugin type: %s", name)
|
|
}
|
|
|
|
// Create the implementation first so we know if there is an error.
|
|
impl, err := p.Server(d.broker)
|
|
if err != nil {
|
|
// We turn the error into an errors error so that it works across RPC
|
|
return errors.New(err.Error())
|
|
}
|
|
|
|
// Reserve an ID for our implementation
|
|
id := d.broker.NextId()
|
|
*response = id
|
|
|
|
// Run the rest in a goroutine since it can only happen once this RPC
|
|
// call returns. We wait for a connection for the plugin implementation
|
|
// and serve it.
|
|
go func() {
|
|
conn, err := d.broker.Accept(id)
|
|
if err != nil {
|
|
log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err)
|
|
return
|
|
}
|
|
|
|
serve(conn, "Plugin", impl)
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func serve(conn io.ReadWriteCloser, name string, v interface{}) {
|
|
server := rpc.NewServer()
|
|
if err := server.RegisterName(name, v); err != nil {
|
|
log.Printf("[ERR] go-plugin: plugin dispense error: %s", err)
|
|
return
|
|
}
|
|
|
|
server.ServeConn(conn)
|
|
}
|