From feabeb81671399e0a9d960c6bd62ffa93e63503c Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 20 Aug 2015 16:07:26 -0700 Subject: [PATCH] client: skeleton package --- client/client.go | 124 ++++++++++++++++++++++++++++++++++++++++++++ client/util.go | 31 +++++++++++ client/util_test.go | 43 +++++++++++++++ 3 files changed, 198 insertions(+) create mode 100644 client/util.go create mode 100644 client/util_test.go diff --git a/client/client.go b/client/client.go index 24f8fc6d4..d59340967 100644 --- a/client/client.go +++ b/client/client.go @@ -1,16 +1,49 @@ package client import ( + "fmt" "io" "log" + "net" "os" + "strconv" "sync" + "time" + + "github.com/hashicorp/nomad/nomad" ) +const ( + // clientRPCCache controls how long we keep an idle connection + // open to a server + clientRPCCache = 30 * time.Second + + // clientMaxStreams controsl how many idle streams we keep + // open to a server + clientMaxStreams = 2 +) + +// RPCHandler can be provided to the Client if there is a local server +// to avoid going over the network. If not provided, the Client will +// maintain a connection pool to the servers +type RPCHandler interface { + RPC(method string, args interface{}, reply interface{}) error +} + // Config is used to parameterize and configure the behavior of the client type Config struct { // LogOutput is the destination for logs LogOutput io.Writer + + // Region is the clients region + Region string + + // Servers is a list of known server addresses. These are as "host:port" + Servers []string + + // RPCHandler can be provided to avoid network traffic if the + // server is running locally. + RPCHandler RPCHandler } // DefaultConfig returns the default configuration @@ -24,8 +57,16 @@ func DefaultConfig() *Config { // are expected to register as a schedulable node to the servers, and to // run allocations as determined by the servers. type Client struct { + config *Config + logger *log.Logger + lastServer net.Addr + lastRPCTime time.Time + lastServerLock sync.Mutex + + connPool *nomad.ConnPool + shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex @@ -37,6 +78,8 @@ func NewClient(config *Config) (*Client, error) { logger := log.New(config.LogOutput, "", log.LstdFlags) c := &Client{ + config: config, + connPool: nomad.NewPool(config.LogOutput, clientRPCCache, clientMaxStreams, nil), logger: logger, shutdownCh: make(chan struct{}), } @@ -45,6 +88,7 @@ func NewClient(config *Config) (*Client, error) { // Shutdown is used to tear down the client func (c *Client) Shutdown() error { + c.logger.Printf("[INFO] client: shutting down") c.shutdownLock.Lock() defer c.shutdownLock.Unlock() @@ -55,3 +99,83 @@ func (c *Client) Shutdown() error { close(c.shutdownCh) return nil } + +// RPC is used to forward an RPC call to a nomad server, or fail if no servers +func (c *Client) RPC(method string, args interface{}, reply interface{}) error { + // Invoke the RPCHandle if it exists + if c.config.RPCHandler != nil { + return c.config.RPCHandler.RPC(method, args, reply) + } + + // Pick a server to request from + addr, err := c.pickServer() + if err != nil { + return err + } + + // Make the RPC request + err = c.connPool.RPC(c.config.Region, addr, 1, method, args, reply) + + // Update the last server information + c.lastServerLock.Lock() + if err != nil { + c.lastServer = nil + c.lastRPCTime = time.Time{} + } else { + c.lastServer = addr + c.lastRPCTime = time.Now() + } + c.lastServerLock.Unlock() + return err +} + +// pickServer is used to pick a target RPC server +func (c *Client) pickServer() (net.Addr, error) { + c.lastServerLock.Lock() + defer c.lastServerLock.Unlock() + + // Check for a valid last-used server + if c.lastServer != nil && time.Now().Sub(c.lastRPCTime) < clientRPCCache { + return c.lastServer, nil + } + + // Bail if we can't find any servers + if len(c.config.Servers) == 0 { + return nil, fmt.Errorf("no known servers") + } + + // Copy the list of servers and shuffle + servers := make([]string, len(c.config.Servers)) + copy(servers, c.config.Servers) + shuffleStrings(servers) + + // Try to resolve each server + for i := 0; i < len(servers); i++ { + addr, err := net.ResolveTCPAddr("tcp", servers[i]) + if err == nil { + c.lastServer = addr + c.lastRPCTime = time.Now() + return addr, nil + } + c.logger.Printf("[WARN] client: failed to resolve '%s': %v", err) + } + + // Bail if we reach this point + return nil, fmt.Errorf("failed to resolve any servers") +} + +// Stats is used to return statistics for debugging and insight +// for various sub-systems +func (c *Client) Stats() map[string]map[string]string { + toString := func(v uint64) string { + return strconv.FormatUint(v, 10) + } + stats := map[string]map[string]string{ + "nomad": map[string]string{ + "server": "false", + "known_servers": toString(uint64(len(c.config.Servers))), + }, + "runtime": nomad.RuntimeStats(), + } + return stats +} diff --git a/client/util.go b/client/util.go new file mode 100644 index 000000000..51b3c7d32 --- /dev/null +++ b/client/util.go @@ -0,0 +1,31 @@ +package client + +import ( + crand "crypto/rand" + "fmt" +) + +import "math/rand" + +// generateUUID is used to generate a random UUID +func generateUUID() string { + buf := make([]byte, 16) + if _, err := crand.Read(buf); err != nil { + panic(fmt.Errorf("failed to read random bytes: %v", err)) + } + + return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x", + buf[0:4], + buf[4:6], + buf[6:8], + buf[8:10], + buf[10:16]) +} + +// shuffleStrings randomly shuffles the list of strings +func shuffleStrings(list []string) { + for i := range list { + j := rand.Intn(i + 1) + list[i], list[j] = list[j], list[i] + } +} diff --git a/client/util_test.go b/client/util_test.go new file mode 100644 index 000000000..762b00e28 --- /dev/null +++ b/client/util_test.go @@ -0,0 +1,43 @@ +package client + +import ( + "reflect" + "regexp" + "testing" +) + +func TestGenerateUUID(t *testing.T) { + prev := generateUUID() + for i := 0; i < 100; i++ { + id := generateUUID() + if prev == id { + t.Fatalf("Should get a new ID!") + } + + matched, err := regexp.MatchString( + "[\\da-f]{8}-[\\da-f]{4}-[\\da-f]{4}-[\\da-f]{4}-[\\da-f]{12}", id) + if !matched || err != nil { + t.Fatalf("expected match %s %v %s", id, matched, err) + } + } +} + +func TestShuffleStrings(t *testing.T) { + // Generate input + inp := make([]string, 10) + for idx := range inp { + inp[idx] = generateUUID() + } + + // Copy the input + orig := make([]string, len(inp)) + copy(orig, inp) + + // Shuffle + shuffleStrings(inp) + + // Ensure order is not the same + if reflect.DeepEqual(inp, orig) { + t.Fatalf("shuffle failed") + } +}