Added an endpoint for users to reconcile job summaries

This commit is contained in:
Diptanu Choudhury
2016-08-03 16:08:30 -07:00
parent 5c03764015
commit 72cd53d6e5
9 changed files with 185 additions and 34 deletions

View File

@@ -129,6 +129,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeersRequest))
s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest))
s.mux.HandleFunc("/v1/system/reconcilesummaries", s.wrap(s.ReconcileJobSummaries))
if enableDebug {
s.mux.HandleFunc("/debug/pprof/", pprof.Index)

View File

@@ -22,3 +22,20 @@ func (s *HTTPServer) GarbageCollectRequest(resp http.ResponseWriter, req *http.R
}
return nil, nil
}
func (s *HTTPServer) ReconcileJobSummaries(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
return nil, CodedError(405, ErrInvalidMethod)
}
var args structs.GenericRequest
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
var gResp structs.GenericResponse
if err := s.agent.RPC("System.ReconcileJobSummaries", &args, &gResp); err != nil {
return nil, err
}
return nil, nil
}

View File

@@ -21,3 +21,24 @@ func TestHTTP_SystemGarbageCollect(t *testing.T) {
}
})
}
func TestHTTP_ReconcileJobSummaries(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Make the HTTP request
req, err := http.NewRequest("PUT", "/v1/system/reconcilesummaries", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()
// Make the request
if _, err := s.Server.ReconcileJobSummaries(respW, req); err != nil {
t.Fatalf("err: %v", err)
}
t.Fatalf("code %v", respW.Code)
if respW.Code != 200 {
t.Fatalf("expected: %v, actual: %v", 200, respW.Code)
}
})
}

View File

@@ -135,6 +135,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyAllocUpdate(buf[1:], log.Index)
case structs.AllocClientUpdateRequestType:
return n.applyAllocClientUpdate(buf[1:], log.Index)
case structs.ReconcileJobSummariesRequestType:
return n.applyReconcileSummaries(buf[1:], log.Index)
default:
if ignoreUnknown {
n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
@@ -444,6 +446,14 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
return nil
}
// applyReconcileSummaries reconciles summaries for all the job
func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} {
if err := n.state.ReconcileJobSummaries(index); err != nil {
return err
}
return n.reconcileQueuedAllocations(index)
}
func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
// Create a new snapshot
snap, err := n.state.Snapshot()
@@ -586,11 +596,19 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
if err != nil {
return fmt.Errorf("couldn't fetch index of job summary table: %v", err)
}
// If the index is 0 that means there is no job summary in the snapshot so
// we will have to create them
if index == 0 {
if err := n.state.ReconcileJobSummaries(); err != nil {
// query the latest index
latestIndex, err := n.state.LatestIndex()
if err != nil {
return fmt.Errorf("unable to query latest index: %v", index)
}
if err := n.state.ReconcileJobSummaries(latestIndex); err != nil {
return fmt.Errorf("error reconciling summaries: %v", err)
}
if err := n.reconcileQueuedAllocations(); err != nil {
if err := n.reconcileQueuedAllocations(latestIndex); err != nil {
return fmt.Errorf("error re-computing the number of queued allocations:; %v", err)
}
}
@@ -600,7 +618,7 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
// reconcileSummaries re-calculates the queued allocations for every job that we
// created a Job Summary during the snap shot restore
func (n *nomadFSM) reconcileQueuedAllocations() error {
func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
// Get all the jobs
iter, err := n.state.Jobs()
if err != nil {
@@ -615,12 +633,6 @@ func (n *nomadFSM) reconcileQueuedAllocations() error {
jobs = append(jobs, rawJob.(*structs.Job))
}
// Start a restore session
restore, err := n.state.Restore()
if err != nil {
return err
}
defer restore.Abort()
snap, err := n.state.Snapshot()
if err != nil {
return fmt.Errorf("unable to create snapshot: %v", err)
@@ -650,10 +662,32 @@ func (n *nomadFSM) reconcileQueuedAllocations() error {
if err := sched.Process(eval); err != nil {
return err
}
summary, err := snap.JobSummaryByID(job.ID)
// Get the job summary from the fsm state store
summary, err := n.state.JobSummaryByID(job.ID)
if err != nil {
return err
}
// Add the allocations scheduler has made to queued since these
// allocations are never getting placed until the scheduler is invoked
// with a real planner
if l := len(planner.Plans); l != 1 {
return fmt.Errorf("unexpected number of plans during restore %d. Please file an issue including the logs", l)
}
for _, allocations := range planner.Plans[0].NodeAllocation {
for _, allocation := range allocations {
tgSummary, ok := summary.Summary[allocation.TaskGroup]
if !ok {
return fmt.Errorf("task group %q not found while updating queued count", allocation.TaskGroup)
}
tgSummary.Queued += 1
summary.Summary[allocation.TaskGroup] = tgSummary
}
}
// Add the queued allocations attached to the evaluation to the queued
// counter of the job summary
if l := len(planner.Evals); l != 1 {
return fmt.Errorf("unexpected number of evals during restore %d. Please file an issue including the logs", l)
}
@@ -662,15 +696,14 @@ func (n *nomadFSM) reconcileQueuedAllocations() error {
if !ok {
return fmt.Errorf("task group %q not found while updating queued count", tg)
}
tgSummary.Queued = queued
tgSummary.Queued += queued
summary.Summary[tg] = tgSummary
}
if err := restore.JobSummaryRestore(summary); err != nil {
if err := n.state.UpsertJobSummary(index, summary); err != nil {
return err
}
}
restore.Commit()
return nil
}

View File

@@ -990,14 +990,13 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
job1.TaskGroups[0].Tasks[0].Resources.CPU = 5000
state.UpsertJob(1000, job1)
// make an allocation and a job which can make partial progress
alloc := mock.Alloc()
state.UpsertJob(1010, alloc.Job)
state.UpsertAllocs(1020, []*structs.Allocation{alloc})
// make a job which can make partial progress
job2 := mock.Job()
state.UpsertJob(1010, job2)
// Delete the summaries
state.DeleteJobSummary(1030, job1.ID)
state.DeleteJobSummary(1040, alloc.Job.ID)
state.DeleteJobSummary(1040, job2.ID)
// Delete the index
if err := state.RemoveIndex("job_summary"); err != nil {
@@ -1023,13 +1022,13 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
t.Fatalf("expected: %#v, actual: %#v", &expected, out1)
}
out2, _ := state2.JobSummaryByID(alloc.Job.ID)
out2, _ := state2.JobSummaryByID(job2.ID)
expected = structs.JobSummary{
JobID: alloc.Job.ID,
JobID: job2.ID,
Summary: map[string]structs.TaskGroupSummary{
"web": structs.TaskGroupSummary{
Queued: 3,
Starting: 1,
Queued: 10,
Starting: 0,
},
},
CreateIndex: latestIndex,

View File

@@ -85,15 +85,21 @@ func (s *StateStore) StopWatch(items watch.Items, notify chan struct{}) {
s.watch.stopWatch(items, notify)
}
// UpsertJobSummary upserts a job summary into the state store. This is for
// testing purposes
// UpsertJobSummary upserts a job summary into the state store.
func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSummary) error {
txn := s.db.Txn(true)
defer txn.Abort()
// Update the index
if err := txn.Insert("job_summary", *jobSummary); err != nil {
return err
}
// Update the indexes table for job summary
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
txn.Commit()
return nil
}
@@ -108,6 +114,9 @@ func (s *StateStore) DeleteJobSummary(index uint64, id string) error {
if _, err := txn.DeleteAll("job_summary", "id", id); err != nil {
return fmt.Errorf("deleting job summary failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
txn.Commit()
return nil
}
@@ -1191,16 +1200,10 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) {
// ReconcileJobSummaries re-creates summaries for all jobs present in the state
// store
func (s *StateStore) ReconcileJobSummaries() error {
func (s *StateStore) ReconcileJobSummaries(index uint64) error {
txn := s.db.Txn(true)
defer txn.Abort()
// Find the latest index
latestIndex, err := s.LatestIndex()
if err != nil {
return err
}
// Get all the jobs
iter, err := txn.Get("jobs", "id")
if err != nil {
@@ -1259,15 +1262,15 @@ func (s *StateStore) ReconcileJobSummaries() error {
}
// Insert the job summary
summary.CreateIndex = latestIndex
summary.ModifyIndex = latestIndex
summary.CreateIndex = index
summary.ModifyIndex = index
if err := txn.Insert("job_summary", summary); err != nil {
return fmt.Errorf("error inserting job summary: %v", err)
}
}
// Update the indexes table for job summary
if err := txn.Insert("index", &IndexEntry{"job_summary", latestIndex}); err != nil {
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
txn.Commit()

View File

@@ -45,6 +45,7 @@ const (
EvalDeleteRequestType
AllocUpdateRequestType
AllocClientUpdateRequestType
ReconcileJobSummariesRequestType
)
const (

View File

@@ -27,3 +27,19 @@ func (s *System) GarbageCollect(args *structs.GenericRequest, reply *structs.Gen
s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC, snapshotIndex))
return nil
}
// ReconcileSummaries reconciles the summaries of all the jobs in the state
// store
func (s *System) ReconcileJobSummaries(args *structs.GenericRequest, reply *structs.GenericResponse) error {
if done, err := s.srv.forward("System.ReconcileJobSummaries", args, args, reply); done {
return err
}
_, index, err := s.srv.raftApply(structs.ReconcileJobSummariesRequestType, args)
if err != nil {
s.srv.logger.Printf("[ERR] nomad.client: Reconcile failed: %v", err)
return err
}
reply.Index = index
return nil
}

View File

@@ -2,6 +2,7 @@ package nomad
import (
"fmt"
"reflect"
"testing"
"github.com/hashicorp/net-rpc-msgpackrpc"
@@ -49,3 +50,62 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) {
t.Fatalf("err: %s", err)
})
}
func TestSystemEndpoint_ReconcileSummaries(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Insert a job that can be GC'd
state := s1.fsm.State()
s1.fsm.State()
job := mock.Job()
if err := state.UpsertJob(1000, job); err != nil {
t.Fatalf("UpsertJob() failed: %v", err)
}
// Delete the job summary
state.DeleteJobSummary(1001, job.ID)
// Make the GC request
req := &structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "System.ReconcileJobSummaries", req, &resp); err != nil {
t.Fatalf("expect err: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
// Check if Nomad has reconciled the summary for the job
summary, err := state.JobSummaryByID(job.ID)
if err != nil {
return false, err
}
if summary.CreateIndex == 0 || summary.ModifyIndex == 0 {
t.Fatalf("create index: %v, modify index: %v", summary.CreateIndex, summary.ModifyIndex)
}
// setting the modifyindex and createindex of the expected summary to
// the output so that we can do deep equal
expectedSummary := structs.JobSummary{
JobID: job.ID,
Summary: map[string]structs.TaskGroupSummary{
"web": structs.TaskGroupSummary{
Queued: 10,
},
},
ModifyIndex: summary.ModifyIndex,
CreateIndex: summary.CreateIndex,
}
if !reflect.DeepEqual(&expectedSummary, summary) {
return false, fmt.Errorf("expected: %v, actual: %v", expectedSummary, summary)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}