drain heap

This commit is contained in:
Alex Dadgar
2018-03-02 15:19:55 -08:00
committed by Michael Schurter
parent 6026af2a8a
commit e566fcdf5f
3 changed files with 309 additions and 7 deletions

View File

@@ -1,20 +1,173 @@
package drainerv2
import (
"context"
"sync"
"time"
"github.com/hashicorp/nomad/nomad/structs"
)
// DrainDeadlineNotifier allows batch notification of nodes that have reached
// their drain deadline.
type DrainDeadlineNotifier interface {
NextBatch() <-chan []*structs.Node
// NextBatch returns the next batch of nodes that have reached their
// deadline.
NextBatch() <-chan []string
// Remove removes the given node from being tracked for a deadline.
Remove(nodeID string)
// Watch marks the given node for being watched for its deadline.
Watch(nodeID string, deadline time.Time)
}
// TODO Make any of what I just wrote true :) Initially it is just a simple
// implementation.
// deadlineHeap implements the DrainDeadlineNotifier and is backed by a min-heap
// to efficiently determine the next deadlining node. It also supports
// coalescing several deadlines into a single emission.
type deadlineHeap struct {
ctx context.Context
coalesceWindow time.Duration
batch chan []string
nodes map[string]time.Time
trigger chan string
l sync.RWMutex
}
func (d *deadlineHeap) NextBatch() <-chan []structs.Node { return nil }
func (d *deadlineHeap) Remove(nodeID string) {}
func (d *deadlineHeap) Watch(nodeID string, deadline time.Time) {}
// NewDeadlineHeap returns a new deadline heap that coalesces for the given
// duration and will stop watching when the passed context is cancelled.
func NewDeadlineHeap(ctx context.Context, coalesceWindow time.Duration) *deadlineHeap {
d := &deadlineHeap{
ctx: ctx,
coalesceWindow: coalesceWindow,
batch: make(chan []string, 4),
nodes: make(map[string]time.Time, 64),
trigger: make(chan string, 4),
}
go d.watch()
return d
}
func (d *deadlineHeap) watch() {
timer := time.NewTimer(0 * time.Millisecond)
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
var nextDeadline time.Time
defer timer.Stop()
for {
select {
case <-d.ctx.Done():
return
case <-timer.C:
if nextDeadline.IsZero() {
continue
}
d.l.Lock()
var batch []string
for nodeID, nodeDeadline := range d.nodes {
if !nodeDeadline.After(nextDeadline) {
batch = append(batch, nodeID)
}
}
// If there is nothing exit early
if len(batch) == 0 {
d.l.Unlock()
goto CALC
}
// Send the batch
select {
case d.batch <- batch:
case <-d.ctx.Done():
d.l.Unlock()
return
}
// Clean up the nodes
for _, nodeID := range batch {
delete(d.nodes, nodeID)
}
d.l.Unlock()
case <-d.trigger:
}
CALC:
deadline, ok := d.calculateNextDeadline()
if !ok {
continue
}
if !deadline.Equal(nextDeadline) {
timer.Reset(deadline.Sub(time.Now()))
nextDeadline = deadline
}
}
}
// calculateNextDeadline returns the next deadline in which to scan for
// deadlined nodes. It applies the coalesce window.
func (d *deadlineHeap) calculateNextDeadline() (time.Time, bool) {
d.l.Lock()
defer d.l.Unlock()
if len(d.nodes) == 0 {
return time.Time{}, false
}
// Calculate the new timer value
var deadline time.Time
for _, v := range d.nodes {
if deadline.IsZero() || v.Before(deadline) {
deadline = v
}
}
var maxWithinWindow time.Time
coalescedDeadline := deadline.Add(d.coalesceWindow)
for _, nodeDeadline := range d.nodes {
if nodeDeadline.Before(coalescedDeadline) {
if maxWithinWindow.IsZero() || nodeDeadline.After(maxWithinWindow) {
maxWithinWindow = nodeDeadline
}
}
}
return maxWithinWindow, true
}
// NextBatch returns the next batch of nodes to be drained.
func (d *deadlineHeap) NextBatch() <-chan []string {
return d.batch
}
func (d *deadlineHeap) Remove(nodeID string) {
d.l.Lock()
defer d.l.Unlock()
delete(d.nodes, nodeID)
select {
case d.trigger <- nodeID:
default:
}
}
func (d *deadlineHeap) Watch(nodeID string, deadline time.Time) {
d.l.Lock()
defer d.l.Unlock()
d.nodes[nodeID] = deadline
select {
case d.trigger <- nodeID:
default:
}
}

View File

@@ -0,0 +1,149 @@
package drainerv2
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestDeadlineHeap_Interface(t *testing.T) {
t.Parallel()
require := require.New(t)
h := NewDeadlineHeap(context.Background(), 1*time.Second)
require.Implements((*DrainDeadlineNotifier)(nil), h)
}
func TestDeadlineHeap_WatchAndGet(t *testing.T) {
t.Parallel()
require := require.New(t)
h := NewDeadlineHeap(context.Background(), 1*time.Second)
now := time.Now()
nodeID := "1"
wait := 10 * time.Millisecond
deadline := now.Add(wait)
h.Watch(nodeID, deadline)
var batch []string
select {
case batch = <-h.NextBatch():
case <-time.After(2 * wait):
t.Fatal("timeout")
}
require.Len(batch, 1)
require.Equal(nodeID, batch[0])
}
func TestDeadlineHeap_WatchThenUpdateAndGet(t *testing.T) {
t.Parallel()
require := require.New(t)
h := NewDeadlineHeap(context.Background(), 1*time.Second)
now := time.Now()
nodeID := "1"
wait := 10 * time.Millisecond
deadline := now.Add(wait)
// Initially watch way in the future
h.Watch(nodeID, now.Add(24*time.Hour))
// Rewatch
h.Watch(nodeID, deadline)
var batch []string
select {
case batch = <-h.NextBatch():
case <-time.After(2 * wait):
t.Fatal("timeout")
}
require.Len(batch, 1)
require.Equal(nodeID, batch[0])
}
func TestDeadlineHeap_MultiwatchAndDelete(t *testing.T) {
t.Parallel()
require := require.New(t)
h := NewDeadlineHeap(context.Background(), 1*time.Second)
now := time.Now()
wait := 50 * time.Millisecond
deadline := now.Add(wait)
nodeID1 := "1"
nodeID2 := "2"
h.Watch(nodeID1, deadline)
h.Watch(nodeID2, deadline)
time.Sleep(1 * time.Millisecond)
h.Remove(nodeID2)
var batch []string
select {
case batch = <-h.NextBatch():
case <-time.After(2 * wait):
t.Fatal("timeout")
}
require.Len(batch, 1)
require.Equal(nodeID1, batch[0])
}
func TestDeadlineHeap_WatchCoalesce(t *testing.T) {
t.Parallel()
require := require.New(t)
h := NewDeadlineHeap(context.Background(), 250*time.Millisecond)
now := time.Now()
group1 := map[string]time.Time{
"1": now.Add(5 * time.Millisecond),
"2": now.Add(10 * time.Millisecond),
"3": now.Add(20 * time.Millisecond),
"4": now.Add(100 * time.Millisecond),
}
group2 := map[string]time.Time{
"10": now.Add(355 * time.Millisecond),
"11": now.Add(360 * time.Millisecond),
}
for _, g := range []map[string]time.Time{group1, group2} {
for n, d := range g {
h.Watch(n, d)
}
}
var batch []string
select {
case batch = <-h.NextBatch():
case <-time.After(1 * time.Second):
t.Fatal("timeout")
}
require.Len(batch, len(group1))
for nodeID := range group1 {
require.Contains(batch, nodeID)
}
batch = nil
select {
case batch = <-h.NextBatch():
case <-time.After(2 * time.Second):
t.Fatal("timeout")
}
require.Len(batch, len(group2))
for nodeID := range group2 {
require.Contains(batch, nodeID)
}
select {
case <-h.NextBatch():
t.Fatal("unexpected batch")
case <-time.After(100 * time.Millisecond):
}
}

View File

@@ -140,7 +140,7 @@ func (n *NodeDrainer) run(ctx context.Context) {
}
}
func (n *NodeDrainer) handleDeadlinedNodes(nodes []*structs.Node) {
func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) {
// TODO
}