From 92535496b482d3f0184327ecb1663013f3f7bac0 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 26 Apr 2017 11:22:01 -0700 Subject: [PATCH 1/2] Properly interpolate services on updated tasks Previously was interpolating the original task's services again. Fixes #2180 Also fixes a slight memory leak in the new consul agent. Script check handles weren't being deleted after cancellation. --- client/task_runner.go | 2 +- command/agent/consul/client.go | 8 +-- command/agent/consul/unit_test.go | 87 +++++++++++++++++++++++++++++++ nomad/structs/structs.go | 1 - 4 files changed, 93 insertions(+), 5 deletions(-) diff --git a/client/task_runner.go b/client/task_runner.go index c578ca5ea..2a24b8fd2 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -1442,7 +1442,7 @@ func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, ol // Allow set the script executor if the driver supports it exec = h } - interpolateServices(r.getTaskEnv(), r.task) + interpolateServices(r.getTaskEnv(), new) return r.consul.UpdateTask(r.alloc.ID, old, new, exec) } diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 2fd2542e4..38f331f20 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -221,6 +221,7 @@ func (c *ServiceClient) merge(ops *operations) { if script, ok := c.runningScripts[cid]; ok { script.cancel() delete(c.scripts, cid) + delete(c.runningScripts, cid) } delete(c.checks, cid) } @@ -673,14 +674,15 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host switch check.Type { case structs.ServiceCheckHTTP: - if check.Protocol == "" { - check.Protocol = "http" + proto := check.Protocol + if proto == "" { + proto = "http" } if check.TLSSkipVerify { chkReg.TLSSkipVerify = true } base := url.URL{ - Scheme: check.Protocol, + Scheme: proto, Host: net.JoinHostPort(host, strconv.Itoa(port)), } relative, err := url.Parse(check.Path) diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index 96794e0c1..94e00a949 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -767,3 +767,90 @@ func TestConsul_NoTLSSkipVerifySupport(t *testing.T) { } } } + +// TestConsul_RemoveScript assert removing a script check removes all objects +// related to that check. +func TestConsul_CancelScript(t *testing.T) { + ctx := setupFake() + ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ + { + Name: "scriptcheckDel", + Type: "script", + Interval: 9000 * time.Hour, + Timeout: 9000 * time.Hour, + }, + { + Name: "scriptcheckKeep", + Type: "script", + Interval: 9000 * time.Hour, + Timeout: 9000 * time.Hour, + }, + } + + if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx); err != nil { + t.Fatalf("unexpected error registering task: %v", err) + } + + if err := ctx.syncOnce(); err != nil { + t.Fatalf("unexpected error syncing task: %v", err) + } + + if len(ctx.FakeConsul.checks) != 2 { + t.Errorf("expected 2 checks but found %d", len(ctx.FakeConsul.checks)) + } + + if len(ctx.ServiceClient.scripts) != 2 && len(ctx.ServiceClient.runningScripts) != 2 { + t.Errorf("expected 2 running script but found scripts=%d runningScripts=%d", + len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts)) + } + + for i := 0; i < 2; i++ { + select { + case <-ctx.execs: + // Script ran as expected! + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to run") + } + } + + // Remove a check and update the task + origTask := ctx.Task.Copy() + ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ + { + Name: "scriptcheckKeep", + Type: "script", + Interval: 9000 * time.Hour, + Timeout: 9000 * time.Hour, + }, + } + + if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, ctx); err != nil { + t.Fatalf("unexpected error registering task: %v", err) + } + + if err := ctx.syncOnce(); err != nil { + t.Fatalf("unexpected error syncing task: %v", err) + } + + if len(ctx.FakeConsul.checks) != 1 { + t.Errorf("expected 1 check but found %d", len(ctx.FakeConsul.checks)) + } + + if len(ctx.ServiceClient.scripts) != 1 && len(ctx.ServiceClient.runningScripts) != 1 { + t.Errorf("expected 1 running script but found scripts=%d runningScripts=%d", + len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts)) + } + + // Make sure exec wasn't called again + select { + case <-ctx.execs: + t.Errorf("unexpected execution of script; was goroutine not cancelled?") + case <-time.After(100 * time.Millisecond): + // No unexpected script execs + } + + // Don't leak goroutines + for _, scriptHandle := range ctx.ServiceClient.runningScripts { + scriptHandle.cancel() + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 49196b41b..af2553cd7 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2088,7 +2088,6 @@ func (tg *TaskGroup) GoString() string { } const ( - // TODO add Consul TTL check ServiceCheckHTTP = "http" ServiceCheckTCP = "tcp" ServiceCheckScript = "script" From 3999b57e7d5c0eb7c44ff7d04bc9e9fb3f9144be Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 4 May 2017 15:06:15 -0700 Subject: [PATCH 2/2] Build new env from new alloc before interpolating --- client/consul_test.go | 2 +- client/task_runner.go | 10 +++++++--- client/task_runner_test.go | 24 ++++++++++++++++++++++-- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/client/consul_test.go b/client/consul_test.go index 4c34fd9a3..95dacb960 100644 --- a/client/consul_test.go +++ b/client/consul_test.go @@ -56,7 +56,7 @@ func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.T m.mu.Lock() defer m.mu.Unlock() m.logger.Printf("[TEST] mock_consul: UpdateTask(%q, %v, %v, %T)", allocID, old, new, exec) - m.ops = append(m.ops, newMockConsulOp("update", allocID, old, exec)) + m.ops = append(m.ops, newMockConsulOp("update", allocID, new, exec)) return nil } diff --git a/client/task_runner.go b/client/task_runner.go index 2a24b8fd2..1508c2f04 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -1418,7 +1418,7 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { mErr.Errors = append(mErr.Errors, fmt.Errorf("updating task resources failed: %v", err)) } - if err := r.updateServices(drv, r.handle, r.task, updatedTask); err != nil { + if err := r.updateServices(drv, r.handle, r.task, updatedTask, update); err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("error updating services and checks in Consul: %v", err)) } } @@ -1436,13 +1436,17 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { } // updateServices and checks with Consul. -func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, old, new *structs.Task) error { +func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, old, new *structs.Task, newAlloc *structs.Allocation) error { var exec driver.ScriptExecutor if d.Abilities().Exec { // Allow set the script executor if the driver supports it exec = h } - interpolateServices(r.getTaskEnv(), new) + newTaskEnv, err := driver.GetTaskEnv(r.taskDir, r.config.Node, new, newAlloc, r.config, r.vaultFuture.Get()) + if err != nil { + return err + } + interpolateServices(newTaskEnv, new) return r.consul.UpdateTask(r.alloc.ID, old, new, exec) } diff --git a/client/task_runner_test.go b/client/task_runner_test.go index ede8cb164..1b8557ade 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -279,6 +279,7 @@ func TestTaskRunner_Update(t *testing.T) { // Change command to ensure we run for a bit ctx.tr.task.Config["command"] = "/bin/sleep" ctx.tr.task.Config["args"] = []string{"100"} + ctx.tr.task.Services[0].Checks[0].Args[0] = "${NOMAD_META_foo}" go ctx.tr.Run() defer ctx.Cleanup() @@ -290,8 +291,12 @@ func TestTaskRunner_Update(t *testing.T) { newMode := "foo" newTG.RestartPolicy.Mode = newMode - newTask := updateAlloc.Job.TaskGroups[0].Tasks[0] - newTask.Driver = "foobar" + newTask := newTG.Tasks[0] + newTask.Driver = "mock_driver" + + // Update meta to make sure service checks are interpolated correctly + // #2180 + newTask.Meta["foo"] = "UPDATE" // Update the kill timeout testutil.WaitForResult(func() (bool, error) { @@ -322,6 +327,21 @@ func TestTaskRunner_Update(t *testing.T) { if ctx.tr.handle.ID() == oldHandle { return false, fmt.Errorf("handle not ctx.updated") } + // Make sure Consul services were interpolated correctly during + // the update #2180 + consul := ctx.tr.consul.(*mockConsulServiceClient) + consul.mu.Lock() + defer consul.mu.Unlock() + if len(consul.ops) < 2 { + return false, fmt.Errorf("expected at least 2 consul ops found: %d", len(consul.ops)) + } + lastOp := consul.ops[len(consul.ops)-1] + if lastOp.op != "update" { + return false, fmt.Errorf("expected last consul op to be update not %q", lastOp.op) + } + if found := lastOp.task.Services[0].Checks[0].Args[0]; found != "UPDATE" { + return false, fmt.Errorf("expected consul check to be UPDATE but found: %q", found) + } return true, nil }, func(err error) { t.Fatalf("err: %v", err)