added indices to the job scaling events, so we could properly do

blocking queries on the job scaling status
This commit is contained in:
Chris Baker
2020-04-01 17:28:19 +00:00
parent 10ffa7eee5
commit 742948e724
6 changed files with 37 additions and 19 deletions

View File

@@ -92,9 +92,10 @@ type TaskGroupScaleStatus struct {
}
type ScalingEvent struct {
Error bool
Message string
Meta map[string]interface{}
EvalID *string
Time uint64
Error bool
Message string
Meta map[string]interface{}
EvalID *string
Time uint64
CreateIndex uint64
}

View File

@@ -1767,7 +1767,7 @@ func (j *Job) ScaleStatus(args *structs.JobScaleStatusRequest,
return err
}
events, err := state.ScalingEventsByJob(ws, args.RequestNamespace(), args.JobID)
events, eventsIndex, err := state.ScalingEventsByJob(ws, args.RequestNamespace(), args.JobID)
if err != nil {
return err
}
@@ -1799,11 +1799,14 @@ func (j *Job) ScaleStatus(args *structs.JobScaleStatusRequest,
reply.JobScaleStatus.TaskGroups[tg.Name] = tgScale
}
if deployment != nil && deployment.ModifyIndex > job.ModifyIndex {
reply.Index = deployment.ModifyIndex
} else {
reply.Index = job.ModifyIndex
maxIndex := job.ModifyIndex
if deployment != nil && deployment.ModifyIndex > maxIndex {
maxIndex = deployment.ModifyIndex
}
if eventsIndex > maxIndex {
maxIndex = eventsIndex
}
reply.Index = maxIndex
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)

View File

@@ -5595,7 +5595,7 @@ func TestJobEndpoint_Scale_NoEval(t *testing.T) {
require.Empty(resp.EvalID)
require.Empty(resp.EvalCreateIndex)
jobEvents, err := state.ScalingEventsByJob(nil, job.Namespace, job.ID)
jobEvents, eventsIndex, err := state.ScalingEventsByJob(nil, job.Namespace, job.ID)
require.NoError(err)
require.NotNil(jobEvents)
require.Len(jobEvents, 1)
@@ -5604,6 +5604,7 @@ func TestJobEndpoint_Scale_NoEval(t *testing.T) {
require.Len(groupEvents, 1)
event := groupEvents[0]
require.Nil(event.EvalID)
require.Greater(eventsIndex, job.CreateIndex)
}
func TestJobEndpoint_GetScaleStatus(t *testing.T) {

View File

@@ -650,6 +650,9 @@ func (s *StateStore) UpsertScalingEvent(index uint64, req *structs.ScalingEventR
}
}
jobEvents.ModifyIndex = index
req.ScalingEvent.CreateIndex = index
events := jobEvents.ScalingEvents[req.TaskGroup]
// prepend this latest event
events = append(
@@ -691,19 +694,20 @@ func (s *StateStore) ScalingEvents(ws memdb.WatchSet) (memdb.ResultIterator, err
return iter, nil
}
func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID string) (map[string][]*structs.ScalingEvent, error) {
func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID string) (map[string][]*structs.ScalingEvent, uint64, error) {
txn := s.db.Txn(false)
watchCh, existing, err := txn.FirstWatch("scaling_event", "id", namespace, jobID)
if err != nil {
return nil, fmt.Errorf("job scaling events lookup failed: %v", err)
return nil, 0, fmt.Errorf("job scaling events lookup failed: %v", err)
}
ws.Add(watchCh)
if existing != nil {
return existing.(*structs.JobScalingEvents).ScalingEvents, nil
events := existing.(*structs.JobScalingEvents)
return events.ScalingEvents, events.ModifyIndex, nil
}
return nil, nil
return nil, 0, nil
}
// UpsertNode is used to register a node or update a node definition

View File

@@ -8570,7 +8570,7 @@ func TestStateStore_UpsertScalingEvent(t *testing.T) {
require.Nil(all.Next())
ws := memdb.NewWatchSet()
out, err := state.ScalingEventsByJob(ws, job.Namespace, job.ID)
out, _, err := state.ScalingEventsByJob(ws, job.Namespace, job.ID)
require.NoError(err)
require.Nil(out)
@@ -8585,11 +8585,12 @@ func TestStateStore_UpsertScalingEvent(t *testing.T) {
require.True(watchFired(wsAll))
ws = memdb.NewWatchSet()
out, err = state.ScalingEventsByJob(ws, job.Namespace, job.ID)
out, eventsIndex, err := state.ScalingEventsByJob(ws, job.Namespace, job.ID)
require.NoError(err)
require.Equal(map[string][]*structs.ScalingEvent{
groupName: {newEvent},
}, out)
require.EqualValues(eventsIndex, 1000)
iter, err := state.ScalingEvents(ws)
require.NoError(err)
@@ -8607,6 +8608,8 @@ func TestStateStore_UpsertScalingEvent(t *testing.T) {
count++
}
require.Equal(1, count)
require.EqualValues(jobEvents.ModifyIndex, 1000)
require.EqualValues(jobEvents.ScalingEvents[groupName][0].CreateIndex, 1000)
index, err := state.Index("scaling_event")
require.NoError(err)
@@ -8657,7 +8660,7 @@ func TestStateStore_UpsertScalingEvent_LimitAndOrder(t *testing.T) {
require.NoError(err)
}
out, err := state.ScalingEventsByJob(nil, namespace, jobID)
out, _, err := state.ScalingEventsByJob(nil, namespace, jobID)
require.NoError(err)
require.Len(out, 2)
@@ -8708,7 +8711,7 @@ func TestStateStore_RestoreScalingEvents(t *testing.T) {
restore.Commit()
ws := memdb.NewWatchSet()
out, err := state.ScalingEventsByJob(ws, jobScalingEvents.Namespace,
out, _, err := state.ScalingEventsByJob(ws, jobScalingEvents.Namespace,
jobScalingEvents.JobID)
require.NoError(err)
require.NotNil(out)

View File

@@ -4664,6 +4664,9 @@ type JobScalingEvents struct {
// the indexed array is sorted from newest to oldest event
// the array should have less than JobTrackedScalingEvents entries
ScalingEvents map[string][]*ScalingEvent
// Raft index
ModifyIndex uint64
}
// Factory method for ScalingEvent objects
@@ -4693,6 +4696,9 @@ type ScalingEvent struct {
// EvalID is the ID for an evaluation if one was created as part of a scaling event
EvalID *string
// Raft index
CreateIndex uint64
}
func (e *ScalingEvent) SetError(error bool) *ScalingEvent {