diff --git a/client/client.go b/client/client.go index 367a601b3..e2a614572 100644 --- a/client/client.go +++ b/client/client.go @@ -371,9 +371,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic }, // TODO(tgross): refactor these dispenser constructors into csimanager to tidy it up }) - // create heartbeatStop, and restore its previous state from the state store. Post init for the stateDB - c.heartbeatStop = newHeartbeatStop(c.stateDB, c.getAllocRunner, logger, c.shutdownCh) - // Setup the clients RPC server c.setupClientRpc() @@ -448,9 +445,13 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic c.pluginManagers.RegisterAndRun(devManager) // Batching of initial fingerprints is done to reduce the number of node - // updates sent to the server on startup. + // updates sent to the server on startup. This is the first RPC to the servers go c.batchFirstFingerprints() + // create heartbeatStop. We go after the first attempt to connect to the server, so + // that our grace period for connection goes for the full time + c.heartbeatStop = newHeartbeatStop(c.getAllocRunner, batchFirstFingerprintsTimeout, logger, c.shutdownCh) + // Watch for disconnection, and heartbeatStopAllocs configured to have a maximum // lifetime when out of touch with the server go c.heartbeatStop.watch() @@ -1764,7 +1765,7 @@ func (c *Client) registerNode() error { c.heartbeatLock.Lock() defer c.heartbeatLock.Unlock() - c.heartbeatStop.setLastOk() + c.heartbeatStop.setLastOk(time.Now()) c.heartbeatTTL = resp.HeartbeatTTL return nil } @@ -1793,7 +1794,7 @@ func (c *Client) updateNodeStatus() error { last := c.lastHeartbeat() oldTTL := c.heartbeatTTL haveHeartbeated := c.haveHeartbeated - c.heartbeatStop.setLastOk() + c.heartbeatStop.setLastOk(time.Now()) c.heartbeatTTL = resp.HeartbeatTTL c.haveHeartbeated = true c.heartbeatLock.Unlock() diff --git a/client/heartbeatstop.go b/client/heartbeatstop.go index 3186bed70..05e9b1f5e 100644 --- a/client/heartbeatstop.go +++ b/client/heartbeatstop.go @@ -5,44 +5,36 @@ import ( "time" hclog "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/nomad/structs" ) type heartbeatStop struct { lastOk time.Time + startupGrace time.Time allocInterval map[string]time.Duration allocHookCh chan *structs.Allocation getRunner func(string) (AllocRunner, error) logger hclog.InterceptLogger - state state.StateDB shutdownCh chan struct{} lock *sync.RWMutex } func newHeartbeatStop( - state state.StateDB, getRunner func(string) (AllocRunner, error), + timeout time.Duration, logger hclog.InterceptLogger, shutdownCh chan struct{}) *heartbeatStop { h := &heartbeatStop{ + startupGrace: time.Now().Add(timeout), allocInterval: make(map[string]time.Duration), allocHookCh: make(chan *structs.Allocation), getRunner: getRunner, logger: logger, - state: state, shutdownCh: shutdownCh, lock: &sync.RWMutex{}, } - if state != nil { - lastOk, err := state.GetLastHeartbeatOk() - if err == nil && !lastOk.IsZero() { - h.lastOk = lastOk - } - } - return h } @@ -60,12 +52,19 @@ func (h *heartbeatStop) allocHook(alloc *structs.Allocation) { func (h *heartbeatStop) shouldStop(alloc *structs.Allocation) bool { tg := allocTaskGroup(alloc) if tg.StopAfterClientDisconnect != nil { - now := time.Now() - return now.After(h.lastOk.Add(*tg.StopAfterClientDisconnect)) + return h.shouldStopAfter(time.Now(), *tg.StopAfterClientDisconnect) } return false } +func (h *heartbeatStop) shouldStopAfter(now time.Time, interval time.Duration) bool { + lastOk := h.getLastOk() + if lastOk.IsZero() { + return h.startupGrace.After(now) + } + return now.After(lastOk.Add(interval)) +} + // watch is a loop that checks for allocations that should be stopped. It also manages the // registration of allocs to be stopped in a single thread. func (h *heartbeatStop) watch() { @@ -116,7 +115,7 @@ func (h *heartbeatStop) watch() { now = time.Now() for allocID, d := range h.allocInterval { - if now.After(h.lastOk.Add(d)) { + if h.shouldStopAfter(now, d) { stop <- allocID } } @@ -124,16 +123,10 @@ func (h *heartbeatStop) watch() { } // setLastOk sets the last known good heartbeat time to the current time, and persists that time to disk -func (h *heartbeatStop) setLastOk() error { +func (h *heartbeatStop) setLastOk(t time.Time) { h.lock.Lock() defer h.lock.Unlock() - t := time.Now() h.lastOk = t - // We may encounter an error here, but want to update the running time - // unconditionally, since we'll actively terminate stateful tasks if it ages too - // much. We only use the state value when restarting the client itself after a - // crash, so it's better to update the runtime value and have the stored value stale - return h.state.PutLastHeartbeatOk(t) } func (h *heartbeatStop) getLastOk() time.Time { diff --git a/client/heartbeatstop_test.go b/client/heartbeatstop_test.go index 257b91b44..86ba76410 100644 --- a/client/heartbeatstop_test.go +++ b/client/heartbeatstop_test.go @@ -23,13 +23,6 @@ func TestHearbeatStop_allocHook(t *testing.T) { }) defer cleanupC1() - // last heartbeat is persisted in the state db - err := client.registerNode() - require.NoError(t, err) - last, err := client.stateDB.GetLastHeartbeatOk() - require.NoError(t, err) - require.Empty(t, last) - // an allocation, with a tiny lease d := 1 * time.Microsecond alloc := &structs.Allocation{ @@ -51,7 +44,7 @@ func TestHearbeatStop_allocHook(t *testing.T) { } // alloc added to heartbeatStop.allocs - err = client.addAlloc(alloc, "") + err := client.addAlloc(alloc, "") require.NoError(t, err) testutil.WaitForResult(func() (bool, error) { _, ok := client.heartbeatStop.allocInterval[alloc.ID] diff --git a/client/state/interface.go b/client/state/interface.go index d4ecb1257..dc492d5ec 100644 --- a/client/state/interface.go +++ b/client/state/interface.go @@ -1,8 +1,6 @@ package state import ( - "time" - "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" "github.com/hashicorp/nomad/client/dynamicplugins" @@ -78,13 +76,6 @@ type StateDB interface { // PutDynamicPluginRegistryState is used to store the dynamic plugin managers's state. PutDynamicPluginRegistryState(state *dynamicplugins.RegistryState) error - // GetLastHeartbeatOk retrieves the stored last known good heartbeat - GetLastHeartbeatOk() (time.Time, error) - - // PutLastHeartbeatOk stores the last heartbeat known to have made the round trip to - // the server - PutLastHeartbeatOk(time.Time) error - // Close the database. Unsafe for further use after calling regardless // of return value. Close() error diff --git a/client/state/memdb.go b/client/state/memdb.go index 46a3addce..63e967e45 100644 --- a/client/state/memdb.go +++ b/client/state/memdb.go @@ -2,7 +2,6 @@ package state import ( "sync" - "time" hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" @@ -34,9 +33,6 @@ type MemDB struct { // dynamicmanager -> registry-state dynamicManagerPs *dynamicplugins.RegistryState - // lastHeartbeatOk -> last_heartbeat_ok - lastHeartbeatOk time.Time - logger hclog.Logger mu sync.RWMutex @@ -93,19 +89,6 @@ func (m *MemDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentS return nil } -func (m *MemDB) GetLastHeartbeatOk() (time.Time, error) { - m.mu.Lock() - defer m.mu.Unlock() - return m.lastHeartbeatOk, nil -} - -func (m *MemDB) PutLastHeartbeatOk(t time.Time) error { - m.mu.Lock() - defer m.mu.Unlock() - m.lastHeartbeatOk = t - return nil -} - func (m *MemDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) { m.mu.RLock() defer m.mu.RUnlock() diff --git a/client/state/noopdb.go b/client/state/noopdb.go index fbb4535fe..28fbd2c15 100644 --- a/client/state/noopdb.go +++ b/client/state/noopdb.go @@ -1,8 +1,6 @@ package state import ( - "time" - "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" "github.com/hashicorp/nomad/client/dynamicplugins" @@ -81,14 +79,6 @@ func (n NoopDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, return nil, nil } -func (n NoopDB) PutLastHeartbeatOk(t time.Time) error { - return nil -} - -func (n NoopDB) GetLastHeartbeatOk() (time.Time, error) { - return time.Time{}, nil -} - func (n NoopDB) Close() error { return nil } diff --git a/client/state/state_database.go b/client/state/state_database.go index 09d33e0d4..a9a958f5f 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -91,12 +91,6 @@ var ( // registryStateKey is the key at which dynamic plugin registry state is stored registryStateKey = []byte("registry_state") - - // lastHeartbeatOkBucket is the bucket for storing the last known good heartbeat time - lastHeartbeatOkBucket = []byte("last_heartbeat_ok") - - // lastHeartbeatOkKey is the key for the last known good heartbeat time - lastHeartbeatOkKey = []byte("last_heartbeat") ) // taskBucketName returns the bucket name for the given task name. @@ -661,51 +655,6 @@ func (s *BoltStateDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryS return ps, nil } -// PutLastHeartbeatOk stores the dynamic plugin registry's -// state or returns an error. -func (s *BoltStateDB) PutLastHeartbeatOk(t time.Time) error { - return s.db.Update(func(tx *boltdd.Tx) error { - // Retrieve the root dynamic plugin manager bucket - dynamicBkt, err := tx.CreateBucketIfNotExists(lastHeartbeatOkBucket) - if err != nil { - return err - } - - return dynamicBkt.Put(lastHeartbeatOkKey, t.Unix()) - }) -} - -// GetLastHeartbeatOk stores the dynamic plugin registry's -// registry state or returns an error. -func (s *BoltStateDB) GetLastHeartbeatOk() (time.Time, error) { - var unix int64 - err := s.db.View(func(tx *boltdd.Tx) error { - dynamicBkt := tx.Bucket(lastHeartbeatOkBucket) - if dynamicBkt == nil { - // No state, return - return nil - } - - // Restore Plugin State if it exists - if err := dynamicBkt.Get(lastHeartbeatOkKey, unix); err != nil { - if !boltdd.IsErrNotFound(err) { - return fmt.Errorf("failed to read last heartbeat state: %v", err) - } - - // Key not found, reset output to nil - unix = 0 - } - - return nil - }) - - if err != nil { - return time.Time{}, err - } - - return time.Unix(unix, 0), nil -} - // init initializes metadata entries in a newly created state database. func (s *BoltStateDB) init() error { return s.db.Update(func(tx *boltdd.Tx) error {