From daeabb94565a8e49138231c0403e0ab361d34af5 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 18 Apr 2017 15:11:33 -0700 Subject: [PATCH] Revert server endpoint --- nomad/job_endpoint.go | 52 ++++++++++++ nomad/job_endpoint_test.go | 137 ++++++++++++++++++++++++++++++++ nomad/state/state_store.go | 18 +++++ nomad/state/state_store_test.go | 23 ++++++ 4 files changed, 230 insertions(+) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 95b8eb4a3..30ce58098 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -305,6 +305,58 @@ func (j *Job) Validate(args *structs.JobValidateRequest, reply *structs.JobValid return nil } +// Revert is used to revert the job to a prior version +func (j *Job) Revert(args *structs.JobRevertRequest, reply *structs.JobRegisterResponse) error { + if done, err := j.srv.forward("Job.Revert", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "job", "revert"}, time.Now()) + + // Validate the arguments + if args.JobID == "" { + return fmt.Errorf("missing job ID for evaluation") + } + + // Lookup the job by version + snap, err := j.srv.fsm.State().Snapshot() + if err != nil { + return err + } + + ws := memdb.NewWatchSet() + jobV, err := snap.JobByIDAndVersion(ws, args.JobID, args.JobVersion) + if err != nil { + return err + } + if jobV == nil { + return fmt.Errorf("job %q at version %d not found", args.JobID, args.JobVersion) + } + + // Build the register request + reg := &structs.JobRegisterRequest{ + Job: jobV.Copy(), + WriteRequest: args.WriteRequest, + } + + // If the request is enforcing the existing version do a check. + if args.EnforcePriorVersion != nil { + cur, err := snap.JobByID(ws, args.JobID) + if err != nil { + return err + } + + if cur.Version != *args.EnforcePriorVersion { + return fmt.Errorf("Current job has version %d; enforcing version %d", cur.Version, *args.EnforcePriorVersion) + } + + reg.EnforceIndex = true + reg.JobModifyIndex = cur.JobModifyIndex + } + + // Register the version. + return j.Register(reg, reply) +} + // Evaluate is used to force a job for re-evaluation func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegisterResponse) error { if done, err := j.srv.forward("Job.Evaluate", args, args, reply); done { diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index ebc2d5fe4..e0843bdcf 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -9,6 +9,7 @@ import ( memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -687,6 +688,142 @@ func TestJobEndpoint_Register_Vault_Policies(t *testing.T) { } } +func TestJobEndpoint_Revert(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the initial register request + job := mock.Job() + job.Priority = 100 + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.JobRegisterResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index == 0 { + t.Fatalf("bad index: %d", resp.Index) + } + + // Reregister again to get another version + job2 := job.Copy() + job2.Priority = 1 + req = &structs.JobRegisterRequest{ + Job: job2, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index == 0 { + t.Fatalf("bad index: %d", resp.Index) + } + + // Create revert request and enforcing it be at an incorrect version + revertReq := &structs.JobRevertRequest{ + JobID: job.ID, + JobVersion: 0, + EnforcePriorVersion: helper.Uint64ToPtr(10), + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + err := msgpackrpc.CallWithCodec(codec, "Job.Revert", revertReq, &resp) + if err == nil || !strings.Contains(err.Error(), "enforcing version 10") { + t.Fatalf("expected enforcement error") + } + + // Create revert request and enforcing it be at version 1 + revertReq = &structs.JobRevertRequest{ + JobID: job.ID, + JobVersion: 0, + EnforcePriorVersion: helper.Uint64ToPtr(1), + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + if err := msgpackrpc.CallWithCodec(codec, "Job.Revert", revertReq, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index == 0 { + t.Fatalf("bad index: %d", resp.Index) + } + if resp.EvalID == "" || resp.EvalCreateIndex == 0 { + t.Fatalf("bad created eval: %+v", resp) + } + if resp.JobModifyIndex == 0 { + t.Fatalf("bad job modify index: %d", resp.JobModifyIndex) + } + + // Create revert request and don't enforce + revertReq = &structs.JobRevertRequest{ + JobID: job.ID, + JobVersion: 0, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + if err := msgpackrpc.CallWithCodec(codec, "Job.Revert", revertReq, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index == 0 { + t.Fatalf("bad index: %d", resp.Index) + } + if resp.EvalID == "" || resp.EvalCreateIndex == 0 { + t.Fatalf("bad created eval: %+v", resp) + } + if resp.JobModifyIndex == 0 { + t.Fatalf("bad job modify index: %d", resp.JobModifyIndex) + } + + // Check that the job is at the correct version and that the eval was + // created + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("expected job") + } + if out.Priority != job.Priority { + t.Fatalf("priority mis-match") + } + if out.Version != 3 { + t.Fatalf("got version %d; want %d", out.Version, 3) + } + + eout, err := state.EvalByID(ws, resp.EvalID) + if err != nil { + t.Fatalf("err: %v", err) + } + if eout == nil { + t.Fatalf("expected eval") + } + if eout.JobID != job.ID { + t.Fatalf("job id mis-match") + } + + versions, err := state.JobVersionsByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(versions) != 4 { + t.Fatalf("got %d versions; want %d", len(versions), 4) + } +} + func TestJobEndpoint_Evaluate(t *testing.T) { s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 983c20a33..b4c014e0c 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -602,6 +602,24 @@ func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, id strin return all, nil } +// JobByIDAndVersion returns the job identified by its ID and Version +func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, id string, version uint64) (*structs.Job, error) { + txn := s.db.Txn(false) + watchCh, existing, err := txn.FirstWatch("job_versions", "id", id, version) + if err != nil { + return nil, err + } + + ws.Add(watchCh) + + if existing != nil { + job := existing.(*structs.Job) + return job, nil + } + + return nil, nil +} + // Jobs returns an iterator over all the jobs func (s *StateStore) Jobs(ws memdb.WatchSet) (memdb.ResultIterator, error) { txn := s.db.Txn(false) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 0624417f8..54350c029 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -433,6 +433,16 @@ func TestStateStore_UpsertJob_Job(t *testing.T) { if a := allVersions[0]; a.ID != job.ID || a.Version != 0 { t.Fatalf("bad: %v", a) } + + // Test the looking up the job by version returns the same results + vout, err := state.JobByIDAndVersion(ws, job.ID, 0) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !reflect.DeepEqual(out, vout) { + t.Fatalf("bad: %#v %#v", out, vout) + } } func TestStateStore_UpdateUpsertJob_Job(t *testing.T) { @@ -478,6 +488,9 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) { if out.ModifyIndex != 1001 { t.Fatalf("bad: %#v", out) } + if out.Version != 1 { + t.Fatalf("bad: %#v", out) + } index, err := state.Index("jobs") if err != nil { @@ -487,6 +500,16 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) { t.Fatalf("bad: %d", index) } + // Test the looking up the job by version returns the same results + vout, err := state.JobByIDAndVersion(ws, job.ID, 1) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !reflect.DeepEqual(out, vout) { + t.Fatalf("bad: %#v %#v", out, vout) + } + // Test that the job summary remains the same if the job is updated but // count remains same summary, err := state.JobSummaryByID(ws, job.ID)