diff --git a/nomad/drainerv2/drain_heap.go b/nomad/drainerv2/drain_heap.go index 899b8dd16..b661447e2 100644 --- a/nomad/drainerv2/drain_heap.go +++ b/nomad/drainerv2/drain_heap.go @@ -1,20 +1,173 @@ package drainerv2 import ( + "context" + "sync" "time" - - "github.com/hashicorp/nomad/nomad/structs" ) +// DrainDeadlineNotifier allows batch notification of nodes that have reached +// their drain deadline. type DrainDeadlineNotifier interface { - NextBatch() <-chan []*structs.Node + // NextBatch returns the next batch of nodes that have reached their + // deadline. + NextBatch() <-chan []string + + // Remove removes the given node from being tracked for a deadline. Remove(nodeID string) + + // Watch marks the given node for being watched for its deadline. Watch(nodeID string, deadline time.Time) } +// TODO Make any of what I just wrote true :) Initially it is just a simple +// implementation. + +// deadlineHeap implements the DrainDeadlineNotifier and is backed by a min-heap +// to efficiently determine the next deadlining node. It also supports +// coalescing several deadlines into a single emission. type deadlineHeap struct { + ctx context.Context + coalesceWindow time.Duration + batch chan []string + nodes map[string]time.Time + trigger chan string + l sync.RWMutex } -func (d *deadlineHeap) NextBatch() <-chan []structs.Node { return nil } -func (d *deadlineHeap) Remove(nodeID string) {} -func (d *deadlineHeap) Watch(nodeID string, deadline time.Time) {} +// NewDeadlineHeap returns a new deadline heap that coalesces for the given +// duration and will stop watching when the passed context is cancelled. +func NewDeadlineHeap(ctx context.Context, coalesceWindow time.Duration) *deadlineHeap { + d := &deadlineHeap{ + ctx: ctx, + coalesceWindow: coalesceWindow, + batch: make(chan []string, 4), + nodes: make(map[string]time.Time, 64), + trigger: make(chan string, 4), + } + + go d.watch() + return d +} + +func (d *deadlineHeap) watch() { + timer := time.NewTimer(0 * time.Millisecond) + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + + var nextDeadline time.Time + defer timer.Stop() + + for { + select { + case <-d.ctx.Done(): + return + case <-timer.C: + if nextDeadline.IsZero() { + continue + } + + d.l.Lock() + var batch []string + for nodeID, nodeDeadline := range d.nodes { + if !nodeDeadline.After(nextDeadline) { + batch = append(batch, nodeID) + } + } + + // If there is nothing exit early + if len(batch) == 0 { + d.l.Unlock() + goto CALC + } + + // Send the batch + select { + case d.batch <- batch: + case <-d.ctx.Done(): + d.l.Unlock() + return + } + + // Clean up the nodes + for _, nodeID := range batch { + delete(d.nodes, nodeID) + } + d.l.Unlock() + case <-d.trigger: + } + + CALC: + deadline, ok := d.calculateNextDeadline() + if !ok { + continue + } + + if !deadline.Equal(nextDeadline) { + timer.Reset(deadline.Sub(time.Now())) + nextDeadline = deadline + } + } +} + +// calculateNextDeadline returns the next deadline in which to scan for +// deadlined nodes. It applies the coalesce window. +func (d *deadlineHeap) calculateNextDeadline() (time.Time, bool) { + d.l.Lock() + defer d.l.Unlock() + + if len(d.nodes) == 0 { + return time.Time{}, false + } + + // Calculate the new timer value + var deadline time.Time + for _, v := range d.nodes { + if deadline.IsZero() || v.Before(deadline) { + deadline = v + } + } + + var maxWithinWindow time.Time + coalescedDeadline := deadline.Add(d.coalesceWindow) + for _, nodeDeadline := range d.nodes { + if nodeDeadline.Before(coalescedDeadline) { + if maxWithinWindow.IsZero() || nodeDeadline.After(maxWithinWindow) { + maxWithinWindow = nodeDeadline + } + } + } + + return maxWithinWindow, true +} + +// NextBatch returns the next batch of nodes to be drained. +func (d *deadlineHeap) NextBatch() <-chan []string { + return d.batch +} + +func (d *deadlineHeap) Remove(nodeID string) { + d.l.Lock() + defer d.l.Unlock() + delete(d.nodes, nodeID) + + select { + case d.trigger <- nodeID: + default: + } +} + +func (d *deadlineHeap) Watch(nodeID string, deadline time.Time) { + d.l.Lock() + defer d.l.Unlock() + d.nodes[nodeID] = deadline + + select { + case d.trigger <- nodeID: + default: + } +} diff --git a/nomad/drainerv2/drain_heap_test.go b/nomad/drainerv2/drain_heap_test.go new file mode 100644 index 000000000..a47a98ff7 --- /dev/null +++ b/nomad/drainerv2/drain_heap_test.go @@ -0,0 +1,149 @@ +package drainerv2 + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestDeadlineHeap_Interface(t *testing.T) { + t.Parallel() + require := require.New(t) + h := NewDeadlineHeap(context.Background(), 1*time.Second) + require.Implements((*DrainDeadlineNotifier)(nil), h) +} + +func TestDeadlineHeap_WatchAndGet(t *testing.T) { + t.Parallel() + require := require.New(t) + h := NewDeadlineHeap(context.Background(), 1*time.Second) + + now := time.Now() + nodeID := "1" + wait := 10 * time.Millisecond + deadline := now.Add(wait) + h.Watch(nodeID, deadline) + + var batch []string + select { + case batch = <-h.NextBatch(): + case <-time.After(2 * wait): + t.Fatal("timeout") + } + + require.Len(batch, 1) + require.Equal(nodeID, batch[0]) +} + +func TestDeadlineHeap_WatchThenUpdateAndGet(t *testing.T) { + t.Parallel() + require := require.New(t) + h := NewDeadlineHeap(context.Background(), 1*time.Second) + + now := time.Now() + nodeID := "1" + wait := 10 * time.Millisecond + deadline := now.Add(wait) + + // Initially watch way in the future + h.Watch(nodeID, now.Add(24*time.Hour)) + + // Rewatch + h.Watch(nodeID, deadline) + + var batch []string + select { + case batch = <-h.NextBatch(): + case <-time.After(2 * wait): + t.Fatal("timeout") + } + + require.Len(batch, 1) + require.Equal(nodeID, batch[0]) +} + +func TestDeadlineHeap_MultiwatchAndDelete(t *testing.T) { + t.Parallel() + require := require.New(t) + h := NewDeadlineHeap(context.Background(), 1*time.Second) + + now := time.Now() + wait := 50 * time.Millisecond + deadline := now.Add(wait) + + nodeID1 := "1" + nodeID2 := "2" + h.Watch(nodeID1, deadline) + h.Watch(nodeID2, deadline) + + time.Sleep(1 * time.Millisecond) + h.Remove(nodeID2) + + var batch []string + select { + case batch = <-h.NextBatch(): + case <-time.After(2 * wait): + t.Fatal("timeout") + } + + require.Len(batch, 1) + require.Equal(nodeID1, batch[0]) +} + +func TestDeadlineHeap_WatchCoalesce(t *testing.T) { + t.Parallel() + require := require.New(t) + h := NewDeadlineHeap(context.Background(), 250*time.Millisecond) + + now := time.Now() + + group1 := map[string]time.Time{ + "1": now.Add(5 * time.Millisecond), + "2": now.Add(10 * time.Millisecond), + "3": now.Add(20 * time.Millisecond), + "4": now.Add(100 * time.Millisecond), + } + + group2 := map[string]time.Time{ + "10": now.Add(355 * time.Millisecond), + "11": now.Add(360 * time.Millisecond), + } + + for _, g := range []map[string]time.Time{group1, group2} { + for n, d := range g { + h.Watch(n, d) + } + } + + var batch []string + select { + case batch = <-h.NextBatch(): + case <-time.After(1 * time.Second): + t.Fatal("timeout") + } + + require.Len(batch, len(group1)) + for nodeID := range group1 { + require.Contains(batch, nodeID) + } + batch = nil + + select { + case batch = <-h.NextBatch(): + case <-time.After(2 * time.Second): + t.Fatal("timeout") + } + + require.Len(batch, len(group2)) + for nodeID := range group2 { + require.Contains(batch, nodeID) + } + + select { + case <-h.NextBatch(): + t.Fatal("unexpected batch") + case <-time.After(100 * time.Millisecond): + } +} diff --git a/nomad/drainerv2/drainer.go b/nomad/drainerv2/drainer.go index a7156dc91..6e9b4b73b 100644 --- a/nomad/drainerv2/drainer.go +++ b/nomad/drainerv2/drainer.go @@ -140,7 +140,7 @@ func (n *NodeDrainer) run(ctx context.Context) { } } -func (n *NodeDrainer) handleDeadlinedNodes(nodes []*structs.Node) { +func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) { // TODO }