CNI Implementation (#7518)

This commit is contained in:
Nick Ethier
2020-05-15 11:09:01 -04:00
committed by Michael Schurter
parent e9ff8a8daa
commit 33ce12cda9
25 changed files with 724 additions and 180 deletions

View File

@@ -117,6 +117,9 @@ func netModeToIsolationMode(netMode string) drivers.NetIsolationMode {
case "driver":
return drivers.NetIsolationModeTask
default:
if strings.HasPrefix(strings.ToLower(netMode), "cni/") {
return drivers.NetIsolationModeGroup
}
return drivers.NetIsolationModeHost
}
}
@@ -129,9 +132,13 @@ func newNetworkConfigurator(log hclog.Logger, alloc *structs.Allocation, config
return &hostNetworkConfigurator{}, nil
}
switch strings.ToLower(tg.Networks[0].Mode) {
case "bridge":
netMode := strings.ToLower(tg.Networks[0].Mode)
switch {
case netMode == "bridge":
return newBridgeNetworkConfigurator(log, config.BridgeNetworkName, config.BridgeNetworkAllocSubnet, config.CNIPath)
case strings.HasPrefix(netMode, "cni/"):
return newCNINetworkConfigurator(log, config.CNIPath, config.CNIInterfacePrefix, config.CNIConfigDir, netMode[4:])
default:
return &hostNetworkConfigurator{}, nil
}

View File

@@ -3,12 +3,7 @@ package allocrunner
import (
"context"
"fmt"
"math/rand"
"os"
"path/filepath"
"time"
cni "github.com/containerd/go-cni"
"github.com/coreos/go-iptables/iptables"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs"
@@ -16,14 +11,6 @@ import (
)
const (
// envCNIPath is the environment variable name to use to derive the CNI path
// when it is not explicitly set by the client
envCNIPath = "CNI_PATH"
// defaultCNIPath is the CNI path to use when it is not set by the client
// and is not set by environment variable
defaultCNIPath = "/opt/cni/bin"
// defaultNomadBridgeName is the name of the bridge to use when not set by
// the client
defaultNomadBridgeName = "nomad"
@@ -45,11 +32,10 @@ const (
// shared bridge, configures masquerading for egress traffic and port mapping
// for ingress
type bridgeNetworkConfigurator struct {
cni cni.CNI
cni *cniNetworkConfigurator
allocSubnet string
bridgeName string
rand *rand.Rand
logger hclog.Logger
}
@@ -57,21 +43,8 @@ func newBridgeNetworkConfigurator(log hclog.Logger, bridgeName, ipRange, cniPath
b := &bridgeNetworkConfigurator{
bridgeName: bridgeName,
allocSubnet: ipRange,
rand: rand.New(rand.NewSource(time.Now().Unix())),
logger: log,
}
if cniPath == "" {
if cniPath = os.Getenv(envCNIPath); cniPath == "" {
cniPath = defaultCNIPath
}
}
c, err := cni.New(cni.WithPluginDir(filepath.SplitList(cniPath)),
cni.WithInterfacePrefix(bridgeNetworkAllocIfPrefix))
if err != nil {
return nil, err
}
b.cni = c
if b.bridgeName == "" {
b.bridgeName = defaultNomadBridgeName
@@ -81,6 +54,12 @@ func newBridgeNetworkConfigurator(log hclog.Logger, bridgeName, ipRange, cniPath
b.allocSubnet = defaultNomadAllocSubnet
}
c, err := newCNINetworkConfiguratorWithConf(log, cniPath, bridgeNetworkAllocIfPrefix, buildNomadBridgeNetConfig(b.bridgeName, b.allocSubnet))
if err != nil {
return nil, err
}
b.cni = c
return b, nil
}
@@ -148,72 +127,16 @@ func (b *bridgeNetworkConfigurator) Setup(ctx context.Context, alloc *structs.Al
return fmt.Errorf("failed to initialize table forwarding rules: %v", err)
}
if err := b.ensureCNIInitialized(); err != nil {
return err
}
// Depending on the version of bridge cni plugin (< 0.8.4) a known race could occur
// where two alloc attempt to create the nomad bridge at the same time, resulting
// in one of them to fail. This retry attempts to overcome those erroneous failures.
const retry = 3
for attempt := 1; ; attempt++ {
//TODO eventually returning the IP from the result would be nice to have in the alloc
if _, err := b.cni.Setup(ctx, alloc.ID, spec.Path, cni.WithCapabilityPortMap(getPortMapping(alloc))); err != nil {
b.logger.Warn("failed to configure bridge network", "err", err, "attempt", attempt)
if attempt == retry {
return fmt.Errorf("failed to configure bridge network: %v", err)
}
// Sleep for 1 second + jitter
time.Sleep(time.Second + (time.Duration(b.rand.Int63n(1000)) * time.Millisecond))
continue
}
break
}
return nil
return b.cni.Setup(ctx, alloc, spec)
}
// Teardown calls the CNI plugins with the delete action
func (b *bridgeNetworkConfigurator) Teardown(ctx context.Context, alloc *structs.Allocation, spec *drivers.NetworkIsolationSpec) error {
if err := b.ensureCNIInitialized(); err != nil {
return err
}
return b.cni.Remove(ctx, alloc.ID, spec.Path, cni.WithCapabilityPortMap(getPortMapping(alloc)))
return b.cni.Teardown(ctx, alloc, spec)
}
func (b *bridgeNetworkConfigurator) ensureCNIInitialized() error {
if err := b.cni.Status(); cni.IsCNINotInitialized(err) {
return b.cni.Load(cni.WithConfListBytes(b.buildNomadNetConfig()))
} else {
return err
}
}
// getPortMapping builds a list of portMapping structs that are used as the
// portmapping capability arguments for the portmap CNI plugin
func getPortMapping(alloc *structs.Allocation) []cni.PortMapping {
ports := []cni.PortMapping{}
for _, network := range alloc.AllocatedResources.Shared.Networks {
for _, port := range append(network.DynamicPorts, network.ReservedPorts...) {
if port.To < 1 {
continue
}
for _, proto := range []string{"tcp", "udp"} {
ports = append(ports, cni.PortMapping{
HostPort: int32(port.Value),
ContainerPort: int32(port.To),
Protocol: proto,
})
}
}
}
return ports
}
func (b *bridgeNetworkConfigurator) buildNomadNetConfig() []byte {
return []byte(fmt.Sprintf(nomadCNIConfigTemplate, b.bridgeName, b.allocSubnet, cniAdminChainName))
func buildNomadBridgeNetConfig(bridgeName, subnet string) []byte {
return []byte(fmt.Sprintf(nomadCNIConfigTemplate, bridgeName, subnet, cniAdminChainName))
}
const nomadCNIConfigTemplate = `{

View File

@@ -0,0 +1,182 @@
package allocrunner
import (
"context"
"fmt"
"math/rand"
"os"
"path/filepath"
"sort"
"strings"
"time"
cni "github.com/containerd/go-cni"
cnilibrary "github.com/containernetworking/cni/libcni"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
const (
// envCNIPath is the environment variable name to use to derive the CNI path
// when it is not explicitly set by the client
envCNIPath = "CNI_PATH"
// defaultCNIPath is the CNI path to use when it is not set by the client
// and is not set by environment variable
defaultCNIPath = "/opt/cni/bin"
// defaultCNIInterfacePrefix is the network interface to use if not set in
// client config
defaultCNIInterfacePrefix = "eth"
)
type cniNetworkConfigurator struct {
cni cni.CNI
cniConf []byte
rand *rand.Rand
logger log.Logger
}
func newCNINetworkConfigurator(logger log.Logger, cniPath, cniInterfacePrefix, cniConfDir, networkName string) (*cniNetworkConfigurator, error) {
cniConf, err := loadCNIConf(cniConfDir, networkName)
if err != nil {
return nil, fmt.Errorf("failed to load CNI config: %v", err)
}
return newCNINetworkConfiguratorWithConf(logger, cniPath, cniInterfacePrefix, cniConf)
}
func newCNINetworkConfiguratorWithConf(logger log.Logger, cniPath, cniInterfacePrefix string, cniConf []byte) (*cniNetworkConfigurator, error) {
conf := &cniNetworkConfigurator{
cniConf: cniConf,
rand: rand.New(rand.NewSource(time.Now().Unix())),
logger: logger,
}
if cniPath == "" {
if cniPath = os.Getenv(envCNIPath); cniPath == "" {
cniPath = defaultCNIPath
}
}
if cniInterfacePrefix == "" {
cniInterfacePrefix = defaultCNIInterfacePrefix
}
c, err := cni.New(cni.WithPluginDir(filepath.SplitList(cniPath)),
cni.WithInterfacePrefix(cniInterfacePrefix))
if err != nil {
return nil, err
}
conf.cni = c
return conf, nil
}
// Setup calls the CNI plugins with the add action
func (c *cniNetworkConfigurator) Setup(ctx context.Context, alloc *structs.Allocation, spec *drivers.NetworkIsolationSpec) error {
if err := c.ensureCNIInitialized(); err != nil {
return err
}
// Depending on the version of bridge cni plugin used, a known race could occure
// where two alloc attempt to create the nomad bridge at the same time, resulting
// in one of them to fail. This rety attempts to overcome those erroneous failures.
const retry = 3
var firstError error
for attempt := 1; ; attempt++ {
//TODO eventually returning the IP from the result would be nice to have in the alloc
if _, err := c.cni.Setup(ctx, alloc.ID, spec.Path, cni.WithCapabilityPortMap(getPortMapping(alloc))); err != nil {
c.logger.Warn("failed to configure network", "err", err, "attempt", attempt)
switch attempt {
case 1:
firstError = err
case retry:
return fmt.Errorf("failed to configure network: %v", firstError)
}
// Sleep for 1 second + jitter
time.Sleep(time.Second + (time.Duration(c.rand.Int63n(1000)) * time.Millisecond))
continue
}
break
}
return nil
}
func loadCNIConf(confDir, name string) ([]byte, error) {
files, err := cnilibrary.ConfFiles(confDir, []string{".conf", ".conflist", ".json"})
switch {
case err != nil:
return nil, fmt.Errorf("failed to detect CNI config file: %v", err)
case len(files) == 0:
return nil, fmt.Errorf("no CNI network config found in %s", confDir)
}
// files contains the network config files associated with cni network.
// Use lexicographical way as a defined order for network config files.
sort.Strings(files)
for _, confFile := range files {
if strings.HasSuffix(confFile, ".conflist") {
confList, err := cnilibrary.ConfListFromFile(confFile)
if err != nil {
return nil, fmt.Errorf("failed to load CNI config list file %s: %v", confFile, err)
}
if confList.Name == name {
return confList.Bytes, nil
}
} else {
conf, err := cnilibrary.ConfFromFile(confFile)
if err != nil {
return nil, fmt.Errorf("failed to load CNI config file %s: %v", confFile, err)
}
if conf.Network.Name == name {
return conf.Bytes, nil
}
}
}
return nil, fmt.Errorf("CNI network config not found for name %q", name)
}
// Teardown calls the CNI plugins with the delete action
func (c *cniNetworkConfigurator) Teardown(ctx context.Context, alloc *structs.Allocation, spec *drivers.NetworkIsolationSpec) error {
if err := c.ensureCNIInitialized(); err != nil {
return err
}
return c.cni.Remove(ctx, alloc.ID, spec.Path, cni.WithCapabilityPortMap(getPortMapping(alloc)))
}
func (c *cniNetworkConfigurator) ensureCNIInitialized() error {
if err := c.cni.Status(); cni.IsCNINotInitialized(err) {
return c.cni.Load(cni.WithConfListBytes(c.cniConf))
} else {
return err
}
}
// getPortMapping builds a list of portMapping structs that are used as the
// portmapping capability arguments for the portmap CNI plugin
func getPortMapping(alloc *structs.Allocation) []cni.PortMapping {
ports := []cni.PortMapping{}
for _, network := range alloc.AllocatedResources.Shared.Networks {
for _, port := range append(network.DynamicPorts, network.ReservedPorts...) {
if port.To < 1 {
continue
}
for _, proto := range []string{"tcp", "udp"} {
ports = append(ports, cni.PortMapping{
HostPort: int32(port.Value),
ContainerPort: int32(port.To),
Protocol: proto,
})
}
}
}
return ports
}

View File

@@ -1436,7 +1436,6 @@ func (c *Client) updateNodeFromFingerprint(response *fingerprint.FingerprintResp
// if we still have node changes, merge them
if response.Resources != nil {
response.Resources.Networks = updateNetworks(
c.config.Node.Resources.Networks,
response.Resources.Networks,
c.config)
if !c.config.Node.Resources.Equals(response.Resources) {
@@ -1449,7 +1448,6 @@ func (c *Client) updateNodeFromFingerprint(response *fingerprint.FingerprintResp
// if we still have node changes, merge them
if response.NodeResources != nil {
response.NodeResources.Networks = updateNetworks(
c.config.Node.NodeResources.Networks,
response.NodeResources.Networks,
c.config)
if !c.config.Node.NodeResources.Equals(response.NodeResources) {
@@ -1465,33 +1463,40 @@ func (c *Client) updateNodeFromFingerprint(response *fingerprint.FingerprintResp
return c.configCopy.Node
}
// updateNetworks preserves manually configured network options, but
// applies fingerprint updates
func updateNetworks(ns structs.Networks, up structs.Networks, c *config.Config) structs.Networks {
if c.NetworkInterface == "" {
ns = up
} else {
// If a network device is configured, filter up to contain details for only
// updateNetworks filters and overrides network speed of host networks based
// on configured settings
func updateNetworks(up structs.Networks, c *config.Config) structs.Networks {
if up == nil {
return nil
}
if c.NetworkInterface != "" {
// For host networks, if a network device is configured filter up to contain details for only
// that device
upd := []*structs.NetworkResource{}
for _, n := range up {
if c.NetworkInterface == n.Device {
switch n.Mode {
case "host":
if c.NetworkInterface == n.Device {
upd = append(upd, n)
}
default:
upd = append(upd, n)
}
}
// If updates, use them. Otherwise, ns contains the configured interfaces
if len(upd) > 0 {
ns = upd
}
up = upd
}
// ns is set, apply the config NetworkSpeed to all
// if set, apply the config NetworkSpeed to networks in host mode
if c.NetworkSpeed != 0 {
for _, n := range ns {
n.MBits = c.NetworkSpeed
for _, n := range up {
if n.Mode == "host" {
n.MBits = c.NetworkSpeed
}
}
}
return ns
return up
}
// retryIntv calculates a retry interval value given the base

View File

@@ -1184,17 +1184,16 @@ func TestClient_UpdateNodeFromFingerprintKeepsConfig(t *testing.T) {
client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
NodeResources: &structs.NodeResources{
Cpu: structs.NodeCpuResources{CpuShares: 123},
Networks: []*structs.NetworkResource{{Device: "any-interface"}},
Networks: []*structs.NetworkResource{{Mode: "host", Device: "any-interface"}},
},
Resources: &structs.Resources{
CPU: 80,
Networks: []*structs.NetworkResource{{Device: "any-interface"}},
CPU: 80,
},
})
assert.Equal(t, int64(123), client.config.Node.NodeResources.Cpu.CpuShares)
assert.Equal(t, "any-interface", client.config.Node.NodeResources.Networks[0].Device)
assert.Equal(t, 80, client.config.Node.Resources.CPU)
assert.Equal(t, "any-interface", client.config.Node.Resources.Networks[0].Device)
idx := len(client.config.Node.NodeResources.Networks) - 1
require.Equal(t, int64(123), client.config.Node.NodeResources.Cpu.CpuShares)
require.Equal(t, "any-interface", client.config.Node.NodeResources.Networks[idx].Device)
require.Equal(t, 80, client.config.Node.Resources.CPU)
// lookup an interface. client.Node starts with a hardcoded value, eth0,
// and is only updated async through fingerprinter.
@@ -1210,48 +1209,43 @@ func TestClient_UpdateNodeFromFingerprintKeepsConfig(t *testing.T) {
client, cleanup = TestClient(t, func(c *config.Config) {
c.NetworkInterface = dev
c.Node.Name = name
c.Options["fingerprint.blacklist"] = "network"
// Node is already a mock.Node, with a device
c.Node.NodeResources.Networks[0].Device = dev
c.Node.Resources.Networks = c.Node.NodeResources.Networks
})
defer cleanup()
client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
NodeResources: &structs.NodeResources{
Cpu: structs.NodeCpuResources{CpuShares: 123},
Networks: []*structs.NetworkResource{
{Device: "any-interface", MBits: 20},
{Device: dev, MBits: 20},
{Mode: "host", Device: "any-interface", MBits: 20},
},
},
Resources: &structs.Resources{
CPU: 80,
Networks: []*structs.NetworkResource{{Device: "any-interface"}},
},
})
assert.Equal(t, int64(123), client.config.Node.NodeResources.Cpu.CpuShares)
require.Equal(t, int64(123), client.config.Node.NodeResources.Cpu.CpuShares)
// only the configured device is kept
assert.Equal(t, 1, len(client.config.Node.NodeResources.Networks))
assert.Equal(t, dev, client.config.Node.NodeResources.Networks[0].Device)
// network speed updates to the configured network are kept
assert.Equal(t, 20, client.config.Node.NodeResources.Networks[0].MBits)
assert.Equal(t, 80, client.config.Node.Resources.CPU)
assert.Equal(t, dev, client.config.Node.Resources.Networks[0].Device)
require.Equal(t, 2, len(client.config.Node.NodeResources.Networks))
require.Equal(t, dev, client.config.Node.NodeResources.Networks[0].Device)
require.Equal(t, "bridge", client.config.Node.NodeResources.Networks[1].Mode)
// Network speed is applied to all NetworkResources
client.config.NetworkInterface = ""
client.config.NetworkSpeed = 100
client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
NodeResources: &structs.NodeResources{
Cpu: structs.NodeCpuResources{CpuShares: 123},
Networks: []*structs.NetworkResource{{Device: "any-interface", MBits: 20}},
Cpu: structs.NodeCpuResources{CpuShares: 123},
Networks: []*structs.NetworkResource{
{Mode: "host", Device: "any-interface", MBits: 20},
},
},
Resources: &structs.Resources{
CPU: 80,
Networks: []*structs.NetworkResource{{Device: "any-interface"}},
CPU: 80,
},
})
assert.Equal(t, "any-interface", client.config.Node.NodeResources.Networks[0].Device)
assert.Equal(t, 100, client.config.Node.NodeResources.Networks[0].MBits)
assert.Equal(t, 3, len(client.config.Node.NodeResources.Networks))
assert.Equal(t, "any-interface", client.config.Node.NodeResources.Networks[2].Device)
assert.Equal(t, 100, client.config.Node.NodeResources.Networks[2].MBits)
assert.Equal(t, 0, client.config.Node.NodeResources.Networks[1].MBits)
}
// Support multiple IP addresses (ipv4 vs. 6, e.g.) on the configured network interface
@@ -1269,7 +1263,7 @@ func Test_UpdateNodeFromFingerprintMultiIP(t *testing.T) {
// Client without network configured updates to match fingerprint
client, cleanup := TestClient(t, func(c *config.Config) {
c.NetworkInterface = dev
c.Node.NodeResources.Networks[0].Device = dev
c.Options["fingerprint.blacklist"] = "network,cni,bridge"
c.Node.Resources.Networks = c.Node.NodeResources.Networks
})
defer cleanup()
@@ -1284,12 +1278,13 @@ func Test_UpdateNodeFromFingerprintMultiIP(t *testing.T) {
},
})
two := structs.Networks{
nets := structs.Networks{
mock.Node().NodeResources.Networks[0],
{Device: dev, IP: "127.0.0.1"},
{Device: dev, IP: "::1"},
}
require.Equal(t, two, client.config.Node.NodeResources.Networks)
require.Equal(t, nets, client.config.Node.NodeResources.Networks)
}
func TestClient_computeAllocatedDeviceStats(t *testing.T) {
@@ -1480,6 +1475,9 @@ func TestClient_getAllocatedResources(t *testing.T) {
result := client.getAllocatedResources(client.config.Node)
// Ignore comparing networks for now
result.Flattened.Networks = nil
expected := structs.ComparableResources{
Flattened: structs.AllocatedTaskResources{
Cpu: structs.AllocatedCpuResources{

View File

@@ -234,6 +234,15 @@ type Config struct {
// be specified with colon delimited
CNIPath string
// CNIConfigDir is the directory where CNI network configuration is located. The
// client will use this path when fingerprinting CNI networks.
CNIConfigDir string
// CNIInterfacePrefix is the prefix to use when creating CNI network interfaces. This
// defaults to 'eth', therefore the first interface created by CNI inside the alloc
// network will be 'eth0'.
CNIInterfacePrefix string
// BridgeNetworkName is the name to use for the bridge created in bridge
// networking mode. This defaults to 'nomad' if not set
BridgeNetworkName string
@@ -301,6 +310,9 @@ func DefaultConfig() *Config {
},
BackwardsCompatibleMetrics: false,
RPCHoldTimeout: 5 * time.Second,
CNIPath: "/opt/cni/bin",
CNIConfigDir: "/opt/cni/config",
CNIInterfacePrefix: "eth",
}
}

View File

@@ -0,0 +1,12 @@
package fingerprint
import log "github.com/hashicorp/go-hclog"
type BridgeFingerprint struct {
logger log.Logger
StaticFingerprinter
}
func NewBridgeFingerprint(logger log.Logger) Fingerprint {
return &BridgeFingerprint{logger: logger}
}

View File

@@ -0,0 +1,5 @@
// +build !linux
package fingerprint
func (f *BridgeFingerprint) Fingerprint(*FingerprintRequest, *FingerprintResponse) error { return nil }

View File

@@ -0,0 +1,49 @@
package fingerprint
import (
"bufio"
"fmt"
"os"
"regexp"
"github.com/hashicorp/nomad/nomad/structs"
)
const bridgeKernelModuleName = "bridge"
func (f *BridgeFingerprint) Fingerprint(req *FingerprintRequest, resp *FingerprintResponse) error {
if err := f.checkKMod(bridgeKernelModuleName); err != nil {
f.logger.Warn("failed to detect bridge kernel module, bridge network mode disabled", "error", err)
return nil
}
resp.NodeResources = &structs.NodeResources{
Networks: []*structs.NetworkResource{
{
Mode: "bridge",
},
},
}
resp.Detected = true
return nil
}
func (f *BridgeFingerprint) checkKMod(mod string) error {
file, err := os.Open("/proc/modules")
if err != nil {
return fmt.Errorf("could not read /proc/modules: %v", err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
pattern := fmt.Sprintf("%s\\s+.*$", mod)
for scanner.Scan() {
if matched, err := regexp.MatchString(pattern, scanner.Text()); matched {
return nil
} else if err != nil {
return fmt.Errorf("could not parse /proc/modules: %v", err)
}
}
return fmt.Errorf("could not detect kernel module %s", mod)
}

View File

@@ -0,0 +1,14 @@
package fingerprint
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestBridgeFingerprint_checkKMod(t *testing.T) {
require := require.New(t)
f := &BridgeFingerprint{}
require.NoError(f.checkKMod("ip_tables"))
require.Error(f.checkKMod("nonexistentmodule"))
}

77
client/fingerprint/cni.go Normal file
View File

@@ -0,0 +1,77 @@
package fingerprint
import (
"fmt"
"os"
"strings"
"github.com/containernetworking/cni/libcni"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs"
)
type CNIFingerprint struct {
StaticFingerprinter
logger log.Logger
}
func NewCNIFingerprint(logger log.Logger) Fingerprint {
return &CNIFingerprint{logger: logger}
}
func (f *CNIFingerprint) Fingerprint(req *FingerprintRequest, resp *FingerprintResponse) error {
confDir := req.Config.CNIConfigDir
networks := map[string]struct{}{}
if _, err := os.Stat(confDir); os.IsNotExist(err) {
f.logger.Debug("CNI config dir is not set or does not exist, skipping", "cni_config_dir", confDir)
resp.Detected = false
return nil
}
files, err := libcni.ConfFiles(confDir, []string{".conf", ".conflist", ".json"})
if err != nil {
return fmt.Errorf("failed to detect CNI conf files: %v", err)
}
for _, confFile := range files {
if strings.HasSuffix(confFile, ".conflist") {
confList, err := libcni.ConfListFromFile(confFile)
if err != nil {
return fmt.Errorf("failed to load CNI config list file %s: %v", confFile, err)
}
if _, ok := networks[confList.Name]; ok {
f.logger.Warn("duplicate CNI config names found, ignoring file", "name", confList.Name, "file", confFile)
continue
}
networks[confList.Name] = struct{}{}
} else {
conf, err := libcni.ConfFromFile(confFile)
if err != nil {
return fmt.Errorf("failed to load CNI config file %s: %v", confFile, err)
}
if _, ok := networks[conf.Network.Name]; ok {
f.logger.Warn("duplicate CNI config names found, ignoring file", "name", conf.Network.Name, "file", confFile)
continue
}
networks[conf.Network.Name] = struct{}{}
}
}
var nodeNetworks structs.Networks
for name := range networks {
nodeNetworks = append(nodeNetworks, &structs.NetworkResource{
Mode: fmt.Sprintf("cni/%s", name),
})
f.logger.Debug("detected CNI network", "name", name)
}
resp.NodeResources = &structs.NodeResources{
Networks: nodeNetworks,
}
resp.Detected = true
return nil
}
func (f *CNIFingerprint) Reload() {}

View File

@@ -0,0 +1,85 @@
package fingerprint
import (
"testing"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
// Test that CNI fingerprinter is reloadable
var _ ReloadableFingerprint = &CNIFingerprint{}
func TestCNIFingerprint(t *testing.T) {
cases := []struct {
name string
req *FingerprintRequest
exp *FingerprintResponse
err bool
errMatch string
}{
{
name: "cni config dir not set",
req: &FingerprintRequest{
Config: &config.Config{},
},
exp: &FingerprintResponse{
Detected: false,
},
},
{
name: "cni config dir non-existent",
req: &FingerprintRequest{
Config: &config.Config{
CNIConfigDir: "text_fixtures/cni_nonexistent",
},
},
exp: &FingerprintResponse{
Detected: false,
},
},
{
name: "two networks, no errors",
req: &FingerprintRequest{
Config: &config.Config{
CNIConfigDir: "test_fixtures/cni",
},
},
exp: &FingerprintResponse{
NodeResources: &structs.NodeResources{
Networks: []*structs.NetworkResource{
{
Mode: "cni/net1",
},
{
Mode: "cni/net2",
},
},
},
Detected: true,
},
err: false,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
r := require.New(t)
fp := NewCNIFingerprint(testlog.HCLogger(t))
resp := &FingerprintResponse{}
err := fp.Fingerprint(c.req, resp)
if c.err {
r.Error(err)
r.Contains(err.Error(), c.errMatch)
} else {
r.NoError(err)
r.Equal(c.exp.Detected, resp.Detected)
if resp.NodeResources != nil || c.exp.NodeResources != nil {
r.ElementsMatch(c.exp.NodeResources.Networks, resp.NodeResources.Networks)
}
}
})
}
}

View File

@@ -485,6 +485,7 @@ func (f *EnvAWSFingerprint) Fingerprint(request *FingerprintRequest, response *F
nodeResources = new(structs.NodeResources)
nodeResources.Networks = []*structs.NetworkResource{
{
Mode: "host",
Device: "eth0",
IP: val,
CIDR: val + "/32",

View File

@@ -31,6 +31,7 @@ var (
hostFingerprinters = map[string]Factory{
"arch": NewArchFingerprint,
"consul": NewConsulFingerprint,
"cni": NewCNIFingerprint,
"cpu": NewCPUFingerprint,
"host": NewHostFingerprint,
"memory": NewMemoryFingerprint,
@@ -115,6 +116,13 @@ type Fingerprint interface {
Periodic() (bool, time.Duration)
}
// ReloadableFingerprint can be implemented if the fingerprinter needs to be run during client reload.
// If implemented, the client will call Reload during client reload then immediately Fingerprint
type ReloadableFingerprint interface {
Fingerprint
Reload()
}
// StaticFingerprinter can be embedded in a struct that has a Fingerprint method
// to make it non-periodic.
type StaticFingerprinter struct{}

View File

@@ -2,4 +2,5 @@ package fingerprint
func initPlatformFingerprints(fps map[string]Factory) {
fps["cgroup"] = NewCGroupFingerprint
fps["bridge"] = NewBridgeFingerprint
}

View File

@@ -132,6 +132,7 @@ func (f *NetworkFingerprint) createNetworkResources(throughput int, intf *net.In
for _, addr := range addrs {
// Create a new network resource
newNetwork := &structs.NetworkResource{
Mode: "host",
Device: intf.Name,
MBits: throughput,
}

View File

@@ -0,0 +1,17 @@
{
"cniVersion": "0.2.0",
"name": "net1",
"type": "bridge",
"bridge": "cni0",
"isGateway": true,
"ipMasq": true,
"ipam": {
"type": "host-local",
"subnet": "10.22.0.0/16",
"routes": [
{
"dst": "0.0.0.0/0"
}
]
}
}

View File

@@ -0,0 +1,25 @@
{
"cniVersion": "0.3.1",
"name": "net2",
"plugins": [
{
"type": "ptp",
"ipMasq": true,
"ipam": {
"type": "host-local",
"subnet": "172.16.30.0/24",
"routes": [
{
"dst": "0.0.0.0/0"
}
]
}
},
{
"type": "portmap",
"capabilities": {
"portMappings": true
}
}
]
}

View File

@@ -24,6 +24,8 @@ type FingerprintManager struct {
// associated node
updateNodeAttributes func(*fingerprint.FingerprintResponse) *structs.Node
reloadableFps map[string]fingerprint.ReloadableFingerprint
logger log.Logger
}
@@ -44,6 +46,7 @@ func NewFingerprintManager(
node: node,
shutdownCh: shutdownCh,
logger: logger.Named("fingerprint_mgr"),
reloadableFps: make(map[string]fingerprint.ReloadableFingerprint),
}
}
@@ -103,6 +106,17 @@ func (fp *FingerprintManager) Run() error {
return nil
}
// Reload will reload any registered ReloadableFingerprinters and immediately call Fingerprint
func (fm *FingerprintManager) Reload() {
for name, fp := range fm.reloadableFps {
fm.logger.Info("reloading fingerprinter", "fingerprinter", name)
fp.Reload()
if _, err := fm.fingerprint(name, fp); err != nil {
fm.logger.Warn("error fingerprinting after reload", "fingerprinter", name, "error", err)
}
}
}
// setupFingerprints is used to fingerprint the node to see if these attributes are
// supported
func (fm *FingerprintManager) setupFingerprinters(fingerprints []string) error {
@@ -130,6 +144,10 @@ func (fm *FingerprintManager) setupFingerprinters(fingerprints []string) error {
if p {
go fm.runFingerprint(f, period, name)
}
if rfp, ok := f.(fingerprint.ReloadableFingerprint); ok {
fm.reloadableFps[name] = rfp
}
}
fm.logger.Debug("detected fingerprints", "node_attrs", appliedFingerprints)

View File

@@ -65,6 +65,7 @@ func Node() *structs.Node {
},
Networks: []*structs.NetworkResource{
{
Mode: "host",
Device: "eth0",
CIDR: "192.168.0.100/32",
MBits: 1000,

View File

@@ -12,6 +12,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"hash/crc32"
"math"
"net"
"os"
@@ -2265,53 +2266,23 @@ type NetworkResource struct {
DynamicPorts []Port // Host Dynamically assigned ports
}
func (nr *NetworkResource) Equals(other *NetworkResource) bool {
if nr.Mode != other.Mode {
return false
}
if nr.Device != other.Device {
return false
}
if nr.CIDR != other.CIDR {
return false
}
if nr.IP != other.IP {
return false
}
if nr.MBits != other.MBits {
return false
}
if len(nr.ReservedPorts) != len(other.ReservedPorts) {
return false
}
func (nr *NetworkResource) Hash() uint32 {
var data []byte
data = append(data, []byte(fmt.Sprintf("%s%s%s%s%d", nr.Mode, nr.Device, nr.CIDR, nr.IP, nr.MBits))...)
for i, port := range nr.ReservedPorts {
if len(other.ReservedPorts) <= i {
return false
}
if port != other.ReservedPorts[i] {
return false
}
data = append(data, []byte(fmt.Sprintf("r%d%s%d%d", i, port.Label, port.Value, port.To))...)
}
if len(nr.DynamicPorts) != len(other.DynamicPorts) {
return false
}
for i, port := range nr.DynamicPorts {
if len(other.DynamicPorts) <= i {
return false
}
if port != other.DynamicPorts[i] {
return false
}
data = append(data, []byte(fmt.Sprintf("d%d%s%d%d", i, port.Label, port.Value, port.To))...)
}
return true
return crc32.ChecksumIEEE(data)
}
func (nr *NetworkResource) Equals(other *NetworkResource) bool {
return nr.Hash() == other.Hash()
}
func (n *NetworkResource) Canonicalize() {
@@ -2613,7 +2584,7 @@ func (n *NodeResources) Merge(o *NodeResources) {
n.Disk.Merge(&o.Disk)
if len(o.Networks) != 0 {
n.Networks = o.Networks
n.Networks = append(n.Networks, o.Networks...)
}
if len(o.Devices) != 0 {

View File

@@ -5281,6 +5281,50 @@ func TestMultiregion_CopyCanonicalize(t *testing.T) {
require.False(old.Diff(nonEmptyOld))
}
func TestNodeResources_Merge(t *testing.T) {
res := &NodeResources{
Cpu: NodeCpuResources{
CpuShares: int64(32000),
},
Memory: NodeMemoryResources{
MemoryMB: int64(64000),
},
Networks: Networks{
{
Device: "foo",
},
},
}
res.Merge(&NodeResources{
Memory: NodeMemoryResources{
MemoryMB: int64(100000),
},
Networks: Networks{
{
Mode: "foo/bar",
},
},
})
require.Exactly(t, &NodeResources{
Cpu: NodeCpuResources{
CpuShares: int64(32000),
},
Memory: NodeMemoryResources{
MemoryMB: int64(100000),
},
Networks: Networks{
{
Device: "foo",
},
{
Mode: "foo/bar",
},
},
}, res)
}
func TestMultiregion_Validate(t *testing.T) {
require := require.New(t)
cases := []struct {

View File

@@ -314,6 +314,44 @@ func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) {
return true, ""
}
// NetworkChecker is a FeasibilityChecker which returns whether a node has the
// network resources necessary to schedule the task group
type NetworkChecker struct {
ctx Context
networkMode string
}
func NewNetworkChecker(ctx Context) *NetworkChecker {
return &NetworkChecker{ctx: ctx, networkMode: "host"}
}
func (c *NetworkChecker) SetNetworkMode(netMode string) {
c.networkMode = netMode
}
func (c *NetworkChecker) Feasible(option *structs.Node) bool {
if c.hasNetwork(option) {
return true
}
c.ctx.Metrics().FilterNode(option, "missing network")
return false
}
func (c *NetworkChecker) hasNetwork(option *structs.Node) bool {
if option.NodeResources == nil {
return false
}
for _, nw := range option.NodeResources.Networks {
if nw.Mode == c.networkMode {
return true
}
}
return false
}
// DriverChecker is a FeasibilityChecker which returns whether a node has the
// drivers necessary to scheduler a task group.
type DriverChecker struct {

View File

@@ -397,6 +397,48 @@ func TestCSIVolumeChecker(t *testing.T) {
}
}
func TestNetworkChecker(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
mock.Node(),
}
nodes[0].NodeResources.Networks = append(nodes[0].NodeResources.Networks, &structs.NetworkResource{Mode: "bridge"})
nodes[1].NodeResources.Networks = append(nodes[1].NodeResources.Networks, &structs.NetworkResource{Mode: "bridge"})
nodes[2].NodeResources.Networks = append(nodes[2].NodeResources.Networks, &structs.NetworkResource{Mode: "cni/mynet"})
checker := NewNetworkChecker(ctx)
cases := []struct {
mode string
results []bool
}{
{
mode: "host",
results: []bool{true, true, true},
},
{
mode: "bridge",
results: []bool{true, true, false},
},
{
mode: "cni/mynet",
results: []bool{false, false, true},
},
{
mode: "cni/nonexistent",
results: []bool{false, false, false},
},
}
for _, c := range cases {
checker.SetNetworkMode(c.mode)
for i, node := range nodes {
require.Equal(t, c.results[i], checker.Feasible(node), "mode=%q, idx=%d", c.mode, i)
}
}
}
func TestDriverChecker_DriverInfo(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{

View File

@@ -52,6 +52,7 @@ type GenericStack struct {
taskGroupDevices *DeviceChecker
taskGroupHostVolumes *HostVolumeChecker
taskGroupCSIVolumes *CSIVolumeChecker
taskGroupNetwork *NetworkChecker
distinctHostsConstraint *DistinctHostsIterator
distinctPropertyConstraint *DistinctPropertyIterator
@@ -135,6 +136,9 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra
s.taskGroupDevices.SetTaskGroup(tg)
s.taskGroupHostVolumes.SetVolumes(tg.Volumes)
s.taskGroupCSIVolumes.SetVolumes(tg.Volumes)
if len(tg.Networks) > 0 {
s.taskGroupNetwork.SetNetworkMode(tg.Networks[0].Mode)
}
s.distinctHostsConstraint.SetTaskGroup(tg)
s.distinctPropertyConstraint.SetTaskGroup(tg)
s.wrappedChecks.SetTaskGroup(tg.Name)
@@ -332,6 +336,9 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack {
// Filter on available, healthy CSI plugins
s.taskGroupCSIVolumes = NewCSIVolumeChecker(ctx)
// Filter on available client networks
s.taskGroupNetwork = NewNetworkChecker(ctx)
// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
@@ -340,7 +347,8 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack {
tgs := []FeasibilityChecker{s.taskGroupDrivers,
s.taskGroupConstraint,
s.taskGroupHostVolumes,
s.taskGroupDevices}
s.taskGroupDevices,
s.taskGroupNetwork}
avail := []FeasibilityChecker{s.taskGroupCSIVolumes}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail)