mirror of
https://github.com/kemko/nomad.git
synced 2026-01-09 20:05:42 +03:00
Merge pull request #5381 from hashicorp/b-max-eval-wait-index
nomad: compare current eval when setting WaitIndex
This commit is contained in:
@@ -105,7 +105,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
|
||||
// Provide the output if any
|
||||
if eval != nil {
|
||||
// Get the index that the worker should wait until before scheduling.
|
||||
waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID)
|
||||
waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID, eval.ModifyIndex)
|
||||
if err != nil {
|
||||
var mErr multierror.Error
|
||||
multierror.Append(&mErr, err)
|
||||
@@ -133,7 +133,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
|
||||
// invoking the scheduler. The index should be the highest modify index of any
|
||||
// evaluation for the job. This prevents scheduling races for the same job when
|
||||
// there are blocked evaluations.
|
||||
func (e *Eval) getWaitIndex(namespace, job string) (uint64, error) {
|
||||
func (e *Eval) getWaitIndex(namespace, job string, evalModifyIndex uint64) (uint64, error) {
|
||||
snap, err := e.srv.State().Snapshot()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -144,7 +144,10 @@ func (e *Eval) getWaitIndex(namespace, job string) (uint64, error) {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var max uint64
|
||||
// Since dequeueing evals is concurrent with applying Raft messages to
|
||||
// the state store, initialize to the currently dequeued eval's index
|
||||
// in case it isn't in the snapshot used by EvalsByJob yet.
|
||||
max := evalModifyIndex
|
||||
for _, eval := range evals {
|
||||
if max < eval.ModifyIndex {
|
||||
max = eval.ModifyIndex
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/hashicorp/nomad/scheduler"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestEvalEndpoint_GetEval(t *testing.T) {
|
||||
@@ -239,7 +240,9 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) {
|
||||
// TestEvalEndpoint_Dequeue_WaitIndex_Snapshot asserts that an eval's wait
|
||||
// index will be equal to the highest eval modify index in the state store.
|
||||
func TestEvalEndpoint_Dequeue_WaitIndex_Snapshot(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
@@ -285,6 +288,48 @@ func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestEvalEndpoint_Dequeue_WaitIndex_Eval asserts that an eval's wait index
|
||||
// will be its own modify index if its modify index is greater than all of the
|
||||
// indexes in the state store. This can happen if Dequeue receives an eval that
|
||||
// has not yet been applied from the Raft log to the local node's state store.
|
||||
func TestEvalEndpoint_Dequeue_WaitIndex_Eval(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the register request but only upsert 1 into the state store
|
||||
eval1 := mock.Eval()
|
||||
eval2 := mock.Eval()
|
||||
eval2.JobID = eval1.JobID
|
||||
s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1})
|
||||
eval2.ModifyIndex = 1001
|
||||
s1.evalBroker.Enqueue(eval2)
|
||||
|
||||
// Dequeue the eval
|
||||
get := &structs.EvalDequeueRequest{
|
||||
Schedulers: defaultSched,
|
||||
SchedulerVersion: scheduler.SchedulerVersion,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
var resp structs.EvalDequeueResponse
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp))
|
||||
require.Equal(t, eval2, resp.Eval)
|
||||
|
||||
// Ensure outstanding
|
||||
token, ok := s1.evalBroker.Outstanding(eval2.ID)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, resp.Token, token)
|
||||
|
||||
// WaitIndex should be equal to the max ModifyIndex - even when that
|
||||
// modify index is of the dequeued eval which has yet to be applied to
|
||||
// the state store.
|
||||
require.Equal(t, eval2.ModifyIndex, resp.WaitIndex)
|
||||
}
|
||||
|
||||
func TestEvalEndpoint_Dequeue_UpdateWaitIndex(t *testing.T) {
|
||||
// test enqueuing an eval, updating a plan result for the same eval and de-queueing the eval
|
||||
t.Parallel()
|
||||
|
||||
Reference in New Issue
Block a user