From d701925ffa6124d8ecdd111d1fd655d1865e31dc Mon Sep 17 00:00:00 2001 From: Juana De La Cuesta Date: Thu, 5 Oct 2023 09:32:54 +0200 Subject: [PATCH] [f-gh-1106-reporting] Use full cluster metadata for reporting (#18660) * func: add reporting config to server * func: add reporting manager for ce * func: change from clusterID to clusterMetadata and use it to start ent ledearship * Update leader.go * style: typo --- nomad/leader.go | 17 +++-- nomad/leader_ce.go | 4 +- nomad/leader_test.go | 120 ++------------------------------ nomad/node_endpoint.go | 4 +- nomad/operator_endpoint_test.go | 8 +-- nomad/server.go | 40 ++++++++--- nomad/testing.go | 3 + 7 files changed, 62 insertions(+), 134 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index 8d43d4ea9..8aae6dec8 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -367,8 +367,11 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Initialize scheduler configuration. schedulerConfig := s.getOrCreateSchedulerConfig() - // Initialize the ClusterID - _, _ = s.ClusterID() + // Initialize the Cluster metadata + clusterMetadata, err := s.ClusterMetaData() + if err != nil { + return err + } // todo: use cluster ID for stuff, later! // Enable the plan queue, since we are now the leader @@ -489,7 +492,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { } // Setup any enterprise systems required. - if err := s.establishEnterpriseLeadership(stopCh); err != nil { + if err := s.establishEnterpriseLeadership(stopCh, clusterMetadata); err != nil { return err } @@ -2776,20 +2779,20 @@ func (s *Server) initializeKeyring(stopCh <-chan struct{}) { logger.Info("initialized keyring", "id", rootKey.Meta.KeyID) } -func (s *Server) generateClusterID() (string, error) { +func (s *Server) generateClusterMetadata() (structs.ClusterMetadata, error) { if !ServersMeetMinimumVersion(s.Members(), AllRegions, minClusterIDVersion, false) { s.logger.Named("core").Warn("cannot initialize cluster ID until all servers are above minimum version", "min_version", minClusterIDVersion) - return "", fmt.Errorf("cluster ID cannot be created until all servers are above minimum version %s", minClusterIDVersion) + return structs.ClusterMetadata{}, fmt.Errorf("cluster ID cannot be created until all servers are above minimum version %s", minClusterIDVersion) } newMeta := structs.ClusterMetadata{ClusterID: uuid.Generate(), CreateTime: time.Now().UnixNano()} if _, _, err := s.raftApply(structs.ClusterMetadataRequestType, newMeta); err != nil { s.logger.Named("core").Error("failed to create cluster ID", "error", err) - return "", fmt.Errorf("failed to create cluster ID: %w", err) + return structs.ClusterMetadata{}, fmt.Errorf("failed to create cluster ID: %w", err) } s.logger.Named("core").Info("established cluster id", "cluster_id", newMeta.ClusterID, "create_time", newMeta.CreateTime) - return newMeta.ClusterID, nil + return newMeta, nil } // handleEvalBrokerStateChange handles changing the evalBroker and blockedEvals diff --git a/nomad/leader_ce.go b/nomad/leader_ce.go index 754aa82c8..ce77b9c18 100644 --- a/nomad/leader_ce.go +++ b/nomad/leader_ce.go @@ -6,8 +6,10 @@ package nomad +import "github.com/hashicorp/nomad/nomad/structs" + // establishEnterpriseLeadership is a no-op on OSS. -func (s *Server) establishEnterpriseLeadership(stopCh chan struct{}) error { +func (s *Server) establishEnterpriseLeadership(stopCh chan struct{}, clusterMD structs.ClusterMetadata) error { return nil } diff --git a/nomad/leader_test.go b/nomad/leader_test.go index c07e88756..7e9d8d647 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -13,7 +13,6 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" - "github.com/hashicorp/go-version" "github.com/shoenig/test" "github.com/shoenig/test/must" "github.com/shoenig/test/wait" @@ -861,115 +860,10 @@ func TestLeader_ClusterID(t *testing.T) { defer cleanupS1() testutil.WaitForLeader(t, s1.RPC) - clusterID, err := s1.ClusterID() + clusterMD, err := s1.ClusterMetaData() require.NoError(t, err) - require.True(t, helper.IsUUID(clusterID)) -} - -func TestLeader_ClusterID_upgradePath(t *testing.T) { - ci.Parallel(t) - - before := version.Must(version.NewVersion("0.10.1")).String() - after := minClusterIDVersion.String() - - type server struct { - s *Server - cleanup func() - } - - outdated := func() server { - s, cleanup := TestServer(t, func(c *Config) { - c.NumSchedulers = 0 - c.Build = before - c.BootstrapExpect = 3 - c.Logger.SetLevel(hclog.Trace) - }) - return server{s: s, cleanup: cleanup} - } - - upgraded := func() server { - s, cleanup := TestServer(t, func(c *Config) { - c.NumSchedulers = 0 - c.Build = after - c.BootstrapExpect = 0 - c.Logger.SetLevel(hclog.Trace) - }) - return server{s: s, cleanup: cleanup} - } - - servers := []server{outdated(), outdated(), outdated()} - // fallback shutdown attempt in case testing fails - defer servers[0].cleanup() - defer servers[1].cleanup() - defer servers[2].cleanup() - - upgrade := func(i int) { - previous := servers[i] - - servers[i] = upgraded() - TestJoin(t, servers[i].s, servers[(i+1)%3].s, servers[(i+2)%3].s) - testutil.WaitForLeader(t, servers[i].s.RPC) - - require.NoError(t, previous.s.Leave()) - require.NoError(t, previous.s.Shutdown()) - } - - // Join the servers before doing anything - TestJoin(t, servers[0].s, servers[1].s, servers[2].s) - - // Wait for servers to settle - for i := 0; i < len(servers); i++ { - testutil.WaitForLeader(t, servers[i].s.RPC) - } - - // A check that ClusterID is not available yet - noIDYet := func() { - for _, s := range servers { - _, err := s.s.ClusterID() - must.Error(t, err) - } - } - - // Replace first old server with new server - upgrade(0) - defer servers[0].cleanup() - noIDYet() // ClusterID should not work yet, servers: [new, old, old] - - // Replace second old server with new server - upgrade(1) - defer servers[1].cleanup() - noIDYet() // ClusterID should not work yet, servers: [new, new, old] - - // Replace third / final old server with new server - upgrade(2) - defer servers[2].cleanup() - - // Wait for old servers to really be gone - for _, s := range servers { - testutil.WaitForResult(func() (bool, error) { - peers, _ := s.s.numPeers() - return peers == 3, nil - }, func(_ error) { - t.Fatalf("should have 3 peers") - }) - } - - // Now we can tickle the leader into making a cluster ID - leaderID := "" - for _, s := range servers { - if s.s.IsLeader() { - id, err := s.s.ClusterID() - require.NoError(t, err) - leaderID = id - break - } - } - require.True(t, helper.IsUUID(leaderID)) - - // Now every participating server has been upgraded, each one should be - // able to get the cluster ID, having been plumbed all the way through. - agreeClusterID(t, []*Server{servers[0].s, servers[1].s, servers[2].s}) + require.True(t, helper.IsUUID(clusterMD.ClusterID)) } func TestLeader_ClusterID_noUpgrade(t *testing.T) { @@ -1021,20 +915,20 @@ func agreeClusterID(t *testing.T, servers []*Server) { must.Len(t, 3, servers) f := func() error { - id1, err1 := servers[0].ClusterID() + id1, err1 := servers[0].ClusterMetaData() if err1 != nil { return err1 } - id2, err2 := servers[1].ClusterID() + id2, err2 := servers[1].ClusterMetaData() if err2 != nil { return err2 } - id3, err3 := servers[2].ClusterID() + id3, err3 := servers[2].ClusterMetaData() if err3 != nil { return err3 } - if id1 != id2 || id2 != id3 { - return fmt.Errorf("ids do not match, id1: %s, id2: %s, id3: %s", id1, id2, id3) + if id1.ClusterID != id2.ClusterID || id2.ClusterID != id3.ClusterID { + return fmt.Errorf("ids do not match, id1: %s, id2: %s, id3: %s", id1.ClusterID, id2.ClusterID, id3.ClusterID) } return nil } diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 71fcaefd8..84fb69d66 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -2042,12 +2042,14 @@ func (n *Node) DeriveSIToken(args *structs.DeriveSITokenRequest, reply *structs. } // Get the ClusterID - clusterID, err := n.srv.ClusterID() + clusterMD, err := n.srv.ClusterMetaData() if err != nil { setError(err, false) return nil } + clusterID := clusterMD.ClusterID + // Verify the following: // * The Node exists and has the correct SecretID. // * The Allocation exists on the specified Node. diff --git a/nomad/operator_endpoint_test.go b/nomad/operator_endpoint_test.go index 6899a1d0b..0608d8794 100644 --- a/nomad/operator_endpoint_test.go +++ b/nomad/operator_endpoint_test.go @@ -584,7 +584,7 @@ func TestOperator_SchedulerGetConfiguration(t *testing.T) { ci.Parallel(t) s1, cleanupS1 := TestServer(t, func(c *Config) { - c.Build = "0.9.0+unittest" + c.Build = "1.3.0+unittest" }) defer cleanupS1() codec := rpcClient(t, s1) @@ -608,7 +608,7 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) { ci.Parallel(t) s1, cleanupS1 := TestServer(t, func(c *Config) { - c.Build = "0.9.0+unittest" + c.Build = "1.3.0+unittest" }) defer cleanupS1() rpcCodec := rpcClient(t, s1) @@ -654,7 +654,7 @@ func TestOperator_SchedulerGetConfiguration_ACL(t *testing.T) { s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { c.RaftConfig.ProtocolVersion = 3 - c.Build = "0.9.0+unittest" + c.Build = "1.3.0+unittest" }) defer cleanupS1() codec := rpcClient(t, s1) @@ -701,7 +701,7 @@ func TestOperator_SchedulerSetConfiguration_ACL(t *testing.T) { s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { c.RaftConfig.ProtocolVersion = 3 - c.Build = "0.9.0+unittest" + c.Build = "1.3.0+unittest" }) defer cleanupS1() codec := rpcClient(t, s1) diff --git a/nomad/server.go b/nomad/server.go index c8e02ad06..f04d2e1c3 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -2058,7 +2058,8 @@ func (s *Server) ReplicationToken() string { return s.config.ReplicationToken } -// ClusterID returns the unique ID for this cluster. +// ClusterMetaData returns the unique ID for this cluster composed of a +// uuid and a timestamp. // // Any Nomad server agent may call this method to get at the ID. // If we are the leader and the ID has not yet been created, it will @@ -2066,7 +2067,7 @@ func (s *Server) ReplicationToken() string { // // The ID will not be created until all participating servers have reached // a minimum version (0.10.4). -func (s *Server) ClusterID() (string, error) { +func (s *Server) ClusterMetaData() (structs.ClusterMetadata, error) { s.clusterIDLock.Lock() defer s.clusterIDLock.Unlock() @@ -2075,32 +2076,55 @@ func (s *Server) ClusterID() (string, error) { existingMeta, err := fsmState.ClusterMetadata(nil) if err != nil { s.logger.Named("core").Error("failed to get cluster ID", "error", err) - return "", err + return structs.ClusterMetadata{}, err } // got the cluster ID from state store, cache that and return it if existingMeta != nil && existingMeta.ClusterID != "" { - return existingMeta.ClusterID, nil + return *existingMeta, nil } // if we are not the leader, nothing more we can do if !s.IsLeader() { - return "", errors.New("cluster ID not ready yet") + return structs.ClusterMetadata{}, errors.New("cluster ID not ready {}yet") } // we are the leader, try to generate the ID now - generatedID, err := s.generateClusterID() + generatedMD, err := s.generateClusterMetadata() if err != nil { - return "", err + return structs.ClusterMetadata{}, err } - return generatedID, nil + return generatedMD, nil } func (s *Server) isSingleServerCluster() bool { return s.config.BootstrapExpect == 1 } +func (s *Server) GetClientNodesCount() (int, error) { + stateSnapshot, err := s.State().Snapshot() + if err != nil { + return 0, err + } + + var count int + iter, err := stateSnapshot.Nodes(nil) + if err != nil { + return 0, err + } + + for { + raw := iter.Next() + if raw == nil { + break + } + count++ + } + + return count, nil +} + // peersInfoContent is used to help operators understand what happened to the // peers.json file. This is written to a file called peers.info in the same // location. diff --git a/nomad/testing.go b/nomad/testing.go index c118ff0c0..7152362d4 100644 --- a/nomad/testing.go +++ b/nomad/testing.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + structsconfig "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/version" testing "github.com/mitchellh/go-testing-interface" "github.com/shoenig/test/must" @@ -115,6 +116,8 @@ func TestConfigForServer(t testing.T) *Config { // Default to having concurrent schedulers config.NumSchedulers = 2 + config.Reporting = structsconfig.DefaultReporting() + return config }