mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 09:25:46 +03:00
SCADA support
This commit is contained in:
@@ -27,9 +27,6 @@ func TestHTTP_AgentSelf(t *testing.T) {
|
||||
if self.Config == nil {
|
||||
t.Fatalf("bad: %#v", self)
|
||||
}
|
||||
if self.Member == nil {
|
||||
t.Fatalf("bad: %#v", self)
|
||||
}
|
||||
if len(self.Stats) == 0 {
|
||||
t.Fatalf("bad: %#v", self)
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/hashicorp/logutils"
|
||||
"github.com/hashicorp/nomad/helper/flag-slice"
|
||||
"github.com/hashicorp/nomad/helper/gated-writer"
|
||||
scada "github.com/hashicorp/scada-client"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
@@ -41,6 +42,9 @@ type Command struct {
|
||||
httpServer *HTTPServer
|
||||
logFilter *logutils.LevelFilter
|
||||
logOutput io.Writer
|
||||
|
||||
scadaProvider *scada.Provider
|
||||
scadaHttp *HTTPServer
|
||||
}
|
||||
|
||||
func (c *Command) readConfig() *Config {
|
||||
@@ -146,6 +150,14 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer) error {
|
||||
}
|
||||
c.agent = agent
|
||||
|
||||
// Enable the SCADA integration
|
||||
if err := c.setupSCADA(config); err != nil {
|
||||
agent.Shutdown()
|
||||
c.Ui.Error(fmt.Sprintf("Error starting SCADA: %s", err))
|
||||
return err
|
||||
}
|
||||
|
||||
// Setup the HTTP server
|
||||
http, err := NewHTTPServer(agent, config, logOutput)
|
||||
if err != nil {
|
||||
agent.Shutdown()
|
||||
@@ -232,6 +244,19 @@ func (c *Command) Run(args []string) int {
|
||||
}
|
||||
defer c.agent.Shutdown()
|
||||
|
||||
// Check and shut down the SCADA listeners at the end
|
||||
defer func() {
|
||||
if c.httpServer != nil {
|
||||
c.httpServer.Shutdown()
|
||||
}
|
||||
if c.scadaHttp != nil {
|
||||
c.scadaHttp.Shutdown()
|
||||
}
|
||||
if c.scadaProvider != nil {
|
||||
c.scadaProvider.Shutdown()
|
||||
}
|
||||
}()
|
||||
|
||||
// Compile agent information for output later
|
||||
info := make(map[string]string)
|
||||
info["client"] = strconv.FormatBool(config.Client.Enabled)
|
||||
@@ -399,6 +424,33 @@ func (c *Command) setupTelementry(config *Config) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// setupSCADA is used to start a new SCADA provider and listener,
|
||||
// replacing any existing listeners.
|
||||
func (c *Command) setupSCADA(config *Config) error {
|
||||
// Shut down existing SCADA listeners
|
||||
if c.scadaProvider != nil {
|
||||
c.scadaProvider.Shutdown()
|
||||
}
|
||||
if c.scadaHttp != nil {
|
||||
c.scadaHttp.Shutdown()
|
||||
}
|
||||
|
||||
// No-op if we don't have an infrastructure
|
||||
if config.Atlas == nil || config.Atlas.Infrastructure == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create the new provider and listener
|
||||
c.Ui.Output("Connecting to Atlas: " + config.Atlas.Infrastructure)
|
||||
provider, list, err := NewProvider(config, c.logOutput)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.scadaProvider = provider
|
||||
c.scadaHttp = newScadaHttp(c.agent, list)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Command) Synopsis() string {
|
||||
return "Runs a Nomad agent"
|
||||
}
|
||||
|
||||
@@ -70,6 +70,9 @@ type Config struct {
|
||||
|
||||
DevMode bool `hcl:"-"`
|
||||
|
||||
// AtlasConfig is used to configure Atlas
|
||||
Atlas *AtlasConfig `hcl:"atlas"`
|
||||
|
||||
// NomadConfig is used to override the default config.
|
||||
// This is largly used for testing purposes.
|
||||
NomadConfig *nomad.Config `hcl:"-" json:"-"`
|
||||
@@ -79,6 +82,23 @@ type Config struct {
|
||||
ClientConfig *client.Config `hcl:"-" json:"-"`
|
||||
}
|
||||
|
||||
// AtlasConfig is used to enable an parameterize the Atlas integration
|
||||
type AtlasConfig struct {
|
||||
// Infrastructure is the name of the infrastructure we belong to. e.g. hashicorp/stage
|
||||
Infrastructure string `hcl:"infrastructure"`
|
||||
|
||||
// Token is our authentication token from Atlas
|
||||
Token string `hcl:"token" json:"-"`
|
||||
|
||||
// Join controls if Atlas will attempt to auto-join the node
|
||||
// to it's cluster. Requires Atlas integration.
|
||||
Join bool `hcl:"join"`
|
||||
|
||||
// Endpoint is the SCADA endpoint used for Atlas integration. If
|
||||
// empty, the defaults from the provider are used.
|
||||
Endpoint string `hcl:"endpoint"`
|
||||
}
|
||||
|
||||
type ClientConfig struct {
|
||||
// Enabled controls if we are a client
|
||||
Enabled bool `hcl:"enabled"`
|
||||
|
||||
@@ -18,6 +18,12 @@ import (
|
||||
const (
|
||||
// ErrInvalidMethod is used if the HTTP method is not supported
|
||||
ErrInvalidMethod = "Invalid method"
|
||||
|
||||
// scadaHTTPAddr is the address associated with the
|
||||
// HTTPServer. When populating an ACL token for a request,
|
||||
// this is checked to switch between the ACLToken and
|
||||
// AtlasACLToken
|
||||
scadaHTTPAddr = "SCADA"
|
||||
)
|
||||
|
||||
// HTTPServer is used to wrap an Agent and expose it over an HTTP interface
|
||||
@@ -26,6 +32,7 @@ type HTTPServer struct {
|
||||
mux *http.ServeMux
|
||||
listener net.Listener
|
||||
logger *log.Logger
|
||||
addr string
|
||||
}
|
||||
|
||||
// NewHTTPServer starts new HTTP server over the agent
|
||||
@@ -45,6 +52,7 @@ func NewHTTPServer(agent *Agent, config *Config, logOutput io.Writer) (*HTTPServ
|
||||
mux: mux,
|
||||
listener: ln,
|
||||
logger: log.New(logOutput, "", log.LstdFlags),
|
||||
addr: ln.Addr().String(),
|
||||
}
|
||||
srv.registerHandlers(config.EnableDebug)
|
||||
|
||||
@@ -53,6 +61,27 @@ func NewHTTPServer(agent *Agent, config *Config, logOutput io.Writer) (*HTTPServ
|
||||
return srv, nil
|
||||
}
|
||||
|
||||
// newScadaHttp creates a new HTTP server wrapping the SCADA
|
||||
// listener such that HTTP calls can be sent from the brokers.
|
||||
func newScadaHttp(agent *Agent, list net.Listener) *HTTPServer {
|
||||
// Create the mux
|
||||
mux := http.NewServeMux()
|
||||
|
||||
// Create the server
|
||||
srv := &HTTPServer{
|
||||
agent: agent,
|
||||
mux: mux,
|
||||
listener: list,
|
||||
logger: agent.logger,
|
||||
addr: scadaHTTPAddr,
|
||||
}
|
||||
srv.registerHandlers(false) // Never allow debug for SCADA
|
||||
|
||||
// Start the server
|
||||
go http.Serve(list, mux)
|
||||
return srv
|
||||
}
|
||||
|
||||
// Shutdown is used to shutdown the HTTP server
|
||||
func (s *HTTPServer) Shutdown() {
|
||||
if s != nil {
|
||||
|
||||
195
command/agent/scada.go
Normal file
195
command/agent/scada.go
Normal file
@@ -0,0 +1,195 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/scada-client"
|
||||
)
|
||||
|
||||
const (
|
||||
// providerService is the service name we use
|
||||
providerService = "nomad"
|
||||
|
||||
// resourceType is the type of resource we represent
|
||||
// when connecting to SCADA
|
||||
resourceType = "nomad-cluster"
|
||||
)
|
||||
|
||||
// ProviderService returns the service information for the provider
|
||||
func ProviderService(c *Config) *client.ProviderService {
|
||||
return &client.ProviderService{
|
||||
Service: providerService,
|
||||
ServiceVersion: fmt.Sprintf("%s%s", c.Version, c.VersionPrerelease),
|
||||
Capabilities: map[string]int{
|
||||
"http": 1,
|
||||
},
|
||||
Meta: map[string]string{
|
||||
"auto-join": strconv.FormatBool(c.Atlas.Join),
|
||||
"region": c.Region,
|
||||
"datacenter": c.Datacenter,
|
||||
"client": strconv.FormatBool(c.Client != nil && c.Client.Enabled),
|
||||
"server": strconv.FormatBool(c.Server != nil && c.Server.Enabled),
|
||||
},
|
||||
ResourceType: resourceType,
|
||||
}
|
||||
}
|
||||
|
||||
// ProviderConfig returns the configuration for the SCADA provider
|
||||
func ProviderConfig(c *Config) *client.ProviderConfig {
|
||||
return &client.ProviderConfig{
|
||||
Service: ProviderService(c),
|
||||
Handlers: map[string]client.CapabilityProvider{
|
||||
"http": nil,
|
||||
},
|
||||
Endpoint: c.Atlas.Endpoint,
|
||||
ResourceGroup: c.Atlas.Infrastructure,
|
||||
Token: c.Atlas.Token,
|
||||
}
|
||||
}
|
||||
|
||||
// NewProvider creates a new SCADA provider using the
|
||||
// given configuration. Requests for the HTTP capability
|
||||
// are passed off to the listener that is returned.
|
||||
func NewProvider(c *Config, logOutput io.Writer) (*client.Provider, net.Listener, error) {
|
||||
// Get the configuration of the provider
|
||||
config := ProviderConfig(c)
|
||||
config.LogOutput = logOutput
|
||||
|
||||
// SCADA_INSECURE env variable is used for testing to disable
|
||||
// TLS certificate verification.
|
||||
if os.Getenv("SCADA_INSECURE") != "" {
|
||||
config.TLSConfig = &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
}
|
||||
}
|
||||
|
||||
// Create an HTTP listener and handler
|
||||
list := newScadaListener(c.Atlas.Infrastructure)
|
||||
config.Handlers["http"] = func(capability string, meta map[string]string,
|
||||
conn io.ReadWriteCloser) error {
|
||||
return list.PushRWC(conn)
|
||||
}
|
||||
|
||||
// Create the provider
|
||||
provider, err := client.NewProvider(config)
|
||||
if err != nil {
|
||||
list.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
return provider, list, nil
|
||||
}
|
||||
|
||||
// scadaListener is used to return a net.Listener for
|
||||
// incoming SCADA connections
|
||||
type scadaListener struct {
|
||||
addr *scadaAddr
|
||||
pending chan net.Conn
|
||||
|
||||
closed bool
|
||||
closedCh chan struct{}
|
||||
l sync.Mutex
|
||||
}
|
||||
|
||||
// newScadaListener returns a new listener
|
||||
func newScadaListener(infra string) *scadaListener {
|
||||
l := &scadaListener{
|
||||
addr: &scadaAddr{infra},
|
||||
pending: make(chan net.Conn),
|
||||
closedCh: make(chan struct{}),
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
// PushRWC is used to push a io.ReadWriteCloser as a net.Conn
|
||||
func (s *scadaListener) PushRWC(conn io.ReadWriteCloser) error {
|
||||
// Check if this already implements net.Conn
|
||||
if nc, ok := conn.(net.Conn); ok {
|
||||
return s.Push(nc)
|
||||
}
|
||||
|
||||
// Wrap to implement the interface
|
||||
wrapped := &scadaRWC{conn, s.addr}
|
||||
return s.Push(wrapped)
|
||||
}
|
||||
|
||||
// Push is used to add a connection to the queu
|
||||
func (s *scadaListener) Push(conn net.Conn) error {
|
||||
select {
|
||||
case s.pending <- conn:
|
||||
return nil
|
||||
case <-time.After(time.Second):
|
||||
return fmt.Errorf("accept timed out")
|
||||
case <-s.closedCh:
|
||||
return fmt.Errorf("scada listener closed")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *scadaListener) Accept() (net.Conn, error) {
|
||||
select {
|
||||
case conn := <-s.pending:
|
||||
return conn, nil
|
||||
case <-s.closedCh:
|
||||
return nil, fmt.Errorf("scada listener closed")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *scadaListener) Close() error {
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
if s.closed {
|
||||
return nil
|
||||
}
|
||||
s.closed = true
|
||||
close(s.closedCh)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *scadaListener) Addr() net.Addr {
|
||||
return s.addr
|
||||
}
|
||||
|
||||
// scadaAddr is used to return a net.Addr for SCADA
|
||||
type scadaAddr struct {
|
||||
infra string
|
||||
}
|
||||
|
||||
func (s *scadaAddr) Network() string {
|
||||
return "SCADA"
|
||||
}
|
||||
|
||||
func (s *scadaAddr) String() string {
|
||||
return fmt.Sprintf("SCADA::Atlas::%s", s.infra)
|
||||
}
|
||||
|
||||
type scadaRWC struct {
|
||||
io.ReadWriteCloser
|
||||
addr *scadaAddr
|
||||
}
|
||||
|
||||
func (s *scadaRWC) LocalAddr() net.Addr {
|
||||
return s.addr
|
||||
}
|
||||
|
||||
func (s *scadaRWC) RemoteAddr() net.Addr {
|
||||
return s.addr
|
||||
}
|
||||
|
||||
func (s *scadaRWC) SetDeadline(t time.Time) error {
|
||||
return errors.New("SCADA.Conn does not support deadlines")
|
||||
}
|
||||
|
||||
func (s *scadaRWC) SetReadDeadline(t time.Time) error {
|
||||
return errors.New("SCADA.Conn does not support deadlines")
|
||||
}
|
||||
|
||||
func (s *scadaRWC) SetWriteDeadline(t time.Time) error {
|
||||
return errors.New("SCADA.Conn does not support deadlines")
|
||||
}
|
||||
112
command/agent/scada_test.go
Normal file
112
command/agent/scada_test.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"net"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/scada-client"
|
||||
)
|
||||
|
||||
func TestProviderService(t *testing.T) {
|
||||
conf := DefaultConfig()
|
||||
conf.Version = "0.5.0"
|
||||
conf.VersionPrerelease = "rc1"
|
||||
conf.Atlas = &AtlasConfig{}
|
||||
conf.Atlas.Join = true
|
||||
conf.Server.Enabled = true
|
||||
ps := ProviderService(conf)
|
||||
|
||||
expect := &client.ProviderService{
|
||||
Service: "nomad",
|
||||
ServiceVersion: "0.5.0rc1",
|
||||
Capabilities: map[string]int{
|
||||
"http": 1,
|
||||
},
|
||||
Meta: map[string]string{
|
||||
"auto-join": "true",
|
||||
"region": "global",
|
||||
"datacenter": "dc1",
|
||||
"client": "false",
|
||||
"server": "true",
|
||||
},
|
||||
ResourceType: "nomad-cluster",
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(ps, expect) {
|
||||
t.Fatalf("bad: %v", ps)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProviderConfig(t *testing.T) {
|
||||
conf := DefaultConfig()
|
||||
conf.Version = "0.5.0"
|
||||
conf.VersionPrerelease = "rc1"
|
||||
conf.Atlas = &AtlasConfig{}
|
||||
conf.Atlas.Join = true
|
||||
conf.Atlas.Infrastructure = "armon/test"
|
||||
conf.Atlas.Token = "foobarbaz"
|
||||
conf.Atlas.Endpoint = "foo.bar:1111"
|
||||
conf.Server.Enabled = true
|
||||
pc := ProviderConfig(conf)
|
||||
|
||||
expect := &client.ProviderConfig{
|
||||
Service: &client.ProviderService{
|
||||
Service: "nomad",
|
||||
ServiceVersion: "0.5.0rc1",
|
||||
Capabilities: map[string]int{
|
||||
"http": 1,
|
||||
},
|
||||
Meta: map[string]string{
|
||||
"auto-join": "true",
|
||||
"region": "global",
|
||||
"datacenter": "dc1",
|
||||
"client": "false",
|
||||
"server": "true",
|
||||
},
|
||||
ResourceType: "nomad-cluster",
|
||||
},
|
||||
Handlers: map[string]client.CapabilityProvider{
|
||||
"http": nil,
|
||||
},
|
||||
Endpoint: "foo.bar:1111",
|
||||
ResourceGroup: "armon/test",
|
||||
Token: "foobarbaz",
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(pc, expect) {
|
||||
t.Fatalf("bad: %v", pc)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSCADAListener(t *testing.T) {
|
||||
list := newScadaListener("armon/test")
|
||||
defer list.Close()
|
||||
|
||||
var raw interface{} = list
|
||||
_, ok := raw.(net.Listener)
|
||||
if !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
a, b := net.Pipe()
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
|
||||
go list.Push(a)
|
||||
out, err := list.Accept()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if out != a {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSCADAAddr(t *testing.T) {
|
||||
var addr interface{} = &scadaAddr{"armon/test"}
|
||||
_, ok := addr.(net.Addr)
|
||||
if !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user