nomad: adding the FSM CRUD

This commit is contained in:
Armon Dadgar
2015-08-04 14:04:26 -07:00
parent 46c6b7bcf6
commit ed29b4a806
2 changed files with 131 additions and 0 deletions

View File

@@ -28,6 +28,7 @@ const (
JobSnapshot
IndexSnapshot
EvalSnapshot
AllocSnapshot
)
// nomadFSM implements a finite state machine that is used
@@ -106,6 +107,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyUpdateEval(buf[1:], log.Index)
case structs.EvalDeleteRequestType:
return n.applyDeleteEval(buf[1:], log.Index)
case structs.AllocUpdateRequestType:
return n.applyAllocUpdate(buf[1:], log.Index)
default:
if ignoreUnknown {
n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
@@ -214,6 +217,20 @@ func (n *nomadFSM) applyDeleteEval(buf []byte, index uint64) interface{} {
return nil
}
func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_update"}, time.Now())
var req structs.AllocUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpdateAllocations(index, req.Evict, req.Alloc); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpdateAllocations failed: %v", err)
return err
}
return nil
}
func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
// Create a new snapshot
snap, err := n.state.Snapshot()
@@ -289,6 +306,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return err
}
case AllocSnapshot:
alloc := new(structs.Allocation)
if err := dec.Decode(alloc); err != nil {
return err
}
if err := restore.AllocRestore(alloc); err != nil {
return err
}
case IndexSnapshot:
idx := new(IndexEntry)
if err := dec.Decode(idx); err != nil {
@@ -337,6 +363,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel()
return err
}
if err := s.persistAllocs(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil
}
@@ -448,6 +478,33 @@ func (s *nomadSnapshot) persistEvals(sink raft.SnapshotSink,
return nil
}
func (s *nomadSnapshot) persistAllocs(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the allocations
allocs, err := s.snap.Allocs()
if err != nil {
return err
}
for {
// Get the next item
raw := allocs.Next()
if raw == nil {
break
}
// Prepare the request struct
alloc := raw.(*structs.Allocation)
// Write out the evaluation
sink.Write([]byte{byte(AllocSnapshot)})
if err := encoder.Encode(alloc); err != nil {
return err
}
}
return nil
}
// Release is a no-op, as we just need to GC the pointer
// to the state store snapshot. There is nothing to explicitly
// cleanup.

View File

@@ -296,6 +296,58 @@ func TestFSM_DeleteEval(t *testing.T) {
}
}
func TestFSM_UpdateAllocations(t *testing.T) {
fsm := testFSM(t)
alloc := mockAlloc()
req := structs.AllocUpdateRequest{
Evict: nil,
Alloc: []*structs.Allocation{alloc},
}
buf, err := structs.Encode(structs.AllocUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
out, err := fsm.State().GetAllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
alloc.CreateIndex = out.CreateIndex
alloc.ModifyIndex = out.ModifyIndex
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("bad: %#v %#v", alloc, out)
}
req2 := structs.AllocUpdateRequest{
Evict: []string{alloc.ID},
}
buf, err = structs.Encode(structs.AllocUpdateRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are NOT registered
out, err = fsm.State().GetAllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("alloc found!")
}
}
func testSnapshotRestore(t *testing.T, fsm *nomadFSM) *nomadFSM {
// Snapshot
snap, err := fsm.Snapshot()
@@ -387,6 +439,28 @@ func TestFSM_SnapshotRestore_Evals(t *testing.T) {
}
}
func TestFSM_SnapshotRestore_Allocs(t *testing.T) {
// Add some state
fsm := testFSM(t)
state := fsm.State()
alloc1 := mockAlloc()
state.UpdateAllocations(1000, nil, []*structs.Allocation{alloc1})
alloc2 := mockAlloc()
state.UpdateAllocations(1001, nil, []*structs.Allocation{alloc2})
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out1, _ := state2.GetAllocByID(alloc1.ID)
out2, _ := state2.GetAllocByID(alloc2.ID)
if !reflect.DeepEqual(alloc1, out1) {
t.Fatalf("bad: \n%#v\n%#v", out1, alloc1)
}
if !reflect.DeepEqual(alloc2, out2) {
t.Fatalf("bad: \n%#v\n%#v", out2, alloc2)
}
}
func TestFSM_SnapshotRestore_Indexes(t *testing.T) {
// Add some state
fsm := testFSM(t)