prevent double job status update (#9768)

* Prevent Job Statuses from being calculated twice

https://github.com/hashicorp/nomad/pull/8435 introduced atomic eval
insertion iwth job (de-)registration. This change removes a now obsolete
guard which checked if the index was equal to the job.CreateIndex, which
would empty the status. Now that the job regisration eval insetion is
atomic with the registration this check is no longer necessary to set
the job statuses correctly.

* test to ensure only single job event for job register

* periodic e2e

* separate job update summary step

* fix updatejobstability to use copy instead of modified reference of job

* update envoygatewaybindaddresses copy to prevent job diff on null vs empty

* set ConsulGatewayBindAddress to empty map instead of nil

fix nil assertions for empty map

rm unnecessary guard
This commit is contained in:
Drew Bailey
2021-01-22 09:18:17 -05:00
committed by GitHub
parent 906963169a
commit 3cb1132693
13 changed files with 303 additions and 95 deletions

View File

@@ -1345,9 +1345,8 @@ func apiConnectGatewayProxyToStructs(in *api.ConsulGatewayProxy) *structs.Consul
return nil
}
var bindAddresses map[string]*structs.ConsulGatewayBindAddress
bindAddresses := make(map[string]*structs.ConsulGatewayBindAddress)
if in.EnvoyGatewayBindAddresses != nil {
bindAddresses = make(map[string]*structs.ConsulGatewayBindAddress)
for k, v := range in.EnvoyGatewayBindAddresses {
bindAddresses[k] = &structs.ConsulGatewayBindAddress{
Address: v.Address,

View File

@@ -24,6 +24,7 @@ import (
_ "github.com/hashicorp/nomad/e2e/nodedrain"
_ "github.com/hashicorp/nomad/e2e/nomad09upgrade"
_ "github.com/hashicorp/nomad/e2e/nomadexec"
_ "github.com/hashicorp/nomad/e2e/periodic"
_ "github.com/hashicorp/nomad/e2e/podman"
_ "github.com/hashicorp/nomad/e2e/quotas"
_ "github.com/hashicorp/nomad/e2e/rescheduling"

View File

@@ -6,12 +6,12 @@ import (
"io/ioutil"
"os/exec"
"regexp"
"strings"
)
// Register registers a jobspec from a file but with a unique ID.
// The caller is responsible for recording that ID for later cleanup.
func Register(jobID, jobFilePath string) error {
cmd := exec.Command("nomad", "job", "run", "-")
stdin, err := cmd.StdinPipe()
if err != nil {
@@ -40,6 +40,33 @@ func Register(jobID, jobFilePath string) error {
return nil
}
// PeriodicForce forces a periodic job to dispatch, returning the child job ID
// or an error
func PeriodicForce(jobID string) error {
// nomad job periodic force
cmd := exec.Command("nomad", "job", "periodic", "force", jobID)
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("could not register job: %w\n%v", err, string(out))
}
return nil
}
// JobInspectTemplate runs nomad job inspect and formats the output
// using the specified go template
func JobInspectTemplate(jobID, template string) (string, error) {
cmd := exec.Command("nomad", "job", "inspect", "-t", template, jobID)
out, err := cmd.CombinedOutput()
if err != nil {
return "", fmt.Errorf("could not inspect job: %w\n%v", err, string(out))
}
outStr := string(out)
outStr = strings.TrimSuffix(outStr, "\n")
return outStr, nil
}
// Register registers a jobspec from a string, also with a unique ID.
// The caller is responsible for recording that ID for later cleanup.
func RegisterFromJobspec(jobID, jobspec string) error {

View File

@@ -0,0 +1,28 @@
job "periodic" {
datacenters = ["dc1"]
type = "batch"
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
periodic {
cron = "* * * * *"
prohibit_overlap = true
}
group "group" {
task "task" {
driver = "docker"
config {
image = "busybox:1"
command = "/bin/sh"
args = ["-c", "sleep 5"]
}
}
}
}

82
e2e/periodic/periodic.go Normal file
View File

@@ -0,0 +1,82 @@
package periodic
import (
"fmt"
"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
type PeriodicTest struct {
framework.TC
jobIDs []string
}
func init() {
framework.AddSuites(&framework.TestSuite{
Component: "Periodic",
CanRunLocal: true,
Cases: []framework.TestCase{
new(PeriodicTest),
},
})
}
func (tc *PeriodicTest) BeforeAll(f *framework.F) {
e2eutil.WaitForLeader(f.T(), tc.Nomad())
}
func (tc *PeriodicTest) AfterEach(f *framework.F) {
nomadClient := tc.Nomad()
j := nomadClient.Jobs()
for _, id := range tc.jobIDs {
j.Deregister(id, true, nil)
}
_, err := e2eutil.Command("nomad", "system", "gc")
f.NoError(err)
}
func (tc *PeriodicTest) TestPeriodicDispatch_Basic(f *framework.F) {
t := f.T()
uuid := uuid.Generate()
jobID := fmt.Sprintf("periodicjob-%s", uuid[0:8])
tc.jobIDs = append(tc.jobIDs, jobID)
// register job
e2eutil.Register(jobID, "periodic/input/simple.nomad")
// force dispatch
require.NoError(t, e2eutil.PeriodicForce(jobID))
// Get the child job ID
childID, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 1}}{{printf "%s" .ID}}{{end}}`)
require.NoError(t, err)
require.NotEmpty(t, childID)
testutil.WaitForResult(func() (bool, error) {
status, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 1}}{{printf "%s" .Status}}{{end}}`)
require.NoError(t, err)
require.NotEmpty(t, status)
if status == "dead" {
return true, nil
}
return false, fmt.Errorf("expected periodic job to be dead, got %s", status)
}, func(err error) {
require.NoError(t, err)
})
// Assert there are no pending children
pending, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 0}}{{printf "%d" .JobSummary.Children.Pending}}{{end}}`)
require.NoError(t, err)
require.Equal(t, "0", pending)
// Assert there are no pending children
dead, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 0}}{{printf "%d" .JobSummary.Children.Dead}}{{end}}`)
require.NoError(t, err)
require.Equal(t, "1", dead)
}

View File

@@ -1,6 +1,9 @@
NOMAD_SHA ?= $(shell git rev-parse HEAD)
PKG_PATH = $(shell pwd)/../../pkg/linux_amd64/nomad
# The version of nomad that gets deployed depends on an order of precedence
# linked below
# https://github.com/hashicorp/nomad/blob/master/e2e/terraform/README.md#nomad-version
dev-cluster:
terraform apply -auto-approve \
-var="nomad_sha=$(NOMAD_SHA)"

View File

@@ -2,6 +2,7 @@ package nomad
import (
"bytes"
"context"
"fmt"
"reflect"
"strings"
@@ -3422,3 +3423,59 @@ func TestFSM_ACLEvents(t *testing.T) {
})
}
}
// TestFSM_EventBroker_JobRegisterFSMEvents asserts that only a single job
// register event is emitted when registering a job
func TestFSM_EventBroker_JobRegisterFSMEvents(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
job := mock.Job()
eval := mock.Eval()
eval.JobID = job.ID
req := structs.JobRegisterRequest{
Job: job,
Eval: eval,
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
require.NoError(t, err)
resp := fsm.Apply(makeLog(buf))
require.Nil(t, resp)
broker, err := fsm.State().EventBroker()
require.NoError(t, err)
subReq := &stream.SubscribeRequest{
Topics: map[structs.Topic][]string{
structs.TopicJob: {"*"},
},
}
sub, err := broker.Subscribe(subReq)
require.NoError(t, err)
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(500*time.Millisecond))
defer cancel()
// consume the queue
var events []structs.Event
for {
out, err := sub.Next(ctx)
if len(out.Events) == 0 {
break
}
// consume the queue until the deadline has exceeded or until we've
// received more events than expected
if err == context.DeadlineExceeded || len(events) > 1 {
break
}
events = append(events, out.Events...)
}
require.Len(t, events, 1)
require.Equal(t, structs.TypeJobRegistered, events[0].Type)
}

View File

@@ -345,7 +345,7 @@ func gatewayProxyForBridge(gateway *structs.ConsulGateway) *structs.ConsulGatewa
func gatewayBindAddresses(ingress *structs.ConsulIngressConfigEntry) map[string]*structs.ConsulGatewayBindAddress {
if ingress == nil || len(ingress.Listeners) == 0 {
return nil
return make(map[string]*structs.ConsulGatewayBindAddress)
}
addresses := make(map[string]*structs.ConsulGatewayBindAddress)

View File

@@ -412,12 +412,12 @@ func TestJobEndpointConnect_gatewayProxyIsDefault(t *testing.T) {
func TestJobEndpointConnect_gatewayBindAddresses(t *testing.T) {
t.Run("nil", func(t *testing.T) {
result := gatewayBindAddresses(nil)
require.Nil(t, result)
require.Empty(t, result)
})
t.Run("no listeners", func(t *testing.T) {
result := gatewayBindAddresses(&structs.ConsulIngressConfigEntry{Listeners: nil})
require.Nil(t, result)
require.Empty(t, result)
})
t.Run("simple", func(t *testing.T) {

View File

@@ -875,7 +875,8 @@ func ConnectIngressGatewayJob(mode string, inject bool) *structs.Job {
Connect: &structs.ConsulConnect{
Gateway: &structs.ConsulGateway{
Proxy: &structs.ConsulGatewayProxy{
ConnectTimeout: helper.TimeToPtr(3 * time.Second),
ConnectTimeout: helper.TimeToPtr(3 * time.Second),
EnvoyGatewayBindAddresses: make(map[string]*structs.ConsulGatewayBindAddress),
},
Ingress: &structs.ConsulIngressConfigEntry{
Listeners: []*structs.ConsulIngressListener{{

View File

@@ -4412,6 +4412,7 @@ func (s *StateStore) setJobStatuses(index uint64, txn *txn,
if err := s.setJobStatus(index, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil {
return err
}
}
return nil
@@ -4427,9 +4428,7 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn,
// Capture the current status so we can check if there is a change
oldStatus := job.Status
if index == job.CreateIndex {
oldStatus = ""
}
firstPass := index == job.CreateIndex
newStatus := forceStatus
// If forceStatus is not set, compute the jobs status.
@@ -4441,8 +4440,12 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn,
}
}
// Fast-path if nothing has changed.
// Fast-path if the job has changed.
// Still update the job summary if necessary.
if oldStatus == newStatus {
if err := s.setJobSummary(txn, job, index, oldStatus, newStatus, firstPass); err != nil {
return err
}
return nil
}
@@ -4460,64 +4463,72 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn,
}
// Update the children summary
if updated.ParentID != "" {
// Try to update the summary of the parent job summary
summaryRaw, err := txn.First("job_summary", "id", updated.Namespace, updated.ParentID)
if err != nil {
return fmt.Errorf("unable to retrieve summary for parent job: %v", err)
}
if err := s.setJobSummary(txn, updated, index, oldStatus, newStatus, firstPass); err != nil {
return fmt.Errorf("job summary update failed %w", err)
}
return nil
}
// Only continue if the summary exists. It could not exist if the parent
// job was removed
if summaryRaw != nil {
existing := summaryRaw.(*structs.JobSummary)
pSummary := existing.Copy()
if pSummary.Children == nil {
pSummary.Children = new(structs.JobChildrenSummary)
}
// Determine the transition and update the correct fields
children := pSummary.Children
// Decrement old status
if oldStatus != "" {
switch oldStatus {
case structs.JobStatusPending:
children.Pending--
case structs.JobStatusRunning:
children.Running--
case structs.JobStatusDead:
children.Dead--
default:
return fmt.Errorf("unknown old job status %q", oldStatus)
}
}
// Increment new status
switch newStatus {
case structs.JobStatusPending:
children.Pending++
case structs.JobStatusRunning:
children.Running++
case structs.JobStatusDead:
children.Dead++
default:
return fmt.Errorf("unknown new job status %q", newStatus)
}
// Update the index
pSummary.ModifyIndex = index
// Insert the summary
if err := txn.Insert("job_summary", pSummary); err != nil {
return fmt.Errorf("job summary insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
}
func (s *StateStore) setJobSummary(txn *txn, updated *structs.Job, index uint64, oldStatus, newStatus string, firstPass bool) error {
if updated.ParentID == "" {
return nil
}
// Try to update the summary of the parent job summary
summaryRaw, err := txn.First("job_summary", "id", updated.Namespace, updated.ParentID)
if err != nil {
return fmt.Errorf("unable to retrieve summary for parent job: %v", err)
}
// Only continue if the summary exists. It could not exist if the parent
// job was removed
if summaryRaw != nil {
existing := summaryRaw.(*structs.JobSummary)
pSummary := existing.Copy()
if pSummary.Children == nil {
pSummary.Children = new(structs.JobChildrenSummary)
}
// Determine the transition and update the correct fields
children := pSummary.Children
// Decrement old status
if !firstPass {
switch oldStatus {
case structs.JobStatusPending:
children.Pending--
case structs.JobStatusRunning:
children.Running--
case structs.JobStatusDead:
children.Dead--
default:
return fmt.Errorf("unknown old job status %q", oldStatus)
}
}
// Increment new status
switch newStatus {
case structs.JobStatusPending:
children.Pending++
case structs.JobStatusRunning:
children.Running++
case structs.JobStatusDead:
children.Dead++
default:
return fmt.Errorf("unknown new job status %q", newStatus)
}
// Update the index
pSummary.ModifyIndex = index
// Insert the summary
if err := txn.Insert("job_summary", pSummary); err != nil {
return fmt.Errorf("job summary insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
}
return nil
}

View File

@@ -6839,32 +6839,20 @@ func TestStateStore_UpdateJobStability(t *testing.T) {
// Insert a job twice to get two versions
job := mock.Job()
if err := state.UpsertJob(structs.MsgTypeTestSetup, 1, job); err != nil {
t.Fatalf("bad: %v", err)
}
require.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1, job))
if err := state.UpsertJob(structs.MsgTypeTestSetup, 2, job); err != nil {
t.Fatalf("bad: %v", err)
}
require.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 2, job.Copy()))
// Update the stability to true
err := state.UpdateJobStability(3, job.Namespace, job.ID, 0, true)
if err != nil {
t.Fatalf("bad: %v", err)
}
require.NoError(t, err)
// Check that the job was updated properly
ws := memdb.NewWatchSet()
jout, _ := state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0)
if err != nil {
t.Fatalf("bad: %v", err)
}
if jout == nil {
t.Fatalf("bad: %#v", jout)
}
if !jout.Stable {
t.Fatalf("job not marked stable %#v", jout)
}
jout, err := state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0)
require.NoError(t, err)
require.NotNil(t, jout)
require.True(t, jout.Stable, "job not marked as stable")
// Update the stability to false
err = state.UpdateJobStability(3, job.Namespace, job.ID, 0, false)
@@ -6873,16 +6861,10 @@ func TestStateStore_UpdateJobStability(t *testing.T) {
}
// Check that the job was updated properly
jout, _ = state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0)
if err != nil {
t.Fatalf("bad: %v", err)
}
if jout == nil {
t.Fatalf("bad: %#v", jout)
}
if jout.Stable {
t.Fatalf("job marked stable %#v", jout)
}
jout, err = state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0)
require.NoError(t, err)
require.NotNil(t, jout)
require.False(t, jout.Stable)
}
// Test that nonexistent deployment can't be promoted

View File

@@ -293,6 +293,23 @@ func TestConsulConnect_CopyEquals(t *testing.T) {
require.False(t, c.Equals(o))
}
func TestConsulConnect_GatewayProxy_CopyEquals(t *testing.T) {
t.Parallel()
c := &ConsulGatewayProxy{
ConnectTimeout: helper.TimeToPtr(1 * time.Second),
EnvoyGatewayBindTaggedAddresses: false,
EnvoyGatewayBindAddresses: make(map[string]*ConsulGatewayBindAddress),
}
require.NoError(t, c.Validate())
// Copies should be equivalent
o := c.Copy()
require.Equal(t, c, o)
require.True(t, c.Equals(o))
}
func TestSidecarTask_MergeIntoTask(t *testing.T) {
t.Parallel()