client: register on start

This commit is contained in:
Armon Dadgar
2015-08-20 17:49:04 -07:00
parent c9649dd7f7
commit 5e8d4ef647
4 changed files with 117 additions and 0 deletions

View File

@@ -24,6 +24,10 @@ const (
// clientMaxStreams controsl how many idle streams we keep
// open to a server
clientMaxStreams = 2
// registerRetryIntv is minimum interval on which we retry
// registration. We pick a value between this and 2x this.
registerRetryIntv = 30 * time.Second
)
// RPCHandler can be provided to the Client if there is a local server
@@ -56,6 +60,7 @@ type Config struct {
func DefaultConfig() *Config {
return &Config{
LogOutput: os.Stderr,
Region: "region1",
}
}
@@ -105,6 +110,9 @@ func NewClient(config *Config) (*Client, error) {
if err := c.setupDrivers(); err != nil {
return nil, fmt.Errorf("driver setup failed: %v", err)
}
// Start the client!
go c.run()
return c, nil
}
@@ -119,6 +127,7 @@ func (c *Client) Shutdown() error {
}
c.shutdown = true
close(c.shutdownCh)
c.connPool.Shutdown()
return nil
}
@@ -223,6 +232,22 @@ func (c *Client) setupNode() error {
if node.Meta == nil {
node.Meta = make(map[string]string)
}
if node.Resources == nil {
node.Resources = &structs.Resources{}
}
if node.ID == "" {
node.ID = generateUUID()
}
if node.Datacenter == "" {
node.Datacenter = "dc1"
}
if node.Name == "" {
node.Name, _ = os.Hostname()
}
if node.Name == "" {
node.Name = node.ID
}
node.Status = structs.NodeStatusInit
return nil
}
@@ -265,3 +290,46 @@ func (c *Client) setupDrivers() error {
c.logger.Printf("[DEBUG] client: available drivers %v", avail)
return nil
}
// run is a long lived goroutine used to run the client
func (c *Client) run() {
// Register the client
for {
if err := c.registerNode(); err == nil {
break
}
select {
case <-time.After(registerRetryIntv + randomStagger(registerRetryIntv)):
case <-c.shutdownCh:
return
}
}
// TODO: Heartbeat periodically
// TODO: Watch for changes in allocations
select {
case <-c.shutdownCh:
return
}
}
// registerNode is used to register the node or update the registration
func (c *Client) registerNode() error {
node := c.Node()
req := structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: c.config.Region},
}
var resp structs.NodeUpdateResponse
err := c.RPC("Client.Register", &req, &resp)
if err != nil {
c.logger.Printf("[ERR] client: failed to register node: %v", err)
return err
}
c.logger.Printf("[DEBUG] client: node registration complete")
if len(resp.EvalIDs) != 0 {
c.logger.Printf("[DEBUG] client: %d evaluations triggered by node registration", len(resp.EvalIDs))
}
return nil
}

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
)
@@ -137,3 +138,34 @@ func TestClient_Drivers(t *testing.T) {
t.Fatalf("missing exec driver")
}
}
func TestClient_Register(t *testing.T) {
s1, _ := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := testClient(t, func(c *Config) {
c.RPCHandler = s1
})
defer c1.Shutdown()
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
QueryOptions: structs.QueryOptions{Region: "region1"},
}
var out structs.SingleNodeResponse
// Register should succeed
testutil.WaitForResult(func() (bool, error) {
err := s1.RPC("Client.GetNode", &req, &out)
if err != nil {
return false, err
}
if out.Node == nil {
return false, fmt.Errorf("missing reg")
}
return out.Node.ID == req.NodeID, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

View File

@@ -3,6 +3,7 @@ package client
import (
crand "crypto/rand"
"fmt"
"time"
)
import "math/rand"
@@ -22,6 +23,11 @@ func generateUUID() string {
buf[10:16])
}
// Returns a random stagger interval between 0 and the duration
func randomStagger(intv time.Duration) time.Duration {
return time.Duration(uint64(rand.Int63()) % uint64(intv))
}
// shuffleStrings randomly shuffles the list of strings
func shuffleStrings(list []string) {
for i := range list {

View File

@@ -4,6 +4,7 @@ import (
"reflect"
"regexp"
"testing"
"time"
)
func TestGenerateUUID(t *testing.T) {
@@ -22,6 +23,16 @@ func TestGenerateUUID(t *testing.T) {
}
}
func TestRandomStagger(t *testing.T) {
intv := time.Minute
for i := 0; i < 10; i++ {
stagger := randomStagger(intv)
if stagger < 0 || stagger >= intv {
t.Fatalf("Bad: %v", stagger)
}
}
}
func TestShuffleStrings(t *testing.T) {
// Generate input
inp := make([]string, 10)