mirror of
https://github.com/kemko/nomad.git
synced 2026-01-08 03:15:42 +03:00
Merge pull request #4166 from hashicorp/b-panic-fix-update
Fixes races accessing node and updating it during fingerprinting
This commit is contained in:
@@ -259,7 +259,13 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
||||
return nil, fmt.Errorf("node setup failed: %v", err)
|
||||
}
|
||||
|
||||
fingerprintManager := NewFingerprintManager(c.GetConfig, c.config.Node,
|
||||
// Store the config copy before restoring state but after it has been
|
||||
// initialized.
|
||||
c.configLock.Lock()
|
||||
c.configCopy = c.config.Copy()
|
||||
c.configLock.Unlock()
|
||||
|
||||
fingerprintManager := NewFingerprintManager(c.GetConfig, c.configCopy.Node,
|
||||
c.shutdownCh, c.updateNodeFromFingerprint, c.updateNodeFromDriver,
|
||||
c.logger)
|
||||
|
||||
@@ -271,12 +277,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
||||
// Setup the reserved resources
|
||||
c.reservePorts()
|
||||
|
||||
// Store the config copy before restoring state but after it has been
|
||||
// initialized.
|
||||
c.configLock.Lock()
|
||||
c.configCopy = c.config.Copy()
|
||||
c.configLock.Unlock()
|
||||
|
||||
// Set the preconfigured list of static servers
|
||||
c.configLock.RLock()
|
||||
if len(c.configCopy.Servers) > 0 {
|
||||
@@ -437,7 +437,7 @@ func (c *Client) Leave() error {
|
||||
func (c *Client) GetConfig() *config.Config {
|
||||
c.configLock.Lock()
|
||||
defer c.configLock.Unlock()
|
||||
return c.config
|
||||
return c.configCopy
|
||||
}
|
||||
|
||||
// Datacenter returns the datacenter for the given client
|
||||
@@ -726,7 +726,7 @@ func (c *Client) restoreState() error {
|
||||
watcher := noopPrevAlloc{}
|
||||
|
||||
c.configLock.RLock()
|
||||
ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, watcher)
|
||||
ar := NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, watcher)
|
||||
c.configLock.RUnlock()
|
||||
|
||||
c.allocLock.Lock()
|
||||
@@ -963,6 +963,9 @@ func (c *Client) reservePorts() {
|
||||
for _, net := range reservedIndex {
|
||||
node.Reserved.Networks = append(node.Reserved.Networks, net)
|
||||
}
|
||||
|
||||
// Make the changes available to the config copy.
|
||||
c.configCopy = c.config.Copy()
|
||||
}
|
||||
|
||||
// updateNodeFromFingerprint updates the node with the result of
|
||||
@@ -1009,10 +1012,10 @@ func (c *Client) updateNodeFromFingerprint(response *cstructs.FingerprintRespons
|
||||
}
|
||||
|
||||
if nodeHasChanged {
|
||||
c.updateNode()
|
||||
c.updateNodeLocked()
|
||||
}
|
||||
|
||||
return c.config.Node
|
||||
return c.configCopy.Node
|
||||
}
|
||||
|
||||
// updateNodeFromDriver receives either a fingerprint of the driver or its
|
||||
@@ -1104,10 +1107,10 @@ func (c *Client) updateNodeFromDriver(name string, fingerprint, health *structs.
|
||||
|
||||
if hasChanged {
|
||||
c.config.Node.Drivers[name].UpdateTime = time.Now()
|
||||
c.updateNode()
|
||||
c.updateNodeLocked()
|
||||
}
|
||||
|
||||
return c.config.Node
|
||||
return c.configCopy.Node
|
||||
}
|
||||
|
||||
// resourcesAreEqual is a temporary function to compare whether resources are
|
||||
@@ -1752,9 +1755,14 @@ OUTER:
|
||||
}
|
||||
}
|
||||
|
||||
// updateNode triggers a client to update its node copy if it isn't doing
|
||||
// so already
|
||||
func (c *Client) updateNode() {
|
||||
// updateNode updates the Node copy and triggers the client to send the updated
|
||||
// Node to the server. This should be done while the caller holds the
|
||||
// configLock lock.
|
||||
func (c *Client) updateNodeLocked() {
|
||||
// Update the config copy.
|
||||
node := c.config.Node.Copy()
|
||||
c.configCopy.Node = node
|
||||
|
||||
select {
|
||||
case c.triggerNodeUpdate <- struct{}{}:
|
||||
// Node update goroutine was released to execute
|
||||
@@ -1774,15 +1782,7 @@ func (c *Client) watchNodeUpdates() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
c.logger.Printf("[DEBUG] client: state changed, updating node and re-registering.")
|
||||
|
||||
// Update the config copy.
|
||||
c.configLock.Lock()
|
||||
node := c.config.Node.Copy()
|
||||
c.configCopy.Node = node
|
||||
c.configLock.Unlock()
|
||||
|
||||
c.retryRegisterNode()
|
||||
|
||||
hasChanged = false
|
||||
case <-c.triggerNodeUpdate:
|
||||
if hasChanged {
|
||||
@@ -1899,7 +1899,10 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
|
||||
c.configLock.RLock()
|
||||
prevAlloc := newAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger, migrateToken)
|
||||
|
||||
ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc)
|
||||
// Copy the config since the node can be swapped out as it is being updated.
|
||||
// The long term fix is to pass in the config and node separately and then
|
||||
// we don't have to do a copy.
|
||||
ar := NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc)
|
||||
c.configLock.RUnlock()
|
||||
|
||||
// Store the alloc runner.
|
||||
|
||||
@@ -53,10 +53,16 @@ func NewFingerprintManager(getConfig func() *config.Config,
|
||||
func (fm *FingerprintManager) setNode(node *structs.Node) {
|
||||
fm.nodeLock.Lock()
|
||||
defer fm.nodeLock.Unlock()
|
||||
|
||||
fm.node = node
|
||||
}
|
||||
|
||||
// getNode returns the current client node
|
||||
func (fm *FingerprintManager) getNode() *structs.Node {
|
||||
fm.nodeLock.Lock()
|
||||
defer fm.nodeLock.Unlock()
|
||||
return fm.node
|
||||
}
|
||||
|
||||
// Run starts the process of fingerprinting the node. It does an initial pass,
|
||||
// identifying whitelisted and blacklisted fingerprints/drivers. Then, for
|
||||
// those which require periotic checking, it starts a periodic process for
|
||||
@@ -167,7 +173,7 @@ func (fm *FingerprintManager) setupFingerprinters(fingerprints []string) error {
|
||||
// supported
|
||||
func (fm *FingerprintManager) setupDrivers(drivers []string) error {
|
||||
var availDrivers []string
|
||||
driverCtx := driver.NewDriverContext("", "", fm.getConfig(), fm.node, fm.logger, nil)
|
||||
driverCtx := driver.NewDriverContext("", "", fm.getConfig(), fm.getNode(), fm.logger, nil)
|
||||
for _, name := range drivers {
|
||||
|
||||
d, err := driver.NewDriver(name, driverCtx)
|
||||
|
||||
@@ -2,6 +2,8 @@ package structs
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
)
|
||||
|
||||
// DriverInfo is the current state of a single driver. This is updated
|
||||
@@ -14,6 +16,17 @@ type DriverInfo struct {
|
||||
UpdateTime time.Time
|
||||
}
|
||||
|
||||
func (di *DriverInfo) Copy() *DriverInfo {
|
||||
if di == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
cdi := new(DriverInfo)
|
||||
*cdi = *di
|
||||
cdi.Attributes = helper.CopyMapStringString(di.Attributes)
|
||||
return cdi
|
||||
}
|
||||
|
||||
// MergeHealthCheck merges information from a health check for a drier into a
|
||||
// node's driver info
|
||||
func (di *DriverInfo) MergeHealthCheck(other *DriverInfo) {
|
||||
|
||||
@@ -1461,6 +1461,7 @@ func (n *Node) Copy() *Node {
|
||||
nn.Meta = helper.CopyMapStringString(nn.Meta)
|
||||
nn.Events = copyNodeEvents(n.Events)
|
||||
nn.DrainStrategy = nn.DrainStrategy.Copy()
|
||||
nn.Drivers = copyNodeDrivers(n.Drivers)
|
||||
return nn
|
||||
}
|
||||
|
||||
@@ -1478,6 +1479,20 @@ func copyNodeEvents(events []*NodeEvent) []*NodeEvent {
|
||||
return c
|
||||
}
|
||||
|
||||
// copyNodeDrivers is a helper to copy a map of DriverInfo
|
||||
func copyNodeDrivers(drivers map[string]*DriverInfo) map[string]*DriverInfo {
|
||||
l := len(drivers)
|
||||
if l == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
c := make(map[string]*DriverInfo, l)
|
||||
for driver, info := range drivers {
|
||||
c[driver] = info.Copy()
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// TerminalStatus returns if the current status is terminal and
|
||||
// will no longer transition.
|
||||
func (n *Node) TerminalStatus() bool {
|
||||
|
||||
@@ -3676,3 +3676,80 @@ func TestNode_Canonicalize(t *testing.T) {
|
||||
node.Canonicalize()
|
||||
require.Equal(NodeSchedulingIneligible, node.SchedulingEligibility)
|
||||
}
|
||||
|
||||
func TestNode_Copy(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
node := &Node{
|
||||
ID: uuid.Generate(),
|
||||
SecretID: uuid.Generate(),
|
||||
Datacenter: "dc1",
|
||||
Name: "foobar",
|
||||
Attributes: map[string]string{
|
||||
"kernel.name": "linux",
|
||||
"arch": "x86",
|
||||
"nomad.version": "0.5.0",
|
||||
"driver.exec": "1",
|
||||
"driver.mock_driver": "1",
|
||||
},
|
||||
Resources: &Resources{
|
||||
CPU: 4000,
|
||||
MemoryMB: 8192,
|
||||
DiskMB: 100 * 1024,
|
||||
IOPS: 150,
|
||||
Networks: []*NetworkResource{
|
||||
{
|
||||
Device: "eth0",
|
||||
CIDR: "192.168.0.100/32",
|
||||
MBits: 1000,
|
||||
},
|
||||
},
|
||||
},
|
||||
Reserved: &Resources{
|
||||
CPU: 100,
|
||||
MemoryMB: 256,
|
||||
DiskMB: 4 * 1024,
|
||||
Networks: []*NetworkResource{
|
||||
{
|
||||
Device: "eth0",
|
||||
IP: "192.168.0.100",
|
||||
ReservedPorts: []Port{{Label: "ssh", Value: 22}},
|
||||
MBits: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
Links: map[string]string{
|
||||
"consul": "foobar.dc1",
|
||||
},
|
||||
Meta: map[string]string{
|
||||
"pci-dss": "true",
|
||||
"database": "mysql",
|
||||
"version": "5.6",
|
||||
},
|
||||
NodeClass: "linux-medium-pci",
|
||||
Status: NodeStatusReady,
|
||||
SchedulingEligibility: NodeSchedulingEligible,
|
||||
Drivers: map[string]*DriverInfo{
|
||||
"mock_driver": &DriverInfo{
|
||||
Attributes: map[string]string{"running": "1"},
|
||||
Detected: true,
|
||||
Healthy: true,
|
||||
HealthDescription: "Currently active",
|
||||
UpdateTime: time.Now(),
|
||||
},
|
||||
},
|
||||
}
|
||||
node.ComputeClass()
|
||||
|
||||
node2 := node.Copy()
|
||||
|
||||
require.Equal(node.Attributes, node2.Attributes)
|
||||
require.Equal(node.Resources, node2.Resources)
|
||||
require.Equal(node.Reserved, node2.Reserved)
|
||||
require.Equal(node.Links, node2.Links)
|
||||
require.Equal(node.Meta, node2.Meta)
|
||||
require.Equal(node.Events, node2.Events)
|
||||
require.Equal(node.DrainStrategy, node2.DrainStrategy)
|
||||
require.Equal(node.Drivers, node2.Drivers)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user