From 5e8d4ef6470788cfcb77330e4c17d3f648e6b092 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 20 Aug 2015 17:49:04 -0700 Subject: [PATCH] client: register on start --- client/client.go | 68 +++++++++++++++++++++++++++++++++++++++++++ client/client_test.go | 32 ++++++++++++++++++++ client/util.go | 6 ++++ client/util_test.go | 11 +++++++ 4 files changed, 117 insertions(+) diff --git a/client/client.go b/client/client.go index 36ea768be..58cdd9325 100644 --- a/client/client.go +++ b/client/client.go @@ -24,6 +24,10 @@ const ( // clientMaxStreams controsl how many idle streams we keep // open to a server clientMaxStreams = 2 + + // registerRetryIntv is minimum interval on which we retry + // registration. We pick a value between this and 2x this. + registerRetryIntv = 30 * time.Second ) // RPCHandler can be provided to the Client if there is a local server @@ -56,6 +60,7 @@ type Config struct { func DefaultConfig() *Config { return &Config{ LogOutput: os.Stderr, + Region: "region1", } } @@ -105,6 +110,9 @@ func NewClient(config *Config) (*Client, error) { if err := c.setupDrivers(); err != nil { return nil, fmt.Errorf("driver setup failed: %v", err) } + + // Start the client! + go c.run() return c, nil } @@ -119,6 +127,7 @@ func (c *Client) Shutdown() error { } c.shutdown = true close(c.shutdownCh) + c.connPool.Shutdown() return nil } @@ -223,6 +232,22 @@ func (c *Client) setupNode() error { if node.Meta == nil { node.Meta = make(map[string]string) } + if node.Resources == nil { + node.Resources = &structs.Resources{} + } + if node.ID == "" { + node.ID = generateUUID() + } + if node.Datacenter == "" { + node.Datacenter = "dc1" + } + if node.Name == "" { + node.Name, _ = os.Hostname() + } + if node.Name == "" { + node.Name = node.ID + } + node.Status = structs.NodeStatusInit return nil } @@ -265,3 +290,46 @@ func (c *Client) setupDrivers() error { c.logger.Printf("[DEBUG] client: available drivers %v", avail) return nil } + +// run is a long lived goroutine used to run the client +func (c *Client) run() { + // Register the client + for { + if err := c.registerNode(); err == nil { + break + } + select { + case <-time.After(registerRetryIntv + randomStagger(registerRetryIntv)): + case <-c.shutdownCh: + return + } + } + + // TODO: Heartbeat periodically + + // TODO: Watch for changes in allocations + select { + case <-c.shutdownCh: + return + } +} + +// registerNode is used to register the node or update the registration +func (c *Client) registerNode() error { + node := c.Node() + req := structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: c.config.Region}, + } + var resp structs.NodeUpdateResponse + err := c.RPC("Client.Register", &req, &resp) + if err != nil { + c.logger.Printf("[ERR] client: failed to register node: %v", err) + return err + } + c.logger.Printf("[DEBUG] client: node registration complete") + if len(resp.EvalIDs) != 0 { + c.logger.Printf("[DEBUG] client: %d evaluations triggered by node registration", len(resp.EvalIDs)) + } + return nil +} diff --git a/client/client_test.go b/client/client_test.go index 0b605c513..5c2c57e8a 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/hashicorp/nomad/nomad" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" ) @@ -137,3 +138,34 @@ func TestClient_Drivers(t *testing.T) { t.Fatalf("missing exec driver") } } + +func TestClient_Register(t *testing.T) { + s1, _ := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + c1 := testClient(t, func(c *Config) { + c.RPCHandler = s1 + }) + defer c1.Shutdown() + + req := structs.NodeSpecificRequest{ + NodeID: c1.Node().ID, + QueryOptions: structs.QueryOptions{Region: "region1"}, + } + var out structs.SingleNodeResponse + + // Register should succeed + testutil.WaitForResult(func() (bool, error) { + err := s1.RPC("Client.GetNode", &req, &out) + if err != nil { + return false, err + } + if out.Node == nil { + return false, fmt.Errorf("missing reg") + } + return out.Node.ID == req.NodeID, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} diff --git a/client/util.go b/client/util.go index 51b3c7d32..9561b30a2 100644 --- a/client/util.go +++ b/client/util.go @@ -3,6 +3,7 @@ package client import ( crand "crypto/rand" "fmt" + "time" ) import "math/rand" @@ -22,6 +23,11 @@ func generateUUID() string { buf[10:16]) } +// Returns a random stagger interval between 0 and the duration +func randomStagger(intv time.Duration) time.Duration { + return time.Duration(uint64(rand.Int63()) % uint64(intv)) +} + // shuffleStrings randomly shuffles the list of strings func shuffleStrings(list []string) { for i := range list { diff --git a/client/util_test.go b/client/util_test.go index 762b00e28..136a564c5 100644 --- a/client/util_test.go +++ b/client/util_test.go @@ -4,6 +4,7 @@ import ( "reflect" "regexp" "testing" + "time" ) func TestGenerateUUID(t *testing.T) { @@ -22,6 +23,16 @@ func TestGenerateUUID(t *testing.T) { } } +func TestRandomStagger(t *testing.T) { + intv := time.Minute + for i := 0; i < 10; i++ { + stagger := randomStagger(intv) + if stagger < 0 || stagger >= intv { + t.Fatalf("Bad: %v", stagger) + } + } +} + func TestShuffleStrings(t *testing.T) { // Generate input inp := make([]string, 10)