mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 10:25:42 +03:00
nomad: compare current eval when setting WaitIndex
Consider currently dequeued Evaluation's ModifyIndex when determining its WaitIndex. Normally the Evaluation itself would already be in the state store snapshot used to determine the WaitIndex. However, since the FSM applies Raft messages to the state store concurrently with Dequeueing, it's possible the currently dequeued Evaluation won't yet exist in the state store snapshot used by JobsForEval. This can be solved by always considering the current eval's modify index and using it if it is greater than all of the evals returned by the state store.
This commit is contained in:
@@ -105,7 +105,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
|
||||
// Provide the output if any
|
||||
if eval != nil {
|
||||
// Get the index that the worker should wait until before scheduling.
|
||||
waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID)
|
||||
waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID, eval.ModifyIndex)
|
||||
if err != nil {
|
||||
var mErr multierror.Error
|
||||
multierror.Append(&mErr, err)
|
||||
@@ -133,7 +133,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
|
||||
// invoking the scheduler. The index should be the highest modify index of any
|
||||
// evaluation for the job. This prevents scheduling races for the same job when
|
||||
// there are blocked evaluations.
|
||||
func (e *Eval) getWaitIndex(namespace, job string) (uint64, error) {
|
||||
func (e *Eval) getWaitIndex(namespace, job string, curIndex uint64) (uint64, error) {
|
||||
snap, err := e.srv.State().Snapshot()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -151,6 +151,14 @@ func (e *Eval) getWaitIndex(namespace, job string) (uint64, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Since dequeueing evals is concurrent with applying raft messages to
|
||||
// the state store, manually compare the currently dequeued eval's
|
||||
// index against max in case it wasn't in the snapshot used by
|
||||
// EvalsByJob yet.
|
||||
if max < curIndex {
|
||||
max = curIndex
|
||||
}
|
||||
|
||||
return max, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/hashicorp/nomad/scheduler"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestEvalEndpoint_GetEval(t *testing.T) {
|
||||
@@ -239,7 +240,9 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) {
|
||||
// TestEvalEndpoint_Dequeue_WaitIndex_Snapshot asserts that an eval's wait
|
||||
// index will be equal to the highest eval modify index in the state store.
|
||||
func TestEvalEndpoint_Dequeue_WaitIndex_Snapshot(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
@@ -285,6 +288,48 @@ func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestEvalEndpoint_Dequeue_WaitIndex_Eval asserts that an eval's wait index
|
||||
// will be its own modify index if its modify index is greater than all of the
|
||||
// indexes in the state store. This can happen if Dequeue receives an eval that
|
||||
// has not yet been applied from the raft log to the local node's state store.
|
||||
func TestEvalEndpoint_Dequeue_WaitIndex_Eval(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the register request but only upsert 1 into the state store
|
||||
eval1 := mock.Eval()
|
||||
eval2 := mock.Eval()
|
||||
eval2.JobID = eval1.JobID
|
||||
s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1})
|
||||
eval2.ModifyIndex = 1001
|
||||
s1.evalBroker.Enqueue(eval2)
|
||||
|
||||
// Dequeue the eval
|
||||
get := &structs.EvalDequeueRequest{
|
||||
Schedulers: defaultSched,
|
||||
SchedulerVersion: scheduler.SchedulerVersion,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
var resp structs.EvalDequeueResponse
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp))
|
||||
require.Equal(t, eval2, resp.Eval)
|
||||
|
||||
// Ensure outstanding
|
||||
token, ok := s1.evalBroker.Outstanding(eval2.ID)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, resp.Token, token)
|
||||
|
||||
// WaitIndex should be equal to the max ModifyIndex - even when that
|
||||
// modify index is of the dequeued eval which has yet to be applied to
|
||||
// the state store.
|
||||
require.Equal(t, eval2.ModifyIndex, resp.WaitIndex)
|
||||
}
|
||||
|
||||
func TestEvalEndpoint_Dequeue_UpdateWaitIndex(t *testing.T) {
|
||||
// test enqueuing an eval, updating a plan result for the same eval and de-queueing the eval
|
||||
t.Parallel()
|
||||
|
||||
Reference in New Issue
Block a user