mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 10:25:42 +03:00
Merge pull request #1517 from hashicorp/f-reconcile-summaries
Endpoint for reconciling job summaries
This commit is contained in:
@@ -129,6 +129,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
|
||||
s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeersRequest))
|
||||
|
||||
s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest))
|
||||
s.mux.HandleFunc("/v1/system/reconcile/summaries", s.wrap(s.ReconcileJobSummaries))
|
||||
|
||||
if enableDebug {
|
||||
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
|
||||
|
||||
@@ -22,3 +22,20 @@ func (s *HTTPServer) GarbageCollectRequest(resp http.ResponseWriter, req *http.R
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *HTTPServer) ReconcileJobSummaries(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
if req.Method != "PUT" {
|
||||
return nil, CodedError(405, ErrInvalidMethod)
|
||||
}
|
||||
|
||||
var args structs.GenericRequest
|
||||
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var gResp structs.GenericResponse
|
||||
if err := s.agent.RPC("System.ReconcileJobSummaries", &args, &gResp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@@ -21,3 +21,23 @@ func TestHTTP_SystemGarbageCollect(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestHTTP_ReconcileJobSummaries(t *testing.T) {
|
||||
httpTest(t, nil, func(s *TestServer) {
|
||||
// Make the HTTP request
|
||||
req, err := http.NewRequest("PUT", "/v1/system/reconcile/summaries", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
respW := httptest.NewRecorder()
|
||||
|
||||
// Make the request
|
||||
if _, err := s.Server.ReconcileJobSummaries(respW, req); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if respW.Code != 200 {
|
||||
t.Fatalf("expected: %v, actual: %v", 200, respW.Code)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
100
nomad/fsm.go
100
nomad/fsm.go
@@ -135,6 +135,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
|
||||
return n.applyAllocUpdate(buf[1:], log.Index)
|
||||
case structs.AllocClientUpdateRequestType:
|
||||
return n.applyAllocClientUpdate(buf[1:], log.Index)
|
||||
case structs.ReconcileJobSummariesRequestType:
|
||||
return n.applyReconcileSummaries(buf[1:], log.Index)
|
||||
default:
|
||||
if ignoreUnknown {
|
||||
n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
|
||||
@@ -444,6 +446,14 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
|
||||
return nil
|
||||
}
|
||||
|
||||
// applyReconcileSummaries reconciles summaries for all the jobs
|
||||
func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} {
|
||||
if err := n.state.ReconcileJobSummaries(index); err != nil {
|
||||
return err
|
||||
}
|
||||
return n.reconcileQueuedAllocations(index)
|
||||
}
|
||||
|
||||
func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||
// Create a new snapshot
|
||||
snap, err := n.state.Snapshot()
|
||||
@@ -578,39 +588,60 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Create Job Summaries
|
||||
// The entire snapshot has to be restored first before we create the missing
|
||||
// job summaries so that the indexes are updated and we know the highest
|
||||
// index
|
||||
// COMPAT 0.4 -> 0.4.1
|
||||
jobs, err := restore.JobsWithoutSummary()
|
||||
if err != nil {
|
||||
fmt.Errorf("error retreiving jobs during restore: %v", err)
|
||||
}
|
||||
if err := restore.CreateJobSummaries(jobs); err != nil {
|
||||
return fmt.Errorf("error creating job summaries: %v", err)
|
||||
}
|
||||
|
||||
restore.Commit()
|
||||
|
||||
// Reconciling the queued allocations
|
||||
return n.reconcileSummaries(jobs)
|
||||
// Create Job Summaries
|
||||
// COMPAT 0.4 -> 0.4.1
|
||||
// We can remove this in 0.5. This exists so that the server creates job
|
||||
// summaries if they were not present previously. When users upgrade to 0.5
|
||||
// from 0.4.1, the snapshot will contain job summaries so it will be safe to
|
||||
// remove this block.
|
||||
index, err := n.state.Index("job_summary")
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't fetch index of job summary table: %v", err)
|
||||
}
|
||||
|
||||
// If the index is 0 that means there is no job summary in the snapshot so
|
||||
// we will have to create them
|
||||
if index == 0 {
|
||||
// query the latest index
|
||||
latestIndex, err := n.state.LatestIndex()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to query latest index: %v", index)
|
||||
}
|
||||
if err := n.state.ReconcileJobSummaries(latestIndex); err != nil {
|
||||
return fmt.Errorf("error reconciling summaries: %v", err)
|
||||
}
|
||||
if err := n.reconcileQueuedAllocations(latestIndex); err != nil {
|
||||
return fmt.Errorf("error re-computing the number of queued allocations:; %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// reconcileSummaries re-calculates the queued allocations for every job that we
|
||||
// created a Job Summary during the snap shot restore
|
||||
func (n *nomadFSM) reconcileSummaries(jobs []*structs.Job) error {
|
||||
// Start the state restore
|
||||
restore, err := n.state.Restore()
|
||||
func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
|
||||
// Get all the jobs
|
||||
iter, err := n.state.Jobs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer restore.Abort()
|
||||
|
||||
snap, err := n.state.Snapshot()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create snapshot: %v", err)
|
||||
}
|
||||
for _, job := range jobs {
|
||||
|
||||
// Invoking the scheduler for every job so that we can populate the number
|
||||
// of queued allocations for every job
|
||||
for {
|
||||
rawJob := iter.Next()
|
||||
if rawJob == nil {
|
||||
break
|
||||
}
|
||||
job := rawJob.(*structs.Job)
|
||||
planner := &scheduler.Harness{
|
||||
State: &snap.StateStore,
|
||||
}
|
||||
@@ -635,10 +666,32 @@ func (n *nomadFSM) reconcileSummaries(jobs []*structs.Job) error {
|
||||
if err := sched.Process(eval); err != nil {
|
||||
return err
|
||||
}
|
||||
summary, err := snap.JobSummaryByID(job.ID)
|
||||
|
||||
// Get the job summary from the fsm state store
|
||||
summary, err := n.state.JobSummaryByID(job.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Add the allocations scheduler has made to queued since these
|
||||
// allocations are never getting placed until the scheduler is invoked
|
||||
// with a real planner
|
||||
if l := len(planner.Plans); l != 1 {
|
||||
return fmt.Errorf("unexpected number of plans during restore %d. Please file an issue including the logs", l)
|
||||
}
|
||||
for _, allocations := range planner.Plans[0].NodeAllocation {
|
||||
for _, allocation := range allocations {
|
||||
tgSummary, ok := summary.Summary[allocation.TaskGroup]
|
||||
if !ok {
|
||||
return fmt.Errorf("task group %q not found while updating queued count", allocation.TaskGroup)
|
||||
}
|
||||
tgSummary.Queued += 1
|
||||
summary.Summary[allocation.TaskGroup] = tgSummary
|
||||
}
|
||||
}
|
||||
|
||||
// Add the queued allocations attached to the evaluation to the queued
|
||||
// counter of the job summary
|
||||
if l := len(planner.Evals); l != 1 {
|
||||
return fmt.Errorf("unexpected number of evals during restore %d. Please file an issue including the logs", l)
|
||||
}
|
||||
@@ -647,15 +700,14 @@ func (n *nomadFSM) reconcileSummaries(jobs []*structs.Job) error {
|
||||
if !ok {
|
||||
return fmt.Errorf("task group %q not found while updating queued count", tg)
|
||||
}
|
||||
tgSummary.Queued = queued
|
||||
tgSummary.Queued += queued
|
||||
summary.Summary[tg] = tgSummary
|
||||
}
|
||||
|
||||
if err := restore.JobSummaryRestore(summary); err != nil {
|
||||
if err := n.state.UpsertJobSummary(index, summary); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
restore.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -981,13 +981,34 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
|
||||
fsm := testFSM(t)
|
||||
state := fsm.State()
|
||||
|
||||
// Add a node
|
||||
node := mock.Node()
|
||||
state.UpsertNode(800, node)
|
||||
|
||||
// Make a job so that none of the tasks can be placed
|
||||
job1 := mock.Job()
|
||||
job1.TaskGroups[0].Tasks[0].Resources.CPU = 5000
|
||||
state.UpsertJob(1000, job1)
|
||||
state.DeleteJobSummary(1010, job1.ID)
|
||||
|
||||
// make a job which can make partial progress
|
||||
alloc := mock.Alloc()
|
||||
alloc.NodeID = node.ID
|
||||
state.UpsertJob(1010, alloc.Job)
|
||||
state.UpsertAllocs(1011, []*structs.Allocation{alloc})
|
||||
|
||||
// Delete the summaries
|
||||
state.DeleteJobSummary(1030, job1.ID)
|
||||
state.DeleteJobSummary(1040, alloc.Job.ID)
|
||||
|
||||
// Delete the index
|
||||
if err := state.RemoveIndex("job_summary"); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
fsm2 := testSnapshotRestore(t, fsm)
|
||||
state2 := fsm2.State()
|
||||
latestIndex, _ := state.LatestIndex()
|
||||
|
||||
out1, _ := state2.JobSummaryByID(job1.ID)
|
||||
expected := structs.JobSummary{
|
||||
JobID: job1.ID,
|
||||
@@ -999,8 +1020,26 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
|
||||
CreateIndex: latestIndex,
|
||||
ModifyIndex: latestIndex,
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(&expected, out1) {
|
||||
t.Fatalf("expected: %#v, actual: %#v", &expected, out1)
|
||||
}
|
||||
|
||||
// This exercises the code path which adds the allocations made by the
|
||||
// planner and the number of unplaced allocations in the reconcile summaries
|
||||
// codepath
|
||||
out2, _ := state2.JobSummaryByID(alloc.Job.ID)
|
||||
expected = structs.JobSummary{
|
||||
JobID: alloc.Job.ID,
|
||||
Summary: map[string]structs.TaskGroupSummary{
|
||||
"web": structs.TaskGroupSummary{
|
||||
Queued: 10,
|
||||
Starting: 1,
|
||||
},
|
||||
},
|
||||
CreateIndex: latestIndex,
|
||||
ModifyIndex: latestIndex,
|
||||
}
|
||||
if !reflect.DeepEqual(&expected, out2) {
|
||||
t.Fatalf("expected: %#v, actual: %#v", &expected, out2)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,15 +85,21 @@ func (s *StateStore) StopWatch(items watch.Items, notify chan struct{}) {
|
||||
s.watch.stopWatch(items, notify)
|
||||
}
|
||||
|
||||
// UpsertJobSummary upserts a job summary into the state store. This is for
|
||||
// testing purposes
|
||||
// UpsertJobSummary upserts a job summary into the state store.
|
||||
func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSummary) error {
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
|
||||
// Update the index
|
||||
if err := txn.Insert("job_summary", *jobSummary); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update the indexes table for job summary
|
||||
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
txn.Commit()
|
||||
return nil
|
||||
}
|
||||
@@ -108,6 +114,9 @@ func (s *StateStore) DeleteJobSummary(index uint64, id string) error {
|
||||
if _, err := txn.DeleteAll("job_summary", "id", id); err != nil {
|
||||
return fmt.Errorf("deleting job summary failed: %v", err)
|
||||
}
|
||||
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
txn.Commit()
|
||||
return nil
|
||||
}
|
||||
@@ -1164,6 +1173,19 @@ func (s *StateStore) Index(name string) (uint64, error) {
|
||||
return out.(*IndexEntry).Value, nil
|
||||
}
|
||||
|
||||
// RemoveIndex is a helper method to remove an index for testing purposes
|
||||
func (s *StateStore) RemoveIndex(name string) error {
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
|
||||
if _, err := txn.DeleteAll("index", "id", name); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
txn.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Indexes returns an iterator over all the indexes
|
||||
func (s *StateStore) Indexes() (memdb.ResultIterator, error) {
|
||||
txn := s.db.Txn(false)
|
||||
@@ -1176,6 +1198,80 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) {
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
// ReconcileJobSummaries re-creates summaries for all jobs present in the state
|
||||
// store
|
||||
func (s *StateStore) ReconcileJobSummaries(index uint64) error {
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
|
||||
// Get all the jobs
|
||||
iter, err := txn.Get("jobs", "id")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for {
|
||||
rawJob := iter.Next()
|
||||
if rawJob == nil {
|
||||
break
|
||||
}
|
||||
job := rawJob.(*structs.Job)
|
||||
|
||||
// Create a job summary for the job
|
||||
summary := structs.JobSummary{
|
||||
JobID: job.ID,
|
||||
Summary: make(map[string]structs.TaskGroupSummary),
|
||||
}
|
||||
for _, tg := range job.TaskGroups {
|
||||
summary.Summary[tg.Name] = structs.TaskGroupSummary{}
|
||||
}
|
||||
|
||||
// Find all the allocations for the jobs
|
||||
iterAllocs, err := txn.Get("allocs", "job", job.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Calculate the summary for the job
|
||||
for {
|
||||
rawAlloc := iterAllocs.Next()
|
||||
if rawAlloc == nil {
|
||||
break
|
||||
}
|
||||
alloc := rawAlloc.(*structs.Allocation)
|
||||
tg := summary.Summary[alloc.TaskGroup]
|
||||
switch alloc.ClientStatus {
|
||||
case structs.AllocClientStatusFailed:
|
||||
tg.Failed += 1
|
||||
case structs.AllocClientStatusLost:
|
||||
tg.Lost += 1
|
||||
case structs.AllocClientStatusComplete:
|
||||
tg.Complete += 1
|
||||
case structs.AllocClientStatusRunning:
|
||||
tg.Running += 1
|
||||
case structs.AllocClientStatusPending:
|
||||
tg.Starting += 1
|
||||
default:
|
||||
s.logger.Printf("[ERR] state_store: invalid client status: %v in allocation %q", alloc.ClientStatus, alloc.ID)
|
||||
}
|
||||
summary.Summary[alloc.TaskGroup] = tg
|
||||
}
|
||||
|
||||
// Insert the job summary
|
||||
summary.CreateIndex = index
|
||||
summary.ModifyIndex = index
|
||||
if err := txn.Insert("job_summary", summary); err != nil {
|
||||
return fmt.Errorf("error inserting job summary: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Update the indexes table for job summary
|
||||
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
txn.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID.
|
||||
// It takes a map of job IDs to an optional forceStatus string. It returns an
|
||||
// error if the job doesn't exist or setJobStatus fails.
|
||||
@@ -1352,6 +1448,16 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
|
||||
return fmt.Errorf("unable to lookup job summary for job id %q: %v", err)
|
||||
}
|
||||
if summaryRaw == nil {
|
||||
// Check if the job is de-registered
|
||||
rawJob, err := txn.First("jobs", "id", alloc.JobID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to query job: %v", err)
|
||||
}
|
||||
|
||||
// If the job is de-registered then we skip updating it's summary
|
||||
if rawJob == nil {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("job summary for job %q is not present", alloc.JobID)
|
||||
}
|
||||
summary := summaryRaw.(structs.JobSummary)
|
||||
@@ -1445,10 +1551,9 @@ type StateSnapshot struct {
|
||||
// restoring state by only using a single large transaction
|
||||
// instead of thousands of sub transactions
|
||||
type StateRestore struct {
|
||||
txn *memdb.Txn
|
||||
watch *stateWatch
|
||||
items watch.Items
|
||||
latestIndex uint64
|
||||
txn *memdb.Txn
|
||||
watch *stateWatch
|
||||
items watch.Items
|
||||
}
|
||||
|
||||
// Abort is used to abort the restore operation
|
||||
@@ -1510,10 +1615,6 @@ func (r *StateRestore) IndexRestore(idx *IndexEntry) error {
|
||||
if err := r.txn.Insert("index", idx); err != nil {
|
||||
return fmt.Errorf("index insert failed: %v", err)
|
||||
}
|
||||
|
||||
if idx.Value > r.latestIndex {
|
||||
r.latestIndex = idx.Value
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1535,89 +1636,6 @@ func (r *StateRestore) JobSummaryRestore(jobSummary *structs.JobSummary) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// JobsWithoutSummary returns the list of jobs which don't have any summary
|
||||
func (r *StateRestore) JobsWithoutSummary() ([]*structs.Job, error) {
|
||||
// Get all the jobs
|
||||
var jobs []*structs.Job
|
||||
iter, err := r.txn.Get("jobs", "id")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't retrieve jobs: %v", err)
|
||||
}
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// Filter the jobs which have summaries
|
||||
job := raw.(*structs.Job)
|
||||
jobSummary, err := r.txn.First("job_summary", "id", job.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to get job summary: %v", err)
|
||||
}
|
||||
if jobSummary != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
// CreateJobSummaries computes the job summaries for all the jobs
|
||||
func (r *StateRestore) CreateJobSummaries(jobs []*structs.Job) error {
|
||||
for _, job := range jobs {
|
||||
// Get all the allocations for the job
|
||||
iter, err := r.txn.Get("allocs", "job", job.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't retrieve allocations for job %v: %v", job.ID, err)
|
||||
}
|
||||
var allocs []*structs.Allocation
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
allocs = append(allocs, raw.(*structs.Allocation))
|
||||
}
|
||||
|
||||
// Create a job summary for the job
|
||||
summary := structs.JobSummary{
|
||||
JobID: job.ID,
|
||||
Summary: make(map[string]structs.TaskGroupSummary),
|
||||
}
|
||||
for _, tg := range job.TaskGroups {
|
||||
summary.Summary[tg.Name] = structs.TaskGroupSummary{}
|
||||
}
|
||||
// Calculate the summary for the job
|
||||
for _, alloc := range allocs {
|
||||
tg := summary.Summary[alloc.TaskGroup]
|
||||
switch alloc.ClientStatus {
|
||||
case structs.AllocClientStatusFailed:
|
||||
tg.Failed += 1
|
||||
case structs.AllocClientStatusLost:
|
||||
tg.Lost += 1
|
||||
case structs.AllocClientStatusComplete:
|
||||
tg.Complete += 1
|
||||
case structs.AllocClientStatusRunning:
|
||||
tg.Running += 1
|
||||
case structs.AllocClientStatusPending:
|
||||
tg.Starting += 1
|
||||
}
|
||||
summary.Summary[alloc.TaskGroup] = tg
|
||||
}
|
||||
|
||||
// Insert the job summary
|
||||
summary.CreateIndex = r.latestIndex
|
||||
summary.ModifyIndex = r.latestIndex
|
||||
if err := r.txn.Insert("job_summary", summary); err != nil {
|
||||
return fmt.Errorf("error inserting job summary: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// stateWatch holds shared state for watching updates. This is
|
||||
// outside of StateStore so it can be shared with snapshots.
|
||||
type stateWatch struct {
|
||||
|
||||
@@ -1169,86 +1169,6 @@ func TestStateStore_RestoreJobSummary(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_CreateJobSummaries(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
restore, err := state.Restore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Restore a job
|
||||
job := mock.Job()
|
||||
if err := restore.JobRestore(job); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Restore an Index
|
||||
index := IndexEntry{
|
||||
Key: "Foo",
|
||||
Value: 100,
|
||||
}
|
||||
|
||||
if err := restore.IndexRestore(&index); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Restore an allocation
|
||||
alloc := mock.Alloc()
|
||||
alloc.JobID = job.ID
|
||||
alloc.Job = job
|
||||
if err := restore.AllocRestore(alloc); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create the job summaries
|
||||
if err := restore.CreateJobSummaries([]*structs.Job{job}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
restore.Commit()
|
||||
|
||||
summary, err := state.JobSummaryByID(job.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
expected := structs.JobSummary{
|
||||
JobID: job.ID,
|
||||
Summary: map[string]structs.TaskGroupSummary{
|
||||
"web": {
|
||||
Starting: 1,
|
||||
},
|
||||
},
|
||||
CreateIndex: 100,
|
||||
ModifyIndex: 100,
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(summary, &expected) {
|
||||
t.Fatalf("Bad: %#v %#v", summary, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateRestore_JobsWithoutSummaries(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
restore, err := state.Restore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Restore a job
|
||||
job := mock.Job()
|
||||
if err := restore.JobRestore(job); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
jobs, err := restore.JobsWithoutSummary()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(jobs) != 1 {
|
||||
t.Fatalf("expected: %v, actual: %v", 1, len(jobs))
|
||||
}
|
||||
if !reflect.DeepEqual(job, jobs[0]) {
|
||||
t.Fatalf("Bad: %#v %#v", job, jobs[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Indexes(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
node := mock.Node()
|
||||
@@ -2135,6 +2055,138 @@ func TestStateStore_JobSummary(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_ReconcileJobSummary(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
|
||||
// Create an alloc
|
||||
alloc := mock.Alloc()
|
||||
|
||||
// Add another task group to the job
|
||||
tg2 := alloc.Job.TaskGroups[0].Copy()
|
||||
tg2.Name = "db"
|
||||
alloc.Job.TaskGroups = append(alloc.Job.TaskGroups, tg2)
|
||||
state.UpsertJob(100, alloc.Job)
|
||||
|
||||
// Create one more alloc for the db task group
|
||||
alloc2 := mock.Alloc()
|
||||
alloc2.TaskGroup = "db"
|
||||
alloc2.JobID = alloc.JobID
|
||||
alloc2.Job = alloc.Job
|
||||
|
||||
// Upserts the alloc
|
||||
state.UpsertAllocs(110, []*structs.Allocation{alloc, alloc2})
|
||||
|
||||
// Change the state of the first alloc to running
|
||||
alloc3 := alloc.Copy()
|
||||
alloc3.ClientStatus = structs.AllocClientStatusRunning
|
||||
state.UpdateAllocsFromClient(120, []*structs.Allocation{alloc3})
|
||||
|
||||
//Add some more allocs to the second tg
|
||||
alloc4 := mock.Alloc()
|
||||
alloc4.JobID = alloc.JobID
|
||||
alloc4.Job = alloc.Job
|
||||
alloc4.TaskGroup = "db"
|
||||
alloc5 := alloc4.Copy()
|
||||
alloc5.ClientStatus = structs.AllocClientStatusRunning
|
||||
|
||||
alloc6 := mock.Alloc()
|
||||
alloc6.JobID = alloc.JobID
|
||||
alloc6.Job = alloc.Job
|
||||
alloc6.TaskGroup = "db"
|
||||
alloc7 := alloc6.Copy()
|
||||
alloc7.ClientStatus = structs.AllocClientStatusComplete
|
||||
|
||||
alloc8 := mock.Alloc()
|
||||
alloc8.JobID = alloc.JobID
|
||||
alloc8.Job = alloc.Job
|
||||
alloc8.TaskGroup = "db"
|
||||
alloc9 := alloc8.Copy()
|
||||
alloc9.ClientStatus = structs.AllocClientStatusFailed
|
||||
|
||||
alloc10 := mock.Alloc()
|
||||
alloc10.JobID = alloc.JobID
|
||||
alloc10.Job = alloc.Job
|
||||
alloc10.TaskGroup = "db"
|
||||
alloc11 := alloc10.Copy()
|
||||
alloc11.ClientStatus = structs.AllocClientStatusLost
|
||||
|
||||
state.UpsertAllocs(130, []*structs.Allocation{alloc4, alloc6, alloc8, alloc10})
|
||||
|
||||
state.UpdateAllocsFromClient(150, []*structs.Allocation{alloc5, alloc7, alloc9, alloc11})
|
||||
|
||||
// DeleteJobSummary is a helper method and doesn't modify the indexes table
|
||||
state.DeleteJobSummary(130, alloc.Job.ID)
|
||||
|
||||
state.ReconcileJobSummaries(120)
|
||||
|
||||
summary, _ := state.JobSummaryByID(alloc.Job.ID)
|
||||
expectedSummary := structs.JobSummary{
|
||||
JobID: alloc.Job.ID,
|
||||
Summary: map[string]structs.TaskGroupSummary{
|
||||
"web": structs.TaskGroupSummary{
|
||||
Running: 1,
|
||||
},
|
||||
"db": structs.TaskGroupSummary{
|
||||
Starting: 1,
|
||||
Running: 1,
|
||||
Failed: 1,
|
||||
Complete: 1,
|
||||
Lost: 1,
|
||||
},
|
||||
},
|
||||
CreateIndex: 120,
|
||||
ModifyIndex: 120,
|
||||
}
|
||||
if !reflect.DeepEqual(&expectedSummary, summary) {
|
||||
t.Fatalf("expected: %v, actual: %v", expectedSummary, summary)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_UpdateAlloc_JobNotPresent(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
|
||||
alloc := mock.Alloc()
|
||||
state.UpsertJob(100, alloc.Job)
|
||||
state.UpsertAllocs(200, []*structs.Allocation{alloc})
|
||||
|
||||
// Delete the job
|
||||
state.DeleteJob(300, alloc.Job.ID)
|
||||
|
||||
// Update the alloc
|
||||
alloc1 := alloc.Copy()
|
||||
alloc1.ClientStatus = structs.AllocClientStatusRunning
|
||||
|
||||
// Updating allocation should not throw any error
|
||||
if err := state.UpdateAllocsFromClient(400, []*structs.Allocation{alloc1}); err != nil {
|
||||
t.Fatalf("expect err: %v", err)
|
||||
}
|
||||
|
||||
// Re-Register the job
|
||||
state.UpsertJob(500, alloc.Job)
|
||||
|
||||
// Update the alloc again
|
||||
alloc2 := alloc.Copy()
|
||||
alloc2.ClientStatus = structs.AllocClientStatusComplete
|
||||
if err := state.UpdateAllocsFromClient(400, []*structs.Allocation{alloc1}); err != nil {
|
||||
t.Fatalf("expect err: %v", err)
|
||||
}
|
||||
|
||||
// Job Summary of the newly registered job shouldn't account for the
|
||||
// allocation update for the older job
|
||||
expectedSummary := structs.JobSummary{
|
||||
JobID: alloc1.JobID,
|
||||
Summary: map[string]structs.TaskGroupSummary{
|
||||
"web": structs.TaskGroupSummary{},
|
||||
},
|
||||
CreateIndex: 500,
|
||||
ModifyIndex: 500,
|
||||
}
|
||||
summary, _ := state.JobSummaryByID(alloc.Job.ID)
|
||||
if !reflect.DeepEqual(&expectedSummary, summary) {
|
||||
t.Fatalf("expected: %v, actual: %v", expectedSummary, summary)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_EvictAlloc_Alloc(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
alloc := mock.Alloc()
|
||||
|
||||
@@ -45,6 +45,7 @@ const (
|
||||
EvalDeleteRequestType
|
||||
AllocUpdateRequestType
|
||||
AllocClientUpdateRequestType
|
||||
ReconcileJobSummariesRequestType
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -27,3 +27,19 @@ func (s *System) GarbageCollect(args *structs.GenericRequest, reply *structs.Gen
|
||||
s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC, snapshotIndex))
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReconcileSummaries reconciles the summaries of all the jobs in the state
|
||||
// store
|
||||
func (s *System) ReconcileJobSummaries(args *structs.GenericRequest, reply *structs.GenericResponse) error {
|
||||
if done, err := s.srv.forward("System.ReconcileJobSummaries", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
_, index, err := s.srv.raftApply(structs.ReconcileJobSummariesRequestType, args)
|
||||
if err != nil {
|
||||
s.srv.logger.Printf("[ERR] nomad.client: Reconcile failed: %v", err)
|
||||
return err
|
||||
}
|
||||
reply.Index = index
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package nomad
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
@@ -49,3 +50,62 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) {
|
||||
t.Fatalf("err: %s", err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSystemEndpoint_ReconcileSummaries(t *testing.T) {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Insert a job that can be GC'd
|
||||
state := s1.fsm.State()
|
||||
s1.fsm.State()
|
||||
job := mock.Job()
|
||||
if err := state.UpsertJob(1000, job); err != nil {
|
||||
t.Fatalf("UpsertJob() failed: %v", err)
|
||||
}
|
||||
|
||||
// Delete the job summary
|
||||
state.DeleteJobSummary(1001, job.ID)
|
||||
|
||||
// Make the GC request
|
||||
req := &structs.GenericRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: "global",
|
||||
},
|
||||
}
|
||||
var resp structs.GenericResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "System.ReconcileJobSummaries", req, &resp); err != nil {
|
||||
t.Fatalf("expect err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
// Check if Nomad has reconciled the summary for the job
|
||||
summary, err := state.JobSummaryByID(job.ID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if summary.CreateIndex == 0 || summary.ModifyIndex == 0 {
|
||||
t.Fatalf("create index: %v, modify index: %v", summary.CreateIndex, summary.ModifyIndex)
|
||||
}
|
||||
|
||||
// setting the modifyindex and createindex of the expected summary to
|
||||
// the output so that we can do deep equal
|
||||
expectedSummary := structs.JobSummary{
|
||||
JobID: job.ID,
|
||||
Summary: map[string]structs.TaskGroupSummary{
|
||||
"web": structs.TaskGroupSummary{
|
||||
Queued: 10,
|
||||
},
|
||||
},
|
||||
ModifyIndex: summary.ModifyIndex,
|
||||
CreateIndex: summary.CreateIndex,
|
||||
}
|
||||
if !reflect.DeepEqual(&expectedSummary, summary) {
|
||||
return false, fmt.Errorf("expected: %v, actual: %v", expectedSummary, summary)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %s", err)
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user