From 6019915236af960dafe1059a30a50866ddd68117 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 28 Jun 2017 16:29:48 -0700 Subject: [PATCH] FailDeployment --- nomad/deploymentwatcher/deployment_watcher.go | 23 ++++ .../deploymentwatcher/deployments_watcher.go | 18 +++ .../deployments_watcher_test.go | 110 +++++++++++++----- nomad/structs/structs.go | 1 + 4 files changed, 124 insertions(+), 28 deletions(-) diff --git a/nomad/deploymentwatcher/deployment_watcher.go b/nomad/deploymentwatcher/deployment_watcher.go index 3112982a4..49bf46a3c 100644 --- a/nomad/deploymentwatcher/deployment_watcher.go +++ b/nomad/deploymentwatcher/deployment_watcher.go @@ -234,6 +234,29 @@ func (w *deploymentWatcher) PauseDeployment( return nil } +func (w *deploymentWatcher) FailDeployment( + req *structs.DeploymentSpecificRequest, + resp *structs.DeploymentUpdateResponse) error { + + // Determine the status we should transistion to and if we need to create an + // evaluation + status, desc := structs.DeploymentStatusFailed, structs.DeploymentStatusDescriptionFailedByUser + update, eval := w.getDeploymentStatusUpdate(status, desc), w.getEval() + + // Commit the change + i, err := w.upsertDeploymentStatusUpdate(update, eval, nil) + if err != nil { + return err + } + + // Build the response + resp.EvalID = eval.ID + resp.EvalCreateIndex = i + resp.DeploymentModifyIndex = i + w.setLatestEval(i) + return nil +} + // StopWatch stops watching the deployment. This should be called whenever a // deployment is completed or the watcher is no longer needed. func (w *deploymentWatcher) StopWatch() { diff --git a/nomad/deploymentwatcher/deployments_watcher.go b/nomad/deploymentwatcher/deployments_watcher.go index 82425e387..495a8ccbc 100644 --- a/nomad/deploymentwatcher/deployments_watcher.go +++ b/nomad/deploymentwatcher/deployments_watcher.go @@ -336,6 +336,24 @@ func (w *Watcher) PauseDeployment(req *structs.DeploymentPauseRequest, resp *str return watcher.PauseDeployment(req, resp) } +// FailDeployment is used to fail the deployment. +func (w *Watcher) FailDeployment(req *structs.DeploymentSpecificRequest, resp *structs.DeploymentUpdateResponse) error { + w.l.Lock() + defer w.l.Unlock() + + // Not enabled so no-op + if !w.enabled { + return nil + } + + watcher, ok := w.watchers[req.DeploymentID] + if !ok { + return fmt.Errorf("deployment %q not being watched for updates", req.DeploymentID) + } + + return watcher.FailDeployment(req, resp) +} + // createEvaluation commits the given evaluation to Raft but batches the commit // with other calls. func (w *Watcher) createEvaluation(eval *structs.Evaluation) (uint64, error) { diff --git a/nomad/deploymentwatcher/deployments_watcher_test.go b/nomad/deploymentwatcher/deployments_watcher_test.go index 2e1056c63..97443fbf4 100644 --- a/nomad/deploymentwatcher/deployments_watcher_test.go +++ b/nomad/deploymentwatcher/deployments_watcher_test.go @@ -14,11 +14,22 @@ import ( mocker "github.com/stretchr/testify/mock" ) +func testDeploymentWatcher(t *testing.T, qps float64, batchDur time.Duration) (*Watcher, *mockBackend) { + m := newMockBackend(t) + w := NewDeploymentsWatcher(testLogger(), qps, batchDur) + w.SetStateWatchers(m) + w.SetRaftEndpoints(m) + return w, m +} + +func defaultTestDeploymentWatcher(t *testing.T) (*Watcher, *mockBackend) { + return testDeploymentWatcher(t, LimitStateQueriesPerSecond, EvalBatchDuration) +} + // Tests that the watcher properly watches for deployments and reconciles them func TestWatcher_WatchDeployments(t *testing.T) { assert := assert.New(t) - m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) + w, m := defaultTestDeploymentWatcher(t) // Return no allocations or evals m.On("Allocations", mocker.Anything, mocker.Anything).Return(nil).Run(func(args mocker.Arguments) { @@ -99,8 +110,7 @@ func TestWatcher_WatchDeployments(t *testing.T) { // Tests that calls against an unknown deployment fail func TestWatcher_UnknownDeployment(t *testing.T) { assert := assert.New(t) - m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) + w, m := defaultTestDeploymentWatcher(t) w.SetEnabled(true) // Set up the calls for retrieving deployments @@ -140,13 +150,21 @@ func TestWatcher_UnknownDeployment(t *testing.T) { if assert.NotNil(err, "should have error for unknown deployment") { assert.Contains(err.Error(), "not being watched") } + + // Request failing against an unknown deployment + req4 := &structs.DeploymentSpecificRequest{ + DeploymentID: dID, + } + err = w.FailDeployment(req4, &resp) + if assert.NotNil(err, "should have error for unknown deployment") { + assert.Contains(err.Error(), "not being watched") + } } // Test setting an unknown allocation's health func TestWatcher_SetAllocHealth_Unknown(t *testing.T) { assert := assert.New(t) - m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) + w, m := defaultTestDeploymentWatcher(t) // Create a job, and a deployment j := mock.Job() @@ -194,8 +212,7 @@ func TestWatcher_SetAllocHealth_Unknown(t *testing.T) { // Test setting allocation health func TestWatcher_SetAllocHealth_Healthy(t *testing.T) { assert := assert.New(t) - m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) + w, m := defaultTestDeploymentWatcher(t) // Create a job, alloc, and a deployment j := mock.Job() @@ -244,8 +261,7 @@ func TestWatcher_SetAllocHealth_Healthy(t *testing.T) { // Test setting allocation unhealthy func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) { assert := assert.New(t) - m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) + w, m := defaultTestDeploymentWatcher(t) // Create a job, alloc, and a deployment j := mock.Job() @@ -301,8 +317,7 @@ func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) { // Test setting allocation unhealthy and that there should be a rollback func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { assert := assert.New(t) - m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) + w, m := defaultTestDeploymentWatcher(t) // Create a job, alloc, and a deployment j := mock.Job() @@ -370,8 +385,7 @@ func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { // Test promoting a deployment func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) { assert := assert.New(t) - m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) + w, m := defaultTestDeploymentWatcher(t) // Create a job, canary alloc, and a deployment j := mock.Job() @@ -429,8 +443,7 @@ func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) { // Test promoting a deployment with unhealthy canaries func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) { assert := assert.New(t) - m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) + w, m := defaultTestDeploymentWatcher(t) // Create a job, canary alloc, and a deployment j := mock.Job() @@ -488,8 +501,7 @@ func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) { // Test pausing a deployment that is running func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) { assert := assert.New(t) - m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) + w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment j := mock.Job() @@ -536,8 +548,7 @@ func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) { // Test pausing a deployment that is paused func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) { assert := assert.New(t) - m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) + w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment j := mock.Job() @@ -585,8 +596,7 @@ func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) { // Test unpausing a deployment that is paused func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) { assert := assert.New(t) - m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) + w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment j := mock.Job() @@ -635,8 +645,7 @@ func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) { // Test unpausing a deployment that is running func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) { assert := assert.New(t) - m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) + w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment j := mock.Job() @@ -681,12 +690,58 @@ func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) { m.AssertCalled(t, "UpsertDeploymentStatusUpdate", mocker.MatchedBy(matcher)) } +// Test failing a deployment that is running +func TestWatcher_FailDeployment_Running(t *testing.T) { + assert := assert.New(t) + w, m := defaultTestDeploymentWatcher(t) + + // Create a job and a deployment + j := mock.Job() + d := mock.Deployment() + d.JobID = j.ID + assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d, false), "UpsertDeployment") + + // Assert the following methods will be called + m.On("List", mocker.Anything, mocker.Anything).Return(nil).Run(m.listFromState) + m.On("Allocations", mocker.MatchedBy(matchDeploymentSpecificRequest(d.ID)), + mocker.Anything).Return(nil).Run(m.allocationsFromState) + m.On("Evaluations", mocker.MatchedBy(matchJobSpecificRequest(j.ID)), + mocker.Anything).Return(nil).Run(m.evaluationsFromState) + m.On("GetJob", mocker.MatchedBy(matchJobSpecificRequest(j.ID)), + mocker.Anything).Return(nil).Run(m.getJobFromState) + + w.SetEnabled(true) + testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, + func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + + // Assert that we get a call to UpsertDeploymentStatusUpdate + matchConfig := &matchDeploymentStatusUpdateConfig{ + DeploymentID: d.ID, + Status: structs.DeploymentStatusFailed, + StatusDescription: structs.DeploymentStatusDescriptionFailedByUser, + Eval: true, + } + matcher := matchDeploymentStatusUpdateRequest(matchConfig) + m.On("UpsertDeploymentStatusUpdate", mocker.MatchedBy(matcher)).Return(nil) + + // Call PauseDeployment + req := &structs.DeploymentSpecificRequest{ + DeploymentID: d.ID, + } + var resp structs.DeploymentUpdateResponse + err := w.FailDeployment(req, &resp) + assert.Nil(err, "FailDeployment") + + assert.Equal(1, len(w.watchers), "Deployment should still be active") + m.AssertCalled(t, "UpsertDeploymentStatusUpdate", mocker.MatchedBy(matcher)) +} + // Tests that the watcher properly watches for allocation changes and takes the // proper actions func TestDeploymentWatcher_Watch(t *testing.T) { assert := assert.New(t) - m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m, 1000.0, 1*time.Millisecond) + w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) // Create a job, alloc, and a deployment j := mock.Job() @@ -813,8 +868,7 @@ func TestDeploymentWatcher_Watch(t *testing.T) { // Test evaluations are batched between watchers func TestWatcher_BatchEvals(t *testing.T) { assert := assert.New(t) - m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m, 1000.0, EvalBatchDuration) + w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) // Create a job, alloc, for two deployments j1 := mock.Job() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 468341441..f4eb91ca0 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3855,6 +3855,7 @@ const ( DeploymentStatusDescriptionStoppedJob = "Cancelled because job is stopped" DeploymentStatusDescriptionNewerJob = "Cancelled due to newer version of job" DeploymentStatusDescriptionFailedAllocations = "Failed due to unhealthy allocations" + DeploymentStatusDescriptionFailedByUser = "Deployment marked as failed" ) // DeploymentStatusDescriptionRollback is used to get the status description of