From 3477f2e87a01e4f1447ab79728a5c28db8dbe6fe Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Tue, 28 Apr 2020 16:13:09 -0400 Subject: [PATCH] client/heartbeatstop: don't store client state, use timeout In order to minimize this change while keeping a simple version of the behavior, we set `lastOk` to the current time less the intial server connection timeout. If the client starts and never contacts the server, it will stop all configured tasks after the initial server connection grace period, on the assumption that we've been out of touch longer than any configured `stop_after_client_disconnect`. The more complex state behavior might be justified later, but we should learn about failure modes first. --- client/client.go | 13 +++++---- client/heartbeatstop.go | 35 ++++++++++------------- client/heartbeatstop_test.go | 9 +----- client/state/interface.go | 9 ------ client/state/memdb.go | 17 ------------ client/state/noopdb.go | 10 ------- client/state/state_database.go | 51 ---------------------------------- 7 files changed, 22 insertions(+), 122 deletions(-) diff --git a/client/client.go b/client/client.go index 367a601b3..e2a614572 100644 --- a/client/client.go +++ b/client/client.go @@ -371,9 +371,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic }, // TODO(tgross): refactor these dispenser constructors into csimanager to tidy it up }) - // create heartbeatStop, and restore its previous state from the state store. Post init for the stateDB - c.heartbeatStop = newHeartbeatStop(c.stateDB, c.getAllocRunner, logger, c.shutdownCh) - // Setup the clients RPC server c.setupClientRpc() @@ -448,9 +445,13 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic c.pluginManagers.RegisterAndRun(devManager) // Batching of initial fingerprints is done to reduce the number of node - // updates sent to the server on startup. + // updates sent to the server on startup. This is the first RPC to the servers go c.batchFirstFingerprints() + // create heartbeatStop. We go after the first attempt to connect to the server, so + // that our grace period for connection goes for the full time + c.heartbeatStop = newHeartbeatStop(c.getAllocRunner, batchFirstFingerprintsTimeout, logger, c.shutdownCh) + // Watch for disconnection, and heartbeatStopAllocs configured to have a maximum // lifetime when out of touch with the server go c.heartbeatStop.watch() @@ -1764,7 +1765,7 @@ func (c *Client) registerNode() error { c.heartbeatLock.Lock() defer c.heartbeatLock.Unlock() - c.heartbeatStop.setLastOk() + c.heartbeatStop.setLastOk(time.Now()) c.heartbeatTTL = resp.HeartbeatTTL return nil } @@ -1793,7 +1794,7 @@ func (c *Client) updateNodeStatus() error { last := c.lastHeartbeat() oldTTL := c.heartbeatTTL haveHeartbeated := c.haveHeartbeated - c.heartbeatStop.setLastOk() + c.heartbeatStop.setLastOk(time.Now()) c.heartbeatTTL = resp.HeartbeatTTL c.haveHeartbeated = true c.heartbeatLock.Unlock() diff --git a/client/heartbeatstop.go b/client/heartbeatstop.go index 3186bed70..05e9b1f5e 100644 --- a/client/heartbeatstop.go +++ b/client/heartbeatstop.go @@ -5,44 +5,36 @@ import ( "time" hclog "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/nomad/structs" ) type heartbeatStop struct { lastOk time.Time + startupGrace time.Time allocInterval map[string]time.Duration allocHookCh chan *structs.Allocation getRunner func(string) (AllocRunner, error) logger hclog.InterceptLogger - state state.StateDB shutdownCh chan struct{} lock *sync.RWMutex } func newHeartbeatStop( - state state.StateDB, getRunner func(string) (AllocRunner, error), + timeout time.Duration, logger hclog.InterceptLogger, shutdownCh chan struct{}) *heartbeatStop { h := &heartbeatStop{ + startupGrace: time.Now().Add(timeout), allocInterval: make(map[string]time.Duration), allocHookCh: make(chan *structs.Allocation), getRunner: getRunner, logger: logger, - state: state, shutdownCh: shutdownCh, lock: &sync.RWMutex{}, } - if state != nil { - lastOk, err := state.GetLastHeartbeatOk() - if err == nil && !lastOk.IsZero() { - h.lastOk = lastOk - } - } - return h } @@ -60,12 +52,19 @@ func (h *heartbeatStop) allocHook(alloc *structs.Allocation) { func (h *heartbeatStop) shouldStop(alloc *structs.Allocation) bool { tg := allocTaskGroup(alloc) if tg.StopAfterClientDisconnect != nil { - now := time.Now() - return now.After(h.lastOk.Add(*tg.StopAfterClientDisconnect)) + return h.shouldStopAfter(time.Now(), *tg.StopAfterClientDisconnect) } return false } +func (h *heartbeatStop) shouldStopAfter(now time.Time, interval time.Duration) bool { + lastOk := h.getLastOk() + if lastOk.IsZero() { + return h.startupGrace.After(now) + } + return now.After(lastOk.Add(interval)) +} + // watch is a loop that checks for allocations that should be stopped. It also manages the // registration of allocs to be stopped in a single thread. func (h *heartbeatStop) watch() { @@ -116,7 +115,7 @@ func (h *heartbeatStop) watch() { now = time.Now() for allocID, d := range h.allocInterval { - if now.After(h.lastOk.Add(d)) { + if h.shouldStopAfter(now, d) { stop <- allocID } } @@ -124,16 +123,10 @@ func (h *heartbeatStop) watch() { } // setLastOk sets the last known good heartbeat time to the current time, and persists that time to disk -func (h *heartbeatStop) setLastOk() error { +func (h *heartbeatStop) setLastOk(t time.Time) { h.lock.Lock() defer h.lock.Unlock() - t := time.Now() h.lastOk = t - // We may encounter an error here, but want to update the running time - // unconditionally, since we'll actively terminate stateful tasks if it ages too - // much. We only use the state value when restarting the client itself after a - // crash, so it's better to update the runtime value and have the stored value stale - return h.state.PutLastHeartbeatOk(t) } func (h *heartbeatStop) getLastOk() time.Time { diff --git a/client/heartbeatstop_test.go b/client/heartbeatstop_test.go index 257b91b44..86ba76410 100644 --- a/client/heartbeatstop_test.go +++ b/client/heartbeatstop_test.go @@ -23,13 +23,6 @@ func TestHearbeatStop_allocHook(t *testing.T) { }) defer cleanupC1() - // last heartbeat is persisted in the state db - err := client.registerNode() - require.NoError(t, err) - last, err := client.stateDB.GetLastHeartbeatOk() - require.NoError(t, err) - require.Empty(t, last) - // an allocation, with a tiny lease d := 1 * time.Microsecond alloc := &structs.Allocation{ @@ -51,7 +44,7 @@ func TestHearbeatStop_allocHook(t *testing.T) { } // alloc added to heartbeatStop.allocs - err = client.addAlloc(alloc, "") + err := client.addAlloc(alloc, "") require.NoError(t, err) testutil.WaitForResult(func() (bool, error) { _, ok := client.heartbeatStop.allocInterval[alloc.ID] diff --git a/client/state/interface.go b/client/state/interface.go index d4ecb1257..dc492d5ec 100644 --- a/client/state/interface.go +++ b/client/state/interface.go @@ -1,8 +1,6 @@ package state import ( - "time" - "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" "github.com/hashicorp/nomad/client/dynamicplugins" @@ -78,13 +76,6 @@ type StateDB interface { // PutDynamicPluginRegistryState is used to store the dynamic plugin managers's state. PutDynamicPluginRegistryState(state *dynamicplugins.RegistryState) error - // GetLastHeartbeatOk retrieves the stored last known good heartbeat - GetLastHeartbeatOk() (time.Time, error) - - // PutLastHeartbeatOk stores the last heartbeat known to have made the round trip to - // the server - PutLastHeartbeatOk(time.Time) error - // Close the database. Unsafe for further use after calling regardless // of return value. Close() error diff --git a/client/state/memdb.go b/client/state/memdb.go index 46a3addce..63e967e45 100644 --- a/client/state/memdb.go +++ b/client/state/memdb.go @@ -2,7 +2,6 @@ package state import ( "sync" - "time" hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" @@ -34,9 +33,6 @@ type MemDB struct { // dynamicmanager -> registry-state dynamicManagerPs *dynamicplugins.RegistryState - // lastHeartbeatOk -> last_heartbeat_ok - lastHeartbeatOk time.Time - logger hclog.Logger mu sync.RWMutex @@ -93,19 +89,6 @@ func (m *MemDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentS return nil } -func (m *MemDB) GetLastHeartbeatOk() (time.Time, error) { - m.mu.Lock() - defer m.mu.Unlock() - return m.lastHeartbeatOk, nil -} - -func (m *MemDB) PutLastHeartbeatOk(t time.Time) error { - m.mu.Lock() - defer m.mu.Unlock() - m.lastHeartbeatOk = t - return nil -} - func (m *MemDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) { m.mu.RLock() defer m.mu.RUnlock() diff --git a/client/state/noopdb.go b/client/state/noopdb.go index fbb4535fe..28fbd2c15 100644 --- a/client/state/noopdb.go +++ b/client/state/noopdb.go @@ -1,8 +1,6 @@ package state import ( - "time" - "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" "github.com/hashicorp/nomad/client/dynamicplugins" @@ -81,14 +79,6 @@ func (n NoopDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, return nil, nil } -func (n NoopDB) PutLastHeartbeatOk(t time.Time) error { - return nil -} - -func (n NoopDB) GetLastHeartbeatOk() (time.Time, error) { - return time.Time{}, nil -} - func (n NoopDB) Close() error { return nil } diff --git a/client/state/state_database.go b/client/state/state_database.go index 09d33e0d4..a9a958f5f 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -91,12 +91,6 @@ var ( // registryStateKey is the key at which dynamic plugin registry state is stored registryStateKey = []byte("registry_state") - - // lastHeartbeatOkBucket is the bucket for storing the last known good heartbeat time - lastHeartbeatOkBucket = []byte("last_heartbeat_ok") - - // lastHeartbeatOkKey is the key for the last known good heartbeat time - lastHeartbeatOkKey = []byte("last_heartbeat") ) // taskBucketName returns the bucket name for the given task name. @@ -661,51 +655,6 @@ func (s *BoltStateDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryS return ps, nil } -// PutLastHeartbeatOk stores the dynamic plugin registry's -// state or returns an error. -func (s *BoltStateDB) PutLastHeartbeatOk(t time.Time) error { - return s.db.Update(func(tx *boltdd.Tx) error { - // Retrieve the root dynamic plugin manager bucket - dynamicBkt, err := tx.CreateBucketIfNotExists(lastHeartbeatOkBucket) - if err != nil { - return err - } - - return dynamicBkt.Put(lastHeartbeatOkKey, t.Unix()) - }) -} - -// GetLastHeartbeatOk stores the dynamic plugin registry's -// registry state or returns an error. -func (s *BoltStateDB) GetLastHeartbeatOk() (time.Time, error) { - var unix int64 - err := s.db.View(func(tx *boltdd.Tx) error { - dynamicBkt := tx.Bucket(lastHeartbeatOkBucket) - if dynamicBkt == nil { - // No state, return - return nil - } - - // Restore Plugin State if it exists - if err := dynamicBkt.Get(lastHeartbeatOkKey, unix); err != nil { - if !boltdd.IsErrNotFound(err) { - return fmt.Errorf("failed to read last heartbeat state: %v", err) - } - - // Key not found, reset output to nil - unix = 0 - } - - return nil - }) - - if err != nil { - return time.Time{}, err - } - - return time.Unix(unix, 0), nil -} - // init initializes metadata entries in a newly created state database. func (s *BoltStateDB) init() error { return s.db.Update(func(tx *boltdd.Tx) error {