mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 10:55:42 +03:00
client: add NetworkStatus to Allocation (#8657)
This commit is contained in:
@@ -386,8 +386,14 @@ func (ar *allocRunner) Restore() error {
|
||||
return err
|
||||
}
|
||||
|
||||
ns, err := ar.stateDB.GetNetworkStatus(ar.id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ar.stateLock.Lock()
|
||||
ar.state.DeploymentStatus = ds
|
||||
ar.state.NetworkStatus = ns
|
||||
ar.stateLock.Unlock()
|
||||
|
||||
states := make(map[string]*structs.TaskState)
|
||||
@@ -655,6 +661,22 @@ func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *st
|
||||
}
|
||||
}
|
||||
|
||||
// Set the NetworkStatus and default DNSConfig if one is not returned from the client
|
||||
netStatus := ar.state.NetworkStatus
|
||||
if netStatus != nil {
|
||||
a.NetworkStatus = netStatus
|
||||
} else {
|
||||
a.NetworkStatus = new(structs.AllocNetworkStatus)
|
||||
}
|
||||
|
||||
if a.NetworkStatus.DNS == nil {
|
||||
alloc := ar.Alloc()
|
||||
nws := alloc.Job.LookupTaskGroup(alloc.TaskGroup).Networks
|
||||
if len(nws) > 0 {
|
||||
a.NetworkStatus.DNS = nws[0].DNS.Copy()
|
||||
}
|
||||
}
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
@@ -700,6 +722,12 @@ func (ar *allocRunner) SetClientStatus(clientStatus string) {
|
||||
ar.state.ClientStatus = clientStatus
|
||||
}
|
||||
|
||||
func (ar *allocRunner) SetNetworkStatus(s *structs.AllocNetworkStatus) {
|
||||
ar.stateLock.Lock()
|
||||
defer ar.stateLock.Unlock()
|
||||
ar.state.NetworkStatus = s.Copy()
|
||||
}
|
||||
|
||||
// AllocState returns a copy of allocation state including a snapshot of task
|
||||
// states.
|
||||
func (ar *allocRunner) AllocState() *state.State {
|
||||
@@ -855,6 +883,20 @@ func (ar *allocRunner) PersistState() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// persist network status, wrapping in a func to release state lock as early as possible
|
||||
if err := func() error {
|
||||
ar.stateLock.Lock()
|
||||
defer ar.stateLock.Unlock()
|
||||
if ar.state.NetworkStatus != nil {
|
||||
if err := ar.stateDB.PutNetworkStatus(ar.id, ar.state.NetworkStatus); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: consider persisting deployment state along with task status.
|
||||
// While we study why only the alloc is persisted, I opted to maintain current
|
||||
// behavior and not risk adding yet more IO calls unnecessarily.
|
||||
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
)
|
||||
|
||||
type hookResourceSetter interface {
|
||||
@@ -42,23 +41,6 @@ func (a *allocHookResourceSetter) SetAllocHookResources(res *cstructs.AllocHookR
|
||||
}
|
||||
}
|
||||
|
||||
type networkIsolationSetter interface {
|
||||
SetNetworkIsolation(*drivers.NetworkIsolationSpec)
|
||||
}
|
||||
|
||||
// allocNetworkIsolationSetter is a shim to allow the alloc network hook to
|
||||
// set the alloc network isolation configuration without full access
|
||||
// to the alloc runner
|
||||
type allocNetworkIsolationSetter struct {
|
||||
ar *allocRunner
|
||||
}
|
||||
|
||||
func (a *allocNetworkIsolationSetter) SetNetworkIsolation(n *drivers.NetworkIsolationSpec) {
|
||||
for _, tr := range a.ar.tasks {
|
||||
tr.SetNetworkIsolation(n)
|
||||
}
|
||||
}
|
||||
|
||||
// allocHealthSetter is a shim to allow the alloc health watcher hook to set
|
||||
// and clear the alloc health without full access to the alloc runner state
|
||||
type allocHealthSetter struct {
|
||||
@@ -159,7 +141,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
|
||||
newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher),
|
||||
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
|
||||
newAllocHealthWatcherHook(hookLogger, alloc, hs, ar.Listener(), ar.consulClient),
|
||||
newNetworkHook(hookLogger, ns, alloc, nm, nc),
|
||||
newNetworkHook(hookLogger, ns, alloc, nm, nc, ar),
|
||||
newGroupServiceHook(groupServiceHookConfig{
|
||||
alloc: alloc,
|
||||
consul: ar.consulClient,
|
||||
|
||||
@@ -27,7 +27,7 @@ type TaskStateHandler interface {
|
||||
TaskStateUpdated()
|
||||
}
|
||||
|
||||
// AllocStatsReporter gives acess to the latest resource usage from the
|
||||
// AllocStatsReporter gives access to the latest resource usage from the
|
||||
// allocation
|
||||
type AllocStatsReporter interface {
|
||||
LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error)
|
||||
|
||||
@@ -9,12 +9,37 @@ import (
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
)
|
||||
|
||||
type networkIsolationSetter interface {
|
||||
SetNetworkIsolation(*drivers.NetworkIsolationSpec)
|
||||
}
|
||||
|
||||
// allocNetworkIsolationSetter is a shim to allow the alloc network hook to
|
||||
// set the alloc network isolation configuration without full access
|
||||
// to the alloc runner
|
||||
type allocNetworkIsolationSetter struct {
|
||||
ar *allocRunner
|
||||
}
|
||||
|
||||
func (a *allocNetworkIsolationSetter) SetNetworkIsolation(n *drivers.NetworkIsolationSpec) {
|
||||
for _, tr := range a.ar.tasks {
|
||||
tr.SetNetworkIsolation(n)
|
||||
}
|
||||
}
|
||||
|
||||
type networkStatusSetter interface {
|
||||
SetNetworkStatus(*structs.AllocNetworkStatus)
|
||||
}
|
||||
|
||||
// networkHook is an alloc lifecycle hook that manages the network namespace
|
||||
// for an alloc
|
||||
type networkHook struct {
|
||||
// setter is a callback to set the network isolation spec when after the
|
||||
// isolationSetter is a callback to set the network isolation spec when after the
|
||||
// network is created
|
||||
setter networkIsolationSetter
|
||||
isolationSetter networkIsolationSetter
|
||||
|
||||
// statusSetter is a callback to the alloc runner to set the network status once
|
||||
// network setup is complete
|
||||
networkStatusSetter networkStatusSetter
|
||||
|
||||
// manager is used when creating the network namespace. This defaults to
|
||||
// bind mounting a network namespace descritor under /var/run/netns but
|
||||
@@ -34,11 +59,15 @@ type networkHook struct {
|
||||
logger hclog.Logger
|
||||
}
|
||||
|
||||
func newNetworkHook(logger hclog.Logger, ns networkIsolationSetter,
|
||||
alloc *structs.Allocation, netManager drivers.DriverNetworkManager,
|
||||
netConfigurator NetworkConfigurator) *networkHook {
|
||||
func newNetworkHook(logger hclog.Logger,
|
||||
ns networkIsolationSetter,
|
||||
alloc *structs.Allocation,
|
||||
netManager drivers.DriverNetworkManager,
|
||||
netConfigurator NetworkConfigurator,
|
||||
networkStatusSetter networkStatusSetter) *networkHook {
|
||||
return &networkHook{
|
||||
setter: ns,
|
||||
isolationSetter: ns,
|
||||
networkStatusSetter: networkStatusSetter,
|
||||
alloc: alloc,
|
||||
manager: netManager,
|
||||
networkConfigurator: netConfigurator,
|
||||
@@ -69,13 +98,16 @@ func (h *networkHook) Prerun() error {
|
||||
|
||||
if spec != nil {
|
||||
h.spec = spec
|
||||
h.setter.SetNetworkIsolation(spec)
|
||||
h.isolationSetter.SetNetworkIsolation(spec)
|
||||
}
|
||||
|
||||
if created {
|
||||
if err := h.networkConfigurator.Setup(context.TODO(), h.alloc, spec); err != nil {
|
||||
status, err := h.networkConfigurator.Setup(context.TODO(), h.alloc, spec)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to configure networking for alloc: %v", err)
|
||||
}
|
||||
|
||||
h.networkStatusSetter.SetNetworkStatus(status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -27,6 +27,17 @@ func (m *mockNetworkIsolationSetter) SetNetworkIsolation(spec *drivers.NetworkIs
|
||||
require.Exactly(m.t, m.expectedSpec, spec)
|
||||
}
|
||||
|
||||
type mockNetworkStatusSetter struct {
|
||||
t *testing.T
|
||||
expectedStatus *structs.AllocNetworkStatus
|
||||
called bool
|
||||
}
|
||||
|
||||
func (m *mockNetworkStatusSetter) SetNetworkStatus(status *structs.AllocNetworkStatus) {
|
||||
m.called = true
|
||||
require.Exactly(m.t, m.expectedStatus, status)
|
||||
}
|
||||
|
||||
// Test that the prerun and postrun hooks call the setter with the expected spec when
|
||||
// the network mode is not host
|
||||
func TestNetworkHook_Prerun_Postrun(t *testing.T) {
|
||||
@@ -62,10 +73,14 @@ func TestNetworkHook_Prerun_Postrun(t *testing.T) {
|
||||
t: t,
|
||||
expectedSpec: spec,
|
||||
}
|
||||
statusSetter := &mockNetworkStatusSetter{
|
||||
t: t,
|
||||
expectedStatus: nil,
|
||||
}
|
||||
require := require.New(t)
|
||||
|
||||
logger := testlog.HCLogger(t)
|
||||
hook := newNetworkHook(logger, setter, alloc, nm, &hostNetworkConfigurator{})
|
||||
hook := newNetworkHook(logger, setter, alloc, nm, &hostNetworkConfigurator{}, statusSetter)
|
||||
require.NoError(hook.Prerun())
|
||||
require.True(setter.called)
|
||||
require.False(destroyCalled)
|
||||
@@ -76,7 +91,7 @@ func TestNetworkHook_Prerun_Postrun(t *testing.T) {
|
||||
setter.called = false
|
||||
destroyCalled = false
|
||||
alloc.Job.TaskGroups[0].Networks[0].Mode = "host"
|
||||
hook = newNetworkHook(logger, setter, alloc, nm, &hostNetworkConfigurator{})
|
||||
hook = newNetworkHook(logger, setter, alloc, nm, &hostNetworkConfigurator{}, statusSetter)
|
||||
require.NoError(hook.Prerun())
|
||||
require.False(setter.called)
|
||||
require.False(destroyCalled)
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
// NetworkConfigurator sets up and tears down the interfaces, routes, firewall
|
||||
// rules, etc for the configured networking mode of the allocation.
|
||||
type NetworkConfigurator interface {
|
||||
Setup(context.Context, *structs.Allocation, *drivers.NetworkIsolationSpec) error
|
||||
Setup(context.Context, *structs.Allocation, *drivers.NetworkIsolationSpec) (*structs.AllocNetworkStatus, error)
|
||||
Teardown(context.Context, *structs.Allocation, *drivers.NetworkIsolationSpec) error
|
||||
}
|
||||
|
||||
@@ -19,8 +19,8 @@ type NetworkConfigurator interface {
|
||||
// require further configuration
|
||||
type hostNetworkConfigurator struct{}
|
||||
|
||||
func (h *hostNetworkConfigurator) Setup(context.Context, *structs.Allocation, *drivers.NetworkIsolationSpec) error {
|
||||
return nil
|
||||
func (h *hostNetworkConfigurator) Setup(context.Context, *structs.Allocation, *drivers.NetworkIsolationSpec) (*structs.AllocNetworkStatus, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (h *hostNetworkConfigurator) Teardown(context.Context, *structs.Allocation, *drivers.NetworkIsolationSpec) error {
|
||||
return nil
|
||||
|
||||
@@ -122,9 +122,9 @@ func (b *bridgeNetworkConfigurator) generateAdminChainRule() []string {
|
||||
}
|
||||
|
||||
// Setup calls the CNI plugins with the add action
|
||||
func (b *bridgeNetworkConfigurator) Setup(ctx context.Context, alloc *structs.Allocation, spec *drivers.NetworkIsolationSpec) error {
|
||||
func (b *bridgeNetworkConfigurator) Setup(ctx context.Context, alloc *structs.Allocation, spec *drivers.NetworkIsolationSpec) (*structs.AllocNetworkStatus, error) {
|
||||
if err := b.ensureForwardingRules(); err != nil {
|
||||
return fmt.Errorf("failed to initialize table forwarding rules: %v", err)
|
||||
return nil, fmt.Errorf("failed to initialize table forwarding rules: %v", err)
|
||||
}
|
||||
|
||||
return b.cni.Setup(ctx, alloc, spec)
|
||||
|
||||
@@ -78,9 +78,9 @@ func newCNINetworkConfiguratorWithConf(logger log.Logger, cniPath, cniInterfaceP
|
||||
}
|
||||
|
||||
// Setup calls the CNI plugins with the add action
|
||||
func (c *cniNetworkConfigurator) Setup(ctx context.Context, alloc *structs.Allocation, spec *drivers.NetworkIsolationSpec) error {
|
||||
func (c *cniNetworkConfigurator) Setup(ctx context.Context, alloc *structs.Allocation, spec *drivers.NetworkIsolationSpec) (*structs.AllocNetworkStatus, error) {
|
||||
if err := c.ensureCNIInitialized(); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Depending on the version of bridge cni plugin used, a known race could occure
|
||||
@@ -88,15 +88,16 @@ func (c *cniNetworkConfigurator) Setup(ctx context.Context, alloc *structs.Alloc
|
||||
// in one of them to fail. This rety attempts to overcome those erroneous failures.
|
||||
const retry = 3
|
||||
var firstError error
|
||||
var res *cni.CNIResult
|
||||
for attempt := 1; ; attempt++ {
|
||||
//TODO eventually returning the IP from the result would be nice to have in the alloc
|
||||
if _, err := c.cni.Setup(ctx, alloc.ID, spec.Path, cni.WithCapabilityPortMap(getPortMapping(alloc, c.ignorePortMappingHostIP))); err != nil {
|
||||
var err error
|
||||
if res, err = c.cni.Setup(ctx, alloc.ID, spec.Path, cni.WithCapabilityPortMap(getPortMapping(alloc, c.ignorePortMappingHostIP))); err != nil {
|
||||
c.logger.Warn("failed to configure network", "err", err, "attempt", attempt)
|
||||
switch attempt {
|
||||
case 1:
|
||||
firstError = err
|
||||
case retry:
|
||||
return fmt.Errorf("failed to configure network: %v", firstError)
|
||||
return nil, fmt.Errorf("failed to configure network: %v", firstError)
|
||||
}
|
||||
|
||||
// Sleep for 1 second + jitter
|
||||
@@ -106,7 +107,30 @@ func (c *cniNetworkConfigurator) Setup(ctx context.Context, alloc *structs.Alloc
|
||||
break
|
||||
}
|
||||
|
||||
return nil
|
||||
netStatus := new(structs.AllocNetworkStatus)
|
||||
|
||||
if len(res.Interfaces) > 0 {
|
||||
iface, name := func(r *cni.CNIResult) (*cni.Config, string) {
|
||||
for i := range r.Interfaces {
|
||||
return r.Interfaces[i], i
|
||||
}
|
||||
return nil, ""
|
||||
}(res)
|
||||
|
||||
netStatus.InterfaceName = name
|
||||
if len(iface.IPConfigs) > 0 {
|
||||
netStatus.Address = iface.IPConfigs[0].IP.String()
|
||||
}
|
||||
}
|
||||
if len(res.DNS) > 0 {
|
||||
netStatus.DNS = &structs.DNSConfig{
|
||||
Servers: res.DNS[0].Nameservers,
|
||||
Searches: res.DNS[0].Search,
|
||||
Options: res.DNS[0].Options,
|
||||
}
|
||||
}
|
||||
|
||||
return netStatus, nil
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -20,6 +20,9 @@ type State struct {
|
||||
|
||||
// TaskStates is a snapshot of task states.
|
||||
TaskStates map[string]*structs.TaskState
|
||||
|
||||
// NetworkStatus captures network details not known until runtime
|
||||
NetworkStatus *structs.AllocNetworkStatus
|
||||
}
|
||||
|
||||
// SetDeploymentStatus is a helper for updating the client-controlled
|
||||
@@ -57,6 +60,7 @@ func (s *State) Copy() *State {
|
||||
ClientDescription: s.ClientDescription,
|
||||
DeploymentStatus: s.DeploymentStatus.Copy(),
|
||||
TaskStates: taskStates,
|
||||
NetworkStatus: s.NetworkStatus.Copy(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1887,6 +1887,7 @@ func (c *Client) AllocStateUpdated(alloc *structs.Allocation) {
|
||||
stripped.ClientStatus = alloc.ClientStatus
|
||||
stripped.ClientDescription = alloc.ClientDescription
|
||||
stripped.DeploymentStatus = alloc.DeploymentStatus
|
||||
stripped.NetworkStatus = alloc.NetworkStatus
|
||||
|
||||
select {
|
||||
case c.allocUpdates <- stripped:
|
||||
|
||||
@@ -39,6 +39,14 @@ func (m *ErrDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentS
|
||||
return fmt.Errorf("Error!")
|
||||
}
|
||||
|
||||
func (m *ErrDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, error) {
|
||||
return nil, fmt.Errorf("Error!")
|
||||
}
|
||||
|
||||
func (m *ErrDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus) error {
|
||||
return fmt.Errorf("Error!")
|
||||
}
|
||||
|
||||
func (m *ErrDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) {
|
||||
return nil, nil, fmt.Errorf("Error!")
|
||||
}
|
||||
|
||||
@@ -33,6 +33,11 @@ type StateDB interface {
|
||||
GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error)
|
||||
PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error
|
||||
|
||||
// Get/Put NetworkStatus get and put the allocation's network
|
||||
// status. It may be nil.
|
||||
GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, error)
|
||||
PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus) error
|
||||
|
||||
// GetTaskRunnerState returns the LocalState and TaskState for a
|
||||
// TaskRunner. Either state may be nil if it is not found, but if an
|
||||
// error is encountered only the error will be non-nil.
|
||||
|
||||
@@ -20,6 +20,9 @@ type MemDB struct {
|
||||
// alloc_id -> value
|
||||
deployStatus map[string]*structs.AllocDeploymentStatus
|
||||
|
||||
// alloc_id -> value
|
||||
networkStatus map[string]*structs.AllocNetworkStatus
|
||||
|
||||
// alloc_id -> task_name -> value
|
||||
localTaskState map[string]map[string]*state.LocalState
|
||||
taskState map[string]map[string]*structs.TaskState
|
||||
@@ -43,6 +46,7 @@ func NewMemDB(logger hclog.Logger) *MemDB {
|
||||
return &MemDB{
|
||||
allocs: make(map[string]*structs.Allocation),
|
||||
deployStatus: make(map[string]*structs.AllocDeploymentStatus),
|
||||
networkStatus: make(map[string]*structs.AllocNetworkStatus),
|
||||
localTaskState: make(map[string]map[string]*state.LocalState),
|
||||
taskState: make(map[string]map[string]*structs.TaskState),
|
||||
logger: logger,
|
||||
@@ -89,6 +93,19 @@ func (m *MemDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentS
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MemDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return m.networkStatus[allocID], nil
|
||||
}
|
||||
|
||||
func (m *MemDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus) error {
|
||||
m.mu.Lock()
|
||||
m.networkStatus[allocID] = ns
|
||||
defer m.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MemDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
@@ -35,6 +35,14 @@ func (n NoopDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentS
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n NoopDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (n NoopDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n NoopDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
@@ -25,8 +25,9 @@ meta/
|
||||
|--> upgraded -> time.Now().Format(timeRFC3339)
|
||||
allocations/
|
||||
|--> <alloc-id>/
|
||||
|--> alloc -> allocEntry{*structs.Allocation}
|
||||
|--> deploy_status -> deployStatusEntry{*structs.AllocDeploymentStatus}
|
||||
|--> alloc -> allocEntry{*structs.Allocation}
|
||||
|--> deploy_status -> deployStatusEntry{*structs.AllocDeploymentStatus}
|
||||
|--> network_status -> networkStatusEntry{*structs.AllocNetworkStatus}
|
||||
|--> task-<name>/
|
||||
|--> local_state -> *trstate.LocalState # Local-only state
|
||||
|--> task_state -> *structs.TaskState # Sync'd to servers
|
||||
@@ -69,6 +70,10 @@ var (
|
||||
// stored under.
|
||||
allocDeployStatusKey = []byte("deploy_status")
|
||||
|
||||
// allocNetworkStatusKey is the key *structs.AllocNetworkStatus is
|
||||
// stored under
|
||||
allocNetworkStatusKey = []byte("network_status")
|
||||
|
||||
// allocations -> $allocid -> task-$taskname -> the keys below
|
||||
taskLocalStateKey = []byte("local_state")
|
||||
taskStateKey = []byte("task_state")
|
||||
@@ -309,6 +314,64 @@ func (s *BoltStateDB) GetDeploymentStatus(allocID string) (*structs.AllocDeploym
|
||||
return entry.DeploymentStatus, nil
|
||||
}
|
||||
|
||||
// networkStatusEntry wraps values for NetworkStatus keys.
|
||||
type networkStatusEntry struct {
|
||||
NetworkStatus *structs.AllocNetworkStatus
|
||||
}
|
||||
|
||||
// PutDeploymentStatus stores an allocation's DeploymentStatus or returns an
|
||||
// error.
|
||||
func (s *BoltStateDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus) error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
return putNetworkStatusImpl(tx, allocID, ds)
|
||||
})
|
||||
}
|
||||
|
||||
func putNetworkStatusImpl(tx *boltdd.Tx, allocID string, ds *structs.AllocNetworkStatus) error {
|
||||
allocBkt, err := getAllocationBucket(tx, allocID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
entry := networkStatusEntry{
|
||||
NetworkStatus: ds,
|
||||
}
|
||||
return allocBkt.Put(allocNetworkStatusKey, &entry)
|
||||
}
|
||||
|
||||
// GetNetworkStatus retrieves an allocation's NetworkStatus or returns an
|
||||
// error.
|
||||
func (s *BoltStateDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, error) {
|
||||
var entry networkStatusEntry
|
||||
|
||||
err := s.db.View(func(tx *boltdd.Tx) error {
|
||||
allAllocsBkt := tx.Bucket(allocationsBucketName)
|
||||
if allAllocsBkt == nil {
|
||||
// No state, return
|
||||
return nil
|
||||
}
|
||||
|
||||
allocBkt := allAllocsBkt.Bucket([]byte(allocID))
|
||||
if allocBkt == nil {
|
||||
// No state for alloc, return
|
||||
return nil
|
||||
}
|
||||
|
||||
return allocBkt.Get(allocNetworkStatusKey, &entry)
|
||||
})
|
||||
|
||||
// It's valid for this field to be nil/missing
|
||||
if boltdd.IsErrNotFound(err) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return entry.NetworkStatus, nil
|
||||
}
|
||||
|
||||
// GetTaskRunnerState returns the LocalState and TaskState for a
|
||||
// TaskRunner. LocalState or TaskState will be nil if they do not exist.
|
||||
//
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
_ "github.com/hashicorp/nomad/e2e/lifecycle"
|
||||
_ "github.com/hashicorp/nomad/e2e/metrics"
|
||||
_ "github.com/hashicorp/nomad/e2e/namespaces"
|
||||
_ "github.com/hashicorp/nomad/e2e/networking"
|
||||
_ "github.com/hashicorp/nomad/e2e/nodedrain"
|
||||
_ "github.com/hashicorp/nomad/e2e/nomad09upgrade"
|
||||
_ "github.com/hashicorp/nomad/e2e/nomadexec"
|
||||
|
||||
202
e2e/e2eutil/e2ejob.go
Normal file
202
e2e/e2eutil/e2ejob.go
Normal file
@@ -0,0 +1,202 @@
|
||||
package e2eutil
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/api"
|
||||
"github.com/hashicorp/nomad/e2e/framework"
|
||||
"github.com/hashicorp/nomad/helper/discover"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type e2eJob struct {
|
||||
framework.TC
|
||||
jobfile string
|
||||
jobID string
|
||||
}
|
||||
|
||||
func (e *e2eJob) Name() string {
|
||||
return filepath.Base(e.jobfile)
|
||||
}
|
||||
|
||||
// Ensure cluster has leader and at least 1 client node
|
||||
// in a ready state before running tests
|
||||
func (j *e2eJob) BeforeAll(f *framework.F) {
|
||||
WaitForLeader(f.T(), j.Nomad())
|
||||
WaitForNodesReady(f.T(), j.Nomad(), 1)
|
||||
j.jobID = "e2eutil-" + uuid.Generate()[0:8]
|
||||
}
|
||||
|
||||
func (j *e2eJob) TestJob(f *framework.F) {
|
||||
file, err := os.Open(j.jobfile)
|
||||
t := f.T()
|
||||
require.NoError(t, err)
|
||||
|
||||
scanner := bufio.NewScanner(file)
|
||||
var e2eJobLine string
|
||||
for scanner.Scan() {
|
||||
if strings.HasPrefix(scanner.Text(), "//e2e:") {
|
||||
e2eJobLine = scanner.Text()[6:]
|
||||
}
|
||||
require.NoError(t, scanner.Err())
|
||||
}
|
||||
|
||||
switch {
|
||||
case strings.HasPrefix(e2eJobLine, "batch"):
|
||||
parseBatchJobLine(t, j, e2eJobLine).Run(f)
|
||||
case strings.HasPrefix(e2eJobLine, "service"):
|
||||
parseServiceJobLine(t, j, e2eJobLine).Run(f)
|
||||
default:
|
||||
require.Fail(t, "could not parse e2e job line: %q", e2eJobLine)
|
||||
}
|
||||
}
|
||||
|
||||
type e2eBatchJob struct {
|
||||
*e2eJob
|
||||
|
||||
shouldFail bool
|
||||
}
|
||||
|
||||
func (j *e2eBatchJob) Run(f *framework.F) {
|
||||
t := f.T()
|
||||
require := require.New(t)
|
||||
nomadClient := j.Nomad()
|
||||
|
||||
allocs := RegisterAndWaitForAllocs(f.T(), nomadClient, j.jobfile, j.jobID, "")
|
||||
require.Equal(1, len(allocs))
|
||||
allocID := allocs[0].ID
|
||||
|
||||
// wait for the job to stop
|
||||
WaitForAllocStopped(t, nomadClient, allocID)
|
||||
alloc, _, err := nomadClient.Allocations().Info(allocID, nil)
|
||||
require.NoError(err)
|
||||
if j.shouldFail {
|
||||
require.NotEqual(structs.AllocClientStatusComplete, alloc.ClientStatus)
|
||||
} else {
|
||||
require.Equal(structs.AllocClientStatusComplete, alloc.ClientStatus)
|
||||
}
|
||||
}
|
||||
|
||||
type e2eServiceJob struct {
|
||||
*e2eJob
|
||||
|
||||
script string
|
||||
runningDuration time.Duration
|
||||
}
|
||||
|
||||
func (j *e2eServiceJob) Run(f *framework.F) {
|
||||
t := f.T()
|
||||
nomadClient := j.Nomad()
|
||||
|
||||
allocs := RegisterAndWaitForAllocs(f.T(), nomadClient, j.jobfile, j.jobID, "")
|
||||
require.Equal(t, 1, len(allocs))
|
||||
allocID := allocs[0].ID
|
||||
|
||||
var alloc *api.Allocation
|
||||
WaitForAllocRunning(t, nomadClient, allocID)
|
||||
testutil.AssertUntil(j.runningDuration, func() (bool, error) {
|
||||
var err error
|
||||
alloc, _, err = nomadClient.Allocations().Info(allocID, nil)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return alloc.ClientStatus == structs.AllocClientStatusRunning, fmt.Errorf("expected status running, but was: %s", alloc.ClientStatus)
|
||||
}, func(err error) {
|
||||
t.Fatalf("failed to keep alloc running: %v", err)
|
||||
})
|
||||
|
||||
scriptPath := filepath.Join(filepath.Dir(j.jobfile), j.script)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
cmd := exec.CommandContext(ctx, scriptPath)
|
||||
nmdBin, err := discover.NomadExecutable()
|
||||
assert.NoError(t, err)
|
||||
cmd.Env = append(os.Environ(),
|
||||
"NOMAD_BIN="+nmdBin,
|
||||
"NOMAD_ALLOC_ID="+allocID,
|
||||
"NOMAD_ADDR="+nomadClient.Address(),
|
||||
)
|
||||
|
||||
assert.NoError(t, cmd.Start())
|
||||
waitCh := make(chan error)
|
||||
go func() {
|
||||
select {
|
||||
case waitCh <- cmd.Wait():
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case err := <-waitCh:
|
||||
assert.NoError(t, err)
|
||||
assert.Zero(t, cmd.ProcessState.ExitCode())
|
||||
}
|
||||
|
||||
// stop the job
|
||||
_, _, err = nomadClient.Jobs().Deregister(j.jobID, false, nil)
|
||||
require.NoError(t, err)
|
||||
WaitForAllocStopped(t, nomadClient, allocID)
|
||||
}
|
||||
|
||||
//e2e:batch fail=false
|
||||
//e2e:service running=5s check=script.sh
|
||||
|
||||
func NewE2EJob(jobfile string) framework.TestCase {
|
||||
return &e2eJob{
|
||||
jobfile: jobfile,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func parseServiceJobLine(t *testing.T, j *e2eJob, line string) *e2eServiceJob {
|
||||
job := &e2eServiceJob{
|
||||
e2eJob: j,
|
||||
runningDuration: time.Second * 5,
|
||||
}
|
||||
for _, options := range strings.Split(line, " ")[1:] {
|
||||
o := strings.SplitN(options, "=", 2)
|
||||
switch o[0] {
|
||||
case "script":
|
||||
job.script = o[1]
|
||||
case "running":
|
||||
dur, err := time.ParseDuration(o[1])
|
||||
if err != nil {
|
||||
t.Logf("could not parse running duration %q for e2e job spec: %v", o[1], err)
|
||||
} else {
|
||||
job.runningDuration = dur
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return job
|
||||
}
|
||||
|
||||
func parseBatchJobLine(t *testing.T, j *e2eJob, line string) *e2eBatchJob {
|
||||
job := &e2eBatchJob{
|
||||
e2eJob: j,
|
||||
}
|
||||
for _, options := range strings.Split(line, " ")[1:] {
|
||||
o := strings.SplitN(options, "=", 2)
|
||||
switch o[0] {
|
||||
case "shouldFail":
|
||||
job.shouldFail, _ = strconv.ParseBool(o[1])
|
||||
}
|
||||
}
|
||||
|
||||
return job
|
||||
}
|
||||
18
e2e/networking/inputs/basic.nomad
Normal file
18
e2e/networking/inputs/basic.nomad
Normal file
@@ -0,0 +1,18 @@
|
||||
//e2e:service script=validate.sh
|
||||
job "networking" {
|
||||
datacenters = ["nick-east-1"]
|
||||
group "basic" {
|
||||
network {
|
||||
mode = "bridge"
|
||||
}
|
||||
|
||||
task "sleep" {
|
||||
driver = "docker"
|
||||
config {
|
||||
image = "busybox:1"
|
||||
command = "/bin/sleep"
|
||||
args = ["5"]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
3
e2e/networking/inputs/validate.sh
Executable file
3
e2e/networking/inputs/validate.sh
Executable file
@@ -0,0 +1,3 @@
|
||||
#!/bin/bash
|
||||
|
||||
test $(curl --silent ${NOMAD_ADDR}/v1/allocation/${NOMAD_ALLOC_ID} | jq '.NetworkStatus.Address | length') -ne 0
|
||||
14
e2e/networking/networking.go
Normal file
14
e2e/networking/networking.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package networking
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/nomad/e2e/e2eutil"
|
||||
"github.com/hashicorp/nomad/e2e/framework"
|
||||
)
|
||||
|
||||
func init() {
|
||||
framework.AddSuites(&framework.TestSuite{
|
||||
Component: "Networking",
|
||||
CanRunLocal: true,
|
||||
Cases: []framework.TestCase{e2eutil.NewE2EJob("networking/inputs/basic.nomad")},
|
||||
})
|
||||
}
|
||||
@@ -2952,6 +2952,7 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc *
|
||||
copyAlloc.ClientStatus = alloc.ClientStatus
|
||||
copyAlloc.ClientDescription = alloc.ClientDescription
|
||||
copyAlloc.TaskStates = alloc.TaskStates
|
||||
copyAlloc.NetworkStatus = alloc.NetworkStatus
|
||||
|
||||
// The client can only set its deployment health and timestamp, so just take
|
||||
// those
|
||||
|
||||
@@ -2385,6 +2385,20 @@ type DNSConfig struct {
|
||||
Options []string
|
||||
}
|
||||
|
||||
func (d *DNSConfig) Copy() *DNSConfig {
|
||||
if d == nil {
|
||||
return nil
|
||||
}
|
||||
newD := new(DNSConfig)
|
||||
newD.Servers = make([]string, len(d.Servers))
|
||||
copy(newD.Servers, d.Servers)
|
||||
newD.Searches = make([]string, len(d.Searches))
|
||||
copy(newD.Searches, d.Searches)
|
||||
newD.Options = make([]string, len(d.Options))
|
||||
copy(newD.Options, d.Options)
|
||||
return newD
|
||||
}
|
||||
|
||||
// NetworkResource is used to represent available network
|
||||
// resources
|
||||
type NetworkResource struct {
|
||||
@@ -8743,6 +8757,9 @@ type Allocation struct {
|
||||
// RescheduleTrackers captures details of previous reschedule attempts of the allocation
|
||||
RescheduleTracker *RescheduleTracker
|
||||
|
||||
// NetworkStatus captures networking details of an allocation known at runtime
|
||||
NetworkStatus *AllocNetworkStatus
|
||||
|
||||
// FollowupEvalID captures a follow up evaluation created to handle a failed allocation
|
||||
// that can be rescheduled in the future
|
||||
FollowupEvalID string
|
||||
@@ -9503,6 +9520,26 @@ func (s *NodeScoreMeta) Data() interface{} {
|
||||
return s
|
||||
}
|
||||
|
||||
// AllocNetworkStatus captures the status of an allocation's network during runtime.
|
||||
// Depending on the network mode, an allocation's address may need to be known to other
|
||||
// systems in Nomad such as service registration.
|
||||
type AllocNetworkStatus struct {
|
||||
InterfaceName string
|
||||
Address string
|
||||
DNS *DNSConfig
|
||||
}
|
||||
|
||||
func (a *AllocNetworkStatus) Copy() *AllocNetworkStatus {
|
||||
if a == nil {
|
||||
return nil
|
||||
}
|
||||
return &AllocNetworkStatus{
|
||||
InterfaceName: a.InterfaceName,
|
||||
Address: a.Address,
|
||||
DNS: a.DNS.Copy(),
|
||||
}
|
||||
}
|
||||
|
||||
// AllocDeploymentStatus captures the status of the allocation as part of the
|
||||
// deployment. This can include things like if the allocation has been marked as
|
||||
// healthy.
|
||||
|
||||
Reference in New Issue
Block a user