mirror of
https://github.com/kemko/nomad.git
synced 2026-01-14 22:35:42 +03:00
nomad: add client.GetAllocs with blocking query support
This commit is contained in:
@@ -259,6 +259,63 @@ func (c *ClientEndpoint) GetNode(args *structs.NodeSpecificRequest,
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetAllocs is used to request allocations for a specific ndoe
|
||||
func (c *ClientEndpoint) GetAllocs(args *structs.NodeSpecificRequest,
|
||||
reply *structs.NodeAllocsResponse) error {
|
||||
if done, err := c.srv.forward("Client.GetAllocs", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "client", "get_allocs"}, time.Now())
|
||||
|
||||
// Verify the arguments
|
||||
if args.NodeID == "" {
|
||||
return fmt.Errorf("missing node ID")
|
||||
}
|
||||
|
||||
// Setup the blocking query
|
||||
opts := blockingOptions{
|
||||
queryOpts: &args.QueryOptions,
|
||||
queryMeta: &reply.QueryMeta,
|
||||
allocWatch: args.NodeID,
|
||||
run: func() error {
|
||||
// Look for the node
|
||||
snap, err := c.srv.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
allocs, err := snap.AllocsByNode(args.NodeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Setup the output
|
||||
if len(allocs) != 0 {
|
||||
reply.Allocs = allocs
|
||||
for _, alloc := range allocs {
|
||||
reply.Index = maxUint64(reply.Index, alloc.ModifyIndex)
|
||||
}
|
||||
} else {
|
||||
reply.Allocs = nil
|
||||
|
||||
// Use the last index that affected the nodes table
|
||||
index, err := snap.GetIndex("allocs")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Must provide non-zero index to prevent blocking
|
||||
// Index 1 is impossible anyways (due to Raft internals)
|
||||
if index == 0 {
|
||||
reply.Index = 1
|
||||
} else {
|
||||
reply.Index = index
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}}
|
||||
return c.srv.blockingRPC(&opts)
|
||||
}
|
||||
|
||||
// createNodeEvals is used to create evaluations for each alloc on a node.
|
||||
// Each Eval is scoped to a job, so we need to potentially trigger many evals.
|
||||
func (c *ClientEndpoint) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint64, error) {
|
||||
|
||||
@@ -3,6 +3,7 @@ package nomad
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
@@ -245,6 +246,128 @@ func TestClientEndpoint_GetNode(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientEndpoint_GetAllocs(t *testing.T) {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the register request
|
||||
node := mock.Node()
|
||||
reg := &structs.NodeRegisterRequest{
|
||||
Node: node,
|
||||
WriteRequest: structs.WriteRequest{Region: "region1"},
|
||||
}
|
||||
|
||||
// Fetch the response
|
||||
var resp structs.GenericResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Client.Register", reg, &resp); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
node.CreateIndex = resp.Index
|
||||
node.ModifyIndex = resp.Index
|
||||
|
||||
// Inject fake evaluations
|
||||
alloc := mock.Alloc()
|
||||
alloc.NodeID = node.ID
|
||||
state := s1.fsm.State()
|
||||
err := state.UpdateAllocations(100, nil, []*structs.Allocation{alloc})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Lookup the allocs
|
||||
get := &structs.NodeSpecificRequest{
|
||||
NodeID: node.ID,
|
||||
QueryOptions: structs.QueryOptions{Region: "region1"},
|
||||
}
|
||||
var resp2 structs.NodeAllocsResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Client.GetAllocs", get, &resp2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if resp2.Index != 100 {
|
||||
t.Fatalf("Bad index: %d %d", resp2.Index, 100)
|
||||
}
|
||||
|
||||
if len(resp2.Allocs) != 1 || resp2.Allocs[0].ID != alloc.ID {
|
||||
t.Fatalf("bad: %#v", resp2.Allocs)
|
||||
}
|
||||
|
||||
// Lookup non-existing node
|
||||
get.NodeID = "foobarbaz"
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Client.GetAllocs", get, &resp2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if resp2.Index != 100 {
|
||||
t.Fatalf("Bad index: %d %d", resp2.Index, 100)
|
||||
}
|
||||
if len(resp2.Allocs) != 0 {
|
||||
t.Fatalf("unexpected node")
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the register request
|
||||
node := mock.Node()
|
||||
reg := &structs.NodeRegisterRequest{
|
||||
Node: node,
|
||||
WriteRequest: structs.WriteRequest{Region: "region1"},
|
||||
}
|
||||
|
||||
// Fetch the response
|
||||
var resp structs.GenericResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Client.Register", reg, &resp); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
node.CreateIndex = resp.Index
|
||||
node.ModifyIndex = resp.Index
|
||||
|
||||
// Inject fake evaluations async
|
||||
alloc := mock.Alloc()
|
||||
alloc.NodeID = node.ID
|
||||
state := s1.fsm.State()
|
||||
start := time.Now()
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
err := state.UpdateAllocations(100, nil, []*structs.Allocation{alloc})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Lookup the allocs in a blocking query
|
||||
get := &structs.NodeSpecificRequest{
|
||||
NodeID: node.ID,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: "region1",
|
||||
MinQueryIndex: 50,
|
||||
MaxQueryTime: time.Second,
|
||||
},
|
||||
}
|
||||
var resp2 structs.NodeAllocsResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Client.GetAllocs", get, &resp2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Should block at least 100ms
|
||||
if time.Since(start) < 100*time.Millisecond {
|
||||
t.Fatalf("too fast")
|
||||
}
|
||||
|
||||
if resp2.Index != 100 {
|
||||
t.Fatalf("Bad index: %d %d", resp2.Index, 100)
|
||||
}
|
||||
|
||||
if len(resp2.Allocs) != 1 || resp2.Allocs[0].ID != alloc.ID {
|
||||
t.Fatalf("bad: %#v", resp2.Allocs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
|
||||
72
nomad/rpc.go
72
nomad/rpc.go
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/nomad/nomad/state"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/yamux"
|
||||
)
|
||||
@@ -255,3 +256,74 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) {
|
||||
m.KnownLeader = (s.raft.Leader() != "")
|
||||
}
|
||||
}
|
||||
|
||||
// blockingOptions is used to parameterize blockingRPC
|
||||
type blockingOptions struct {
|
||||
queryOpts *structs.QueryOptions
|
||||
queryMeta *structs.QueryMeta
|
||||
allocWatch string
|
||||
run func() error
|
||||
}
|
||||
|
||||
// blockingRPC is used for queries that need to wait for a
|
||||
// minimum index. This is used to block and wait for changes.
|
||||
func (s *Server) blockingRPC(opts *blockingOptions) error {
|
||||
var timeout *time.Timer
|
||||
var notifyCh chan struct{}
|
||||
var state *state.StateStore
|
||||
|
||||
// Fast path non-blocking
|
||||
if opts.queryOpts.MinQueryIndex == 0 {
|
||||
goto RUN_QUERY
|
||||
}
|
||||
|
||||
// Restrict the max query time, and ensure there is always one
|
||||
if opts.queryOpts.MaxQueryTime > maxQueryTime {
|
||||
opts.queryOpts.MaxQueryTime = maxQueryTime
|
||||
} else if opts.queryOpts.MaxQueryTime <= 0 {
|
||||
opts.queryOpts.MaxQueryTime = defaultQueryTime
|
||||
}
|
||||
|
||||
// Apply a small amount of jitter to the request
|
||||
opts.queryOpts.MaxQueryTime += randomStagger(opts.queryOpts.MaxQueryTime / jitterFraction)
|
||||
|
||||
// Setup a query timeout
|
||||
timeout = time.NewTimer(opts.queryOpts.MaxQueryTime)
|
||||
|
||||
// Setup the notify channel
|
||||
notifyCh = make(chan struct{}, 1)
|
||||
|
||||
// Ensure we tear down any watchers on return
|
||||
state = s.fsm.State()
|
||||
defer func() {
|
||||
timeout.Stop()
|
||||
if opts.allocWatch != "" {
|
||||
state.StopWatchAllocs(opts.allocWatch, notifyCh)
|
||||
}
|
||||
}()
|
||||
|
||||
REGISTER_NOTIFY:
|
||||
// Register the notification channel. This may be done
|
||||
// multiple times if we have not reached the target wait index.
|
||||
if opts.allocWatch != "" {
|
||||
state.WatchAllocs(opts.allocWatch, notifyCh)
|
||||
}
|
||||
|
||||
RUN_QUERY:
|
||||
// Update the query meta data
|
||||
s.setQueryMeta(opts.queryMeta)
|
||||
|
||||
// Run the query function
|
||||
metrics.IncrCounter([]string{"nomad", "rpc", "query"}, 1)
|
||||
err := opts.run()
|
||||
|
||||
// Check for minimum query time
|
||||
if err == nil && opts.queryMeta.Index > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex {
|
||||
select {
|
||||
case <-notifyCh:
|
||||
goto REGISTER_NOTIFY
|
||||
case <-timeout.C:
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -274,6 +274,12 @@ type NodeUpdateResponse struct {
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
// NodeAllocsResponse is used to return allocs for a single node
|
||||
type NodeAllocsResponse struct {
|
||||
Allocs []*Allocation
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
// SingleNodeResponse is used to return a single node
|
||||
type SingleNodeResponse struct {
|
||||
Node *Node
|
||||
|
||||
Reference in New Issue
Block a user