mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 17:35:43 +03:00
multiregion: unblock and cancel RPCs
This commit is contained in:
@@ -207,6 +207,90 @@ func (d *Deployment) Promote(args *structs.DeploymentPromoteRequest, reply *stru
|
||||
return d.srv.deploymentWatcher.PromoteDeployment(args, reply)
|
||||
}
|
||||
|
||||
// Unblock is used to unblock a deployment
|
||||
func (d *Deployment) Unblock(args *structs.DeploymentUnblockRequest, reply *structs.DeploymentUpdateResponse) error {
|
||||
if done, err := d.srv.forward("Deployment.Unblock", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "deployment", "unblock"}, time.Now())
|
||||
|
||||
// Validate the arguments
|
||||
if args.DeploymentID == "" {
|
||||
return fmt.Errorf("missing deployment ID")
|
||||
}
|
||||
|
||||
// Lookup the deployment
|
||||
snap, err := d.srv.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
deploy, err := snap.DeploymentByID(ws, args.DeploymentID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if deploy == nil {
|
||||
return fmt.Errorf("deployment not found")
|
||||
}
|
||||
|
||||
// Check namespace submit-job permissions
|
||||
if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil {
|
||||
return err
|
||||
} else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
|
||||
if !deploy.Active() {
|
||||
return fmt.Errorf("can't unblock terminal deployment")
|
||||
}
|
||||
|
||||
// Call into the deployment watcher
|
||||
return d.srv.deploymentWatcher.UnblockDeployment(args, reply)
|
||||
}
|
||||
|
||||
// Cancel is used to cancel a deployment
|
||||
func (d *Deployment) Cancel(args *structs.DeploymentCancelRequest, reply *structs.DeploymentUpdateResponse) error {
|
||||
if done, err := d.srv.forward("Deployment.Cancel", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "deployment", "cancel"}, time.Now())
|
||||
|
||||
// Validate the arguments
|
||||
if args.DeploymentID == "" {
|
||||
return fmt.Errorf("missing deployment ID")
|
||||
}
|
||||
|
||||
// Lookup the deployment
|
||||
snap, err := d.srv.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
deploy, err := snap.DeploymentByID(ws, args.DeploymentID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if deploy == nil {
|
||||
return fmt.Errorf("deployment not found")
|
||||
}
|
||||
|
||||
// Check namespace submit-job permissions
|
||||
if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil {
|
||||
return err
|
||||
} else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
|
||||
if !deploy.Active() {
|
||||
return fmt.Errorf("can't cancel terminal deployment")
|
||||
}
|
||||
|
||||
// Call into the deployment watcher
|
||||
return d.srv.deploymentWatcher.CancelDeployment(args, reply)
|
||||
}
|
||||
|
||||
// SetAllocHealth is used to set the health of allocations that are part of the
|
||||
// deployment.
|
||||
func (d *Deployment) SetAllocHealth(args *structs.DeploymentAllocHealthRequest, reply *structs.DeploymentUpdateResponse) error {
|
||||
|
||||
@@ -622,6 +622,210 @@ func TestDeploymentEndpoint_Promote_ACL(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeploymentEndpoint_Cancel(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
defer cleanupS1()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
assert := assert.New(t)
|
||||
|
||||
// Create the deployment
|
||||
j := mock.Job()
|
||||
d := mock.Deployment()
|
||||
d.JobID = j.ID
|
||||
state := s1.fsm.State()
|
||||
|
||||
assert.Nil(state.UpsertJob(999, j), "UpsertJob")
|
||||
assert.Nil(state.UpsertDeployment(1000, d), "UpsertDeployment")
|
||||
|
||||
// Mark the deployment as failed
|
||||
req := &structs.DeploymentCancelRequest{
|
||||
DeploymentID: d.ID,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
|
||||
// Fetch the response
|
||||
var resp structs.DeploymentUpdateResponse
|
||||
assert.Nil(msgpackrpc.CallWithCodec(codec, "Deployment.Cancel", req, &resp), "RPC")
|
||||
assert.NotEqual(resp.Index, uint64(0), "bad response index")
|
||||
|
||||
// Lookup the deployment
|
||||
ws := memdb.NewWatchSet()
|
||||
dout, err := state.DeploymentByID(ws, d.ID)
|
||||
assert.NoError(err, "DeploymentByID failed")
|
||||
assert.Equal(structs.DeploymentStatusCancelled, dout.Status, "wrong status")
|
||||
assert.Equal(structs.DeploymentStatusDescriptionNewerJob, dout.StatusDescription, "wrong status description")
|
||||
assert.Equal(dout.ModifyIndex, resp.DeploymentModifyIndex, "wrong modify index")
|
||||
}
|
||||
|
||||
func TestDeploymentEndpoint_Cancel_ACL(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s1, _, cleanupS1 := TestACLServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
defer cleanupS1()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
assert := assert.New(t)
|
||||
|
||||
// Create the deployment
|
||||
j := mock.Job()
|
||||
d := mock.Deployment()
|
||||
d.JobID = j.ID
|
||||
state := s1.fsm.State()
|
||||
|
||||
assert.Nil(state.UpsertJob(999, j), "UpsertJob")
|
||||
assert.Nil(state.UpsertDeployment(1000, d), "UpsertDeployment")
|
||||
|
||||
// Create the namespace policy and tokens
|
||||
validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid",
|
||||
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilitySubmitJob}))
|
||||
invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid",
|
||||
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))
|
||||
|
||||
// Mark the deployment as cancelled
|
||||
req := &structs.DeploymentCancelRequest{
|
||||
DeploymentID: d.ID,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
|
||||
// Try with no token and expect permission denied
|
||||
{
|
||||
var resp structs.DeploymentUpdateResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Deployment.Cancel", req, &resp)
|
||||
assert.EqualError(err, structs.ErrPermissionDenied.Error())
|
||||
}
|
||||
// Try with an invalid token
|
||||
{
|
||||
req.AuthToken = invalidToken.SecretID
|
||||
var resp structs.DeploymentUpdateResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Deployment.Cancel", req, &resp)
|
||||
assert.EqualError(err, structs.ErrPermissionDenied.Error())
|
||||
}
|
||||
// Try with a valid token
|
||||
{
|
||||
req.AuthToken = validToken.SecretID
|
||||
var resp structs.DeploymentUpdateResponse
|
||||
assert.NoError(msgpackrpc.CallWithCodec(codec, "Deployment.Cancel", req, &resp), "RPC")
|
||||
assert.NotEqual(resp.Index, uint64(0), "bad response index")
|
||||
|
||||
// Lookup the deployment
|
||||
ws := memdb.NewWatchSet()
|
||||
dout, err := state.DeploymentByID(ws, d.ID)
|
||||
assert.NoError(err, "DeploymentByID failed")
|
||||
assert.Equal(structs.DeploymentStatusCancelled, dout.Status, "wrong status")
|
||||
assert.Equal(structs.DeploymentStatusDescriptionNewerJob, dout.StatusDescription, "wrong status description")
|
||||
assert.Equal(dout.ModifyIndex, resp.DeploymentModifyIndex, "wrong modify index")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeploymentEndpoint_Unblock(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
defer cleanupS1()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
assert := assert.New(t)
|
||||
|
||||
// Create the deployment
|
||||
j := mock.Job()
|
||||
d := mock.Deployment()
|
||||
d.JobID = j.ID
|
||||
state := s1.fsm.State()
|
||||
|
||||
assert.Nil(state.UpsertJob(999, j), "UpsertJob")
|
||||
assert.Nil(state.UpsertDeployment(1000, d), "UpsertDeployment")
|
||||
|
||||
// Mark the deployment as failed
|
||||
req := &structs.DeploymentUnblockRequest{
|
||||
DeploymentID: d.ID,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
|
||||
// Fetch the response
|
||||
var resp structs.DeploymentUpdateResponse
|
||||
assert.Nil(msgpackrpc.CallWithCodec(codec, "Deployment.Unblock", req, &resp), "RPC")
|
||||
assert.NotEqual(resp.Index, uint64(0), "bad response index")
|
||||
|
||||
// Lookup the deployment
|
||||
ws := memdb.NewWatchSet()
|
||||
dout, err := state.DeploymentByID(ws, d.ID)
|
||||
assert.Nil(err, "DeploymentByID failed")
|
||||
assert.Equal(structs.DeploymentStatusSuccessful, dout.Status, "wrong status")
|
||||
assert.Equal(structs.DeploymentStatusDescriptionSuccessful, dout.StatusDescription, "wrong status description")
|
||||
assert.Equal(dout.ModifyIndex, resp.DeploymentModifyIndex, "wrong modify index")
|
||||
}
|
||||
|
||||
func TestDeploymentEndpoint_Unblock_ACL(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s1, _, cleanupS1 := TestACLServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
defer cleanupS1()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
assert := assert.New(t)
|
||||
|
||||
// Create the deployment
|
||||
j := mock.Job()
|
||||
d := mock.Deployment()
|
||||
d.JobID = j.ID
|
||||
state := s1.fsm.State()
|
||||
|
||||
assert.Nil(state.UpsertJob(999, j), "UpsertJob")
|
||||
assert.Nil(state.UpsertDeployment(1000, d), "UpsertDeployment")
|
||||
|
||||
// Create the namespace policy and tokens
|
||||
validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid",
|
||||
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilitySubmitJob}))
|
||||
invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid",
|
||||
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))
|
||||
|
||||
// Mark the deployment as unblocked
|
||||
req := &structs.DeploymentUnblockRequest{
|
||||
DeploymentID: d.ID,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
|
||||
// Try with no token and expect permission denied
|
||||
{
|
||||
var resp structs.DeploymentUpdateResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Deployment.Unblock", req, &resp)
|
||||
assert.EqualError(err, structs.ErrPermissionDenied.Error())
|
||||
}
|
||||
// Try with an invalid token
|
||||
{
|
||||
req.AuthToken = invalidToken.SecretID
|
||||
var resp structs.DeploymentUpdateResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Deployment.Unblock", req, &resp)
|
||||
assert.EqualError(err, structs.ErrPermissionDenied.Error())
|
||||
}
|
||||
// Try with a valid token
|
||||
{
|
||||
req.AuthToken = validToken.SecretID
|
||||
var resp structs.DeploymentUpdateResponse
|
||||
assert.NoError(msgpackrpc.CallWithCodec(codec, "Deployment.Unblock", req, &resp), "RPC")
|
||||
assert.NotEqual(resp.Index, uint64(0), "bad response index")
|
||||
|
||||
// Lookup the deployment
|
||||
ws := memdb.NewWatchSet()
|
||||
dout, err := state.DeploymentByID(ws, d.ID)
|
||||
assert.NoError(err, "DeploymentByID failed")
|
||||
assert.Equal(structs.DeploymentStatusSuccessful, dout.Status, "wrong status")
|
||||
assert.Equal(structs.DeploymentStatusDescriptionSuccessful, dout.StatusDescription, "wrong status description")
|
||||
assert.Equal(dout.ModifyIndex, resp.DeploymentModifyIndex, "wrong modify index")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeploymentEndpoint_SetAllocHealth(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
||||
20
nomad/deploymentwatcher/deployment_watcher_oss.go
Normal file
20
nomad/deploymentwatcher/deployment_watcher_oss.go
Normal file
@@ -0,0 +1,20 @@
|
||||
// +build !ent
|
||||
|
||||
package deploymentwatcher
|
||||
|
||||
import "github.com/hashicorp/nomad/nomad/structs"
|
||||
|
||||
// TODO: move this into multiregion_oss.go once #269 is merged
|
||||
|
||||
// UnblockDeployment is used to unblock a multiregion deployment. In
|
||||
// single-region deployments, the blocked state is unused.
|
||||
func (w *deploymentWatcher) UnblockDeployment(req *structs.DeploymentUnblockRequest, resp *structs.DeploymentUpdateResponse) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// CancelDeployment is used to cancel a multiregion deployment. In
|
||||
// single-region deployments, the deploymentwatcher has sole responsibility to
|
||||
// cancel deployments so this RPC is never used.
|
||||
func (w *deploymentWatcher) CancelDeployment(req *structs.DeploymentCancelRequest, resp *structs.DeploymentUpdateResponse) error {
|
||||
return nil
|
||||
}
|
||||
@@ -363,6 +363,29 @@ func (w *Watcher) FailDeployment(req *structs.DeploymentFailRequest, resp *struc
|
||||
return watcher.FailDeployment(req, resp)
|
||||
}
|
||||
|
||||
// UnblockDeployment is used to unblock a multiregion deployment. In
|
||||
// single-region deployments, the blocked state is unused.
|
||||
func (w *Watcher) UnblockDeployment(req *structs.DeploymentUnblockRequest, resp *structs.DeploymentUpdateResponse) error {
|
||||
watcher, err := w.getOrCreateWatcher(req.DeploymentID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return watcher.UnblockDeployment(req, resp)
|
||||
}
|
||||
|
||||
// CancelDeployment is used to cancel a multiregion deployment. In
|
||||
// single-region deployments, the deploymentwatcher has sole responsibility to
|
||||
// cancel deployments so this RPC is never used.
|
||||
func (w *Watcher) CancelDeployment(req *structs.DeploymentCancelRequest, resp *structs.DeploymentUpdateResponse) error {
|
||||
watcher, err := w.getOrCreateWatcher(req.DeploymentID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return watcher.CancelDeployment(req, resp)
|
||||
}
|
||||
|
||||
// createUpdate commits the given allocation desired transition and evaluation
|
||||
// to Raft but batches the commit with other calls.
|
||||
func (w *Watcher) createUpdate(allocs map[string]*structs.DesiredTransition, eval *structs.Evaluation) (uint64, error) {
|
||||
|
||||
@@ -7899,6 +7899,7 @@ const (
|
||||
DeploymentStatusDescriptionFailedAllocations = "Failed due to unhealthy allocations"
|
||||
DeploymentStatusDescriptionProgressDeadline = "Failed due to progress deadline"
|
||||
DeploymentStatusDescriptionFailedByUser = "Deployment marked as failed"
|
||||
DeploymentStatusDescriptionBlocked = "Deployment is complete but waiting for peer region"
|
||||
)
|
||||
|
||||
// DeploymentStatusDescriptionRollback is used to get the status description of
|
||||
|
||||
@@ -210,10 +210,22 @@ func (a *allocReconciler) Compute() *reconcileResults {
|
||||
|
||||
// Mark the deployment as complete if possible
|
||||
if a.deployment != nil && complete {
|
||||
|
||||
var status string
|
||||
var desc string
|
||||
|
||||
if a.job.IsMultiregion() {
|
||||
status = structs.DeploymentStatusBlocked
|
||||
desc = structs.DeploymentStatusDescriptionBlocked
|
||||
} else {
|
||||
status = structs.DeploymentStatusSuccessful
|
||||
desc = structs.DeploymentStatusDescriptionSuccessful
|
||||
}
|
||||
|
||||
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
|
||||
DeploymentID: a.deployment.ID,
|
||||
Status: structs.DeploymentStatusSuccessful,
|
||||
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
|
||||
Status: status,
|
||||
StatusDescription: desc,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user