From cf3c6aa8a75a689987b689d75ae2ba73458465cb Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 14 Sep 2015 15:33:08 -0700 Subject: [PATCH] SCADA support --- command/agent/agent_endpoint_test.go | 3 - command/agent/command.go | 52 +++++++ command/agent/config.go | 20 +++ command/agent/http.go | 29 ++++ command/agent/scada.go | 195 +++++++++++++++++++++++++++ command/agent/scada_test.go | 112 +++++++++++++++ 6 files changed, 408 insertions(+), 3 deletions(-) create mode 100644 command/agent/scada.go create mode 100644 command/agent/scada_test.go diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index f498f3c6d..3b0ff17c4 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -27,9 +27,6 @@ func TestHTTP_AgentSelf(t *testing.T) { if self.Config == nil { t.Fatalf("bad: %#v", self) } - if self.Member == nil { - t.Fatalf("bad: %#v", self) - } if len(self.Stats) == 0 { t.Fatalf("bad: %#v", self) } diff --git a/command/agent/command.go b/command/agent/command.go index 3550ba8b1..3dc49b7de 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/logutils" "github.com/hashicorp/nomad/helper/flag-slice" "github.com/hashicorp/nomad/helper/gated-writer" + scada "github.com/hashicorp/scada-client" "github.com/mitchellh/cli" ) @@ -41,6 +42,9 @@ type Command struct { httpServer *HTTPServer logFilter *logutils.LevelFilter logOutput io.Writer + + scadaProvider *scada.Provider + scadaHttp *HTTPServer } func (c *Command) readConfig() *Config { @@ -146,6 +150,14 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer) error { } c.agent = agent + // Enable the SCADA integration + if err := c.setupSCADA(config); err != nil { + agent.Shutdown() + c.Ui.Error(fmt.Sprintf("Error starting SCADA: %s", err)) + return err + } + + // Setup the HTTP server http, err := NewHTTPServer(agent, config, logOutput) if err != nil { agent.Shutdown() @@ -232,6 +244,19 @@ func (c *Command) Run(args []string) int { } defer c.agent.Shutdown() + // Check and shut down the SCADA listeners at the end + defer func() { + if c.httpServer != nil { + c.httpServer.Shutdown() + } + if c.scadaHttp != nil { + c.scadaHttp.Shutdown() + } + if c.scadaProvider != nil { + c.scadaProvider.Shutdown() + } + }() + // Compile agent information for output later info := make(map[string]string) info["client"] = strconv.FormatBool(config.Client.Enabled) @@ -399,6 +424,33 @@ func (c *Command) setupTelementry(config *Config) error { return nil } +// setupSCADA is used to start a new SCADA provider and listener, +// replacing any existing listeners. +func (c *Command) setupSCADA(config *Config) error { + // Shut down existing SCADA listeners + if c.scadaProvider != nil { + c.scadaProvider.Shutdown() + } + if c.scadaHttp != nil { + c.scadaHttp.Shutdown() + } + + // No-op if we don't have an infrastructure + if config.Atlas == nil || config.Atlas.Infrastructure == "" { + return nil + } + + // Create the new provider and listener + c.Ui.Output("Connecting to Atlas: " + config.Atlas.Infrastructure) + provider, list, err := NewProvider(config, c.logOutput) + if err != nil { + return err + } + c.scadaProvider = provider + c.scadaHttp = newScadaHttp(c.agent, list) + return nil +} + func (c *Command) Synopsis() string { return "Runs a Nomad agent" } diff --git a/command/agent/config.go b/command/agent/config.go index 6f26f6129..07c518101 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -70,6 +70,9 @@ type Config struct { DevMode bool `hcl:"-"` + // AtlasConfig is used to configure Atlas + Atlas *AtlasConfig `hcl:"atlas"` + // NomadConfig is used to override the default config. // This is largly used for testing purposes. NomadConfig *nomad.Config `hcl:"-" json:"-"` @@ -79,6 +82,23 @@ type Config struct { ClientConfig *client.Config `hcl:"-" json:"-"` } +// AtlasConfig is used to enable an parameterize the Atlas integration +type AtlasConfig struct { + // Infrastructure is the name of the infrastructure we belong to. e.g. hashicorp/stage + Infrastructure string `hcl:"infrastructure"` + + // Token is our authentication token from Atlas + Token string `hcl:"token" json:"-"` + + // Join controls if Atlas will attempt to auto-join the node + // to it's cluster. Requires Atlas integration. + Join bool `hcl:"join"` + + // Endpoint is the SCADA endpoint used for Atlas integration. If + // empty, the defaults from the provider are used. + Endpoint string `hcl:"endpoint"` +} + type ClientConfig struct { // Enabled controls if we are a client Enabled bool `hcl:"enabled"` diff --git a/command/agent/http.go b/command/agent/http.go index adb762e90..16ea31f9a 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -18,6 +18,12 @@ import ( const ( // ErrInvalidMethod is used if the HTTP method is not supported ErrInvalidMethod = "Invalid method" + + // scadaHTTPAddr is the address associated with the + // HTTPServer. When populating an ACL token for a request, + // this is checked to switch between the ACLToken and + // AtlasACLToken + scadaHTTPAddr = "SCADA" ) // HTTPServer is used to wrap an Agent and expose it over an HTTP interface @@ -26,6 +32,7 @@ type HTTPServer struct { mux *http.ServeMux listener net.Listener logger *log.Logger + addr string } // NewHTTPServer starts new HTTP server over the agent @@ -45,6 +52,7 @@ func NewHTTPServer(agent *Agent, config *Config, logOutput io.Writer) (*HTTPServ mux: mux, listener: ln, logger: log.New(logOutput, "", log.LstdFlags), + addr: ln.Addr().String(), } srv.registerHandlers(config.EnableDebug) @@ -53,6 +61,27 @@ func NewHTTPServer(agent *Agent, config *Config, logOutput io.Writer) (*HTTPServ return srv, nil } +// newScadaHttp creates a new HTTP server wrapping the SCADA +// listener such that HTTP calls can be sent from the brokers. +func newScadaHttp(agent *Agent, list net.Listener) *HTTPServer { + // Create the mux + mux := http.NewServeMux() + + // Create the server + srv := &HTTPServer{ + agent: agent, + mux: mux, + listener: list, + logger: agent.logger, + addr: scadaHTTPAddr, + } + srv.registerHandlers(false) // Never allow debug for SCADA + + // Start the server + go http.Serve(list, mux) + return srv +} + // Shutdown is used to shutdown the HTTP server func (s *HTTPServer) Shutdown() { if s != nil { diff --git a/command/agent/scada.go b/command/agent/scada.go new file mode 100644 index 000000000..815671243 --- /dev/null +++ b/command/agent/scada.go @@ -0,0 +1,195 @@ +package agent + +import ( + "crypto/tls" + "errors" + "fmt" + "io" + "net" + "os" + "strconv" + "sync" + "time" + + "github.com/hashicorp/scada-client" +) + +const ( + // providerService is the service name we use + providerService = "nomad" + + // resourceType is the type of resource we represent + // when connecting to SCADA + resourceType = "nomad-cluster" +) + +// ProviderService returns the service information for the provider +func ProviderService(c *Config) *client.ProviderService { + return &client.ProviderService{ + Service: providerService, + ServiceVersion: fmt.Sprintf("%s%s", c.Version, c.VersionPrerelease), + Capabilities: map[string]int{ + "http": 1, + }, + Meta: map[string]string{ + "auto-join": strconv.FormatBool(c.Atlas.Join), + "region": c.Region, + "datacenter": c.Datacenter, + "client": strconv.FormatBool(c.Client != nil && c.Client.Enabled), + "server": strconv.FormatBool(c.Server != nil && c.Server.Enabled), + }, + ResourceType: resourceType, + } +} + +// ProviderConfig returns the configuration for the SCADA provider +func ProviderConfig(c *Config) *client.ProviderConfig { + return &client.ProviderConfig{ + Service: ProviderService(c), + Handlers: map[string]client.CapabilityProvider{ + "http": nil, + }, + Endpoint: c.Atlas.Endpoint, + ResourceGroup: c.Atlas.Infrastructure, + Token: c.Atlas.Token, + } +} + +// NewProvider creates a new SCADA provider using the +// given configuration. Requests for the HTTP capability +// are passed off to the listener that is returned. +func NewProvider(c *Config, logOutput io.Writer) (*client.Provider, net.Listener, error) { + // Get the configuration of the provider + config := ProviderConfig(c) + config.LogOutput = logOutput + + // SCADA_INSECURE env variable is used for testing to disable + // TLS certificate verification. + if os.Getenv("SCADA_INSECURE") != "" { + config.TLSConfig = &tls.Config{ + InsecureSkipVerify: true, + } + } + + // Create an HTTP listener and handler + list := newScadaListener(c.Atlas.Infrastructure) + config.Handlers["http"] = func(capability string, meta map[string]string, + conn io.ReadWriteCloser) error { + return list.PushRWC(conn) + } + + // Create the provider + provider, err := client.NewProvider(config) + if err != nil { + list.Close() + return nil, nil, err + } + return provider, list, nil +} + +// scadaListener is used to return a net.Listener for +// incoming SCADA connections +type scadaListener struct { + addr *scadaAddr + pending chan net.Conn + + closed bool + closedCh chan struct{} + l sync.Mutex +} + +// newScadaListener returns a new listener +func newScadaListener(infra string) *scadaListener { + l := &scadaListener{ + addr: &scadaAddr{infra}, + pending: make(chan net.Conn), + closedCh: make(chan struct{}), + } + return l +} + +// PushRWC is used to push a io.ReadWriteCloser as a net.Conn +func (s *scadaListener) PushRWC(conn io.ReadWriteCloser) error { + // Check if this already implements net.Conn + if nc, ok := conn.(net.Conn); ok { + return s.Push(nc) + } + + // Wrap to implement the interface + wrapped := &scadaRWC{conn, s.addr} + return s.Push(wrapped) +} + +// Push is used to add a connection to the queu +func (s *scadaListener) Push(conn net.Conn) error { + select { + case s.pending <- conn: + return nil + case <-time.After(time.Second): + return fmt.Errorf("accept timed out") + case <-s.closedCh: + return fmt.Errorf("scada listener closed") + } +} + +func (s *scadaListener) Accept() (net.Conn, error) { + select { + case conn := <-s.pending: + return conn, nil + case <-s.closedCh: + return nil, fmt.Errorf("scada listener closed") + } +} + +func (s *scadaListener) Close() error { + s.l.Lock() + defer s.l.Unlock() + if s.closed { + return nil + } + s.closed = true + close(s.closedCh) + return nil +} + +func (s *scadaListener) Addr() net.Addr { + return s.addr +} + +// scadaAddr is used to return a net.Addr for SCADA +type scadaAddr struct { + infra string +} + +func (s *scadaAddr) Network() string { + return "SCADA" +} + +func (s *scadaAddr) String() string { + return fmt.Sprintf("SCADA::Atlas::%s", s.infra) +} + +type scadaRWC struct { + io.ReadWriteCloser + addr *scadaAddr +} + +func (s *scadaRWC) LocalAddr() net.Addr { + return s.addr +} + +func (s *scadaRWC) RemoteAddr() net.Addr { + return s.addr +} + +func (s *scadaRWC) SetDeadline(t time.Time) error { + return errors.New("SCADA.Conn does not support deadlines") +} + +func (s *scadaRWC) SetReadDeadline(t time.Time) error { + return errors.New("SCADA.Conn does not support deadlines") +} + +func (s *scadaRWC) SetWriteDeadline(t time.Time) error { + return errors.New("SCADA.Conn does not support deadlines") +} diff --git a/command/agent/scada_test.go b/command/agent/scada_test.go new file mode 100644 index 000000000..686db0682 --- /dev/null +++ b/command/agent/scada_test.go @@ -0,0 +1,112 @@ +package agent + +import ( + "net" + "reflect" + "testing" + + "github.com/hashicorp/scada-client" +) + +func TestProviderService(t *testing.T) { + conf := DefaultConfig() + conf.Version = "0.5.0" + conf.VersionPrerelease = "rc1" + conf.Atlas = &AtlasConfig{} + conf.Atlas.Join = true + conf.Server.Enabled = true + ps := ProviderService(conf) + + expect := &client.ProviderService{ + Service: "nomad", + ServiceVersion: "0.5.0rc1", + Capabilities: map[string]int{ + "http": 1, + }, + Meta: map[string]string{ + "auto-join": "true", + "region": "global", + "datacenter": "dc1", + "client": "false", + "server": "true", + }, + ResourceType: "nomad-cluster", + } + + if !reflect.DeepEqual(ps, expect) { + t.Fatalf("bad: %v", ps) + } +} + +func TestProviderConfig(t *testing.T) { + conf := DefaultConfig() + conf.Version = "0.5.0" + conf.VersionPrerelease = "rc1" + conf.Atlas = &AtlasConfig{} + conf.Atlas.Join = true + conf.Atlas.Infrastructure = "armon/test" + conf.Atlas.Token = "foobarbaz" + conf.Atlas.Endpoint = "foo.bar:1111" + conf.Server.Enabled = true + pc := ProviderConfig(conf) + + expect := &client.ProviderConfig{ + Service: &client.ProviderService{ + Service: "nomad", + ServiceVersion: "0.5.0rc1", + Capabilities: map[string]int{ + "http": 1, + }, + Meta: map[string]string{ + "auto-join": "true", + "region": "global", + "datacenter": "dc1", + "client": "false", + "server": "true", + }, + ResourceType: "nomad-cluster", + }, + Handlers: map[string]client.CapabilityProvider{ + "http": nil, + }, + Endpoint: "foo.bar:1111", + ResourceGroup: "armon/test", + Token: "foobarbaz", + } + + if !reflect.DeepEqual(pc, expect) { + t.Fatalf("bad: %v", pc) + } +} + +func TestSCADAListener(t *testing.T) { + list := newScadaListener("armon/test") + defer list.Close() + + var raw interface{} = list + _, ok := raw.(net.Listener) + if !ok { + t.Fatalf("bad") + } + + a, b := net.Pipe() + defer a.Close() + defer b.Close() + + go list.Push(a) + out, err := list.Accept() + if err != nil { + t.Fatalf("err: %v", err) + } + if out != a { + t.Fatalf("bad") + } +} + +func TestSCADAAddr(t *testing.T) { + var addr interface{} = &scadaAddr{"armon/test"} + _, ok := addr.(net.Addr) + if !ok { + t.Fatalf("bad") + } +}