From dd85f129590eb0123e4bd590faaf4e5a075f000f Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 7 Jul 2015 10:55:47 -0600 Subject: [PATCH] nomad: add FSM support for Jobs --- nomad/fsm.go | 41 +++++++++++++++++++++ nomad/fsm_test.go | 91 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+) diff --git a/nomad/fsm.go b/nomad/fsm.go index 1c7c69dd5..fd465e9dd 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -25,6 +25,7 @@ type SnapshotType byte const ( NodeSnapshot SnapshotType = iota + JobSnapshot IndexSnapshot ) @@ -235,6 +236,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } + case JobSnapshot: + job := new(structs.Job) + if err := dec.Decode(job); err != nil { + return err + } + if err := restore.JobRestore(job); err != nil { + return err + } + case IndexSnapshot: idx := new(IndexEntry) if err := dec.Decode(idx); err != nil { @@ -275,6 +285,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } + if err := s.persistJobs(sink, encoder); err != nil { + sink.Cancel() + return err + } return nil } @@ -332,6 +346,33 @@ func (s *nomadSnapshot) persistNodes(sink raft.SnapshotSink, return nil } +func (s *nomadSnapshot) persistJobs(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + // Get all the jobs + jobs, err := s.snap.Jobs() + if err != nil { + return err + } + + for { + // Get the next item + raw := jobs.Next() + if raw == nil { + break + } + + // Prepare the request struct + job := raw.(*structs.Job) + + // Write out a job registration + sink.Write([]byte{byte(JobSnapshot)}) + if err := encoder.Encode(job); err != nil { + return err + } + } + return nil +} + // Release is a no-op, as we just need to GC the pointer // to the state store snapshot. There is nothing to explicitly // cleanup. diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 609fa9fff..3ebd410b8 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -158,6 +158,75 @@ func TestFSM_UpdateNodeStatus(t *testing.T) { } } +func TestFSM_RegisterJob(t *testing.T) { + fsm := testFSM(t) + + req := structs.JobRegisterRequest{ + Job: mockJob(), + } + buf, err := structs.Encode(structs.JobRegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + job, err := fsm.State().GetJobByName(req.Job.Name) + if err != nil { + t.Fatalf("err: %v", err) + } + if job == nil { + t.Fatalf("not found!") + } + if job.CreateIndex != 1 { + t.Fatalf("bad index: %d", job.CreateIndex) + } +} + +func TestFSM_DeregisterJob(t *testing.T) { + fsm := testFSM(t) + + job := mockJob() + req := structs.JobRegisterRequest{ + Job: job, + } + buf, err := structs.Encode(structs.JobRegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + req2 := structs.JobDeregisterRequest{ + JobName: job.Name, + } + buf, err = structs.Encode(structs.JobDeregisterRequestType, req2) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are NOT registered + job, err = fsm.State().GetJobByName(req.Job.Name) + if err != nil { + t.Fatalf("err: %v", err) + } + if job != nil { + t.Fatalf("job found!") + } +} + func testSnapshotRestore(t *testing.T, fsm *nomadFSM) *nomadFSM { // Snapshot snap, err := fsm.Snapshot() @@ -205,6 +274,28 @@ func TestFSM_SnapshotRestore_Nodes(t *testing.T) { } } +func TestFSM_SnapshotRestore_Jobs(t *testing.T) { + // Add some state + fsm := testFSM(t) + state := fsm.State() + job1 := mockJob() + state.RegisterJob(1000, job1) + job2 := mockJob() + state.RegisterJob(1001, job2) + + // Verify the contents + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + out1, _ := state2.GetJobByName(job1.Name) + out2, _ := state2.GetJobByName(job2.Name) + if !reflect.DeepEqual(job1, out1) { + t.Fatalf("bad: \n%#v\n%#v", out1, job1) + } + if !reflect.DeepEqual(job2, out2) { + t.Fatalf("bad: \n%#v\n%#v", out2, job2) + } +} + func TestFSM_SnapshotRestore_Indexes(t *testing.T) { // Add some state fsm := testFSM(t)