mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
client/heartbeatstop: don't store client state, use timeout
In order to minimize this change while keeping a simple version of the behavior, we set `lastOk` to the current time less the intial server connection timeout. If the client starts and never contacts the server, it will stop all configured tasks after the initial server connection grace period, on the assumption that we've been out of touch longer than any configured `stop_after_client_disconnect`. The more complex state behavior might be justified later, but we should learn about failure modes first.
This commit is contained in:
@@ -371,9 +371,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
||||
}, // TODO(tgross): refactor these dispenser constructors into csimanager to tidy it up
|
||||
})
|
||||
|
||||
// create heartbeatStop, and restore its previous state from the state store. Post init for the stateDB
|
||||
c.heartbeatStop = newHeartbeatStop(c.stateDB, c.getAllocRunner, logger, c.shutdownCh)
|
||||
|
||||
// Setup the clients RPC server
|
||||
c.setupClientRpc()
|
||||
|
||||
@@ -448,9 +445,13 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
||||
c.pluginManagers.RegisterAndRun(devManager)
|
||||
|
||||
// Batching of initial fingerprints is done to reduce the number of node
|
||||
// updates sent to the server on startup.
|
||||
// updates sent to the server on startup. This is the first RPC to the servers
|
||||
go c.batchFirstFingerprints()
|
||||
|
||||
// create heartbeatStop. We go after the first attempt to connect to the server, so
|
||||
// that our grace period for connection goes for the full time
|
||||
c.heartbeatStop = newHeartbeatStop(c.getAllocRunner, batchFirstFingerprintsTimeout, logger, c.shutdownCh)
|
||||
|
||||
// Watch for disconnection, and heartbeatStopAllocs configured to have a maximum
|
||||
// lifetime when out of touch with the server
|
||||
go c.heartbeatStop.watch()
|
||||
@@ -1764,7 +1765,7 @@ func (c *Client) registerNode() error {
|
||||
|
||||
c.heartbeatLock.Lock()
|
||||
defer c.heartbeatLock.Unlock()
|
||||
c.heartbeatStop.setLastOk()
|
||||
c.heartbeatStop.setLastOk(time.Now())
|
||||
c.heartbeatTTL = resp.HeartbeatTTL
|
||||
return nil
|
||||
}
|
||||
@@ -1793,7 +1794,7 @@ func (c *Client) updateNodeStatus() error {
|
||||
last := c.lastHeartbeat()
|
||||
oldTTL := c.heartbeatTTL
|
||||
haveHeartbeated := c.haveHeartbeated
|
||||
c.heartbeatStop.setLastOk()
|
||||
c.heartbeatStop.setLastOk(time.Now())
|
||||
c.heartbeatTTL = resp.HeartbeatTTL
|
||||
c.haveHeartbeated = true
|
||||
c.heartbeatLock.Unlock()
|
||||
|
||||
@@ -5,44 +5,36 @@ import (
|
||||
"time"
|
||||
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
type heartbeatStop struct {
|
||||
lastOk time.Time
|
||||
startupGrace time.Time
|
||||
allocInterval map[string]time.Duration
|
||||
allocHookCh chan *structs.Allocation
|
||||
getRunner func(string) (AllocRunner, error)
|
||||
logger hclog.InterceptLogger
|
||||
state state.StateDB
|
||||
shutdownCh chan struct{}
|
||||
lock *sync.RWMutex
|
||||
}
|
||||
|
||||
func newHeartbeatStop(
|
||||
state state.StateDB,
|
||||
getRunner func(string) (AllocRunner, error),
|
||||
timeout time.Duration,
|
||||
logger hclog.InterceptLogger,
|
||||
shutdownCh chan struct{}) *heartbeatStop {
|
||||
|
||||
h := &heartbeatStop{
|
||||
startupGrace: time.Now().Add(timeout),
|
||||
allocInterval: make(map[string]time.Duration),
|
||||
allocHookCh: make(chan *structs.Allocation),
|
||||
getRunner: getRunner,
|
||||
logger: logger,
|
||||
state: state,
|
||||
shutdownCh: shutdownCh,
|
||||
lock: &sync.RWMutex{},
|
||||
}
|
||||
|
||||
if state != nil {
|
||||
lastOk, err := state.GetLastHeartbeatOk()
|
||||
if err == nil && !lastOk.IsZero() {
|
||||
h.lastOk = lastOk
|
||||
}
|
||||
}
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
@@ -60,12 +52,19 @@ func (h *heartbeatStop) allocHook(alloc *structs.Allocation) {
|
||||
func (h *heartbeatStop) shouldStop(alloc *structs.Allocation) bool {
|
||||
tg := allocTaskGroup(alloc)
|
||||
if tg.StopAfterClientDisconnect != nil {
|
||||
now := time.Now()
|
||||
return now.After(h.lastOk.Add(*tg.StopAfterClientDisconnect))
|
||||
return h.shouldStopAfter(time.Now(), *tg.StopAfterClientDisconnect)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (h *heartbeatStop) shouldStopAfter(now time.Time, interval time.Duration) bool {
|
||||
lastOk := h.getLastOk()
|
||||
if lastOk.IsZero() {
|
||||
return h.startupGrace.After(now)
|
||||
}
|
||||
return now.After(lastOk.Add(interval))
|
||||
}
|
||||
|
||||
// watch is a loop that checks for allocations that should be stopped. It also manages the
|
||||
// registration of allocs to be stopped in a single thread.
|
||||
func (h *heartbeatStop) watch() {
|
||||
@@ -116,7 +115,7 @@ func (h *heartbeatStop) watch() {
|
||||
|
||||
now = time.Now()
|
||||
for allocID, d := range h.allocInterval {
|
||||
if now.After(h.lastOk.Add(d)) {
|
||||
if h.shouldStopAfter(now, d) {
|
||||
stop <- allocID
|
||||
}
|
||||
}
|
||||
@@ -124,16 +123,10 @@ func (h *heartbeatStop) watch() {
|
||||
}
|
||||
|
||||
// setLastOk sets the last known good heartbeat time to the current time, and persists that time to disk
|
||||
func (h *heartbeatStop) setLastOk() error {
|
||||
func (h *heartbeatStop) setLastOk(t time.Time) {
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
t := time.Now()
|
||||
h.lastOk = t
|
||||
// We may encounter an error here, but want to update the running time
|
||||
// unconditionally, since we'll actively terminate stateful tasks if it ages too
|
||||
// much. We only use the state value when restarting the client itself after a
|
||||
// crash, so it's better to update the runtime value and have the stored value stale
|
||||
return h.state.PutLastHeartbeatOk(t)
|
||||
}
|
||||
|
||||
func (h *heartbeatStop) getLastOk() time.Time {
|
||||
|
||||
@@ -23,13 +23,6 @@ func TestHearbeatStop_allocHook(t *testing.T) {
|
||||
})
|
||||
defer cleanupC1()
|
||||
|
||||
// last heartbeat is persisted in the state db
|
||||
err := client.registerNode()
|
||||
require.NoError(t, err)
|
||||
last, err := client.stateDB.GetLastHeartbeatOk()
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, last)
|
||||
|
||||
// an allocation, with a tiny lease
|
||||
d := 1 * time.Microsecond
|
||||
alloc := &structs.Allocation{
|
||||
@@ -51,7 +44,7 @@ func TestHearbeatStop_allocHook(t *testing.T) {
|
||||
}
|
||||
|
||||
// alloc added to heartbeatStop.allocs
|
||||
err = client.addAlloc(alloc, "")
|
||||
err := client.addAlloc(alloc, "")
|
||||
require.NoError(t, err)
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
_, ok := client.heartbeatStop.allocInterval[alloc.ID]
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
||||
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
|
||||
"github.com/hashicorp/nomad/client/dynamicplugins"
|
||||
@@ -78,13 +76,6 @@ type StateDB interface {
|
||||
// PutDynamicPluginRegistryState is used to store the dynamic plugin managers's state.
|
||||
PutDynamicPluginRegistryState(state *dynamicplugins.RegistryState) error
|
||||
|
||||
// GetLastHeartbeatOk retrieves the stored last known good heartbeat
|
||||
GetLastHeartbeatOk() (time.Time, error)
|
||||
|
||||
// PutLastHeartbeatOk stores the last heartbeat known to have made the round trip to
|
||||
// the server
|
||||
PutLastHeartbeatOk(time.Time) error
|
||||
|
||||
// Close the database. Unsafe for further use after calling regardless
|
||||
// of return value.
|
||||
Close() error
|
||||
|
||||
@@ -2,7 +2,6 @@ package state
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
||||
@@ -34,9 +33,6 @@ type MemDB struct {
|
||||
// dynamicmanager -> registry-state
|
||||
dynamicManagerPs *dynamicplugins.RegistryState
|
||||
|
||||
// lastHeartbeatOk -> last_heartbeat_ok
|
||||
lastHeartbeatOk time.Time
|
||||
|
||||
logger hclog.Logger
|
||||
|
||||
mu sync.RWMutex
|
||||
@@ -93,19 +89,6 @@ func (m *MemDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentS
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MemDB) GetLastHeartbeatOk() (time.Time, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return m.lastHeartbeatOk, nil
|
||||
}
|
||||
|
||||
func (m *MemDB) PutLastHeartbeatOk(t time.Time) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.lastHeartbeatOk = t
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MemDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
||||
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
|
||||
"github.com/hashicorp/nomad/client/dynamicplugins"
|
||||
@@ -81,14 +79,6 @@ func (n NoopDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState,
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (n NoopDB) PutLastHeartbeatOk(t time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n NoopDB) GetLastHeartbeatOk() (time.Time, error) {
|
||||
return time.Time{}, nil
|
||||
}
|
||||
|
||||
func (n NoopDB) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -91,12 +91,6 @@ var (
|
||||
|
||||
// registryStateKey is the key at which dynamic plugin registry state is stored
|
||||
registryStateKey = []byte("registry_state")
|
||||
|
||||
// lastHeartbeatOkBucket is the bucket for storing the last known good heartbeat time
|
||||
lastHeartbeatOkBucket = []byte("last_heartbeat_ok")
|
||||
|
||||
// lastHeartbeatOkKey is the key for the last known good heartbeat time
|
||||
lastHeartbeatOkKey = []byte("last_heartbeat")
|
||||
)
|
||||
|
||||
// taskBucketName returns the bucket name for the given task name.
|
||||
@@ -661,51 +655,6 @@ func (s *BoltStateDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryS
|
||||
return ps, nil
|
||||
}
|
||||
|
||||
// PutLastHeartbeatOk stores the dynamic plugin registry's
|
||||
// state or returns an error.
|
||||
func (s *BoltStateDB) PutLastHeartbeatOk(t time.Time) error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
// Retrieve the root dynamic plugin manager bucket
|
||||
dynamicBkt, err := tx.CreateBucketIfNotExists(lastHeartbeatOkBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return dynamicBkt.Put(lastHeartbeatOkKey, t.Unix())
|
||||
})
|
||||
}
|
||||
|
||||
// GetLastHeartbeatOk stores the dynamic plugin registry's
|
||||
// registry state or returns an error.
|
||||
func (s *BoltStateDB) GetLastHeartbeatOk() (time.Time, error) {
|
||||
var unix int64
|
||||
err := s.db.View(func(tx *boltdd.Tx) error {
|
||||
dynamicBkt := tx.Bucket(lastHeartbeatOkBucket)
|
||||
if dynamicBkt == nil {
|
||||
// No state, return
|
||||
return nil
|
||||
}
|
||||
|
||||
// Restore Plugin State if it exists
|
||||
if err := dynamicBkt.Get(lastHeartbeatOkKey, unix); err != nil {
|
||||
if !boltdd.IsErrNotFound(err) {
|
||||
return fmt.Errorf("failed to read last heartbeat state: %v", err)
|
||||
}
|
||||
|
||||
// Key not found, reset output to nil
|
||||
unix = 0
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return time.Time{}, err
|
||||
}
|
||||
|
||||
return time.Unix(unix, 0), nil
|
||||
}
|
||||
|
||||
// init initializes metadata entries in a newly created state database.
|
||||
func (s *BoltStateDB) init() error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
|
||||
Reference in New Issue
Block a user