From fdfd9f50c20e1860cc0b93ab4330f936b57069d7 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 17 Jun 2020 11:02:26 -0400 Subject: [PATCH] multiregion: unblock and cancel RPCs --- nomad/deployment_endpoint.go | 84 ++++++++ nomad/deployment_endpoint_test.go | 204 ++++++++++++++++++ .../deployment_watcher_oss.go | 20 ++ .../deploymentwatcher/deployments_watcher.go | 23 ++ nomad/structs/structs.go | 1 + scheduler/reconcile.go | 16 +- 6 files changed, 346 insertions(+), 2 deletions(-) create mode 100644 nomad/deploymentwatcher/deployment_watcher_oss.go diff --git a/nomad/deployment_endpoint.go b/nomad/deployment_endpoint.go index 105290e88..6dae8c4ab 100644 --- a/nomad/deployment_endpoint.go +++ b/nomad/deployment_endpoint.go @@ -207,6 +207,90 @@ func (d *Deployment) Promote(args *structs.DeploymentPromoteRequest, reply *stru return d.srv.deploymentWatcher.PromoteDeployment(args, reply) } +// Unblock is used to unblock a deployment +func (d *Deployment) Unblock(args *structs.DeploymentUnblockRequest, reply *structs.DeploymentUpdateResponse) error { + if done, err := d.srv.forward("Deployment.Unblock", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "deployment", "unblock"}, time.Now()) + + // Validate the arguments + if args.DeploymentID == "" { + return fmt.Errorf("missing deployment ID") + } + + // Lookup the deployment + snap, err := d.srv.fsm.State().Snapshot() + if err != nil { + return err + } + + ws := memdb.NewWatchSet() + deploy, err := snap.DeploymentByID(ws, args.DeploymentID) + if err != nil { + return err + } + if deploy == nil { + return fmt.Errorf("deployment not found") + } + + // Check namespace submit-job permissions + if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) { + return structs.ErrPermissionDenied + } + + if !deploy.Active() { + return fmt.Errorf("can't unblock terminal deployment") + } + + // Call into the deployment watcher + return d.srv.deploymentWatcher.UnblockDeployment(args, reply) +} + +// Cancel is used to cancel a deployment +func (d *Deployment) Cancel(args *structs.DeploymentCancelRequest, reply *structs.DeploymentUpdateResponse) error { + if done, err := d.srv.forward("Deployment.Cancel", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "deployment", "cancel"}, time.Now()) + + // Validate the arguments + if args.DeploymentID == "" { + return fmt.Errorf("missing deployment ID") + } + + // Lookup the deployment + snap, err := d.srv.fsm.State().Snapshot() + if err != nil { + return err + } + + ws := memdb.NewWatchSet() + deploy, err := snap.DeploymentByID(ws, args.DeploymentID) + if err != nil { + return err + } + if deploy == nil { + return fmt.Errorf("deployment not found") + } + + // Check namespace submit-job permissions + if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) { + return structs.ErrPermissionDenied + } + + if !deploy.Active() { + return fmt.Errorf("can't cancel terminal deployment") + } + + // Call into the deployment watcher + return d.srv.deploymentWatcher.CancelDeployment(args, reply) +} + // SetAllocHealth is used to set the health of allocations that are part of the // deployment. func (d *Deployment) SetAllocHealth(args *structs.DeploymentAllocHealthRequest, reply *structs.DeploymentUpdateResponse) error { diff --git a/nomad/deployment_endpoint_test.go b/nomad/deployment_endpoint_test.go index 88b85620d..4afbfb37f 100644 --- a/nomad/deployment_endpoint_test.go +++ b/nomad/deployment_endpoint_test.go @@ -622,6 +622,210 @@ func TestDeploymentEndpoint_Promote_ACL(t *testing.T) { } } +func TestDeploymentEndpoint_Cancel(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + assert := assert.New(t) + + // Create the deployment + j := mock.Job() + d := mock.Deployment() + d.JobID = j.ID + state := s1.fsm.State() + + assert.Nil(state.UpsertJob(999, j), "UpsertJob") + assert.Nil(state.UpsertDeployment(1000, d), "UpsertDeployment") + + // Mark the deployment as failed + req := &structs.DeploymentCancelRequest{ + DeploymentID: d.ID, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.DeploymentUpdateResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Deployment.Cancel", req, &resp), "RPC") + assert.NotEqual(resp.Index, uint64(0), "bad response index") + + // Lookup the deployment + ws := memdb.NewWatchSet() + dout, err := state.DeploymentByID(ws, d.ID) + assert.NoError(err, "DeploymentByID failed") + assert.Equal(structs.DeploymentStatusCancelled, dout.Status, "wrong status") + assert.Equal(structs.DeploymentStatusDescriptionNewerJob, dout.StatusDescription, "wrong status description") + assert.Equal(dout.ModifyIndex, resp.DeploymentModifyIndex, "wrong modify index") +} + +func TestDeploymentEndpoint_Cancel_ACL(t *testing.T) { + t.Parallel() + + s1, _, cleanupS1 := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + assert := assert.New(t) + + // Create the deployment + j := mock.Job() + d := mock.Deployment() + d.JobID = j.ID + state := s1.fsm.State() + + assert.Nil(state.UpsertJob(999, j), "UpsertJob") + assert.Nil(state.UpsertDeployment(1000, d), "UpsertDeployment") + + // Create the namespace policy and tokens + validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilitySubmitJob})) + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) + + // Mark the deployment as cancelled + req := &structs.DeploymentCancelRequest{ + DeploymentID: d.ID, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Try with no token and expect permission denied + { + var resp structs.DeploymentUpdateResponse + err := msgpackrpc.CallWithCodec(codec, "Deployment.Cancel", req, &resp) + assert.EqualError(err, structs.ErrPermissionDenied.Error()) + } + // Try with an invalid token + { + req.AuthToken = invalidToken.SecretID + var resp structs.DeploymentUpdateResponse + err := msgpackrpc.CallWithCodec(codec, "Deployment.Cancel", req, &resp) + assert.EqualError(err, structs.ErrPermissionDenied.Error()) + } + // Try with a valid token + { + req.AuthToken = validToken.SecretID + var resp structs.DeploymentUpdateResponse + assert.NoError(msgpackrpc.CallWithCodec(codec, "Deployment.Cancel", req, &resp), "RPC") + assert.NotEqual(resp.Index, uint64(0), "bad response index") + + // Lookup the deployment + ws := memdb.NewWatchSet() + dout, err := state.DeploymentByID(ws, d.ID) + assert.NoError(err, "DeploymentByID failed") + assert.Equal(structs.DeploymentStatusCancelled, dout.Status, "wrong status") + assert.Equal(structs.DeploymentStatusDescriptionNewerJob, dout.StatusDescription, "wrong status description") + assert.Equal(dout.ModifyIndex, resp.DeploymentModifyIndex, "wrong modify index") + } +} + +func TestDeploymentEndpoint_Unblock(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + assert := assert.New(t) + + // Create the deployment + j := mock.Job() + d := mock.Deployment() + d.JobID = j.ID + state := s1.fsm.State() + + assert.Nil(state.UpsertJob(999, j), "UpsertJob") + assert.Nil(state.UpsertDeployment(1000, d), "UpsertDeployment") + + // Mark the deployment as failed + req := &structs.DeploymentUnblockRequest{ + DeploymentID: d.ID, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.DeploymentUpdateResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Deployment.Unblock", req, &resp), "RPC") + assert.NotEqual(resp.Index, uint64(0), "bad response index") + + // Lookup the deployment + ws := memdb.NewWatchSet() + dout, err := state.DeploymentByID(ws, d.ID) + assert.Nil(err, "DeploymentByID failed") + assert.Equal(structs.DeploymentStatusSuccessful, dout.Status, "wrong status") + assert.Equal(structs.DeploymentStatusDescriptionSuccessful, dout.StatusDescription, "wrong status description") + assert.Equal(dout.ModifyIndex, resp.DeploymentModifyIndex, "wrong modify index") +} + +func TestDeploymentEndpoint_Unblock_ACL(t *testing.T) { + t.Parallel() + + s1, _, cleanupS1 := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + assert := assert.New(t) + + // Create the deployment + j := mock.Job() + d := mock.Deployment() + d.JobID = j.ID + state := s1.fsm.State() + + assert.Nil(state.UpsertJob(999, j), "UpsertJob") + assert.Nil(state.UpsertDeployment(1000, d), "UpsertDeployment") + + // Create the namespace policy and tokens + validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilitySubmitJob})) + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) + + // Mark the deployment as unblocked + req := &structs.DeploymentUnblockRequest{ + DeploymentID: d.ID, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Try with no token and expect permission denied + { + var resp structs.DeploymentUpdateResponse + err := msgpackrpc.CallWithCodec(codec, "Deployment.Unblock", req, &resp) + assert.EqualError(err, structs.ErrPermissionDenied.Error()) + } + // Try with an invalid token + { + req.AuthToken = invalidToken.SecretID + var resp structs.DeploymentUpdateResponse + err := msgpackrpc.CallWithCodec(codec, "Deployment.Unblock", req, &resp) + assert.EqualError(err, structs.ErrPermissionDenied.Error()) + } + // Try with a valid token + { + req.AuthToken = validToken.SecretID + var resp structs.DeploymentUpdateResponse + assert.NoError(msgpackrpc.CallWithCodec(codec, "Deployment.Unblock", req, &resp), "RPC") + assert.NotEqual(resp.Index, uint64(0), "bad response index") + + // Lookup the deployment + ws := memdb.NewWatchSet() + dout, err := state.DeploymentByID(ws, d.ID) + assert.NoError(err, "DeploymentByID failed") + assert.Equal(structs.DeploymentStatusSuccessful, dout.Status, "wrong status") + assert.Equal(structs.DeploymentStatusDescriptionSuccessful, dout.StatusDescription, "wrong status description") + assert.Equal(dout.ModifyIndex, resp.DeploymentModifyIndex, "wrong modify index") + } +} + func TestDeploymentEndpoint_SetAllocHealth(t *testing.T) { t.Parallel() diff --git a/nomad/deploymentwatcher/deployment_watcher_oss.go b/nomad/deploymentwatcher/deployment_watcher_oss.go new file mode 100644 index 000000000..cf4d5f055 --- /dev/null +++ b/nomad/deploymentwatcher/deployment_watcher_oss.go @@ -0,0 +1,20 @@ +// +build !ent + +package deploymentwatcher + +import "github.com/hashicorp/nomad/nomad/structs" + +// TODO: move this into multiregion_oss.go once #269 is merged + +// UnblockDeployment is used to unblock a multiregion deployment. In +// single-region deployments, the blocked state is unused. +func (w *deploymentWatcher) UnblockDeployment(req *structs.DeploymentUnblockRequest, resp *structs.DeploymentUpdateResponse) error { + return nil +} + +// CancelDeployment is used to cancel a multiregion deployment. In +// single-region deployments, the deploymentwatcher has sole responsibility to +// cancel deployments so this RPC is never used. +func (w *deploymentWatcher) CancelDeployment(req *structs.DeploymentCancelRequest, resp *structs.DeploymentUpdateResponse) error { + return nil +} diff --git a/nomad/deploymentwatcher/deployments_watcher.go b/nomad/deploymentwatcher/deployments_watcher.go index 7cfbb94ab..5dc02b6d4 100644 --- a/nomad/deploymentwatcher/deployments_watcher.go +++ b/nomad/deploymentwatcher/deployments_watcher.go @@ -363,6 +363,29 @@ func (w *Watcher) FailDeployment(req *structs.DeploymentFailRequest, resp *struc return watcher.FailDeployment(req, resp) } +// UnblockDeployment is used to unblock a multiregion deployment. In +// single-region deployments, the blocked state is unused. +func (w *Watcher) UnblockDeployment(req *structs.DeploymentUnblockRequest, resp *structs.DeploymentUpdateResponse) error { + watcher, err := w.getOrCreateWatcher(req.DeploymentID) + if err != nil { + return err + } + + return watcher.UnblockDeployment(req, resp) +} + +// CancelDeployment is used to cancel a multiregion deployment. In +// single-region deployments, the deploymentwatcher has sole responsibility to +// cancel deployments so this RPC is never used. +func (w *Watcher) CancelDeployment(req *structs.DeploymentCancelRequest, resp *structs.DeploymentUpdateResponse) error { + watcher, err := w.getOrCreateWatcher(req.DeploymentID) + if err != nil { + return err + } + + return watcher.CancelDeployment(req, resp) +} + // createUpdate commits the given allocation desired transition and evaluation // to Raft but batches the commit with other calls. func (w *Watcher) createUpdate(allocs map[string]*structs.DesiredTransition, eval *structs.Evaluation) (uint64, error) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7892fbecf..a57f30a1b 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -7899,6 +7899,7 @@ const ( DeploymentStatusDescriptionFailedAllocations = "Failed due to unhealthy allocations" DeploymentStatusDescriptionProgressDeadline = "Failed due to progress deadline" DeploymentStatusDescriptionFailedByUser = "Deployment marked as failed" + DeploymentStatusDescriptionBlocked = "Deployment is complete but waiting for peer region" ) // DeploymentStatusDescriptionRollback is used to get the status description of diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index de40b1647..73cc5831c 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -210,10 +210,22 @@ func (a *allocReconciler) Compute() *reconcileResults { // Mark the deployment as complete if possible if a.deployment != nil && complete { + + var status string + var desc string + + if a.job.IsMultiregion() { + status = structs.DeploymentStatusBlocked + desc = structs.DeploymentStatusDescriptionBlocked + } else { + status = structs.DeploymentStatusSuccessful + desc = structs.DeploymentStatusDescriptionSuccessful + } + a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{ DeploymentID: a.deployment.ID, - Status: structs.DeploymentStatusSuccessful, - StatusDescription: structs.DeploymentStatusDescriptionSuccessful, + Status: status, + StatusDescription: desc, }) }