mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 10:25:42 +03:00
Merge pull request #2591 from hashicorp/b-2180-script-updates
Properly interpolate services on updated tasks
This commit is contained in:
@@ -56,7 +56,7 @@ func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.T
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.logger.Printf("[TEST] mock_consul: UpdateTask(%q, %v, %v, %T)", allocID, old, new, exec)
|
||||
m.ops = append(m.ops, newMockConsulOp("update", allocID, old, exec))
|
||||
m.ops = append(m.ops, newMockConsulOp("update", allocID, new, exec))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1550,7 +1550,7 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
|
||||
mErr.Errors = append(mErr.Errors, fmt.Errorf("updating task resources failed: %v", err))
|
||||
}
|
||||
|
||||
if err := r.updateServices(drv, r.handle, r.task, updatedTask); err != nil {
|
||||
if err := r.updateServices(drv, r.handle, r.task, updatedTask, update); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, fmt.Errorf("error updating services and checks in Consul: %v", err))
|
||||
}
|
||||
}
|
||||
@@ -1568,13 +1568,17 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
|
||||
}
|
||||
|
||||
// updateServices and checks with Consul.
|
||||
func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, old, new *structs.Task) error {
|
||||
func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, old, new *structs.Task, newAlloc *structs.Allocation) error {
|
||||
var exec driver.ScriptExecutor
|
||||
if d.Abilities().Exec {
|
||||
// Allow set the script executor if the driver supports it
|
||||
exec = h
|
||||
}
|
||||
interpolateServices(r.getTaskEnv(), r.task)
|
||||
newTaskEnv, err := driver.GetTaskEnv(r.taskDir, r.config.Node, new, newAlloc, r.config, r.vaultFuture.Get())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
interpolateServices(newTaskEnv, new)
|
||||
return r.consul.UpdateTask(r.alloc.ID, old, new, exec)
|
||||
}
|
||||
|
||||
|
||||
@@ -291,6 +291,7 @@ func TestTaskRunner_Update(t *testing.T) {
|
||||
// Change command to ensure we run for a bit
|
||||
ctx.tr.task.Config["command"] = "/bin/sleep"
|
||||
ctx.tr.task.Config["args"] = []string{"100"}
|
||||
ctx.tr.task.Services[0].Checks[0].Args[0] = "${NOMAD_META_foo}"
|
||||
go ctx.tr.Run()
|
||||
defer ctx.Cleanup()
|
||||
|
||||
@@ -302,8 +303,12 @@ func TestTaskRunner_Update(t *testing.T) {
|
||||
newMode := "foo"
|
||||
newTG.RestartPolicy.Mode = newMode
|
||||
|
||||
newTask := updateAlloc.Job.TaskGroups[0].Tasks[0]
|
||||
newTask.Driver = "foobar"
|
||||
newTask := newTG.Tasks[0]
|
||||
newTask.Driver = "mock_driver"
|
||||
|
||||
// Update meta to make sure service checks are interpolated correctly
|
||||
// #2180
|
||||
newTask.Meta["foo"] = "UPDATE"
|
||||
|
||||
// Update the kill timeout
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
@@ -334,6 +339,21 @@ func TestTaskRunner_Update(t *testing.T) {
|
||||
if ctx.tr.handle.ID() == oldHandle {
|
||||
return false, fmt.Errorf("handle not ctx.updated")
|
||||
}
|
||||
// Make sure Consul services were interpolated correctly during
|
||||
// the update #2180
|
||||
consul := ctx.tr.consul.(*mockConsulServiceClient)
|
||||
consul.mu.Lock()
|
||||
defer consul.mu.Unlock()
|
||||
if len(consul.ops) < 2 {
|
||||
return false, fmt.Errorf("expected at least 2 consul ops found: %d", len(consul.ops))
|
||||
}
|
||||
lastOp := consul.ops[len(consul.ops)-1]
|
||||
if lastOp.op != "update" {
|
||||
return false, fmt.Errorf("expected last consul op to be update not %q", lastOp.op)
|
||||
}
|
||||
if found := lastOp.task.Services[0].Checks[0].Args[0]; found != "UPDATE" {
|
||||
return false, fmt.Errorf("expected consul check to be UPDATE but found: %q", found)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
||||
@@ -221,6 +221,7 @@ func (c *ServiceClient) merge(ops *operations) {
|
||||
if script, ok := c.runningScripts[cid]; ok {
|
||||
script.cancel()
|
||||
delete(c.scripts, cid)
|
||||
delete(c.runningScripts, cid)
|
||||
}
|
||||
delete(c.checks, cid)
|
||||
}
|
||||
@@ -673,14 +674,15 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host
|
||||
|
||||
switch check.Type {
|
||||
case structs.ServiceCheckHTTP:
|
||||
if check.Protocol == "" {
|
||||
check.Protocol = "http"
|
||||
proto := check.Protocol
|
||||
if proto == "" {
|
||||
proto = "http"
|
||||
}
|
||||
if check.TLSSkipVerify {
|
||||
chkReg.TLSSkipVerify = true
|
||||
}
|
||||
base := url.URL{
|
||||
Scheme: check.Protocol,
|
||||
Scheme: proto,
|
||||
Host: net.JoinHostPort(host, strconv.Itoa(port)),
|
||||
}
|
||||
relative, err := url.Parse(check.Path)
|
||||
|
||||
@@ -767,3 +767,90 @@ func TestConsul_NoTLSSkipVerifySupport(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestConsul_RemoveScript assert removing a script check removes all objects
|
||||
// related to that check.
|
||||
func TestConsul_CancelScript(t *testing.T) {
|
||||
ctx := setupFake()
|
||||
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "scriptcheckDel",
|
||||
Type: "script",
|
||||
Interval: 9000 * time.Hour,
|
||||
Timeout: 9000 * time.Hour,
|
||||
},
|
||||
{
|
||||
Name: "scriptcheckKeep",
|
||||
Type: "script",
|
||||
Interval: 9000 * time.Hour,
|
||||
Timeout: 9000 * time.Hour,
|
||||
},
|
||||
}
|
||||
|
||||
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx); err != nil {
|
||||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
t.Fatalf("unexpected error syncing task: %v", err)
|
||||
}
|
||||
|
||||
if len(ctx.FakeConsul.checks) != 2 {
|
||||
t.Errorf("expected 2 checks but found %d", len(ctx.FakeConsul.checks))
|
||||
}
|
||||
|
||||
if len(ctx.ServiceClient.scripts) != 2 && len(ctx.ServiceClient.runningScripts) != 2 {
|
||||
t.Errorf("expected 2 running script but found scripts=%d runningScripts=%d",
|
||||
len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts))
|
||||
}
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case <-ctx.execs:
|
||||
// Script ran as expected!
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("timed out waiting for script check to run")
|
||||
}
|
||||
}
|
||||
|
||||
// Remove a check and update the task
|
||||
origTask := ctx.Task.Copy()
|
||||
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "scriptcheckKeep",
|
||||
Type: "script",
|
||||
Interval: 9000 * time.Hour,
|
||||
Timeout: 9000 * time.Hour,
|
||||
},
|
||||
}
|
||||
|
||||
if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, ctx); err != nil {
|
||||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
t.Fatalf("unexpected error syncing task: %v", err)
|
||||
}
|
||||
|
||||
if len(ctx.FakeConsul.checks) != 1 {
|
||||
t.Errorf("expected 1 check but found %d", len(ctx.FakeConsul.checks))
|
||||
}
|
||||
|
||||
if len(ctx.ServiceClient.scripts) != 1 && len(ctx.ServiceClient.runningScripts) != 1 {
|
||||
t.Errorf("expected 1 running script but found scripts=%d runningScripts=%d",
|
||||
len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts))
|
||||
}
|
||||
|
||||
// Make sure exec wasn't called again
|
||||
select {
|
||||
case <-ctx.execs:
|
||||
t.Errorf("unexpected execution of script; was goroutine not cancelled?")
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
// No unexpected script execs
|
||||
}
|
||||
|
||||
// Don't leak goroutines
|
||||
for _, scriptHandle := range ctx.ServiceClient.runningScripts {
|
||||
scriptHandle.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2122,7 +2122,6 @@ func (tg *TaskGroup) GoString() string {
|
||||
}
|
||||
|
||||
const (
|
||||
// TODO add Consul TTL check
|
||||
ServiceCheckHTTP = "http"
|
||||
ServiceCheckTCP = "tcp"
|
||||
ServiceCheckScript = "script"
|
||||
|
||||
Reference in New Issue
Block a user