diff --git a/client/consul_test.go b/client/consul_test.go index 4c34fd9a3..95dacb960 100644 --- a/client/consul_test.go +++ b/client/consul_test.go @@ -56,7 +56,7 @@ func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.T m.mu.Lock() defer m.mu.Unlock() m.logger.Printf("[TEST] mock_consul: UpdateTask(%q, %v, %v, %T)", allocID, old, new, exec) - m.ops = append(m.ops, newMockConsulOp("update", allocID, old, exec)) + m.ops = append(m.ops, newMockConsulOp("update", allocID, new, exec)) return nil } diff --git a/client/task_runner.go b/client/task_runner.go index 296f1b5f2..373525868 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -1550,7 +1550,7 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { mErr.Errors = append(mErr.Errors, fmt.Errorf("updating task resources failed: %v", err)) } - if err := r.updateServices(drv, r.handle, r.task, updatedTask); err != nil { + if err := r.updateServices(drv, r.handle, r.task, updatedTask, update); err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("error updating services and checks in Consul: %v", err)) } } @@ -1568,13 +1568,17 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { } // updateServices and checks with Consul. -func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, old, new *structs.Task) error { +func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, old, new *structs.Task, newAlloc *structs.Allocation) error { var exec driver.ScriptExecutor if d.Abilities().Exec { // Allow set the script executor if the driver supports it exec = h } - interpolateServices(r.getTaskEnv(), r.task) + newTaskEnv, err := driver.GetTaskEnv(r.taskDir, r.config.Node, new, newAlloc, r.config, r.vaultFuture.Get()) + if err != nil { + return err + } + interpolateServices(newTaskEnv, new) return r.consul.UpdateTask(r.alloc.ID, old, new, exec) } diff --git a/client/task_runner_test.go b/client/task_runner_test.go index cfac5ea42..b6f8dbd29 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -291,6 +291,7 @@ func TestTaskRunner_Update(t *testing.T) { // Change command to ensure we run for a bit ctx.tr.task.Config["command"] = "/bin/sleep" ctx.tr.task.Config["args"] = []string{"100"} + ctx.tr.task.Services[0].Checks[0].Args[0] = "${NOMAD_META_foo}" go ctx.tr.Run() defer ctx.Cleanup() @@ -302,8 +303,12 @@ func TestTaskRunner_Update(t *testing.T) { newMode := "foo" newTG.RestartPolicy.Mode = newMode - newTask := updateAlloc.Job.TaskGroups[0].Tasks[0] - newTask.Driver = "foobar" + newTask := newTG.Tasks[0] + newTask.Driver = "mock_driver" + + // Update meta to make sure service checks are interpolated correctly + // #2180 + newTask.Meta["foo"] = "UPDATE" // Update the kill timeout testutil.WaitForResult(func() (bool, error) { @@ -334,6 +339,21 @@ func TestTaskRunner_Update(t *testing.T) { if ctx.tr.handle.ID() == oldHandle { return false, fmt.Errorf("handle not ctx.updated") } + // Make sure Consul services were interpolated correctly during + // the update #2180 + consul := ctx.tr.consul.(*mockConsulServiceClient) + consul.mu.Lock() + defer consul.mu.Unlock() + if len(consul.ops) < 2 { + return false, fmt.Errorf("expected at least 2 consul ops found: %d", len(consul.ops)) + } + lastOp := consul.ops[len(consul.ops)-1] + if lastOp.op != "update" { + return false, fmt.Errorf("expected last consul op to be update not %q", lastOp.op) + } + if found := lastOp.task.Services[0].Checks[0].Args[0]; found != "UPDATE" { + return false, fmt.Errorf("expected consul check to be UPDATE but found: %q", found) + } return true, nil }, func(err error) { t.Fatalf("err: %v", err) diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 2fd2542e4..38f331f20 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -221,6 +221,7 @@ func (c *ServiceClient) merge(ops *operations) { if script, ok := c.runningScripts[cid]; ok { script.cancel() delete(c.scripts, cid) + delete(c.runningScripts, cid) } delete(c.checks, cid) } @@ -673,14 +674,15 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host switch check.Type { case structs.ServiceCheckHTTP: - if check.Protocol == "" { - check.Protocol = "http" + proto := check.Protocol + if proto == "" { + proto = "http" } if check.TLSSkipVerify { chkReg.TLSSkipVerify = true } base := url.URL{ - Scheme: check.Protocol, + Scheme: proto, Host: net.JoinHostPort(host, strconv.Itoa(port)), } relative, err := url.Parse(check.Path) diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index 96794e0c1..94e00a949 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -767,3 +767,90 @@ func TestConsul_NoTLSSkipVerifySupport(t *testing.T) { } } } + +// TestConsul_RemoveScript assert removing a script check removes all objects +// related to that check. +func TestConsul_CancelScript(t *testing.T) { + ctx := setupFake() + ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ + { + Name: "scriptcheckDel", + Type: "script", + Interval: 9000 * time.Hour, + Timeout: 9000 * time.Hour, + }, + { + Name: "scriptcheckKeep", + Type: "script", + Interval: 9000 * time.Hour, + Timeout: 9000 * time.Hour, + }, + } + + if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx); err != nil { + t.Fatalf("unexpected error registering task: %v", err) + } + + if err := ctx.syncOnce(); err != nil { + t.Fatalf("unexpected error syncing task: %v", err) + } + + if len(ctx.FakeConsul.checks) != 2 { + t.Errorf("expected 2 checks but found %d", len(ctx.FakeConsul.checks)) + } + + if len(ctx.ServiceClient.scripts) != 2 && len(ctx.ServiceClient.runningScripts) != 2 { + t.Errorf("expected 2 running script but found scripts=%d runningScripts=%d", + len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts)) + } + + for i := 0; i < 2; i++ { + select { + case <-ctx.execs: + // Script ran as expected! + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to run") + } + } + + // Remove a check and update the task + origTask := ctx.Task.Copy() + ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ + { + Name: "scriptcheckKeep", + Type: "script", + Interval: 9000 * time.Hour, + Timeout: 9000 * time.Hour, + }, + } + + if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, ctx); err != nil { + t.Fatalf("unexpected error registering task: %v", err) + } + + if err := ctx.syncOnce(); err != nil { + t.Fatalf("unexpected error syncing task: %v", err) + } + + if len(ctx.FakeConsul.checks) != 1 { + t.Errorf("expected 1 check but found %d", len(ctx.FakeConsul.checks)) + } + + if len(ctx.ServiceClient.scripts) != 1 && len(ctx.ServiceClient.runningScripts) != 1 { + t.Errorf("expected 1 running script but found scripts=%d runningScripts=%d", + len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts)) + } + + // Make sure exec wasn't called again + select { + case <-ctx.execs: + t.Errorf("unexpected execution of script; was goroutine not cancelled?") + case <-time.After(100 * time.Millisecond): + // No unexpected script execs + } + + // Don't leak goroutines + for _, scriptHandle := range ctx.ServiceClient.runningScripts { + scriptHandle.cancel() + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 3ff464360..b5b5bf208 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2122,7 +2122,6 @@ func (tg *TaskGroup) GoString() string { } const ( - // TODO add Consul TTL check ServiceCheckHTTP = "http" ServiceCheckTCP = "tcp" ServiceCheckScript = "script"