Initial design

This commit is contained in:
Alex Dadgar
2018-03-01 16:37:19 -08:00
committed by Michael Schurter
parent c00c02df62
commit 6026af2a8a
6 changed files with 268 additions and 0 deletions

View File

@@ -0,0 +1,20 @@
package drainerv2
import (
"time"
"github.com/hashicorp/nomad/nomad/structs"
)
type DrainDeadlineNotifier interface {
NextBatch() <-chan []*structs.Node
Remove(nodeID string)
Watch(nodeID string, deadline time.Time)
}
type deadlineHeap struct {
}
func (d *deadlineHeap) NextBatch() <-chan []structs.Node { return nil }
func (d *deadlineHeap) Remove(nodeID string) {}
func (d *deadlineHeap) Watch(nodeID string, deadline time.Time) {}

View File

@@ -0,0 +1 @@
package drainerv2

167
nomad/drainerv2/drainer.go Normal file
View File

@@ -0,0 +1,167 @@
package drainerv2
import (
"context"
"log"
"sync"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"golang.org/x/time/rate"
)
const (
// LimitStateQueriesPerSecond is the number of state queries allowed per
// second
LimitStateQueriesPerSecond = 100.0
)
// RaftApplier contains methods for applying the raft requests required by the
// NodeDrainer.
type RaftApplier interface {
AllocUpdateDesiredTransition(allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) error
NodeDrainComplete(nodeID string) error
}
type AllocDrainer interface {
drain(allocs []*structs.Allocation)
}
type DrainingJobWatcherFactory func(context.Context, *rate.Limiter, AllocDrainer) DrainingJobWatcher
type DrainingNodeWatcherFactory func(context.Context, *rate.Limiter, AllocDrainer) DrainingNodeWatcher
type DrainDeadlineNotifierFactory func(context.Context) DrainDeadlineNotifier
type NodeDrainerConfig struct {
Logger *log.Logger
Raft RaftApplier
JobFactory DrainingJobWatcherFactory
NodeFactory DrainingNodeWatcherFactory
DrainDeadlineFactory DrainDeadlineNotifierFactory
StateQueriesPerSecond float64
}
type NodeDrainer struct {
enabled bool
logger *log.Logger
// nodes is the set of draining nodes
nodes map[string]*drainingNode
// doneNodeCh is used to signal that a node is done draining
doneNodeCh chan string
nodeWatcher DrainingNodeWatcher
nodeFactory DrainingNodeWatcherFactory
jobWatcher DrainingJobWatcher
jobFactory DrainingJobWatcherFactory
deadlineNotifier DrainDeadlineNotifier
deadlineNotifierFactory DrainDeadlineNotifierFactory
// state is the state that is watched for state changes.
state *state.StateStore
// queryLimiter is used to limit the rate of blocking queries
queryLimiter *rate.Limiter
// raft is a shim around the raft messages necessary for draining
raft RaftApplier
// ctx and exitFn are used to cancel the watcher
ctx context.Context
exitFn context.CancelFunc
l sync.RWMutex
}
func NewNodeDrainer(c *NodeDrainerConfig) *NodeDrainer {
return &NodeDrainer{
raft: c.Raft,
logger: c.Logger,
jobFactory: c.JobFactory,
nodeFactory: c.NodeFactory,
deadlineNotifierFactory: c.DrainDeadlineFactory,
queryLimiter: rate.NewLimiter(rate.Limit(c.StateQueriesPerSecond), 100),
}
}
// SetEnabled will start or stop the node draining goroutine depending on the
// enabled boolean.
func (n *NodeDrainer) SetEnabled(enabled bool, state *state.StateStore) {
n.l.Lock()
defer n.l.Unlock()
wasEnabled := n.enabled
n.enabled = enabled
if state != nil {
n.state = state
}
// Flush the state to create the necessary objects
n.flush()
// If we are starting now, launch the watch daemon
if enabled && !wasEnabled {
n.run(n.ctx)
}
}
// flush is used to clear the state of the watcher
func (n *NodeDrainer) flush() {
// Kill everything associated with the watcher
if n.exitFn != nil {
n.exitFn()
}
n.ctx, n.exitFn = context.WithCancel(context.Background())
n.jobWatcher = n.jobFactory(n.ctx, n.queryLimiter, n)
n.nodeWatcher = n.nodeFactory(n.ctx, n.queryLimiter, n)
n.deadlineNotifier = n.deadlineNotifierFactory(n.ctx)
n.nodes = make(map[string]*drainingNode, 32)
n.doneNodeCh = make(chan string, 4)
}
func (n *NodeDrainer) run(ctx context.Context) {
for {
select {
case <-n.ctx.Done():
return
case nodes := <-n.deadlineNotifier.NextBatch():
n.handleDeadlinedNodes(nodes)
case nodes := <-n.nodeWatcher.Transistioning():
n.handleNodeDrainTransistion(nodes)
case allocs := <-n.jobWatcher.Drain():
n.handleJobAllocDrain(allocs)
case node := <-n.doneNodeCh:
n.handleDoneNode(node)
}
}
}
func (n *NodeDrainer) handleDeadlinedNodes(nodes []*structs.Node) {
// TODO
}
func (n *NodeDrainer) handleNodeDrainTransistion(nodes []*structs.Node) {
// TODO
}
func (n *NodeDrainer) handleJobAllocDrain(allocs []*structs.Allocation) {
// TODO
// TODO Call check on the appropriate nodes when the final allocs
// transistion to stop so we have a place to determine with the node
// is done and the final drain of system allocs
// TODO This probably requires changing the interface such that it
// returns replaced allocs as well.
}
func (n *NodeDrainer) handleDoneNode(nodeID string) {
// TODO
}
func (n *NodeDrainer) drain(allocs []*structs.Allocation) {
// TODO
}

View File

@@ -0,0 +1,65 @@
package drainerv2
import (
"sync"
"time"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)
// TODO make this an interface and then I can optimize the infinite case by
// using a singleton object
type drainCoordinator interface {
done(nodeID string)
}
func (n *NodeDrainer) nodeDone(nodeID string) {
select {
case <-n.ctx.Done():
case n.doneNodeCh <- nodeID:
}
}
type drainingNode struct {
coordinator drainCoordinator
state *state.StateStore
node *structs.Node
l sync.RWMutex
}
func NewDrainingNode(node *structs.Node, state *state.StateStore, coordinator drainCoordinator) *drainingNode {
return &drainingNode{
coordinator: coordinator,
state: state,
node: node,
}
}
func (n *drainingNode) Update(node *structs.Node) {
n.l.Lock()
defer n.l.Unlock()
n.node = node
}
// DeadlineTime returns if the node has a deadline and if so what it is
func (n *drainingNode) DeadlineTime() (bool, time.Time) {
n.l.RLock()
defer n.l.RUnlock()
// Should never happen
if n.node == nil || n.node.DrainStrategy == nil {
return false, time.Time{}
}
return n.node.DrainStrategy.DeadlineTime()
}
// DeadlineAllocs returns the set of allocations that should be drained given a
// node is at its deadline
func (n *drainingNode) DeadlineAllocs() ([]*structs.Allocation, error) {
n.l.RLock()
defer n.l.RUnlock()
return nil, nil
}

View File

@@ -0,0 +1,8 @@
package drainerv2
import "github.com/hashicorp/nomad/nomad/structs"
type DrainingJobWatcher interface {
RegisterJob(jobID, namespace string)
Drain() <-chan []*structs.Allocation
}

View File

@@ -0,0 +1,7 @@
package drainerv2
import "github.com/hashicorp/nomad/nomad/structs"
type DrainingNodeWatcher interface {
Transistioning() <-chan []*structs.Node
}