diff --git a/client/client.go b/client/client.go index dfbd22e7f..76b5748f8 100644 --- a/client/client.go +++ b/client/client.go @@ -795,8 +795,9 @@ func (c *Client) registerAndHeartbeat() { c.retryRegisterNode() heartbeat = time.After(lib.RandomStagger(initialHeartbeatStagger)) } else { - c.logger.Printf("[ERR] client: heartbeating failed: %v", err) - heartbeat = time.After(c.retryIntv(registerRetryIntv)) + intv := c.retryIntv(registerRetryIntv) + c.logger.Printf("[ERR] client: heartbeating failed. Retrying in %v: %v", intv, err) + heartbeat = time.After(intv) } } else { c.heartbeatLock.Lock() diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index b3cde6a89..874af7a05 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -7,6 +7,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/driver" @@ -21,6 +22,16 @@ const ( RegisterEnforceIndexErrPrefix = "Enforcing job modify index" ) +var ( + // vaultConstraint is the implicit constraint added to jobs requesting a + // Vault token + vaultConstraint = &structs.Constraint{ + LTarget: "${attr.vault.version}", + RTarget: ">= 0.6.1", + Operand: structs.ConstraintVersion, + } +) + // Job endpoint is used for job interactions type Job struct { srv *Server @@ -70,8 +81,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis } // Ensure that the job has permissions for the requested Vault tokens - desiredPolicies := structs.VaultPoliciesSet(args.Job.VaultPolicies()) - if len(desiredPolicies) != 0 { + policies := args.Job.VaultPolicies() + if len(policies) != 0 { vconf := j.srv.config.VaultConfig if !vconf.Enabled { return fmt.Errorf("Vault not enabled and Vault policies requested") @@ -94,10 +105,36 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return err } - subset, offending := structs.SliceStringIsSubset(allowedPolicies, desiredPolicies) - if !subset { - return fmt.Errorf("Passed Vault Token doesn't allow access to the following policies: %s", - strings.Join(offending, ", ")) + // If we are given a root token it can access all policies + if !lib.StrContains(allowedPolicies, "root") { + flatPolicies := structs.VaultPoliciesSet(policies) + subset, offending := structs.SliceStringIsSubset(allowedPolicies, flatPolicies) + if !subset { + return fmt.Errorf("Passed Vault Token doesn't allow access to the following policies: %s", + strings.Join(offending, ", ")) + } + } + } + + // Add implicit constraints that the task groups are run on a Node with + // Vault + for _, tg := range args.Job.TaskGroups { + _, ok := policies[tg.Name] + if !ok { + // Not requesting Vault + continue + } + + found := false + for _, c := range tg.Constraints { + if c.Equal(vaultConstraint) { + found = true + break + } + } + + if !found { + tg.Constraints = append(tg.Constraints, vaultConstraint) } } } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 9d843622e..d909032cd 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -490,6 +490,10 @@ func TestJobEndpoint_Register_Vault_Policies(t *testing.T) { goodPolicies := []string{"foo", "bar", "baz"} tvc.SetLookupTokenAllowedPolicies(goodToken, goodPolicies) + rootToken := structs.GenerateUUID() + rootPolicies := []string{"root"} + tvc.SetLookupTokenAllowedPolicies(rootToken, rootPolicies) + errToken := structs.GenerateUUID() expectedErr := fmt.Errorf("return errors from vault") tvc.SetLookupTokenError(errToken, expectedErr) @@ -542,6 +546,46 @@ func TestJobEndpoint_Register_Vault_Policies(t *testing.T) { if out.VaultToken != "" { t.Fatalf("vault token not cleared") } + + // Check that an implicit constraint was created + constraints := out.TaskGroups[0].Constraints + if l := len(constraints); l != 1 { + t.Fatalf("Unexpected number of tests: %v", l) + } + + if !constraints[0].Equal(vaultConstraint) { + t.Fatalf("bad constraint; got %#v; want %#v", constraints[0], vaultConstraint) + } + + // Create the register request with another job asking for a vault policy but + // send the root Vault token + job2 := mock.Job() + job2.VaultToken = rootToken + job2.TaskGroups[0].Tasks[0].Vault = &structs.Vault{Policies: []string{policy}} + req = &structs.JobRegisterRequest{ + Job: job2, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil { + t.Fatalf("bad: %v", err) + } + + // Check for the job in the FSM + out, err = state.JobByID(job2.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("expected job") + } + if out.CreateIndex != resp.JobModifyIndex { + t.Fatalf("index mis-match") + } + if out.VaultToken != "" { + t.Fatalf("vault token not cleared") + } } func TestJobEndpoint_Evaluate(t *testing.T) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7ca7d6fc0..b06a68307 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2534,6 +2534,13 @@ type Constraint struct { str string // Memoized string } +// Equal checks if two constraints are equal +func (c *Constraint) Equal(o *Constraint) bool { + return c.LTarget == o.LTarget && + c.RTarget == o.RTarget && + c.Operand == o.Operand +} + func (c *Constraint) Copy() *Constraint { if c == nil { return nil