From d22a3ddc7e243d2852373a854bb7a8499fe2febc Mon Sep 17 00:00:00 2001 From: James Rasell Date: Thu, 3 Mar 2022 11:24:29 +0100 Subject: [PATCH] fsm: add FSM functionality for service registration endpoints. --- nomad/fsm.go | 51 +++++++++++++++++++++++++++++ nomad/fsm_test.go | 81 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+) diff --git a/nomad/fsm.go b/nomad/fsm.go index 8f6355936..b507508d6 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -307,6 +307,12 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyOneTimeTokenDelete(msgType, buf[1:], log.Index) case structs.OneTimeTokenExpireRequestType: return n.applyOneTimeTokenExpire(msgType, buf[1:], log.Index) + case structs.ServiceRegistrationUpsertRequestType: + return n.applyUpsertServiceRegistrations(msgType, buf[1:], log.Index) + case structs.ServiceRegistrationDeleteByIDRequestType: + return n.applyDeleteServiceRegistrationByID(msgType, buf[1:], log.Index) + case structs.ServiceRegistrationDeleteByNodeIDRequestType: + return n.applyDeleteServiceRegistrationByNodeID(msgType, buf[1:], log.Index) } // Check enterprise only message types. @@ -1894,6 +1900,51 @@ func (n *nomadFSM) applyUpsertScalingEvent(buf []byte, index uint64) interface{} return nil } +func (n *nomadFSM) applyUpsertServiceRegistrations(msgType structs.MessageType, buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_service_registration_upsert"}, time.Now()) + var req structs.ServiceRegistrationUpsertRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.UpsertServiceRegistrations(msgType, index, req.Services); err != nil { + n.logger.Error("UpsertServiceRegistrations failed", "error", err) + return err + } + + return nil +} + +func (n *nomadFSM) applyDeleteServiceRegistrationByID(msgType structs.MessageType, buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_service_registration_delete_id"}, time.Now()) + var req structs.ServiceRegistrationDeleteByIDRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.DeleteServiceRegistrationByID(msgType, index, req.RequestNamespace(), req.ID); err != nil { + n.logger.Error("DeleteServiceRegistrationByID failed", "error", err) + return err + } + + return nil +} + +func (n *nomadFSM) applyDeleteServiceRegistrationByNodeID(msgType structs.MessageType, buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_service_registration_delete_node_id"}, time.Now()) + var req structs.ServiceRegistrationDeleteByNodeIDRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.DeleteServiceRegistrationByNodeID(msgType, index, req.NodeID); err != nil { + n.logger.Error("DeleteServiceRegistrationByNodeID failed", "error", err) + return err + } + + return nil +} + func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { defer metrics.MeasureSince([]string{"nomad", "fsm", "persist"}, time.Now()) // Register the nodes diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 42b3a7e25..f007d82b9 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -3259,6 +3259,87 @@ func TestFSM_SnapshotRestore_Namespaces(t *testing.T) { } } +func TestFSM_UpsertServiceRegistrations(t *testing.T) { + t.Parallel() + fsm := testFSM(t) + + // Generate our test service registrations. + services := mock.ServiceRegistrations() + + // Build and apply our message. + req := structs.ServiceRegistrationUpsertRequest{Services: services} + buf, err := structs.Encode(structs.ServiceRegistrationUpsertRequestType, req) + assert.Nil(t, err) + assert.Nil(t, fsm.Apply(makeLog(buf))) + + // Check that both services are found within state. + ws := memdb.NewWatchSet() + out, err := fsm.State().GetServiceRegistrationByID(ws, services[0].Namespace, services[0].ID) + assert.Nil(t, err) + assert.NotNil(t, out) + + out, err = fsm.State().GetServiceRegistrationByID(ws, services[1].Namespace, services[1].ID) + assert.Nil(t, err) + assert.NotNil(t, out) +} + +func TestFSM_DeleteServiceRegistrationsByID(t *testing.T) { + t.Parallel() + fsm := testFSM(t) + + // Generate our test service registrations. + services := mock.ServiceRegistrations() + + // Upsert the services. + assert.NoError(t, fsm.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, uint64(10), services)) + + // Build and apply our message. + req := structs.ServiceRegistrationDeleteByIDRequest{ID: services[0].ID} + buf, err := structs.Encode(structs.ServiceRegistrationDeleteByIDRequestType, req) + assert.Nil(t, err) + assert.Nil(t, fsm.Apply(makeLog(buf))) + + // Check that the service has been deleted, whilst the other is still + // available. + ws := memdb.NewWatchSet() + out, err := fsm.State().GetServiceRegistrationByID(ws, services[0].Namespace, services[0].ID) + assert.Nil(t, err) + assert.Nil(t, out) + + out, err = fsm.State().GetServiceRegistrationByID(ws, services[1].Namespace, services[1].ID) + assert.Nil(t, err) + assert.NotNil(t, out) +} + +func TestFSM_DeleteServiceRegistrationsByNodeID(t *testing.T) { + t.Parallel() + fsm := testFSM(t) + + // Generate our test service registrations. Set them both to have the same + // node ID. + services := mock.ServiceRegistrations() + services[1].NodeID = services[0].NodeID + + // Upsert the services. + assert.NoError(t, fsm.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, uint64(10), services)) + + // Build and apply our message. + req := structs.ServiceRegistrationDeleteByNodeIDRequest{NodeID: services[0].NodeID} + buf, err := structs.Encode(structs.ServiceRegistrationDeleteByNodeIDRequestType, req) + assert.Nil(t, err) + assert.Nil(t, fsm.Apply(makeLog(buf))) + + // Check both services have been removed. + ws := memdb.NewWatchSet() + out, err := fsm.State().GetServiceRegistrationByID(ws, services[0].Namespace, services[0].ID) + assert.Nil(t, err) + assert.Nil(t, out) + + out, err = fsm.State().GetServiceRegistrationByID(ws, services[1].Namespace, services[1].ID) + assert.Nil(t, err) + assert.Nil(t, out) +} + func TestFSM_ACLEvents(t *testing.T) { t.Parallel()