nomad: adding upsert policy endpoint

This commit is contained in:
Armon Dadgar
2017-08-07 21:01:14 -07:00
parent 634104b017
commit 4ec1cbfa86
5 changed files with 94 additions and 6 deletions

View File

@@ -15,13 +15,26 @@ type ACL struct {
srv *Server
}
// UpsertPolicy is used to create or update a policy
func (a *ACL) UpsertPolicy(args *structs.AllocListRequest, reply *structs.AllocListResponse) error {
if done, err := a.srv.forward("ACL.UpsertPolicy", args, args, reply); done {
// UpsertPolicies is used to create or update a set of policies
func (a *ACL) UpsertPolicies(args *structs.ACLPolicyUpsertRequest, reply *structs.GenericResponse) error {
if done, err := a.srv.forward("ACL.UpsertPolicies", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "acl", "upsert_policy"}, time.Now())
// TODO
defer metrics.MeasureSince([]string{"nomad", "acl", "upsert_policies"}, time.Now())
// Validate non-zero set of policies
if len(args.Policies) == 0 {
return fmt.Errorf("must specify as least one policy")
}
// Update via Raft
_, index, err := a.srv.raftApply(structs.ACLPolicyUpsertRequestType, args)
if err != nil {
return err
}
// Update the index
reply.Index = index
return nil
}

View File

@@ -221,7 +221,7 @@ func TestACLEndpoint_List_Blocking(t *testing.T) {
assert.Equal(t, 0, len(resp2.Policies))
}
func TestACLEndpoint_DeletePolicy(t *testing.T) {
func TestACLEndpoint_DeletePolicies(t *testing.T) {
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
@@ -243,3 +243,30 @@ func TestACLEndpoint_DeletePolicy(t *testing.T) {
}
assert.NotEqual(t, uint64(0), resp.Index)
}
func TestACLEndpoint_UpsertPolicies(t *testing.T) {
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
p1 := mock.ACLPolicy()
// Lookup the policies
req := &structs.ACLPolicyUpsertRequest{
Policies: []*structs.ACLPolicy{p1},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "ACL.UpsertPolicies", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
assert.NotEqual(t, uint64(0), resp.Index)
// Check we created the policy
out, err := s1.fsm.State().ACLPolicyByName(nil, p1.Name)
assert.Nil(t, err)
assert.NotNil(t, out)
}

View File

@@ -168,6 +168,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyDeploymentDelete(buf[1:], log.Index)
case structs.JobStabilityRequestType:
return n.applyJobStability(buf[1:], log.Index)
case structs.ACLPolicyUpsertRequestType:
return n.applyACLPolicyUpsert(buf[1:], log.Index)
case structs.ACLPolicyDeleteRequestType:
return n.applyACLPolicyDelete(buf[1:], log.Index)
default:
@@ -672,6 +674,21 @@ func (n *nomadFSM) applyJobStability(buf []byte, index uint64) interface{} {
return nil
}
// applyACLPolicyUpsert is used to upsert a set of policies
func (n *nomadFSM) applyACLPolicyUpsert(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_policy_upsert"}, time.Now())
var req structs.ACLPolicyUpsertRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpsertACLPolicies(index, req.Policies); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpsertACLPolicies failed: %v", err)
return err
}
return nil
}
// applyACLPolicyDelete is used to delete a set of policies
func (n *nomadFSM) applyACLPolicyDelete(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_policy_delete"}, time.Now())

View File

@@ -1518,6 +1518,31 @@ func TestFSM_DeleteDeployment(t *testing.T) {
}
}
func TestFSM_UpsertACLPolicies(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
policy := mock.ACLPolicy()
req := structs.ACLPolicyUpsertRequest{
Policies: []*structs.ACLPolicy{policy},
}
buf, err := structs.Encode(structs.ACLPolicyUpsertRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
ws := memdb.NewWatchSet()
out, err := fsm.State().ACLPolicyByName(ws, policy.Name)
assert.Nil(t, err)
assert.NotNil(t, out)
}
func TestFSM_DeleteACLPolicies(t *testing.T) {
t.Parallel()
fsm := testFSM(t)

View File

@@ -5379,3 +5379,9 @@ type ACLPolicyDeleteRequest struct {
Names []string
WriteRequest
}
// ACLPolicyUpsertRequest is used to upsert a set of policies
type ACLPolicyUpsertRequest struct {
Policies []*ACLPolicy
WriteRequest
}