From b1671f4e593b99e85377233e2ff11da9fd03c3f3 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sat, 22 Aug 2015 19:17:49 -0700 Subject: [PATCH] nomad: add client.GetAllocs with blocking query support --- nomad/client_endpoint.go | 57 ++++++++++++++++ nomad/client_endpoint_test.go | 123 ++++++++++++++++++++++++++++++++++ nomad/rpc.go | 72 ++++++++++++++++++++ nomad/structs/structs.go | 6 ++ 4 files changed, 258 insertions(+) diff --git a/nomad/client_endpoint.go b/nomad/client_endpoint.go index 3c0f5b22c..8639e3fa4 100644 --- a/nomad/client_endpoint.go +++ b/nomad/client_endpoint.go @@ -259,6 +259,63 @@ func (c *ClientEndpoint) GetNode(args *structs.NodeSpecificRequest, return nil } +// GetAllocs is used to request allocations for a specific ndoe +func (c *ClientEndpoint) GetAllocs(args *structs.NodeSpecificRequest, + reply *structs.NodeAllocsResponse) error { + if done, err := c.srv.forward("Client.GetAllocs", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "client", "get_allocs"}, time.Now()) + + // Verify the arguments + if args.NodeID == "" { + return fmt.Errorf("missing node ID") + } + + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + allocWatch: args.NodeID, + run: func() error { + // Look for the node + snap, err := c.srv.fsm.State().Snapshot() + if err != nil { + return err + } + allocs, err := snap.AllocsByNode(args.NodeID) + if err != nil { + return err + } + + // Setup the output + if len(allocs) != 0 { + reply.Allocs = allocs + for _, alloc := range allocs { + reply.Index = maxUint64(reply.Index, alloc.ModifyIndex) + } + } else { + reply.Allocs = nil + + // Use the last index that affected the nodes table + index, err := snap.GetIndex("allocs") + if err != nil { + return err + } + + // Must provide non-zero index to prevent blocking + // Index 1 is impossible anyways (due to Raft internals) + if index == 0 { + reply.Index = 1 + } else { + reply.Index = index + } + } + return nil + }} + return c.srv.blockingRPC(&opts) +} + // createNodeEvals is used to create evaluations for each alloc on a node. // Each Eval is scoped to a job, so we need to potentially trigger many evals. func (c *ClientEndpoint) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint64, error) { diff --git a/nomad/client_endpoint_test.go b/nomad/client_endpoint_test.go index 8269d2f79..933422595 100644 --- a/nomad/client_endpoint_test.go +++ b/nomad/client_endpoint_test.go @@ -3,6 +3,7 @@ package nomad import ( "reflect" "testing" + "time" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/nomad/mock" @@ -245,6 +246,128 @@ func TestClientEndpoint_GetNode(t *testing.T) { } } +func TestClientEndpoint_GetAllocs(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + node := mock.Node() + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "region1"}, + } + + // Fetch the response + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Client.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + node.CreateIndex = resp.Index + node.ModifyIndex = resp.Index + + // Inject fake evaluations + alloc := mock.Alloc() + alloc.NodeID = node.ID + state := s1.fsm.State() + err := state.UpdateAllocations(100, nil, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Lookup the allocs + get := &structs.NodeSpecificRequest{ + NodeID: node.ID, + QueryOptions: structs.QueryOptions{Region: "region1"}, + } + var resp2 structs.NodeAllocsResponse + if err := msgpackrpc.CallWithCodec(codec, "Client.GetAllocs", get, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + if resp2.Index != 100 { + t.Fatalf("Bad index: %d %d", resp2.Index, 100) + } + + if len(resp2.Allocs) != 1 || resp2.Allocs[0].ID != alloc.ID { + t.Fatalf("bad: %#v", resp2.Allocs) + } + + // Lookup non-existing node + get.NodeID = "foobarbaz" + if err := msgpackrpc.CallWithCodec(codec, "Client.GetAllocs", get, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + if resp2.Index != 100 { + t.Fatalf("Bad index: %d %d", resp2.Index, 100) + } + if len(resp2.Allocs) != 0 { + t.Fatalf("unexpected node") + } +} + +func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + node := mock.Node() + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "region1"}, + } + + // Fetch the response + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Client.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + node.CreateIndex = resp.Index + node.ModifyIndex = resp.Index + + // Inject fake evaluations async + alloc := mock.Alloc() + alloc.NodeID = node.ID + state := s1.fsm.State() + start := time.Now() + go func() { + time.Sleep(100 * time.Millisecond) + err := state.UpdateAllocations(100, nil, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + }() + + // Lookup the allocs in a blocking query + get := &structs.NodeSpecificRequest{ + NodeID: node.ID, + QueryOptions: structs.QueryOptions{ + Region: "region1", + MinQueryIndex: 50, + MaxQueryTime: time.Second, + }, + } + var resp2 structs.NodeAllocsResponse + if err := msgpackrpc.CallWithCodec(codec, "Client.GetAllocs", get, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + + // Should block at least 100ms + if time.Since(start) < 100*time.Millisecond { + t.Fatalf("too fast") + } + + if resp2.Index != 100 { + t.Fatalf("Bad index: %d %d", resp2.Index, 100) + } + + if len(resp2.Allocs) != 1 || resp2.Allocs[0].ID != alloc.ID { + t.Fatalf("bad: %#v", resp2.Allocs) + } +} + func TestClientEndpoint_CreateNodeEvals(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() diff --git a/nomad/rpc.go b/nomad/rpc.go index 41c915345..dee45e47f 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -11,6 +11,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/yamux" ) @@ -255,3 +256,74 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) { m.KnownLeader = (s.raft.Leader() != "") } } + +// blockingOptions is used to parameterize blockingRPC +type blockingOptions struct { + queryOpts *structs.QueryOptions + queryMeta *structs.QueryMeta + allocWatch string + run func() error +} + +// blockingRPC is used for queries that need to wait for a +// minimum index. This is used to block and wait for changes. +func (s *Server) blockingRPC(opts *blockingOptions) error { + var timeout *time.Timer + var notifyCh chan struct{} + var state *state.StateStore + + // Fast path non-blocking + if opts.queryOpts.MinQueryIndex == 0 { + goto RUN_QUERY + } + + // Restrict the max query time, and ensure there is always one + if opts.queryOpts.MaxQueryTime > maxQueryTime { + opts.queryOpts.MaxQueryTime = maxQueryTime + } else if opts.queryOpts.MaxQueryTime <= 0 { + opts.queryOpts.MaxQueryTime = defaultQueryTime + } + + // Apply a small amount of jitter to the request + opts.queryOpts.MaxQueryTime += randomStagger(opts.queryOpts.MaxQueryTime / jitterFraction) + + // Setup a query timeout + timeout = time.NewTimer(opts.queryOpts.MaxQueryTime) + + // Setup the notify channel + notifyCh = make(chan struct{}, 1) + + // Ensure we tear down any watchers on return + state = s.fsm.State() + defer func() { + timeout.Stop() + if opts.allocWatch != "" { + state.StopWatchAllocs(opts.allocWatch, notifyCh) + } + }() + +REGISTER_NOTIFY: + // Register the notification channel. This may be done + // multiple times if we have not reached the target wait index. + if opts.allocWatch != "" { + state.WatchAllocs(opts.allocWatch, notifyCh) + } + +RUN_QUERY: + // Update the query meta data + s.setQueryMeta(opts.queryMeta) + + // Run the query function + metrics.IncrCounter([]string{"nomad", "rpc", "query"}, 1) + err := opts.run() + + // Check for minimum query time + if err == nil && opts.queryMeta.Index > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex { + select { + case <-notifyCh: + goto REGISTER_NOTIFY + case <-timeout.C: + } + } + return err +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index eb279705b..d94a51143 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -274,6 +274,12 @@ type NodeUpdateResponse struct { QueryMeta } +// NodeAllocsResponse is used to return allocs for a single node +type NodeAllocsResponse struct { + Allocs []*Allocation + QueryMeta +} + // SingleNodeResponse is used to return a single node type SingleNodeResponse struct { Node *Node