From ed29b4a80613c5e4cbe169ddf36b868b8196cd46 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 4 Aug 2015 14:04:26 -0700 Subject: [PATCH] nomad: adding the FSM CRUD --- nomad/fsm.go | 57 ++++++++++++++++++++++++++++++++++++ nomad/fsm_test.go | 74 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+) diff --git a/nomad/fsm.go b/nomad/fsm.go index 4d0c3c735..ee065d8eb 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -28,6 +28,7 @@ const ( JobSnapshot IndexSnapshot EvalSnapshot + AllocSnapshot ) // nomadFSM implements a finite state machine that is used @@ -106,6 +107,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyUpdateEval(buf[1:], log.Index) case structs.EvalDeleteRequestType: return n.applyDeleteEval(buf[1:], log.Index) + case structs.AllocUpdateRequestType: + return n.applyAllocUpdate(buf[1:], log.Index) default: if ignoreUnknown { n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType) @@ -214,6 +217,20 @@ func (n *nomadFSM) applyDeleteEval(buf []byte, index uint64) interface{} { return nil } +func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_update"}, time.Now()) + var req structs.AllocUpdateRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.UpdateAllocations(index, req.Evict, req.Alloc); err != nil { + n.logger.Printf("[ERR] nomad.fsm: UpdateAllocations failed: %v", err) + return err + } + return nil +} + func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { // Create a new snapshot snap, err := n.state.Snapshot() @@ -289,6 +306,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } + case AllocSnapshot: + alloc := new(structs.Allocation) + if err := dec.Decode(alloc); err != nil { + return err + } + if err := restore.AllocRestore(alloc); err != nil { + return err + } + case IndexSnapshot: idx := new(IndexEntry) if err := dec.Decode(idx); err != nil { @@ -337,6 +363,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } + if err := s.persistAllocs(sink, encoder); err != nil { + sink.Cancel() + return err + } return nil } @@ -448,6 +478,33 @@ func (s *nomadSnapshot) persistEvals(sink raft.SnapshotSink, return nil } +func (s *nomadSnapshot) persistAllocs(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + // Get all the allocations + allocs, err := s.snap.Allocs() + if err != nil { + return err + } + + for { + // Get the next item + raw := allocs.Next() + if raw == nil { + break + } + + // Prepare the request struct + alloc := raw.(*structs.Allocation) + + // Write out the evaluation + sink.Write([]byte{byte(AllocSnapshot)}) + if err := encoder.Encode(alloc); err != nil { + return err + } + } + return nil +} + // Release is a no-op, as we just need to GC the pointer // to the state store snapshot. There is nothing to explicitly // cleanup. diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index f5e3fe7cc..f803b5285 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -296,6 +296,58 @@ func TestFSM_DeleteEval(t *testing.T) { } } +func TestFSM_UpdateAllocations(t *testing.T) { + fsm := testFSM(t) + + alloc := mockAlloc() + req := structs.AllocUpdateRequest{ + Evict: nil, + Alloc: []*structs.Allocation{alloc}, + } + buf, err := structs.Encode(structs.AllocUpdateRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + out, err := fsm.State().GetAllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + alloc.CreateIndex = out.CreateIndex + alloc.ModifyIndex = out.ModifyIndex + if !reflect.DeepEqual(alloc, out) { + t.Fatalf("bad: %#v %#v", alloc, out) + } + + req2 := structs.AllocUpdateRequest{ + Evict: []string{alloc.ID}, + } + buf, err = structs.Encode(structs.AllocUpdateRequestType, req2) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are NOT registered + out, err = fsm.State().GetAllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != nil { + t.Fatalf("alloc found!") + } +} + func testSnapshotRestore(t *testing.T, fsm *nomadFSM) *nomadFSM { // Snapshot snap, err := fsm.Snapshot() @@ -387,6 +439,28 @@ func TestFSM_SnapshotRestore_Evals(t *testing.T) { } } +func TestFSM_SnapshotRestore_Allocs(t *testing.T) { + // Add some state + fsm := testFSM(t) + state := fsm.State() + alloc1 := mockAlloc() + state.UpdateAllocations(1000, nil, []*structs.Allocation{alloc1}) + alloc2 := mockAlloc() + state.UpdateAllocations(1001, nil, []*structs.Allocation{alloc2}) + + // Verify the contents + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + out1, _ := state2.GetAllocByID(alloc1.ID) + out2, _ := state2.GetAllocByID(alloc2.ID) + if !reflect.DeepEqual(alloc1, out1) { + t.Fatalf("bad: \n%#v\n%#v", out1, alloc1) + } + if !reflect.DeepEqual(alloc2, out2) { + t.Fatalf("bad: \n%#v\n%#v", out2, alloc2) + } +} + func TestFSM_SnapshotRestore_Indexes(t *testing.T) { // Add some state fsm := testFSM(t)