From 742948e724f793ec3adb74c08c0eb7d104be6f87 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Wed, 1 Apr 2020 17:28:19 +0000 Subject: [PATCH] added indices to the job scaling events, so we could properly do blocking queries on the job scaling status --- api/scaling.go | 11 ++++++----- nomad/job_endpoint.go | 13 ++++++++----- nomad/job_endpoint_test.go | 3 ++- nomad/state/state_store.go | 12 ++++++++---- nomad/state/state_store_test.go | 11 +++++++---- nomad/structs/structs.go | 6 ++++++ 6 files changed, 37 insertions(+), 19 deletions(-) diff --git a/api/scaling.go b/api/scaling.go index 9b22d5705..a854b43a5 100644 --- a/api/scaling.go +++ b/api/scaling.go @@ -92,9 +92,10 @@ type TaskGroupScaleStatus struct { } type ScalingEvent struct { - Error bool - Message string - Meta map[string]interface{} - EvalID *string - Time uint64 + Error bool + Message string + Meta map[string]interface{} + EvalID *string + Time uint64 + CreateIndex uint64 } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 2761549f4..cb58740c0 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1767,7 +1767,7 @@ func (j *Job) ScaleStatus(args *structs.JobScaleStatusRequest, return err } - events, err := state.ScalingEventsByJob(ws, args.RequestNamespace(), args.JobID) + events, eventsIndex, err := state.ScalingEventsByJob(ws, args.RequestNamespace(), args.JobID) if err != nil { return err } @@ -1799,11 +1799,14 @@ func (j *Job) ScaleStatus(args *structs.JobScaleStatusRequest, reply.JobScaleStatus.TaskGroups[tg.Name] = tgScale } - if deployment != nil && deployment.ModifyIndex > job.ModifyIndex { - reply.Index = deployment.ModifyIndex - } else { - reply.Index = job.ModifyIndex + maxIndex := job.ModifyIndex + if deployment != nil && deployment.ModifyIndex > maxIndex { + maxIndex = deployment.ModifyIndex } + if eventsIndex > maxIndex { + maxIndex = eventsIndex + } + reply.Index = maxIndex // Set the query response j.srv.setQueryMeta(&reply.QueryMeta) diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 4e82bbeb9..3aa49d8d6 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -5595,7 +5595,7 @@ func TestJobEndpoint_Scale_NoEval(t *testing.T) { require.Empty(resp.EvalID) require.Empty(resp.EvalCreateIndex) - jobEvents, err := state.ScalingEventsByJob(nil, job.Namespace, job.ID) + jobEvents, eventsIndex, err := state.ScalingEventsByJob(nil, job.Namespace, job.ID) require.NoError(err) require.NotNil(jobEvents) require.Len(jobEvents, 1) @@ -5604,6 +5604,7 @@ func TestJobEndpoint_Scale_NoEval(t *testing.T) { require.Len(groupEvents, 1) event := groupEvents[0] require.Nil(event.EvalID) + require.Greater(eventsIndex, job.CreateIndex) } func TestJobEndpoint_GetScaleStatus(t *testing.T) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 6972c28c8..1f6838965 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -650,6 +650,9 @@ func (s *StateStore) UpsertScalingEvent(index uint64, req *structs.ScalingEventR } } + jobEvents.ModifyIndex = index + req.ScalingEvent.CreateIndex = index + events := jobEvents.ScalingEvents[req.TaskGroup] // prepend this latest event events = append( @@ -691,19 +694,20 @@ func (s *StateStore) ScalingEvents(ws memdb.WatchSet) (memdb.ResultIterator, err return iter, nil } -func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID string) (map[string][]*structs.ScalingEvent, error) { +func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID string) (map[string][]*structs.ScalingEvent, uint64, error) { txn := s.db.Txn(false) watchCh, existing, err := txn.FirstWatch("scaling_event", "id", namespace, jobID) if err != nil { - return nil, fmt.Errorf("job scaling events lookup failed: %v", err) + return nil, 0, fmt.Errorf("job scaling events lookup failed: %v", err) } ws.Add(watchCh) if existing != nil { - return existing.(*structs.JobScalingEvents).ScalingEvents, nil + events := existing.(*structs.JobScalingEvents) + return events.ScalingEvents, events.ModifyIndex, nil } - return nil, nil + return nil, 0, nil } // UpsertNode is used to register a node or update a node definition diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index acbb1cb65..af2406dbf 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -8570,7 +8570,7 @@ func TestStateStore_UpsertScalingEvent(t *testing.T) { require.Nil(all.Next()) ws := memdb.NewWatchSet() - out, err := state.ScalingEventsByJob(ws, job.Namespace, job.ID) + out, _, err := state.ScalingEventsByJob(ws, job.Namespace, job.ID) require.NoError(err) require.Nil(out) @@ -8585,11 +8585,12 @@ func TestStateStore_UpsertScalingEvent(t *testing.T) { require.True(watchFired(wsAll)) ws = memdb.NewWatchSet() - out, err = state.ScalingEventsByJob(ws, job.Namespace, job.ID) + out, eventsIndex, err := state.ScalingEventsByJob(ws, job.Namespace, job.ID) require.NoError(err) require.Equal(map[string][]*structs.ScalingEvent{ groupName: {newEvent}, }, out) + require.EqualValues(eventsIndex, 1000) iter, err := state.ScalingEvents(ws) require.NoError(err) @@ -8607,6 +8608,8 @@ func TestStateStore_UpsertScalingEvent(t *testing.T) { count++ } require.Equal(1, count) + require.EqualValues(jobEvents.ModifyIndex, 1000) + require.EqualValues(jobEvents.ScalingEvents[groupName][0].CreateIndex, 1000) index, err := state.Index("scaling_event") require.NoError(err) @@ -8657,7 +8660,7 @@ func TestStateStore_UpsertScalingEvent_LimitAndOrder(t *testing.T) { require.NoError(err) } - out, err := state.ScalingEventsByJob(nil, namespace, jobID) + out, _, err := state.ScalingEventsByJob(nil, namespace, jobID) require.NoError(err) require.Len(out, 2) @@ -8708,7 +8711,7 @@ func TestStateStore_RestoreScalingEvents(t *testing.T) { restore.Commit() ws := memdb.NewWatchSet() - out, err := state.ScalingEventsByJob(ws, jobScalingEvents.Namespace, + out, _, err := state.ScalingEventsByJob(ws, jobScalingEvents.Namespace, jobScalingEvents.JobID) require.NoError(err) require.NotNil(out) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index d4f9b02be..453431d61 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4664,6 +4664,9 @@ type JobScalingEvents struct { // the indexed array is sorted from newest to oldest event // the array should have less than JobTrackedScalingEvents entries ScalingEvents map[string][]*ScalingEvent + + // Raft index + ModifyIndex uint64 } // Factory method for ScalingEvent objects @@ -4693,6 +4696,9 @@ type ScalingEvent struct { // EvalID is the ID for an evaluation if one was created as part of a scaling event EvalID *string + + // Raft index + CreateIndex uint64 } func (e *ScalingEvent) SetError(error bool) *ScalingEvent {