Revert server endpoint

This commit is contained in:
Alex Dadgar
2017-04-18 15:11:33 -07:00
parent b8b18dbe62
commit daeabb9456
4 changed files with 230 additions and 0 deletions

View File

@@ -305,6 +305,58 @@ func (j *Job) Validate(args *structs.JobValidateRequest, reply *structs.JobValid
return nil
}
// Revert is used to revert the job to a prior version
func (j *Job) Revert(args *structs.JobRevertRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Revert", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "revert"}, time.Now())
// Validate the arguments
if args.JobID == "" {
return fmt.Errorf("missing job ID for evaluation")
}
// Lookup the job by version
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
ws := memdb.NewWatchSet()
jobV, err := snap.JobByIDAndVersion(ws, args.JobID, args.JobVersion)
if err != nil {
return err
}
if jobV == nil {
return fmt.Errorf("job %q at version %d not found", args.JobID, args.JobVersion)
}
// Build the register request
reg := &structs.JobRegisterRequest{
Job: jobV.Copy(),
WriteRequest: args.WriteRequest,
}
// If the request is enforcing the existing version do a check.
if args.EnforcePriorVersion != nil {
cur, err := snap.JobByID(ws, args.JobID)
if err != nil {
return err
}
if cur.Version != *args.EnforcePriorVersion {
return fmt.Errorf("Current job has version %d; enforcing version %d", cur.Version, *args.EnforcePriorVersion)
}
reg.EnforceIndex = true
reg.JobModifyIndex = cur.JobModifyIndex
}
// Register the version.
return j.Register(reg, reply)
}
// Evaluate is used to force a job for re-evaluation
func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Evaluate", args, args, reply); done {

View File

@@ -9,6 +9,7 @@ import (
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
@@ -687,6 +688,142 @@ func TestJobEndpoint_Register_Vault_Policies(t *testing.T) {
}
}
func TestJobEndpoint_Revert(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the initial register request
job := mock.Job()
job.Priority = 100
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Reregister again to get another version
job2 := job.Copy()
job2.Priority = 1
req = &structs.JobRegisterRequest{
Job: job2,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Create revert request and enforcing it be at an incorrect version
revertReq := &structs.JobRevertRequest{
JobID: job.ID,
JobVersion: 0,
EnforcePriorVersion: helper.Uint64ToPtr(10),
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
err := msgpackrpc.CallWithCodec(codec, "Job.Revert", revertReq, &resp)
if err == nil || !strings.Contains(err.Error(), "enforcing version 10") {
t.Fatalf("expected enforcement error")
}
// Create revert request and enforcing it be at version 1
revertReq = &structs.JobRevertRequest{
JobID: job.ID,
JobVersion: 0,
EnforcePriorVersion: helper.Uint64ToPtr(1),
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Job.Revert", revertReq, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
if resp.EvalID == "" || resp.EvalCreateIndex == 0 {
t.Fatalf("bad created eval: %+v", resp)
}
if resp.JobModifyIndex == 0 {
t.Fatalf("bad job modify index: %d", resp.JobModifyIndex)
}
// Create revert request and don't enforce
revertReq = &structs.JobRevertRequest{
JobID: job.ID,
JobVersion: 0,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Job.Revert", revertReq, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
if resp.EvalID == "" || resp.EvalCreateIndex == 0 {
t.Fatalf("bad created eval: %+v", resp)
}
if resp.JobModifyIndex == 0 {
t.Fatalf("bad job modify index: %d", resp.JobModifyIndex)
}
// Check that the job is at the correct version and that the eval was
// created
state := s1.fsm.State()
ws := memdb.NewWatchSet()
out, err := state.JobByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected job")
}
if out.Priority != job.Priority {
t.Fatalf("priority mis-match")
}
if out.Version != 3 {
t.Fatalf("got version %d; want %d", out.Version, 3)
}
eout, err := state.EvalByID(ws, resp.EvalID)
if err != nil {
t.Fatalf("err: %v", err)
}
if eout == nil {
t.Fatalf("expected eval")
}
if eout.JobID != job.ID {
t.Fatalf("job id mis-match")
}
versions, err := state.JobVersionsByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(versions) != 4 {
t.Fatalf("got %d versions; want %d", len(versions), 4)
}
}
func TestJobEndpoint_Evaluate(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue

View File

@@ -602,6 +602,24 @@ func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, id strin
return all, nil
}
// JobByIDAndVersion returns the job identified by its ID and Version
func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, id string, version uint64) (*structs.Job, error) {
txn := s.db.Txn(false)
watchCh, existing, err := txn.FirstWatch("job_versions", "id", id, version)
if err != nil {
return nil, err
}
ws.Add(watchCh)
if existing != nil {
job := existing.(*structs.Job)
return job, nil
}
return nil, nil
}
// Jobs returns an iterator over all the jobs
func (s *StateStore) Jobs(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)

View File

@@ -433,6 +433,16 @@ func TestStateStore_UpsertJob_Job(t *testing.T) {
if a := allVersions[0]; a.ID != job.ID || a.Version != 0 {
t.Fatalf("bad: %v", a)
}
// Test the looking up the job by version returns the same results
vout, err := state.JobByIDAndVersion(ws, job.ID, 0)
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(out, vout) {
t.Fatalf("bad: %#v %#v", out, vout)
}
}
func TestStateStore_UpdateUpsertJob_Job(t *testing.T) {
@@ -478,6 +488,9 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) {
if out.ModifyIndex != 1001 {
t.Fatalf("bad: %#v", out)
}
if out.Version != 1 {
t.Fatalf("bad: %#v", out)
}
index, err := state.Index("jobs")
if err != nil {
@@ -487,6 +500,16 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) {
t.Fatalf("bad: %d", index)
}
// Test the looking up the job by version returns the same results
vout, err := state.JobByIDAndVersion(ws, job.ID, 1)
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(out, vout) {
t.Fatalf("bad: %#v %#v", out, vout)
}
// Test that the job summary remains the same if the job is updated but
// count remains same
summary, err := state.JobSummaryByID(ws, job.ID)