From d50fb2a00e5d1823829b174736f615f3700a72b4 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 17 Dec 2020 15:21:46 -0800 Subject: [PATCH] core: propagate remote task handles Add a new driver capability: RemoteTasks. When a task is run by a driver with RemoteTasks set, its TaskHandle will be propagated to the server in its allocation's TaskState. If the task is replaced due to a down node or draining, its TaskHandle will be propagated to its replacement allocation. This allows tasks to be scheduled in remote systems whose lifecycles are disconnected from the Nomad node's lifecycle. See https://github.com/hashicorp/nomad-driver-ecs for an example ECS remote task driver. --- api/tasks.go | 9 + .../allocrunner/taskrunner/driver_handle.go | 25 +- .../allocrunner/taskrunner/remotetask_hook.go | 124 +++++ client/allocrunner/taskrunner/service_hook.go | 1 + client/allocrunner/taskrunner/task_runner.go | 15 +- .../taskrunner/task_runner_hooks.go | 6 + client/client.go | 1 - e2e/e2e_test.go | 1 + e2e/remotetasks/input/ecs.nomad | 43 ++ e2e/remotetasks/remotetasks.go | 433 ++++++++++++++++ e2e/terraform/.terraform.lock.hcl | 157 +++--- .../dev-cluster/nomad/client-linux/client.hcl | 8 + .../nomad/client-linux/indexed/client-0.hcl | 8 + .../nomad/client-linux/indexed/client-1.hcl | 8 + e2e/terraform/ecs-task.json | 21 + e2e/terraform/ecs.tf | 29 ++ .../packer/ubuntu-bionic-amd64/setup.sh | 7 + nomad/structs/structs.go | 44 +- plugins/drivers/client.go | 1 + plugins/drivers/driver.go | 13 + plugins/drivers/proto/driver.pb.go | 485 +++++++++--------- plugins/drivers/proto/driver.proto | 4 + plugins/drivers/server.go | 1 + plugins/drivers/task_handle.go | 32 ++ scheduler/generic_sched.go | 42 ++ scheduler/generic_sched_test.go | 210 ++++++++ scheduler/reconcile.go | 44 +- scheduler/reconcile_util.go | 6 + 28 files changed, 1431 insertions(+), 347 deletions(-) create mode 100644 client/allocrunner/taskrunner/remotetask_hook.go create mode 100644 e2e/remotetasks/input/ecs.nomad create mode 100644 e2e/remotetasks/remotetasks.go create mode 100644 e2e/terraform/ecs-task.json create mode 100644 e2e/terraform/ecs.tf diff --git a/api/tasks.go b/api/tasks.go index 29b684819..9c59515c1 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -915,6 +915,15 @@ type TaskState struct { StartedAt time.Time FinishedAt time.Time Events []*TaskEvent + + // Experimental - TaskHandle is based on drivers.TaskHandle and used + // by remote task drivers to migrate task handles between allocations. + TaskHandle *TaskHandle +} + +type TaskHandle struct { + Version int + DriverState []byte } const ( diff --git a/client/allocrunner/taskrunner/driver_handle.go b/client/allocrunner/taskrunner/driver_handle.go index 4f04a6605..36427f6f2 100644 --- a/client/allocrunner/taskrunner/driver_handle.go +++ b/client/allocrunner/taskrunner/driver_handle.go @@ -13,20 +13,22 @@ import ( // NewDriverHandle returns a handle for task operations on a specific task func NewDriverHandle(driver drivers.DriverPlugin, taskID string, task *structs.Task, net *drivers.DriverNetwork) *DriverHandle { return &DriverHandle{ - driver: driver, - net: net, - taskID: taskID, - task: task, + driver: driver, + net: net, + taskID: taskID, + killSignal: task.KillSignal, + killTimeout: task.KillTimeout, } } // DriverHandle encapsulates a driver plugin client and task identifier and exposes // an api to perform driver operations on the task type DriverHandle struct { - driver drivers.DriverPlugin - net *drivers.DriverNetwork - task *structs.Task - taskID string + driver drivers.DriverPlugin + net *drivers.DriverNetwork + taskID string + killSignal string + killTimeout time.Duration } func (h *DriverHandle) ID() string { @@ -37,12 +39,13 @@ func (h *DriverHandle) WaitCh(ctx context.Context) (<-chan *drivers.ExitResult, return h.driver.WaitTask(ctx, h.taskID) } -func (h *DriverHandle) Update(task *structs.Task) error { - return nil +// SetKillSignal allows overriding the signal sent to kill the task. +func (h *DriverHandle) SetKillSignal(signal string) { + h.killSignal = signal } func (h *DriverHandle) Kill() error { - return h.driver.StopTask(h.taskID, h.task.KillTimeout, h.task.KillSignal) + return h.driver.StopTask(h.taskID, h.killTimeout, h.killSignal) } func (h *DriverHandle) Stats(ctx context.Context, interval time.Duration) (<-chan *cstructs.TaskResourceUsage, error) { diff --git a/client/allocrunner/taskrunner/remotetask_hook.go b/client/allocrunner/taskrunner/remotetask_hook.go new file mode 100644 index 000000000..2068b52d9 --- /dev/null +++ b/client/allocrunner/taskrunner/remotetask_hook.go @@ -0,0 +1,124 @@ +package taskrunner + +import ( + "context" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/drivers" +) + +var _ interfaces.TaskPrestartHook = (*remoteTaskHook)(nil) +var _ interfaces.TaskPreKillHook = (*remoteTaskHook)(nil) + +// remoteTaskHook reattaches to remotely executing tasks. +type remoteTaskHook struct { + tr *TaskRunner + + logger hclog.Logger +} + +func newRemoteTaskHook(tr *TaskRunner, logger hclog.Logger) interfaces.TaskHook { + h := &remoteTaskHook{ + tr: tr, + } + h.logger = logger.Named(h.Name()) + return h +} + +func (h *remoteTaskHook) Name() string { + return "remote_task" +} + +// Prestart performs 2 remote task driver related tasks: +// 1. If there is no local handle, see if there is a handle propagated from a +// previous alloc to be restored. +// 2. If the alloc is lost make sure the task signal is set to detach instead +// of kill. +func (h *remoteTaskHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { + if h.tr.getDriverHandle() != nil { + // Driver handle already exists so don't try to load remote + // task handle + return nil + } + + h.tr.stateLock.Lock() + th := drivers.NewTaskHandleFromState(h.tr.state) + h.tr.stateLock.Unlock() + + // Task handle will be nil if there was no previous allocation or if + // this is a destructive update + if th == nil { + resp.Done = true + return nil + } + + // The task config is unique per invocation so recreate it here + th.Config = h.tr.buildTaskConfig() + + if err := h.tr.driver.RecoverTask(th); err != nil { + // Soft error here to let a new instance get started instead of + // failing the task since retrying is unlikely to help. + h.logger.Error("error recovering task state", "error", err) + return nil + } + + taskInfo, err := h.tr.driver.InspectTask(th.Config.ID) + if err != nil { + // Soft error here to let a new instance get started instead of + // failing the task since retrying is unlikely to help. + h.logger.Error("error inspecting recovered task state", "error", err) + return nil + } + + h.tr.setDriverHandle(NewDriverHandle(h.tr.driver, th.Config.ID, h.tr.Task(), taskInfo.NetworkOverride)) + + h.tr.stateLock.Lock() + h.tr.localState.TaskHandle = th + h.tr.localState.DriverNetwork = taskInfo.NetworkOverride + h.tr.stateLock.Unlock() + + // Ensure the signal is set according to the allocation's state + h.setSignal(h.tr.Alloc()) + + // Emit TaskStarted manually since the normal task runner logic will + // treat this task like a restored task and skip emitting started. + h.tr.UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) + + return nil +} + +// PreKilling tells the remote task driver to detach a remote task instead of +// stopping it. +func (h *remoteTaskHook) PreKilling(ctx context.Context, req *interfaces.TaskPreKillRequest, resp *interfaces.TaskPreKillResponse) error { + alloc := h.tr.Alloc() + h.setSignal(alloc) + return nil +} + +// setSignal to detach if the allocation is lost or draining. Safe to call +// multiple times as it only transitions to using detach -- never back to kill. +func (h *remoteTaskHook) setSignal(alloc *structs.Allocation) { + driverHandle := h.tr.getDriverHandle() + if driverHandle == nil { + // Nothing to do exit early + return + } + + switch { + case alloc.ClientStatus == structs.AllocClientStatusLost: + // Continue on; lost allocs should just detach + h.logger.Debug("detaching from remote task since alloc was lost") + case alloc.DesiredTransition.ShouldMigrate(): + // Continue on; migrating allocs should just detach + h.logger.Debug("detaching from remote task since alloc was drained") + default: + // Nothing to do exit early + return + } + + // Set DetachSignal to indicate to the remote task driver that it + // should detach this remote task and ignore it. + driverHandle.SetKillSignal(drivers.DetachSignal) +} diff --git a/client/allocrunner/taskrunner/service_hook.go b/client/allocrunner/taskrunner/service_hook.go index 11883046f..1cf2b5f65 100644 --- a/client/allocrunner/taskrunner/service_hook.go +++ b/client/allocrunner/taskrunner/service_hook.go @@ -19,6 +19,7 @@ var _ interfaces.TaskPoststartHook = &serviceHook{} var _ interfaces.TaskPreKillHook = &serviceHook{} var _ interfaces.TaskExitedHook = &serviceHook{} var _ interfaces.TaskStopHook = &serviceHook{} +var _ interfaces.TaskUpdateHook = &serviceHook{} type serviceHookConfig struct { alloc *structs.Allocation diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 593caa702..558ecad00 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -377,7 +377,8 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { return nil, err } - // Initialize the runners hooks. + // Initialize the runners hooks. Must come after initDriver so hooks + // can use tr.driverCapabilities tr.initHooks() // Initialize base labels @@ -496,6 +497,7 @@ func (tr *TaskRunner) Run() { tr.logger.Info("task failed to restore; waiting to contact server before restarting") select { case <-tr.killCtx.Done(): + tr.logger.Info("task killed while waiting for server contact") case <-tr.shutdownCtx.Done(): return case <-tr.serversContactedCh: @@ -637,11 +639,12 @@ MAIN: } func (tr *TaskRunner) shouldShutdown() bool { - if tr.alloc.ClientTerminalStatus() { + alloc := tr.Alloc() + if alloc.ClientTerminalStatus() { return true } - if !tr.IsPoststopTask() && tr.alloc.ServerTerminalStatus() { + if !tr.IsPoststopTask() && alloc.ServerTerminalStatus() { return true } @@ -1142,6 +1145,12 @@ func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) { tr.logger.Error("error persisting task state", "error", err, "event", event, "state", state) } + // Store task handle for remote tasks + if tr.driverCapabilities != nil && tr.driverCapabilities.RemoteTasks { + tr.logger.Trace("storing remote task handle state") + tr.localState.TaskHandle.Store(tr.state) + } + // Notify the alloc runner of the transition tr.stateUpdater.TaskStateUpdated() } diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 213a7245b..c9ff34441 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -152,6 +152,12 @@ func (tr *TaskRunner) initHooks() { consul: tr.consulServiceClient, logger: hookLogger, })) + + // If this task driver has remote capabilities, add the remote task + // hook. + if tr.driverCapabilities.RemoteTasks { + tr.runnerHooks = append(tr.runnerHooks, newRemoteTaskHook(tr, hookLogger)) + } } func (tr *TaskRunner) emitHookError(err error, hookName string) { diff --git a/client/client.go b/client/client.go index 8208331d9..b334bae2a 100644 --- a/client/client.go +++ b/client/client.go @@ -2239,7 +2239,6 @@ func (c *Client) runAllocs(update *allocUpdates) { // Update the existing allocations for _, update := range diff.updated { - c.logger.Trace("updating alloc", "alloc_id", update.ID, "index", update.AllocModifyIndex) c.updateAlloc(update) } diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index b3326d895..8b1e89024 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -32,6 +32,7 @@ import ( _ "github.com/hashicorp/nomad/e2e/periodic" _ "github.com/hashicorp/nomad/e2e/podman" _ "github.com/hashicorp/nomad/e2e/quotas" + _ "github.com/hashicorp/nomad/e2e/remotetasks" _ "github.com/hashicorp/nomad/e2e/rescheduling" _ "github.com/hashicorp/nomad/e2e/scaling" _ "github.com/hashicorp/nomad/e2e/scalingpolicies" diff --git a/e2e/remotetasks/input/ecs.nomad b/e2e/remotetasks/input/ecs.nomad new file mode 100644 index 000000000..d2a766ca6 --- /dev/null +++ b/e2e/remotetasks/input/ecs.nomad @@ -0,0 +1,43 @@ +variable "subnets" { + type = list(string) + description = "Subnet IDs task will run in." +} + +variable "security_groups" { + type = list(string) + description = "Security Group IDs task will run in." +} + +job "nomad-ecs-e2e" { + datacenters = ["dc1"] + + group "ecs-remote-task-e2e" { + restart { + attempts = 0 + mode = "fail" + } + + reschedule { + delay = "5s" + } + + task "http-server" { + driver = "ecs" + kill_timeout = "1m" // increased from default to accomodate ECS. + + config { + task { + launch_type = "FARGATE" + task_definition = "nomad-rtd-e2e" + network_configuration { + aws_vpc_configuration { + assign_public_ip = "ENABLED" + security_groups = var.security_groups + subnets = var.subnets + } + } + } + } + } + } +} diff --git a/e2e/remotetasks/remotetasks.go b/e2e/remotetasks/remotetasks.go new file mode 100644 index 000000000..43b8e7b40 --- /dev/null +++ b/e2e/remotetasks/remotetasks.go @@ -0,0 +1,433 @@ +package remotetasks + +import ( + "fmt" + "os" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/ecs" + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/e2e/e2eutil" + "github.com/hashicorp/nomad/e2e/framework" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/jobspec2" + "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + ecsTaskStatusDeactivating = "DEACTIVATING" + ecsTaskStatusStopping = "STOPPING" + ecsTaskStatusDeprovisioning = "DEPROVISIONING" + ecsTaskStatusStopped = "STOPPED" + ecsTaskStatusRunning = "RUNNING" +) + +type RemoteTasksTest struct { + framework.TC + jobIDs []string +} + +func init() { + framework.AddSuites(&framework.TestSuite{ + Component: "RemoteTasks", + CanRunLocal: true, + Cases: []framework.TestCase{ + new(RemoteTasksTest), + }, + }) +} + +func (tc *RemoteTasksTest) BeforeAll(f *framework.F) { + e2eutil.WaitForLeader(f.T(), tc.Nomad()) + e2eutil.WaitForNodesReady(f.T(), tc.Nomad(), 2) +} + +func (tc *RemoteTasksTest) AfterEach(f *framework.F) { + nomadClient := tc.Nomad() + + // Mark all nodes eligible + nodesAPI := tc.Nomad().Nodes() + nodes, _, _ := nodesAPI.List(nil) + for _, node := range nodes { + nodesAPI.ToggleEligibility(node.ID, true, nil) + } + + jobs := nomadClient.Jobs() + // Stop all jobs in test + for _, id := range tc.jobIDs { + jobs.Deregister(id, true, nil) + } + tc.jobIDs = []string{} + + // Garbage collect + nomadClient.System().GarbageCollect() +} + +// TestECSJob asserts an ECS job may be started and is cleaned up when stopped. +func (tc *RemoteTasksTest) TestECSJob(f *framework.F) { + t := f.T() + + ecsClient := ecsOrSkip(t, tc.Nomad()) + + jobID := "ecsjob-" + uuid.Generate()[0:8] + tc.jobIDs = append(tc.jobIDs, jobID) + _, allocs := registerECSJobs(t, tc.Nomad(), jobID) + require.Len(t, allocs, 1) + allocID := allocs[0].ID + e2eutil.WaitForAllocsRunning(t, tc.Nomad(), []string{allocID}) + + // We need to go from Allocation -> ECS ARN, so grab the updated + // allocation's task state. + arn := arnForAlloc(t, tc.Nomad().Allocations(), allocID) + + // Use ARN to lookup status of ECS task in AWS + ensureECSRunning(t, ecsClient, arn) + + t.Logf("Task %s is running!", arn) + + // Stop the job + e2eutil.WaitForJobStopped(t, tc.Nomad(), jobID) + + // Ensure it is stopped in ECS + input := ecs.DescribeTasksInput{ + Cluster: aws.String("nomad-rtd-e2e"), + Tasks: []*string{aws.String(arn)}, + } + testutil.WaitForResult(func() (bool, error) { + resp, err := ecsClient.DescribeTasks(&input) + if err != nil { + return false, err + } + status := *resp.Tasks[0].LastStatus + return status == ecsTaskStatusStopped, fmt.Errorf("ecs task is not stopped: %s", status) + }, func(err error) { + t.Fatalf("error retrieving ecs task status: %v", err) + }) +} + +// TestECSDrain asserts an ECS job may be started, drained from one node, and +// is managed by a new node without stopping and restarting the remote task. +func (tc *RemoteTasksTest) TestECSDrain(f *framework.F) { + t := f.T() + + ecsClient := ecsOrSkip(t, tc.Nomad()) + + jobID := "ecsjob-" + uuid.Generate()[0:8] + tc.jobIDs = append(tc.jobIDs, jobID) + _, allocs := registerECSJobs(t, tc.Nomad(), jobID) + require.Len(t, allocs, 1) + origNode := allocs[0].NodeID + origAlloc := allocs[0].ID + e2eutil.WaitForAllocsRunning(t, tc.Nomad(), []string{origAlloc}) + + arn := arnForAlloc(t, tc.Nomad().Allocations(), origAlloc) + ensureECSRunning(t, ecsClient, arn) + + t.Logf("Task %s is running! Now to drain the node.", arn) + + // Drain the node + _, err := tc.Nomad().Nodes().UpdateDrain( + origNode, + &api.DrainSpec{Deadline: 30 * time.Second}, + false, + nil, + ) + require.NoError(t, err, "error draining original node") + + // Wait for new alloc to be running + var newAlloc *api.AllocationListStub + qopts := &api.QueryOptions{} + testutil.WaitForResult(func() (bool, error) { + allocs, resp, err := tc.Nomad().Jobs().Allocations(jobID, false, qopts) + if err != nil { + return false, fmt.Errorf("error retrieving allocations for job: %w", err) + } + + qopts.WaitIndex = resp.LastIndex + + if len(allocs) > 2 { + return false, fmt.Errorf("expected 1 or 2 allocs but found %d", len(allocs)) + } + + for _, alloc := range allocs { + if alloc.ID == origAlloc { + // This is the old alloc, skip it + continue + } + + newAlloc = alloc + + if newAlloc.ClientStatus == "running" { + break + } + } + + if newAlloc == nil { + return false, fmt.Errorf("no new alloc found") + } + if newAlloc.ClientStatus != "running" { + return false, fmt.Errorf("expected new alloc (%s) to be running but found: %s", + newAlloc.ID, newAlloc.ClientStatus) + } + + return true, nil + }, func(err error) { + t.Fatalf("error waiting for new alloc to be running: %v", err) + }) + + // Make sure the ARN hasn't changed by looking up the new alloc's ARN + newARN := arnForAlloc(t, tc.Nomad().Allocations(), newAlloc.ID) + + assert.Equal(t, arn, newARN, "unexpected new ARN") +} + +// TestECSDeployment asserts a new ECS task is started when an ECS job is +// deployed. +func (tc *RemoteTasksTest) TestECSDeployment(f *framework.F) { + t := f.T() + + ecsClient := ecsOrSkip(t, tc.Nomad()) + + jobID := "ecsjob-" + uuid.Generate()[0:8] + tc.jobIDs = append(tc.jobIDs, jobID) + job, origAllocs := registerECSJobs(t, tc.Nomad(), jobID) + require.Len(t, origAllocs, 1) + origAllocID := origAllocs[0].ID + e2eutil.WaitForAllocsRunning(t, tc.Nomad(), []string{origAllocID}) + + // We need to go from Allocation -> ECS ARN, so grab the updated + // allocation's task state. + origARN := arnForAlloc(t, tc.Nomad().Allocations(), origAllocID) + + // Use ARN to lookup status of ECS task in AWS + ensureECSRunning(t, ecsClient, origARN) + + t.Logf("Task %s is running! Updating...", origARN) + + // Force a deployment by updating meta + job.Meta = map[string]string{ + "updated": time.Now().Format(time.RFC3339Nano), + } + + // Register updated job + resp, _, err := tc.Nomad().Jobs().Register(job, nil) + require.NoError(t, err, "error registering updated job") + require.NotEmpty(t, resp.EvalID, "no eval id created when registering updated job") + + // Wait for new alloc to be running + var newAlloc *api.AllocationListStub + testutil.WaitForResult(func() (bool, error) { + allocs, _, err := tc.Nomad().Jobs().Allocations(jobID, false, nil) + if err != nil { + return false, err + } + + for _, a := range allocs { + if a.ID == origAllocID { + if a.ClientStatus == "complete" { + // Original alloc stopped as expected! + continue + } + + // Original alloc is still running + newAlloc = nil + return false, fmt.Errorf("original alloc not yet terminal. "+ + "client status: %s; desired status: %s", + a.ClientStatus, a.DesiredStatus) + } + + if a.ClientStatus != "running" { + return false, fmt.Errorf("new alloc is not running: %s", a.ClientStatus) + } + + if newAlloc != nil { + return false, fmt.Errorf("found 2 replacement allocs: %s and %s", + a.ID, newAlloc.ID) + } + + newAlloc = a + } + + return newAlloc != nil, fmt.Errorf("no new alloc found for updated job") + }, func(err error) { + require.NoError(t, err, "error waiting for updated alloc") + }) + + newARN := arnForAlloc(t, tc.Nomad().Allocations(), newAlloc.ID) + t.Logf("Task %s is updated!", newARN) + require.NotEqual(t, origARN, newARN, "expected new ARN") + + // Ensure original ARN is stopped in ECS + input := ecs.DescribeTasksInput{ + Cluster: aws.String("nomad-rtd-e2e"), + Tasks: []*string{aws.String(origARN)}, + } + testutil.WaitForResult(func() (bool, error) { + resp, err := ecsClient.DescribeTasks(&input) + if err != nil { + return false, err + } + status := *resp.Tasks[0].LastStatus + return status == ecsTaskStatusStopped, fmt.Errorf("original ecs task is not stopped: %s", status) + }, func(err error) { + t.Fatalf("error retrieving ecs task status for original ARN: %v", err) + }) +} + +// ecsOrSkip returns an AWS ECS client or skips the test if ECS is unreachable +// by the test runner or the ECS remote task driver isn't healthy. +func ecsOrSkip(t *testing.T, nomadClient *api.Client) *ecs.ECS { + awsSession := session.Must(session.NewSession()) + + ecsClient := ecs.New(awsSession, aws.NewConfig().WithRegion("us-east-1")) + + _, err := ecsClient.ListClusters(&ecs.ListClustersInput{}) + if err != nil { + t.Skipf("Skipping ECS Remote Task Driver Task. Error querying AWS ECS API: %v", err) + } + + testutil.WaitForResult(func() (bool, error) { + nodes, _, err := nomadClient.Nodes().List(nil) + if err != nil { + return false, fmt.Errorf("error retrieving node listing: %w", err) + } + + notReady := 0 + notEligible := 0 + noECS := 0 + notHealthy := 0 + ready := 0 + for _, n := range nodes { + if n.Status != "ready" { + notReady++ + continue + } + if n.SchedulingEligibility != "eligible" { + notEligible++ + continue + } + ecsDriver, ok := n.Drivers["ecs"] + if !ok { + noECS++ + continue + } + if !ecsDriver.Healthy { + notHealthy++ + continue + } + ready++ + } + + return ready > 1, fmt.Errorf("expected 2 nodes with healthy ecs drivers but found: "+ + "not_ready=%d ineligible=%d no_driver=%d unhealthy=%d ok=%d", + notReady, notEligible, noECS, notHealthy, ready) + }, func(err error) { + if err != nil { + t.Skipf("Skipping Remote Task Driver tests due to: %v", err) + } + }) + + return ecsClient +} + +// arnForAlloc retrieves the ARN for a running allocation. +func arnForAlloc(t *testing.T, allocAPI *api.Allocations, allocID string) string { + t.Logf("Retrieving ARN for alloc=%s", allocID) + ecsState := struct { + ARN string + }{} + testutil.WaitForResult(func() (bool, error) { + alloc, _, err := allocAPI.Info(allocID, nil) + if err != nil { + return false, err + } + state := alloc.TaskStates["http-server"] + if state == nil { + return false, fmt.Errorf("no task state for http-server (%d task states)", len(alloc.TaskStates)) + } + if state.TaskHandle == nil { + return false, fmt.Errorf("no task handle for http-server") + } + if len(state.TaskHandle.DriverState) == 0 { + return false, fmt.Errorf("no driver state for task handle") + } + if err := base.MsgPackDecode(state.TaskHandle.DriverState, &ecsState); err != nil { + return false, fmt.Errorf("error decoding driver state: %w", err) + } + if ecsState.ARN == "" { + return false, fmt.Errorf("ARN is empty despite DriverState being %d bytes", len(state.TaskHandle.DriverState)) + } + return true, nil + }, func(err error) { + t.Fatalf("error getting ARN: %v", err) + }) + t.Logf("Retrieved ARN=%s for alloc=%s", ecsState.ARN, allocID) + + return ecsState.ARN +} + +// ensureECSRunning asserts that the given ARN is a running ECS task. +func ensureECSRunning(t *testing.T, ecsClient *ecs.ECS, arn string) { + t.Logf("Ensuring ARN=%s is running", arn) + input := ecs.DescribeTasksInput{ + Cluster: aws.String("nomad-rtd-e2e"), + Tasks: []*string{aws.String(arn)}, + } + testutil.WaitForResult(func() (bool, error) { + resp, err := ecsClient.DescribeTasks(&input) + if err != nil { + return false, err + } + status := *resp.Tasks[0].LastStatus + return status == ecsTaskStatusRunning, fmt.Errorf("ecs task is not running: %s", status) + }, func(err error) { + t.Fatalf("error retrieving ecs task status: %v", err) + }) + t.Logf("ARN=%s is running", arn) +} + +// registerECSJobs registers an ECS job and returns it and its allocation +// stubs. +func registerECSJobs(t *testing.T, nomadClient *api.Client, jobID string) (*api.Job, []*api.AllocationListStub) { + const ( + jobPath = "remotetasks/input/ecs.nomad" + varPath = "remotetasks/input/ecs.vars" + ) + + jobBytes, err := os.ReadFile(jobPath) + require.NoError(t, err, "error reading job file") + + job, err := jobspec2.ParseWithConfig(&jobspec2.ParseConfig{ + Path: jobPath, + Body: jobBytes, + VarFiles: []string{varPath}, + Strict: true, + }) + require.NoErrorf(t, err, "error parsing jobspec from %s with var file %s", jobPath, varPath) + + job.ID = &jobID + job.Name = &jobID + + // Register job + resp, _, err := nomadClient.Jobs().Register(job, nil) + require.NoError(t, err, "error registering job") + require.NotEmpty(t, resp.EvalID, "no eval id created when registering job") + + var allocs []*api.AllocationListStub + testutil.WaitForResult(func() (bool, error) { + allocs, _, err = nomadClient.Jobs().Allocations(jobID, false, nil) + if err != nil { + return false, err + } + return len(allocs) > 0, fmt.Errorf("no allocs found") + }, func(err error) { + require.NoErrorf(t, err, "error retrieving allocations for %s", jobID) + }) + return job, allocs +} diff --git a/e2e/terraform/.terraform.lock.hcl b/e2e/terraform/.terraform.lock.hcl index 6065a46f1..de1ba3a1c 100644 --- a/e2e/terraform/.terraform.lock.hcl +++ b/e2e/terraform/.terraform.lock.hcl @@ -2,99 +2,98 @@ # Manual edits may be lost in future updates. provider "registry.terraform.io/hashicorp/aws" { - version = "3.22.0" + version = "3.37.0" hashes = [ - "h1:8aWXjFcmEi64P0TMHOCQXWws+/SmvJQrNvHlzdktKOM=", - "h1:f/Tz8zv1Zb78ZaiyJkQ0MGIViZwbYrLuQk3kojPM91c=", - "zh:4a9a66caf1964cdd3b61fb3ebb0da417195a5529cb8e496f266b0778335d11c8", - "zh:514f2f006ae68db715d86781673faf9483292deab235c7402ff306e0e92ea11a", - "zh:5277b61109fddb9011728f6650ef01a639a0590aeffe34ed7de7ba10d0c31803", - "zh:67784dc8c8375ab37103eea1258c3334ee92be6de033c2b37e3a2a65d0005142", - "zh:76d4c8be2ca4a3294fb51fb58de1fe03361d3bc403820270cc8e71a04c5fa806", - "zh:8f90b1cfdcf6e8fb1a9d0382ecaa5056a3a84c94e313fbf9e92c89de271cdede", - "zh:d0ac346519d0df124df89be2d803eb53f373434890f6ee3fb37112802f9eac59", - "zh:d6256feedada82cbfb3b1dd6dd9ad02048f23120ab50e6146a541cb11a108cc1", - "zh:db2fe0d2e77c02e9a74e1ed694aa352295a50283f9a1cf896e5be252af14e9f4", - "zh:eda61e889b579bd90046939a5b40cf5dc9031fb5a819fc3e4667a78bd432bdb2", + "h1:Tf6Os+utUxE8rEr/emCXLFEDdCb0Y6rsN4Ee84+aDCQ=", + "zh:064c9b21bcd69be7a8631ccb3eccb8690c6a9955051145920803ef6ce6fc06bf", + "zh:277dd05750187a41282cf6e066e882eac0dd0056e3211d125f94bf62c19c4b8b", + "zh:47050211f72dcbf3d99c82147abd2eefbb7238efb94d5188979f60de66c8a3df", + "zh:4a4e0d070399a050847545721dae925c192a2d6354802fdfbea73769077acca5", + "zh:4cbc46f79239c85d69389f9e91ca9a9ebf6a8a937cfada026c5a037fd09130fb", + "zh:6548dcb1ac4a388ed46034a5317fa74b3b0b0f68eec03393f2d4d09342683f95", + "zh:75b4a82596aa525d95b0b2847fe648368c6e2b054059c4dc4dcdee01d374b592", + "zh:75cf5cc674b61c82300667a82650f56722618b119ab0526b47b5ecbb4bbf49d0", + "zh:93c896682359039960c38eb5a4b29d1cc06422f228db0572b90330427e2a21ec", + "zh:c7256663aedbc9de121316b6d0623551386a476fc12b8eb77e88532ce15de354", + "zh:e995c32f49c23b5938200386e08b2a3fd69cf5102b5299366c0608bbeac68429", ] } provider "registry.terraform.io/hashicorp/external" { - version = "2.0.0" + version = "2.1.0" hashes = [ - "h1:6S7hqjmUnoAZ5D/0F1VlJZKSJsUIBh7Ro0tLjGpKO0g=", - "h1:Q5xqryWI3tCY8yr+fugq7dz4Qz+8g4GaW9ZS8dc6Ob8=", - "zh:07949780dd6a1d43e7b46950f6e6976581d9724102cb5388d3411a1b6f476bde", - "zh:0a4f4636ff93f0644affa8474465dd8c9252946437ad025b28fc9f6603534a24", - "zh:0dd7e05a974c649950d1a21d7015d3753324ae52ebdd1744b144bc409ca4b3e8", - "zh:2b881032b9aa9d227ac712f614056d050bcdcc67df0dc79e2b2cb76a197059ad", - "zh:38feb4787b4570335459ca75a55389df1a7570bdca8cdf5df4c2876afe3c14b4", - "zh:40f7e0aaef3b1f4c2ca2bb1189e3fe9af8c296da129423986d1d99ccc8cfb86c", - "zh:56b361f64f0f0df5c4f958ae2f0e6f8ba192f35b720b9d3ae1be068fabcf73d9", - "zh:5fadb5880cd31c2105f635ded92b9b16f918c1dd989627a4ce62c04939223909", - "zh:61fa0be9c14c8c4109cfb7be8d54a80c56d35dbae49d3231cddb59831e7e5a4d", - "zh:853774bf97fbc4a784d5af5a4ca0090848430781ae6cfc586adeb48f7c44af79", + "h1:wbtDfLeawmv6xVT1W0w0fctRCb4ABlaD3JTxwb1jXag=", + "zh:0d83ffb72fbd08986378204a7373d8c43b127049096eaf2765bfdd6b00ad9853", + "zh:7577d6edc67b1e8c2cf62fe6501192df1231d74125d90e51d570d586d95269c5", + "zh:9c669ded5d5affa4b2544952c4b6588dfed55260147d24ced02dca3a2829f328", + "zh:a404d46f2831f90633947ab5d57e19dbfe35b3704104ba6ec80bcf50b058acfd", + "zh:ae1caea1c936d459ceadf287bb5c5bd67b5e2a7819df6f5c4114b7305df7f822", + "zh:afb4f805477694a4b9dde86b268d2c0821711c8aab1c6088f5f992228c4c06fb", + "zh:b993b4a1de8a462643e78f4786789e44ce5064b332fee1cb0d6250ed085561b8", + "zh:c84b2c13fa3ea2c0aa7291243006d560ce480a5591294b9001ce3742fc9c5791", + "zh:c8966f69b7eccccb771704fd5335923692eccc9e0e90cb95d14538fe2e92a3b8", + "zh:d5fe68850d449b811e633a300b114d0617df6d450305e8251643b4d143dc855b", + "zh:ddebfd1e674ba336df09b1f27bbaa0e036c25b7a7087dc8081443f6e5954028b", ] } provider "registry.terraform.io/hashicorp/local" { - version = "2.0.0" + version = "2.1.0" hashes = [ - "h1:EC6eh7avwx1rF56h3RZcxgEp/14ihi7Sk/4J3Hn4nIE=", - "h1:pO1ANXtOCRfecKsY9Hn4UsXoPBLv6LFiDIEiS1MZ09E=", - "zh:34ce8b79493ace8333d094752b579ccc907fa9392a2c1d6933a6c95d0786d3f1", - "zh:5c5a19c4f614a4ffb68bae0b0563f3860115cf7539b8adc21108324cfdc10092", - "zh:67ddb1ca2cd3e1a8f948302597ceb967f19d2eeb2d125303493667388fe6330e", - "zh:68e6b16f3a8e180fcba1a99754118deb2d82331b51f6cca39f04518339bfdfa6", - "zh:8393a12eb11598b2799d51c9b0a922a3d9fadda5a626b94a1b4914086d53120e", - "zh:90daea4b2010a86f2aca1e3a9590e0b3ddcab229c2bd3685fae76a832e9e836f", - "zh:99308edc734a0ac9149b44f8e316ca879b2670a1cae387a8ae754c180b57cdb4", - "zh:c76594db07a9d1a73372a073888b672df64adb455d483c2426cc220eda7e092e", - "zh:dc09c1fb36c6a706bdac96cce338952888c8423978426a09f5df93031aa88b84", - "zh:deda88134e9780319e8de91b3745520be48ead6ec38cb662694d09185c3dac70", + "h1:EYZdckuGU3n6APs97nS2LxZm3dDtGqyM4qaIvsmac8o=", + "zh:0f1ec65101fa35050978d483d6e8916664b7556800348456ff3d09454ac1eae2", + "zh:36e42ac19f5d68467aacf07e6adcf83c7486f2e5b5f4339e9671f68525fc87ab", + "zh:6db9db2a1819e77b1642ec3b5e95042b202aee8151a0256d289f2e141bf3ceb3", + "zh:719dfd97bb9ddce99f7d741260b8ece2682b363735c764cac83303f02386075a", + "zh:7598bb86e0378fd97eaa04638c1a4c75f960f62f69d3662e6d80ffa5a89847fe", + "zh:ad0a188b52517fec9eca393f1e2c9daea362b33ae2eb38a857b6b09949a727c1", + "zh:c46846c8df66a13fee6eff7dc5d528a7f868ae0dcf92d79deaac73cc297ed20c", + "zh:dc1a20a2eec12095d04bf6da5321f535351a594a636912361db20eb2a707ccc4", + "zh:e57ab4771a9d999401f6badd8b018558357d3cbdf3d33cc0c4f83e818ca8e94b", + "zh:ebdcde208072b4b0f8d305ebf2bfdc62c926e0717599dcf8ec2fd8c5845031c3", + "zh:ef34c52b68933bedd0868a13ccfd59ff1c820f299760b3c02e008dc95e2ece91", ] } provider "registry.terraform.io/hashicorp/null" { - version = "3.0.0" + version = "3.1.0" hashes = [ - "h1:V1tzrSG6t3e7zWvUwRbGbhsWU2Jd/anrJpOl9XM+R/8=", - "h1:ysHGBhBNkIiJLEpthB/IVCLpA1Qoncp3KbCTFGFZTO0=", - "zh:05fb7eab469324c97e9b73a61d2ece6f91de4e9b493e573bfeda0f2077bc3a4c", - "zh:1688aa91885a395c4ae67636d411475d0b831e422e005dcf02eedacaafac3bb4", - "zh:24a0b1292e3a474f57c483a7a4512d797e041bc9c2fbaac42fe12e86a7fb5a3c", - "zh:2fc951bd0d1b9b23427acc93be09b6909d72871e464088171da60fbee4fdde03", - "zh:6db825759425599a326385a68acc6be2d9ba0d7d6ef587191d0cdc6daef9ac63", - "zh:85985763d02618993c32c294072cc6ec51f1692b803cb506fcfedca9d40eaec9", - "zh:a53186599c57058be1509f904da512342cfdc5d808efdaf02dec15f0f3cb039a", - "zh:c2e07b49b6efa676bdc7b00c06333ea1792a983a5720f9e2233db27323d2707c", - "zh:cdc8fe1096103cf5374751e2e8408ec4abd2eb67d5a1c5151fe2c7ecfd525bef", - "zh:dbdef21df0c012b0d08776f3d4f34eb0f2f229adfde07ff252a119e52c0f65b7", + "h1:vpC6bgUQoJ0znqIKVFevOdq+YQw42bRq0u+H3nto8nA=", + "zh:02a1675fd8de126a00460942aaae242e65ca3380b5bb192e8773ef3da9073fd2", + "zh:53e30545ff8926a8e30ad30648991ca8b93b6fa496272cd23b26763c8ee84515", + "zh:5f9200bf708913621d0f6514179d89700e9aa3097c77dac730e8ba6e5901d521", + "zh:9ebf4d9704faba06b3ec7242c773c0fbfe12d62db7d00356d4f55385fc69bfb2", + "zh:a6576c81adc70326e4e1c999c04ad9ca37113a6e925aefab4765e5a5198efa7e", + "zh:a8a42d13346347aff6c63a37cda9b2c6aa5cc384a55b2fe6d6adfa390e609c53", + "zh:c797744d08a5307d50210e0454f91ca4d1c7621c68740441cf4579390452321d", + "zh:cecb6a304046df34c11229f20a80b24b1603960b794d68361a67c5efe58e62b8", + "zh:e1371aa1e502000d9974cfaff5be4cfa02f47b17400005a16f14d2ef30dc2a70", + "zh:fc39cc1fe71234a0b0369d5c5c7f876c71b956d23d7d6f518289737a001ba69b", + "zh:fea4227271ebf7d9e2b61b89ce2328c7262acd9fd190e1fd6d15a591abfa848e", ] } provider "registry.terraform.io/hashicorp/random" { - version = "3.0.0" + version = "3.1.0" hashes = [ - "h1:grDzxfnOdFXi90FRIIwP/ZrCzirJ/SfsGBe6cE0Shg4=", - "h1:yhHJpb4IfQQfuio7qjUXuUFTU/s+ensuEpm23A+VWz0=", - "zh:0fcb00ff8b87dcac1b0ee10831e47e0203a6c46aafd76cb140ba2bab81f02c6b", - "zh:123c984c0e04bad910c421028d18aa2ca4af25a153264aef747521f4e7c36a17", - "zh:287443bc6fd7fa9a4341dec235589293cbcc6e467a042ae225fd5d161e4e68dc", - "zh:2c1be5596dd3cca4859466885eaedf0345c8e7628503872610629e275d71b0d2", - "zh:684a2ef6f415287944a3d966c4c8cee82c20e393e096e2f7cdcb4b2528407f6b", - "zh:7625ccbc6ff17c2d5360ff2af7f9261c3f213765642dcd84e84ae02a3768fd51", - "zh:9a60811ab9e6a5bfa6352fbb943bb530acb6198282a49373283a8fa3aa2b43fc", - "zh:c73e0eaeea6c65b1cf5098b101d51a2789b054201ce7986a6d206a9e2dacaefd", - "zh:e8f9ed41ac83dbe407de9f0206ef1148204a0d51ba240318af801ffb3ee5f578", - "zh:fbdd0684e62563d3ac33425b0ac9439d543a3942465f4b26582bcfabcb149515", + "h1:BZMEPucF+pbu9gsPk0G0BHx7YP04+tKdq2MrRDF1EDM=", + "zh:2bbb3339f0643b5daa07480ef4397bd23a79963cc364cdfbb4e86354cb7725bc", + "zh:3cd456047805bf639fbf2c761b1848880ea703a054f76db51852008b11008626", + "zh:4f251b0eda5bb5e3dc26ea4400dba200018213654b69b4a5f96abee815b4f5ff", + "zh:7011332745ea061e517fe1319bd6c75054a314155cb2c1199a5b01fe1889a7e2", + "zh:738ed82858317ccc246691c8b85995bc125ac3b4143043219bd0437adc56c992", + "zh:7dbe52fac7bb21227acd7529b487511c91f4107db9cc4414f50d04ffc3cab427", + "zh:a3a9251fb15f93e4cfc1789800fc2d7414bbc18944ad4c5c98f466e6477c42bc", + "zh:a543ec1a3a8c20635cf374110bd2f87c07374cf2c50617eee2c669b3ceeeaa9f", + "zh:d9ab41d556a48bd7059f0810cf020500635bfc696c9fc3adab5ea8915c1d886b", + "zh:d9e13427a7d011dbd654e591b0337e6074eef8c3b9bb11b2e39eaaf257044fd7", + "zh:f7605bd1437752114baf601bdf6931debe6dc6bfe3006eb7e9bb9080931dca8a", ] } provider "registry.terraform.io/hashicorp/template" { version = "2.2.0" hashes = [ - "h1:0wlehNaxBX7GJQnPfQwTNvvAf38Jm0Nv7ssKGMaG6Og=", "h1:94qn780bi1qjrbC3uQtjJh3Wkfwd5+tTtJHOb7KTg9w=", "zh:01702196f0a0492ec07917db7aaa595843d8f171dc195f4c988d2ffca2a06386", "zh:09aae3da826ba3d7df69efeb25d146a1de0d03e951d35019a0f80e4f58c89b53", @@ -110,19 +109,19 @@ provider "registry.terraform.io/hashicorp/template" { } provider "registry.terraform.io/hashicorp/tls" { - version = "3.0.0" + version = "3.1.0" hashes = [ - "h1:AcQGOAD5xa4KE9gYw5g7R6UU8a77Yn/afPvch4N86lQ=", - "h1:LtCEW5v1E5Eo49+kQOsKHRYf9Hc8ZR0jTpK+mXszPHs=", - "zh:05eac573a1fe53227bcc6b01daf6ddf0b73456f97f56f316f1b3114a4771e175", - "zh:09390dad764c76f0fd59cae4dad296e3e39487e06de3a4bc0df73916c6bb2f17", - "zh:142d0bc4722ab088b7ca124b0eb44206b9d100f51035c162d50ef552e09813d0", - "zh:2c391743dd20f43329c0d0d49dec7827970d788115593c0e32a57050c0a85337", - "zh:525b12fc87369c0e6d347afe6c77668aebf56cfa078bb0f1f01cc2ee01ac7016", - "zh:5583d81b7a05c6d49a4c445e1ee62e82facb07bb9204998a836b7b522a51db8d", - "zh:925e3acc70e18ed1cd296d337fc3e0ca43ac6f5bf2e660f24de750c7754f91aa", - "zh:a291457d25b207fd28fb4fad9209ebb591e25cfc507ca1cb0fb8b2e255be1969", - "zh:bbf9e2718752aebfbd7c6b8e196eb2e52730b66befed2ea1954f9ff1c199295e", - "zh:f4b333c467ae02c1a238ac57465fe66405f6e2a6cfeb4eded9bc321c5652a1bf", + "h1:fUJX8Zxx38e2kBln+zWr1Tl41X+OuiE++REjrEyiOM4=", + "zh:3d46616b41fea215566f4a957b6d3a1aa43f1f75c26776d72a98bdba79439db6", + "zh:623a203817a6dafa86f1b4141b645159e07ec418c82fe40acd4d2a27543cbaa2", + "zh:668217e78b210a6572e7b0ecb4134a6781cc4d738f4f5d09eb756085b082592e", + "zh:95354df03710691773c8f50a32e31fca25f124b7f3d6078265fdf3c4e1384dca", + "zh:9f97ab190380430d57392303e3f36f4f7835c74ea83276baa98d6b9a997c3698", + "zh:a16f0bab665f8d933e95ca055b9c8d5707f1a0dd8c8ecca6c13091f40dc1e99d", + "zh:be274d5008c24dc0d6540c19e22dbb31ee6bfdd0b2cddd4d97f3cd8a8d657841", + "zh:d5faa9dce0a5fc9d26b2463cea5be35f8586ab75030e7fa4d4920cd73ee26989", + "zh:e9b672210b7fb410780e7b429975adcc76dd557738ecc7c890ea18942eb321a5", + "zh:eb1f8368573d2370605d6dbf60f9aaa5b64e55741d96b5fb026dbfe91de67c0d", + "zh:fc1e12b713837b85daf6c3bb703d7795eaf1c5177aebae1afcf811dd7009f4b0", ] } diff --git a/e2e/terraform/config/dev-cluster/nomad/client-linux/client.hcl b/e2e/terraform/config/dev-cluster/nomad/client-linux/client.hcl index 439aa72f3..56a5c2615 100644 --- a/e2e/terraform/config/dev-cluster/nomad/client-linux/client.hcl +++ b/e2e/terraform/config/dev-cluster/nomad/client-linux/client.hcl @@ -27,6 +27,14 @@ plugin "nomad-driver-podman" { } } +plugin "nomad-driver-ecs" { + config { + enabled = true + cluster = "nomad-rtd-e2e" + region = "us-east-1" + } +} + vault { enabled = true address = "http://active.vault.service.consul:8200" diff --git a/e2e/terraform/config/full-cluster/nomad/client-linux/indexed/client-0.hcl b/e2e/terraform/config/full-cluster/nomad/client-linux/indexed/client-0.hcl index 192692cbd..f4ebb14e0 100644 --- a/e2e/terraform/config/full-cluster/nomad/client-linux/indexed/client-0.hcl +++ b/e2e/terraform/config/full-cluster/nomad/client-linux/indexed/client-0.hcl @@ -25,6 +25,14 @@ plugin "nomad-driver-podman" { } } +plugin "nomad-driver-ecs" { + config { + enabled = true + cluster = "nomad-rtd-e2e" + region = "us-east-1" + } +} + vault { enabled = true address = "http://active.vault.service.consul:8200" diff --git a/e2e/terraform/config/full-cluster/nomad/client-linux/indexed/client-1.hcl b/e2e/terraform/config/full-cluster/nomad/client-linux/indexed/client-1.hcl index eabc5b878..2b425d01c 100644 --- a/e2e/terraform/config/full-cluster/nomad/client-linux/indexed/client-1.hcl +++ b/e2e/terraform/config/full-cluster/nomad/client-linux/indexed/client-1.hcl @@ -20,6 +20,14 @@ plugin "nomad-driver-podman" { } } +plugin "nomad-driver-ecs" { + config { + enabled = true + cluster = "nomad-rtd-e2e" + region = "us-east-1" + } +} + vault { enabled = true address = "http://active.vault.service.consul:8200" diff --git a/e2e/terraform/ecs-task.json b/e2e/terraform/ecs-task.json new file mode 100644 index 000000000..cd0f0db9f --- /dev/null +++ b/e2e/terraform/ecs-task.json @@ -0,0 +1,21 @@ +[ + { + "command": [ + "/bin/sh -c \"echo ' Amazon ECS Sample App

Amazon ECS Sample App

Congratulations!

Your application is now running on a container in Amazon ECS.

' > /usr/local/apache2/htdocs/index.html && httpd-foreground\"" + ], + "entryPoint": [ + "sh", + "-c" + ], + "essential": true, + "image": "httpd:2.4", + "name": "nomad-remote-driver-demo", + "portMappings": [ + { + "containerPort": 80, + "hostPort": 80, + "protocol": "tcp" + } + ] + } +] diff --git a/e2e/terraform/ecs.tf b/e2e/terraform/ecs.tf new file mode 100644 index 000000000..121424a74 --- /dev/null +++ b/e2e/terraform/ecs.tf @@ -0,0 +1,29 @@ +# Nomad ECS Remote Task Driver E2E +resource "aws_ecs_cluster" "nomad_rtd_e2e" { + name = "nomad-rtd-e2e" +} + +resource "aws_ecs_task_definition" "nomad_rtd_e2e" { + family = "nomad-rtd-e2e" + container_definitions = file("ecs-task.json") + + # Don't need a network for e2e tests + network_mode = "awsvpc" + + requires_compatibilities = ["FARGATE"] + cpu = 256 + memory = 512 +} + +data "template_file" "ecs_vars_hcl" { + template = <= group.Count { - return place + + for _, alloc := range lost { + if existing >= group.Count { + // Reached desired count, do not replace remaining lost + // allocs + break + } + + existing++ + place = append(place, allocPlaceResult{ + name: alloc.Name, + taskGroup: group, + previousAlloc: alloc, + reschedule: false, + canary: alloc.DeploymentStatus.IsCanary(), + downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(), + minJobVersion: alloc.Job.Version, + lost: true, + }) } // Add remaining placement results @@ -749,8 +772,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex, untainted, migrate, lost, canaries allocSet, canaryState bool, followupEvals map[string]string) allocSet { - // Mark all lost allocations for stop. Previous allocation doesn't matter - // here since it is on a lost node + // Mark all lost allocations for stop. var stop allocSet stop = stop.union(lost) a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals) diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index 9b61797f5..8ee080a0c 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -35,6 +35,9 @@ type placementResult interface { // stopped and if so the status description. StopPreviousAlloc() (bool, string) + // PreviousLost is true if the previous allocation was lost. + PreviousLost() bool + // DowngradeNonCanary indicates that placement should use the latest stable job // with the MinJobVersion, rather than the current deployment version DowngradeNonCanary() bool @@ -58,6 +61,7 @@ type allocPlaceResult struct { taskGroup *structs.TaskGroup previousAlloc *structs.Allocation reschedule bool + lost bool downgradeNonCanary bool minJobVersion uint64 @@ -71,6 +75,7 @@ func (a allocPlaceResult) IsRescheduling() bool { return a.re func (a allocPlaceResult) StopPreviousAlloc() (bool, string) { return false, "" } func (a allocPlaceResult) DowngradeNonCanary() bool { return a.downgradeNonCanary } func (a allocPlaceResult) MinJobVersion() uint64 { return a.minJobVersion } +func (a allocPlaceResult) PreviousLost() bool { return a.lost } // allocDestructiveResult contains the information required to do a destructive // update. Destructive changes should be applied atomically, as in the old alloc @@ -92,6 +97,7 @@ func (a allocDestructiveResult) StopPreviousAlloc() (bool, string) { } func (a allocDestructiveResult) DowngradeNonCanary() bool { return false } func (a allocDestructiveResult) MinJobVersion() uint64 { return 0 } +func (a allocDestructiveResult) PreviousLost() bool { return false } // allocMatrix is a mapping of task groups to their allocation set. type allocMatrix map[string]allocSet