nomad: integrating worker and scheduler

This commit is contained in:
Armon Dadgar
2015-07-28 16:15:32 -07:00
parent 447a7f8b47
commit 64aeb26128
2 changed files with 39 additions and 21 deletions

View File

@@ -7,6 +7,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
)
const (
@@ -64,7 +65,7 @@ func (w *Worker) run() {
}
// Wait for the the raft log to catchup to the evaluation
if err := w.waitForSync(eval); err != nil {
if err := w.waitForIndex(eval.ModifyIndex, raftSyncLimit); err != nil {
w.sendAck(eval.ID, false)
continue
}
@@ -145,40 +146,63 @@ func (w *Worker) sendAck(evalID string, ack bool) {
}
}
// waitForSync ensures that the local state is at least as fresh
// as the evaluation. Due to the eval broker architecture, we may
// receive an evaluation from the leader before we've even updated
// our FSM to reflect it. We simply wait for data to finish replicating
// in that case.
func (w *Worker) waitForSync(eval *structs.Evaluation) error {
// waitForIndex ensures that the local state is at least as fresh
// as the given index. This is used before starting an evaluation,
// but also potentially mid-stream. If a Plan fails because of stale
// state (attempt to allocate to a failed/dead node), we may need
// to sync our state again and do the planning with more recent data.
func (w *Worker) waitForIndex(index uint64, timeout time.Duration) error {
start := time.Now()
defer metrics.MeasureSince([]string{"nomad", "worker", "wait_for_sync"}, start)
defer metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, time.Now())
CHECK:
// We only need the FSM state to be as recent as the evaluation
// We only need the FSM state to be as recent as the given index
appliedIndex := w.srv.raft.AppliedIndex()
if eval.ModifyIndex <= appliedIndex {
if index <= appliedIndex {
w.backoffReset()
return nil
}
// Check if we've reached our limit
if time.Now().Sub(start) > raftSyncLimit {
if time.Now().Sub(start) > timeout {
return fmt.Errorf("sync wait limit reached")
}
// Exponential back off if we haven't yet reached it
if w.backoffErr() {
return fmt.Errorf("shutdown during evaluation")
return fmt.Errorf("shutdown while waiting for state sync")
}
goto CHECK
}
// invokeScheduler is used to invoke the business logic of the scheduler
func (w *Worker) invokeScheduler(eval *structs.Evaluation) error {
// TODO:
// Snapshot the current state
snap, err := w.srv.fsm.State().Snapshot()
if err != nil {
return fmt.Errorf("failed to snapshot state: %v", err)
}
// Create the scheduler
sched, err := scheduler.NewScheduler(eval.Type, snap, w)
if err != nil {
return fmt.Errorf("failed to instantiate scheduler: %v", err)
}
// Process the evaluation
err = sched.Process(eval)
if err != nil {
return fmt.Errorf("failed to process evaluation: %v", err)
}
return nil
}
// SubmitPlan is used to submit a plan for consideration. This allows
// the worker to act as the planner for the scheduler.
func (w *Worker) SubmitPlan(*structs.Plan) (*structs.PlanResult, scheduler.State, error) {
// TODO
return nil, nil, nil
}
// backoffErr is used to do an exponential back off on error. This is
// maintained statefully for the worker. Returns if attempts should be
// abandoneded due to shutdown.

View File

@@ -3,6 +3,7 @@ package scheduler
import (
"fmt"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -49,14 +50,7 @@ type Scheduler interface {
type State interface {
// Nodes returns an iterator over all the nodes.
// The type of each result is *structs.Node
Nodes() (ResultIterator, error)
}
// ResultIterator is used to iterate over a list of results
// from a query. The type of the response depends on the query,
// and perfectly represents when generics would be useful.
type ResultIterator interface {
Next() interface{}
Nodes() (memdb.ResultIterator, error)
}
// Planner interface is used to submit a task allocation plan.