diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index a599d9218..d657e40cf 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -628,6 +628,15 @@ func getClientStatus(taskStates map[string]*structs.TaskState) (status, descript return "", "" } +// SetClientStatus is a helper for forcing a specific client +// status on the alloc runner. This is used during restore errors +// when the task state can't be restored. +func (ar *allocRunner) SetClientStatus(clientStatus string) { + ar.stateLock.Lock() + defer ar.stateLock.Unlock() + ar.state.ClientStatus = clientStatus +} + // AllocState returns a copy of allocation state including a snapshot of task // states. func (ar *allocRunner) AllocState() *state.State { diff --git a/client/client.go b/client/client.go index 1bc343abf..3c41afc47 100644 --- a/client/client.go +++ b/client/client.go @@ -256,7 +256,7 @@ var ( ) // NewClient is used to create a new client from the given configuration -func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulService consulApi.ConsulServiceAPI) (*Client, error) { +func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulService consulApi.ConsulServiceAPI, stateDBFunc state.NewStateDBFunc) (*Client, error) { // Create the tls wrapper var tlsWrap tlsutil.RegionWrapper if cfg.TLSConfig.EnableRPC { @@ -303,7 +303,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic c.servers = servers.New(c.logger, c.shutdownCh, c) // Initialize the client - if err := c.init(); err != nil { + if err := c.init(stateDBFunc); err != nil { return nil, fmt.Errorf("failed to initialize client: %v", err) } @@ -454,7 +454,7 @@ func (c *Client) Ready() <-chan struct{} { // init is used to initialize the client and perform any setup // needed before we begin starting its various components. -func (c *Client) init() error { +func (c *Client) init(statedbFunc state.NewStateDBFunc) error { // Ensure the state dir exists if we have one if c.config.StateDir != "" { if err := os.MkdirAll(c.config.StateDir, 0700); err != nil { @@ -478,7 +478,7 @@ func (c *Client) init() error { c.logger.Info("using state directory", "state_dir", c.config.StateDir) // Open the state database - db, err := state.GetStateDBFactory(c.config.DevMode)(c.logger, c.config.StateDir) + db, err := statedbFunc(c.logger, c.config.StateDir) if err != nil { return fmt.Errorf("failed to open state database: %v", err) } @@ -967,10 +967,10 @@ func (c *Client) restoreState() error { // Restore state if err := ar.Restore(); err != nil { c.logger.Error("error restoring alloc", "error", err, "alloc_id", alloc.ID) - c.handleInvalidAllocs(alloc, err) + // Override the status of the alloc to failed + ar.SetClientStatus(structs.AllocClientStatusFailed) // Destroy the alloc runner since this is a failed restore ar.Destroy() - //TODO Cleanup allocrunner continue } diff --git a/client/client_test.go b/client/client_test.go index e15fcc7f2..5e62c9070 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -26,7 +26,11 @@ import ( "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/state" + cstate "github.com/hashicorp/nomad/client/state" ctestutil "github.com/hashicorp/nomad/client/testutil" + "github.com/stretchr/testify/require" ) func testACLServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string, *structs.ACLToken) { @@ -599,7 +603,8 @@ func TestClient_SaveRestoreState(t *testing.T) { c1.config.Logger = logger catalog := consul.NewMockCatalog(logger) mockService := consulApi.NewMockConsulServiceClient(t, logger) - c2, err := NewClient(c1.config, catalog, mockService) + statedbFunc := cstate.GetStateDBFactory(c1.config.DevMode) + c2, err := NewClient(c1.config, catalog, mockService, statedbFunc) if err != nil { t.Fatalf("err: %v", err) } @@ -630,6 +635,96 @@ func TestClient_SaveRestoreState(t *testing.T) { } } +func TestClient_RestoreError(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, _ := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + c1, cleanup := TestClient(t, func(c *config.Config) { + c.DevMode = false + c.RPCHandler = s1 + }) + defer cleanup() + + // Wait until the node is ready + waitTilNodeReady(c1, t) + + // Create mock allocations + job := mock.Job() + alloc1 := mock.Alloc() + alloc1.NodeID = c1.Node().ID + alloc1.Job = job + alloc1.JobID = job.ID + alloc1.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + alloc1.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "10s", + } + alloc1.ClientStatus = structs.AllocClientStatusRunning + + state := s1.State() + err := state.UpsertJob(100, job) + require.Nil(err) + + err = state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)) + require.Nil(err) + + err = state.UpsertAllocs(102, []*structs.Allocation{alloc1}) + require.Nil(err) + + // Allocations should get registered + testutil.WaitForResult(func() (bool, error) { + c1.allocLock.RLock() + ar := c1.allocs[alloc1.ID] + c1.allocLock.RUnlock() + if ar == nil { + return false, fmt.Errorf("nil alloc runner") + } + if ar.Alloc().ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf("client status: got %v; want %v", ar.Alloc().ClientStatus, structs.AllocClientStatusRunning) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Shutdown the client, saves state + if err := c1.Shutdown(); err != nil { + t.Fatalf("err: %v", err) + } + + // Create a new client with a stateDB implementation that errors + logger := testlog.HCLogger(t) + c1.config.Logger = logger + catalog := consul.NewMockCatalog(logger) + mockService := consulApi.NewMockConsulServiceClient(t, logger) + + // This stateDB returns errors for all methods called by restore + statedbFunc := func(hclog.Logger, string) (cstate.StateDB, error) { + return &cstate.ErrDB{[]*structs.Allocation{alloc1}}, nil + } + + c2, err := NewClient(c1.config, catalog, mockService, statedbFunc) + require.Nil(err) + defer c2.Shutdown() + + // Ensure the allocation has been marked as failed on the server + testutil.WaitForResult(func() (bool, error) { + alloc, err := s1.State().AllocByID(nil, alloc1.ID) + require.Nil(err) + failed := alloc.ClientStatus == structs.AllocClientStatusFailed + if !failed { + return false, fmt.Errorf("Expected failed client status, but got %v", alloc.ClientStatus) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + +} + func TestClient_Init(t *testing.T) { t.Parallel() dir, err := ioutil.TempDir("", "nomad") @@ -645,7 +740,8 @@ func TestClient_Init(t *testing.T) { }, logger: testlog.HCLogger(t), } - if err := client.init(); err != nil { + stateDBFunc := state.GetStateDBFactory(true) + if err := client.init(stateDBFunc); err != nil { t.Fatalf("err: %s", err) } diff --git a/client/state/errdb.go b/client/state/errdb.go new file mode 100644 index 000000000..61bf9c2c2 --- /dev/null +++ b/client/state/errdb.go @@ -0,0 +1,81 @@ +package state + +import ( + "fmt" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" + dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" + "github.com/hashicorp/nomad/nomad/structs" +) + +// ErrDB implements a StateDB that returns errors on restore methods, used for testing +type ErrDB struct { + // Allocs is a preset slice of allocations used in GetAllAllocations + Allocs []*structs.Allocation +} + +func (m *ErrDB) Name() string { + return "errdb" +} + +func (m *ErrDB) Upgrade() error { + return nil +} + +func (m *ErrDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) { + return m.Allocs, nil, nil +} + +func (m *ErrDB) PutAllocation(alloc *structs.Allocation) error { + return fmt.Errorf("Error!") +} + +func (m *ErrDB) GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error) { + return nil, fmt.Errorf("Error!") +} + +func (m *ErrDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error { + return fmt.Errorf("Error!") +} + +func (m *ErrDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) { + return nil, nil, fmt.Errorf("Error!") +} + +func (m *ErrDB) PutTaskRunnerLocalState(allocID string, taskName string, val *state.LocalState) error { + return fmt.Errorf("Error!") +} + +func (m *ErrDB) PutTaskState(allocID string, taskName string, state *structs.TaskState) error { + return fmt.Errorf("Error!") +} + +func (m *ErrDB) DeleteTaskBucket(allocID, taskName string) error { + return fmt.Errorf("Error!") +} + +func (m *ErrDB) DeleteAllocationBucket(allocID string) error { + return fmt.Errorf("Error!") +} + +func (m *ErrDB) PutDevicePluginState(ps *dmstate.PluginState) error { + return fmt.Errorf("Error!") +} + +// GetDevicePluginState stores the device manager's plugin state or returns an +// error. +func (m *ErrDB) GetDevicePluginState() (*dmstate.PluginState, error) { + return nil, fmt.Errorf("Error!") +} + +func (m *ErrDB) GetDriverPluginState() (*driverstate.PluginState, error) { + return nil, fmt.Errorf("Error!") +} + +func (m *ErrDB) PutDriverPluginState(ps *driverstate.PluginState) error { + return fmt.Errorf("Error!") +} + +func (m *ErrDB) Close() error { + return fmt.Errorf("Error!") +} diff --git a/client/testing.go b/client/testing.go index 1a4ab5d13..0f23e7850 100644 --- a/client/testing.go +++ b/client/testing.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/nomad/client/config" consulApi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/fingerprint" + "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/plugins/shared/catalog" @@ -21,6 +22,13 @@ import ( // and removed in the returned cleanup function. If they are overridden in the // callback then the caller still must run the returned cleanup func. func TestClient(t testing.T, cb func(c *config.Config)) (*Client, func() error) { + return TestClientWithCustomStateDB(t, cb, nil) +} + +// TestClientWithCustomStateDB creates an in-memory client for testing purposes +// where the state DB factory can be overridden. It is used in tests that +// simulate state restore failures +func TestClientWithCustomStateDB(t testing.T, cb func(c *config.Config), stateDBFunc state.NewStateDBFunc) (*Client, func() error) { conf, cleanup := config.TestClientConfig(t) // Tighten the fingerprinter timeouts (must be done in client package @@ -46,7 +54,10 @@ func TestClient(t testing.T, cb func(c *config.Config)) (*Client, func() error) } catalog := consul.NewMockCatalog(logger) mockService := consulApi.NewMockConsulServiceClient(t, logger) - client, err := NewClient(conf, catalog, mockService) + if stateDBFunc == nil { + stateDBFunc = state.GetStateDBFactory(conf.DevMode) + } + client, err := NewClient(conf, catalog, mockService, stateDBFunc) if err != nil { cleanup() t.Fatalf("err: %v", err) diff --git a/command/agent/agent.go b/command/agent/agent.go index a936282ad..6c860c1f0 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -21,6 +21,7 @@ import ( uuidparse "github.com/hashicorp/go-uuid" "github.com/hashicorp/nomad/client" clientconfig "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad" @@ -730,8 +731,8 @@ func (a *Agent) setupClient() error { return err } } - - client, err := client.NewClient(conf, a.consulCatalog, a.consulService) + statedbFactory := state.GetStateDBFactory(conf.DevMode) + client, err := client.NewClient(conf, a.consulCatalog, a.consulService, statedbFactory) if err != nil { return fmt.Errorf("client setup failed: %v", err) }