From 4ec1cbfa86d1912d76b5ab267336c92cf9fcd970 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 7 Aug 2017 21:01:14 -0700 Subject: [PATCH] nomad: adding upsert policy endpoint --- nomad/acl_endpoint.go | 23 ++++++++++++++++++----- nomad/acl_endpoint_test.go | 29 ++++++++++++++++++++++++++++- nomad/fsm.go | 17 +++++++++++++++++ nomad/fsm_test.go | 25 +++++++++++++++++++++++++ nomad/structs/structs.go | 6 ++++++ 5 files changed, 94 insertions(+), 6 deletions(-) diff --git a/nomad/acl_endpoint.go b/nomad/acl_endpoint.go index adc4bb554..b63db4121 100644 --- a/nomad/acl_endpoint.go +++ b/nomad/acl_endpoint.go @@ -15,13 +15,26 @@ type ACL struct { srv *Server } -// UpsertPolicy is used to create or update a policy -func (a *ACL) UpsertPolicy(args *structs.AllocListRequest, reply *structs.AllocListResponse) error { - if done, err := a.srv.forward("ACL.UpsertPolicy", args, args, reply); done { +// UpsertPolicies is used to create or update a set of policies +func (a *ACL) UpsertPolicies(args *structs.ACLPolicyUpsertRequest, reply *structs.GenericResponse) error { + if done, err := a.srv.forward("ACL.UpsertPolicies", args, args, reply); done { return err } - defer metrics.MeasureSince([]string{"nomad", "acl", "upsert_policy"}, time.Now()) - // TODO + defer metrics.MeasureSince([]string{"nomad", "acl", "upsert_policies"}, time.Now()) + + // Validate non-zero set of policies + if len(args.Policies) == 0 { + return fmt.Errorf("must specify as least one policy") + } + + // Update via Raft + _, index, err := a.srv.raftApply(structs.ACLPolicyUpsertRequestType, args) + if err != nil { + return err + } + + // Update the index + reply.Index = index return nil } diff --git a/nomad/acl_endpoint_test.go b/nomad/acl_endpoint_test.go index 945a1a279..1a9cbbf77 100644 --- a/nomad/acl_endpoint_test.go +++ b/nomad/acl_endpoint_test.go @@ -221,7 +221,7 @@ func TestACLEndpoint_List_Blocking(t *testing.T) { assert.Equal(t, 0, len(resp2.Policies)) } -func TestACLEndpoint_DeletePolicy(t *testing.T) { +func TestACLEndpoint_DeletePolicies(t *testing.T) { t.Parallel() s1 := testServer(t, nil) defer s1.Shutdown() @@ -243,3 +243,30 @@ func TestACLEndpoint_DeletePolicy(t *testing.T) { } assert.NotEqual(t, uint64(0), resp.Index) } + +func TestACLEndpoint_UpsertPolicies(t *testing.T) { + t.Parallel() + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + p1 := mock.ACLPolicy() + + // Lookup the policies + req := &structs.ACLPolicyUpsertRequest{ + Policies: []*structs.ACLPolicy{p1}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "ACL.UpsertPolicies", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + assert.NotEqual(t, uint64(0), resp.Index) + + // Check we created the policy + out, err := s1.fsm.State().ACLPolicyByName(nil, p1.Name) + assert.Nil(t, err) + assert.NotNil(t, out) +} diff --git a/nomad/fsm.go b/nomad/fsm.go index 5feb56b27..f49cae32b 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -168,6 +168,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyDeploymentDelete(buf[1:], log.Index) case structs.JobStabilityRequestType: return n.applyJobStability(buf[1:], log.Index) + case structs.ACLPolicyUpsertRequestType: + return n.applyACLPolicyUpsert(buf[1:], log.Index) case structs.ACLPolicyDeleteRequestType: return n.applyACLPolicyDelete(buf[1:], log.Index) default: @@ -672,6 +674,21 @@ func (n *nomadFSM) applyJobStability(buf []byte, index uint64) interface{} { return nil } +// applyACLPolicyUpsert is used to upsert a set of policies +func (n *nomadFSM) applyACLPolicyUpsert(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_policy_upsert"}, time.Now()) + var req structs.ACLPolicyUpsertRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.UpsertACLPolicies(index, req.Policies); err != nil { + n.logger.Printf("[ERR] nomad.fsm: UpsertACLPolicies failed: %v", err) + return err + } + return nil +} + // applyACLPolicyDelete is used to delete a set of policies func (n *nomadFSM) applyACLPolicyDelete(buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_policy_delete"}, time.Now()) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index b3103bb16..cfec61812 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -1518,6 +1518,31 @@ func TestFSM_DeleteDeployment(t *testing.T) { } } +func TestFSM_UpsertACLPolicies(t *testing.T) { + t.Parallel() + fsm := testFSM(t) + + policy := mock.ACLPolicy() + req := structs.ACLPolicyUpsertRequest{ + Policies: []*structs.ACLPolicy{policy}, + } + buf, err := structs.Encode(structs.ACLPolicyUpsertRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + ws := memdb.NewWatchSet() + out, err := fsm.State().ACLPolicyByName(ws, policy.Name) + assert.Nil(t, err) + assert.NotNil(t, out) +} + func TestFSM_DeleteACLPolicies(t *testing.T) { t.Parallel() fsm := testFSM(t) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 2a79274c8..535c62eee 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -5379,3 +5379,9 @@ type ACLPolicyDeleteRequest struct { Names []string WriteRequest } + +// ACLPolicyUpsertRequest is used to upsert a set of policies +type ACLPolicyUpsertRequest struct { + Policies []*ACLPolicy + WriteRequest +}