mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 09:55:44 +03:00
Unregister from Consul when waiting for restart
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
@@ -13,26 +14,37 @@ import (
|
||||
|
||||
// mockConsulOp represents the register/deregister operations.
|
||||
type mockConsulOp struct {
|
||||
op string // add, remove, or update
|
||||
allocID string
|
||||
task *structs.Task
|
||||
exec driver.ScriptExecutor
|
||||
}
|
||||
|
||||
func newMockConsulOp(op, allocID string, task *structs.Task, exec driver.ScriptExecutor) mockConsulOp {
|
||||
if op != "add" && op != "remove" && op != "update" {
|
||||
panic(fmt.Errorf("invalid consul op: %s", op))
|
||||
}
|
||||
return mockConsulOp{
|
||||
op: op,
|
||||
allocID: allocID,
|
||||
task: task,
|
||||
exec: exec,
|
||||
}
|
||||
}
|
||||
|
||||
// mockConsulServiceClient implements the ConsulServiceAPI interface to record
|
||||
// and log task registration/deregistration.
|
||||
type mockConsulServiceClient struct {
|
||||
registers []mockConsulOp
|
||||
removes []mockConsulOp
|
||||
mu sync.Mutex
|
||||
ops []mockConsulOp
|
||||
mu sync.Mutex
|
||||
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
func newMockConsulServiceClient() *mockConsulServiceClient {
|
||||
m := mockConsulServiceClient{
|
||||
registers: make([]mockConsulOp, 0, 10),
|
||||
removes: make([]mockConsulOp, 0, 10),
|
||||
logger: log.New(ioutil.Discard, "", 0),
|
||||
ops: make([]mockConsulOp, 0, 20),
|
||||
logger: log.New(ioutil.Discard, "", 0),
|
||||
}
|
||||
if testing.Verbose() {
|
||||
m.logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||
@@ -44,8 +56,7 @@ func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.T
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.logger.Printf("[TEST] mock_consul: UpdateTask(%q, %q, %q, %T)", allocID, old, new, exec)
|
||||
m.removes = append(m.removes, mockConsulOp{allocID, old, exec})
|
||||
m.registers = append(m.registers, mockConsulOp{allocID, new, exec})
|
||||
m.ops = append(m.ops, newMockConsulOp("update", allocID, old, exec))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -53,7 +64,7 @@ func (m *mockConsulServiceClient) RegisterTask(allocID string, task *structs.Tas
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.logger.Printf("[TEST] mock_consul: RegisterTask(%q, %q, %T)", allocID, task.Name, exec)
|
||||
m.registers = append(m.registers, mockConsulOp{allocID, task, exec})
|
||||
m.ops = append(m.ops, newMockConsulOp("add", allocID, task, exec))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -61,5 +72,5 @@ func (m *mockConsulServiceClient) RemoveTask(allocID string, task *structs.Task)
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.logger.Printf("[TEST] mock_consul: RemoveTask(%q, %q)", allocID, task.Name)
|
||||
m.removes = append(m.removes, mockConsulOp{allocID, task, nil})
|
||||
m.ops = append(m.ops, newMockConsulOp("remove", allocID, task, nil))
|
||||
}
|
||||
|
||||
@@ -1056,6 +1056,7 @@ func (r *TaskRunner) run() {
|
||||
}
|
||||
|
||||
RESTART:
|
||||
// shouldRestart will block if the task should restart after a delay.
|
||||
restart := r.shouldRestart()
|
||||
if !restart {
|
||||
r.cleanup()
|
||||
@@ -1136,6 +1137,9 @@ func (r *TaskRunner) shouldRestart() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Unregister from Consul while waiting to restart.
|
||||
r.consul.RemoveTask(r.alloc.ID, r.task)
|
||||
|
||||
// Sleep but watch for destroy events.
|
||||
select {
|
||||
case <-time.After(when):
|
||||
|
||||
@@ -466,7 +466,7 @@ func TestTaskRunner_Download_Retries(t *testing.T) {
|
||||
}
|
||||
task.Artifacts = []*structs.TaskArtifact{&artifact}
|
||||
|
||||
// Make the restart policy try one ctx.upd.te
|
||||
// Make the restart policy try one ctx.update
|
||||
alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{
|
||||
Attempts: 1,
|
||||
Interval: 10 * time.Minute,
|
||||
@@ -526,6 +526,53 @@ func TestTaskRunner_Download_Retries(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestTaskRunner_UnregisterConsul_Retries asserts a task is unregistered from
|
||||
// Consul when waiting to be retried.
|
||||
func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) {
|
||||
ctestutil.ExecCompatible(t)
|
||||
|
||||
// Create an allocation that has a task with bad artifacts.
|
||||
alloc := mock.Alloc()
|
||||
|
||||
// Make the restart policy try one ctx.update
|
||||
alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{
|
||||
Attempts: 1,
|
||||
Interval: 10 * time.Minute,
|
||||
Delay: time.Nanosecond,
|
||||
Mode: structs.RestartPolicyModeFail,
|
||||
}
|
||||
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "1",
|
||||
"run_for": "1ns",
|
||||
}
|
||||
|
||||
ctx := testTaskRunnerFromAlloc(t, true, alloc)
|
||||
ctx.tr.MarkReceived()
|
||||
ctx.tr.Run()
|
||||
defer ctx.Cleanup()
|
||||
|
||||
// Assert it is properly registered and unregistered
|
||||
consul := ctx.tr.consul.(*mockConsulServiceClient)
|
||||
if expected := 4; len(consul.ops) != expected {
|
||||
t.Errorf("expected %d consul ops but found: %d", expected, len(consul.ops))
|
||||
}
|
||||
if consul.ops[0].op != "add" {
|
||||
t.Errorf("expected first op to be add but found: %q", consul.ops[0].op)
|
||||
}
|
||||
if consul.ops[1].op != "remove" {
|
||||
t.Errorf("expected second op to be remove but found: %q", consul.ops[1].op)
|
||||
}
|
||||
if consul.ops[2].op != "add" {
|
||||
t.Errorf("expected third op to be add but found: %q", consul.ops[2].op)
|
||||
}
|
||||
if consul.ops[3].op != "remove" {
|
||||
t.Errorf("expected fourth/final op to be remove but found: %q", consul.ops[3].op)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskRunner_Validate_UserEnforcement(t *testing.T) {
|
||||
ctestutil.ExecCompatible(t)
|
||||
ctx := testTaskRunner(t, false)
|
||||
|
||||
Reference in New Issue
Block a user