diff --git a/nomad/drainerv2/drain_heap.go b/nomad/drainerv2/drain_heap.go new file mode 100644 index 000000000..899b8dd16 --- /dev/null +++ b/nomad/drainerv2/drain_heap.go @@ -0,0 +1,20 @@ +package drainerv2 + +import ( + "time" + + "github.com/hashicorp/nomad/nomad/structs" +) + +type DrainDeadlineNotifier interface { + NextBatch() <-chan []*structs.Node + Remove(nodeID string) + Watch(nodeID string, deadline time.Time) +} + +type deadlineHeap struct { +} + +func (d *deadlineHeap) NextBatch() <-chan []structs.Node { return nil } +func (d *deadlineHeap) Remove(nodeID string) {} +func (d *deadlineHeap) Watch(nodeID string, deadline time.Time) {} diff --git a/nomad/drainerv2/drain_interfaces.go b/nomad/drainerv2/drain_interfaces.go new file mode 100644 index 000000000..008537619 --- /dev/null +++ b/nomad/drainerv2/drain_interfaces.go @@ -0,0 +1 @@ +package drainerv2 diff --git a/nomad/drainerv2/drainer.go b/nomad/drainerv2/drainer.go new file mode 100644 index 000000000..a7156dc91 --- /dev/null +++ b/nomad/drainerv2/drainer.go @@ -0,0 +1,167 @@ +package drainerv2 + +import ( + "context" + "log" + "sync" + + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" + "golang.org/x/time/rate" +) + +const ( + // LimitStateQueriesPerSecond is the number of state queries allowed per + // second + LimitStateQueriesPerSecond = 100.0 +) + +// RaftApplier contains methods for applying the raft requests required by the +// NodeDrainer. +type RaftApplier interface { + AllocUpdateDesiredTransition(allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) error + NodeDrainComplete(nodeID string) error +} + +type AllocDrainer interface { + drain(allocs []*structs.Allocation) +} + +type DrainingJobWatcherFactory func(context.Context, *rate.Limiter, AllocDrainer) DrainingJobWatcher +type DrainingNodeWatcherFactory func(context.Context, *rate.Limiter, AllocDrainer) DrainingNodeWatcher +type DrainDeadlineNotifierFactory func(context.Context) DrainDeadlineNotifier + +type NodeDrainerConfig struct { + Logger *log.Logger + Raft RaftApplier + JobFactory DrainingJobWatcherFactory + NodeFactory DrainingNodeWatcherFactory + DrainDeadlineFactory DrainDeadlineNotifierFactory + StateQueriesPerSecond float64 +} + +type NodeDrainer struct { + enabled bool + logger *log.Logger + + // nodes is the set of draining nodes + nodes map[string]*drainingNode + + // doneNodeCh is used to signal that a node is done draining + doneNodeCh chan string + + nodeWatcher DrainingNodeWatcher + nodeFactory DrainingNodeWatcherFactory + + jobWatcher DrainingJobWatcher + jobFactory DrainingJobWatcherFactory + + deadlineNotifier DrainDeadlineNotifier + deadlineNotifierFactory DrainDeadlineNotifierFactory + + // state is the state that is watched for state changes. + state *state.StateStore + + // queryLimiter is used to limit the rate of blocking queries + queryLimiter *rate.Limiter + + // raft is a shim around the raft messages necessary for draining + raft RaftApplier + + // ctx and exitFn are used to cancel the watcher + ctx context.Context + exitFn context.CancelFunc + + l sync.RWMutex +} + +func NewNodeDrainer(c *NodeDrainerConfig) *NodeDrainer { + return &NodeDrainer{ + raft: c.Raft, + logger: c.Logger, + jobFactory: c.JobFactory, + nodeFactory: c.NodeFactory, + deadlineNotifierFactory: c.DrainDeadlineFactory, + queryLimiter: rate.NewLimiter(rate.Limit(c.StateQueriesPerSecond), 100), + } +} + +// SetEnabled will start or stop the node draining goroutine depending on the +// enabled boolean. +func (n *NodeDrainer) SetEnabled(enabled bool, state *state.StateStore) { + n.l.Lock() + defer n.l.Unlock() + + wasEnabled := n.enabled + n.enabled = enabled + + if state != nil { + n.state = state + } + + // Flush the state to create the necessary objects + n.flush() + + // If we are starting now, launch the watch daemon + if enabled && !wasEnabled { + n.run(n.ctx) + } +} + +// flush is used to clear the state of the watcher +func (n *NodeDrainer) flush() { + // Kill everything associated with the watcher + if n.exitFn != nil { + n.exitFn() + } + + n.ctx, n.exitFn = context.WithCancel(context.Background()) + n.jobWatcher = n.jobFactory(n.ctx, n.queryLimiter, n) + n.nodeWatcher = n.nodeFactory(n.ctx, n.queryLimiter, n) + n.deadlineNotifier = n.deadlineNotifierFactory(n.ctx) + n.nodes = make(map[string]*drainingNode, 32) + n.doneNodeCh = make(chan string, 4) +} + +func (n *NodeDrainer) run(ctx context.Context) { + for { + select { + case <-n.ctx.Done(): + return + case nodes := <-n.deadlineNotifier.NextBatch(): + n.handleDeadlinedNodes(nodes) + case nodes := <-n.nodeWatcher.Transistioning(): + n.handleNodeDrainTransistion(nodes) + case allocs := <-n.jobWatcher.Drain(): + n.handleJobAllocDrain(allocs) + case node := <-n.doneNodeCh: + n.handleDoneNode(node) + } + } +} + +func (n *NodeDrainer) handleDeadlinedNodes(nodes []*structs.Node) { + // TODO +} + +func (n *NodeDrainer) handleNodeDrainTransistion(nodes []*structs.Node) { + // TODO +} + +func (n *NodeDrainer) handleJobAllocDrain(allocs []*structs.Allocation) { + // TODO + + // TODO Call check on the appropriate nodes when the final allocs + // transistion to stop so we have a place to determine with the node + // is done and the final drain of system allocs + // TODO This probably requires changing the interface such that it + // returns replaced allocs as well. +} + +func (n *NodeDrainer) handleDoneNode(nodeID string) { + // TODO +} + +func (n *NodeDrainer) drain(allocs []*structs.Allocation) { + // TODO +} diff --git a/nomad/drainerv2/draining_node.go b/nomad/drainerv2/draining_node.go new file mode 100644 index 000000000..3150be1fd --- /dev/null +++ b/nomad/drainerv2/draining_node.go @@ -0,0 +1,65 @@ +package drainerv2 + +import ( + "sync" + "time" + + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" +) + +// TODO make this an interface and then I can optimize the infinite case by +// using a singleton object + +type drainCoordinator interface { + done(nodeID string) +} + +func (n *NodeDrainer) nodeDone(nodeID string) { + select { + case <-n.ctx.Done(): + case n.doneNodeCh <- nodeID: + } +} + +type drainingNode struct { + coordinator drainCoordinator + state *state.StateStore + node *structs.Node + l sync.RWMutex +} + +func NewDrainingNode(node *structs.Node, state *state.StateStore, coordinator drainCoordinator) *drainingNode { + return &drainingNode{ + coordinator: coordinator, + state: state, + node: node, + } +} + +func (n *drainingNode) Update(node *structs.Node) { + n.l.Lock() + defer n.l.Unlock() + n.node = node +} + +// DeadlineTime returns if the node has a deadline and if so what it is +func (n *drainingNode) DeadlineTime() (bool, time.Time) { + n.l.RLock() + defer n.l.RUnlock() + + // Should never happen + if n.node == nil || n.node.DrainStrategy == nil { + return false, time.Time{} + } + + return n.node.DrainStrategy.DeadlineTime() +} + +// DeadlineAllocs returns the set of allocations that should be drained given a +// node is at its deadline +func (n *drainingNode) DeadlineAllocs() ([]*structs.Allocation, error) { + n.l.RLock() + defer n.l.RUnlock() + return nil, nil +} diff --git a/nomad/drainerv2/watch_jobs.go b/nomad/drainerv2/watch_jobs.go new file mode 100644 index 000000000..836cea685 --- /dev/null +++ b/nomad/drainerv2/watch_jobs.go @@ -0,0 +1,8 @@ +package drainerv2 + +import "github.com/hashicorp/nomad/nomad/structs" + +type DrainingJobWatcher interface { + RegisterJob(jobID, namespace string) + Drain() <-chan []*structs.Allocation +} diff --git a/nomad/drainerv2/watch_nodes.go b/nomad/drainerv2/watch_nodes.go new file mode 100644 index 000000000..623c2edb2 --- /dev/null +++ b/nomad/drainerv2/watch_nodes.go @@ -0,0 +1,7 @@ +package drainerv2 + +import "github.com/hashicorp/nomad/nomad/structs" + +type DrainingNodeWatcher interface { + Transistioning() <-chan []*structs.Node +}