From 64aeb2612885e0a7af116a264039d229007bcbf5 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 28 Jul 2015 16:15:32 -0700 Subject: [PATCH] nomad: integrating worker and scheduler --- nomad/worker.go | 50 +++++++++++++++++++++++++++++++----------- scheduler/scheduler.go | 10 ++------- 2 files changed, 39 insertions(+), 21 deletions(-) diff --git a/nomad/worker.go b/nomad/worker.go index 054ddefd2..5a763cf94 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -7,6 +7,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/scheduler" ) const ( @@ -64,7 +65,7 @@ func (w *Worker) run() { } // Wait for the the raft log to catchup to the evaluation - if err := w.waitForSync(eval); err != nil { + if err := w.waitForIndex(eval.ModifyIndex, raftSyncLimit); err != nil { w.sendAck(eval.ID, false) continue } @@ -145,40 +146,63 @@ func (w *Worker) sendAck(evalID string, ack bool) { } } -// waitForSync ensures that the local state is at least as fresh -// as the evaluation. Due to the eval broker architecture, we may -// receive an evaluation from the leader before we've even updated -// our FSM to reflect it. We simply wait for data to finish replicating -// in that case. -func (w *Worker) waitForSync(eval *structs.Evaluation) error { +// waitForIndex ensures that the local state is at least as fresh +// as the given index. This is used before starting an evaluation, +// but also potentially mid-stream. If a Plan fails because of stale +// state (attempt to allocate to a failed/dead node), we may need +// to sync our state again and do the planning with more recent data. +func (w *Worker) waitForIndex(index uint64, timeout time.Duration) error { start := time.Now() - defer metrics.MeasureSince([]string{"nomad", "worker", "wait_for_sync"}, start) + defer metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, time.Now()) CHECK: - // We only need the FSM state to be as recent as the evaluation + // We only need the FSM state to be as recent as the given index appliedIndex := w.srv.raft.AppliedIndex() - if eval.ModifyIndex <= appliedIndex { + if index <= appliedIndex { w.backoffReset() return nil } // Check if we've reached our limit - if time.Now().Sub(start) > raftSyncLimit { + if time.Now().Sub(start) > timeout { return fmt.Errorf("sync wait limit reached") } // Exponential back off if we haven't yet reached it if w.backoffErr() { - return fmt.Errorf("shutdown during evaluation") + return fmt.Errorf("shutdown while waiting for state sync") } goto CHECK } // invokeScheduler is used to invoke the business logic of the scheduler func (w *Worker) invokeScheduler(eval *structs.Evaluation) error { - // TODO: + // Snapshot the current state + snap, err := w.srv.fsm.State().Snapshot() + if err != nil { + return fmt.Errorf("failed to snapshot state: %v", err) + } + + // Create the scheduler + sched, err := scheduler.NewScheduler(eval.Type, snap, w) + if err != nil { + return fmt.Errorf("failed to instantiate scheduler: %v", err) + } + + // Process the evaluation + err = sched.Process(eval) + if err != nil { + return fmt.Errorf("failed to process evaluation: %v", err) + } return nil } +// SubmitPlan is used to submit a plan for consideration. This allows +// the worker to act as the planner for the scheduler. +func (w *Worker) SubmitPlan(*structs.Plan) (*structs.PlanResult, scheduler.State, error) { + // TODO + return nil, nil, nil +} + // backoffErr is used to do an exponential back off on error. This is // maintained statefully for the worker. Returns if attempts should be // abandoneded due to shutdown. diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index d89d3c80c..3064dc4de 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -3,6 +3,7 @@ package scheduler import ( "fmt" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" ) @@ -49,14 +50,7 @@ type Scheduler interface { type State interface { // Nodes returns an iterator over all the nodes. // The type of each result is *structs.Node - Nodes() (ResultIterator, error) -} - -// ResultIterator is used to iterate over a list of results -// from a query. The type of the response depends on the query, -// and perfectly represents when generics would be useful. -type ResultIterator interface { - Next() interface{} + Nodes() (memdb.ResultIterator, error) } // Planner interface is used to submit a task allocation plan.