[f-gh-1106-reporting] Use full cluster metadata for reporting (#18660)

* func: add reporting config to server

* func: add reporting manager for ce

* func: change from clusterID to clusterMetadata and use it to start ent ledearship

* Update leader.go

* style: typo
This commit is contained in:
Juana De La Cuesta
2023-10-05 09:32:54 +02:00
committed by GitHub
parent 03cf9ae7ff
commit d701925ffa
7 changed files with 62 additions and 134 deletions

View File

@@ -367,8 +367,11 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Initialize scheduler configuration.
schedulerConfig := s.getOrCreateSchedulerConfig()
// Initialize the ClusterID
_, _ = s.ClusterID()
// Initialize the Cluster metadata
clusterMetadata, err := s.ClusterMetaData()
if err != nil {
return err
}
// todo: use cluster ID for stuff, later!
// Enable the plan queue, since we are now the leader
@@ -489,7 +492,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
}
// Setup any enterprise systems required.
if err := s.establishEnterpriseLeadership(stopCh); err != nil {
if err := s.establishEnterpriseLeadership(stopCh, clusterMetadata); err != nil {
return err
}
@@ -2776,20 +2779,20 @@ func (s *Server) initializeKeyring(stopCh <-chan struct{}) {
logger.Info("initialized keyring", "id", rootKey.Meta.KeyID)
}
func (s *Server) generateClusterID() (string, error) {
func (s *Server) generateClusterMetadata() (structs.ClusterMetadata, error) {
if !ServersMeetMinimumVersion(s.Members(), AllRegions, minClusterIDVersion, false) {
s.logger.Named("core").Warn("cannot initialize cluster ID until all servers are above minimum version", "min_version", minClusterIDVersion)
return "", fmt.Errorf("cluster ID cannot be created until all servers are above minimum version %s", minClusterIDVersion)
return structs.ClusterMetadata{}, fmt.Errorf("cluster ID cannot be created until all servers are above minimum version %s", minClusterIDVersion)
}
newMeta := structs.ClusterMetadata{ClusterID: uuid.Generate(), CreateTime: time.Now().UnixNano()}
if _, _, err := s.raftApply(structs.ClusterMetadataRequestType, newMeta); err != nil {
s.logger.Named("core").Error("failed to create cluster ID", "error", err)
return "", fmt.Errorf("failed to create cluster ID: %w", err)
return structs.ClusterMetadata{}, fmt.Errorf("failed to create cluster ID: %w", err)
}
s.logger.Named("core").Info("established cluster id", "cluster_id", newMeta.ClusterID, "create_time", newMeta.CreateTime)
return newMeta.ClusterID, nil
return newMeta, nil
}
// handleEvalBrokerStateChange handles changing the evalBroker and blockedEvals

View File

@@ -6,8 +6,10 @@
package nomad
import "github.com/hashicorp/nomad/nomad/structs"
// establishEnterpriseLeadership is a no-op on OSS.
func (s *Server) establishEnterpriseLeadership(stopCh chan struct{}) error {
func (s *Server) establishEnterpriseLeadership(stopCh chan struct{}, clusterMD structs.ClusterMetadata) error {
return nil
}

View File

@@ -13,7 +13,6 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-version"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
@@ -861,115 +860,10 @@ func TestLeader_ClusterID(t *testing.T) {
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
clusterID, err := s1.ClusterID()
clusterMD, err := s1.ClusterMetaData()
require.NoError(t, err)
require.True(t, helper.IsUUID(clusterID))
}
func TestLeader_ClusterID_upgradePath(t *testing.T) {
ci.Parallel(t)
before := version.Must(version.NewVersion("0.10.1")).String()
after := minClusterIDVersion.String()
type server struct {
s *Server
cleanup func()
}
outdated := func() server {
s, cleanup := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
c.Build = before
c.BootstrapExpect = 3
c.Logger.SetLevel(hclog.Trace)
})
return server{s: s, cleanup: cleanup}
}
upgraded := func() server {
s, cleanup := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
c.Build = after
c.BootstrapExpect = 0
c.Logger.SetLevel(hclog.Trace)
})
return server{s: s, cleanup: cleanup}
}
servers := []server{outdated(), outdated(), outdated()}
// fallback shutdown attempt in case testing fails
defer servers[0].cleanup()
defer servers[1].cleanup()
defer servers[2].cleanup()
upgrade := func(i int) {
previous := servers[i]
servers[i] = upgraded()
TestJoin(t, servers[i].s, servers[(i+1)%3].s, servers[(i+2)%3].s)
testutil.WaitForLeader(t, servers[i].s.RPC)
require.NoError(t, previous.s.Leave())
require.NoError(t, previous.s.Shutdown())
}
// Join the servers before doing anything
TestJoin(t, servers[0].s, servers[1].s, servers[2].s)
// Wait for servers to settle
for i := 0; i < len(servers); i++ {
testutil.WaitForLeader(t, servers[i].s.RPC)
}
// A check that ClusterID is not available yet
noIDYet := func() {
for _, s := range servers {
_, err := s.s.ClusterID()
must.Error(t, err)
}
}
// Replace first old server with new server
upgrade(0)
defer servers[0].cleanup()
noIDYet() // ClusterID should not work yet, servers: [new, old, old]
// Replace second old server with new server
upgrade(1)
defer servers[1].cleanup()
noIDYet() // ClusterID should not work yet, servers: [new, new, old]
// Replace third / final old server with new server
upgrade(2)
defer servers[2].cleanup()
// Wait for old servers to really be gone
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.s.numPeers()
return peers == 3, nil
}, func(_ error) {
t.Fatalf("should have 3 peers")
})
}
// Now we can tickle the leader into making a cluster ID
leaderID := ""
for _, s := range servers {
if s.s.IsLeader() {
id, err := s.s.ClusterID()
require.NoError(t, err)
leaderID = id
break
}
}
require.True(t, helper.IsUUID(leaderID))
// Now every participating server has been upgraded, each one should be
// able to get the cluster ID, having been plumbed all the way through.
agreeClusterID(t, []*Server{servers[0].s, servers[1].s, servers[2].s})
require.True(t, helper.IsUUID(clusterMD.ClusterID))
}
func TestLeader_ClusterID_noUpgrade(t *testing.T) {
@@ -1021,20 +915,20 @@ func agreeClusterID(t *testing.T, servers []*Server) {
must.Len(t, 3, servers)
f := func() error {
id1, err1 := servers[0].ClusterID()
id1, err1 := servers[0].ClusterMetaData()
if err1 != nil {
return err1
}
id2, err2 := servers[1].ClusterID()
id2, err2 := servers[1].ClusterMetaData()
if err2 != nil {
return err2
}
id3, err3 := servers[2].ClusterID()
id3, err3 := servers[2].ClusterMetaData()
if err3 != nil {
return err3
}
if id1 != id2 || id2 != id3 {
return fmt.Errorf("ids do not match, id1: %s, id2: %s, id3: %s", id1, id2, id3)
if id1.ClusterID != id2.ClusterID || id2.ClusterID != id3.ClusterID {
return fmt.Errorf("ids do not match, id1: %s, id2: %s, id3: %s", id1.ClusterID, id2.ClusterID, id3.ClusterID)
}
return nil
}

View File

@@ -2042,12 +2042,14 @@ func (n *Node) DeriveSIToken(args *structs.DeriveSITokenRequest, reply *structs.
}
// Get the ClusterID
clusterID, err := n.srv.ClusterID()
clusterMD, err := n.srv.ClusterMetaData()
if err != nil {
setError(err, false)
return nil
}
clusterID := clusterMD.ClusterID
// Verify the following:
// * The Node exists and has the correct SecretID.
// * The Allocation exists on the specified Node.

View File

@@ -584,7 +584,7 @@ func TestOperator_SchedulerGetConfiguration(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.Build = "0.9.0+unittest"
c.Build = "1.3.0+unittest"
})
defer cleanupS1()
codec := rpcClient(t, s1)
@@ -608,7 +608,7 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.Build = "0.9.0+unittest"
c.Build = "1.3.0+unittest"
})
defer cleanupS1()
rpcCodec := rpcClient(t, s1)
@@ -654,7 +654,7 @@ func TestOperator_SchedulerGetConfiguration_ACL(t *testing.T) {
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
c.RaftConfig.ProtocolVersion = 3
c.Build = "0.9.0+unittest"
c.Build = "1.3.0+unittest"
})
defer cleanupS1()
codec := rpcClient(t, s1)
@@ -701,7 +701,7 @@ func TestOperator_SchedulerSetConfiguration_ACL(t *testing.T) {
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
c.RaftConfig.ProtocolVersion = 3
c.Build = "0.9.0+unittest"
c.Build = "1.3.0+unittest"
})
defer cleanupS1()
codec := rpcClient(t, s1)

View File

@@ -2058,7 +2058,8 @@ func (s *Server) ReplicationToken() string {
return s.config.ReplicationToken
}
// ClusterID returns the unique ID for this cluster.
// ClusterMetaData returns the unique ID for this cluster composed of a
// uuid and a timestamp.
//
// Any Nomad server agent may call this method to get at the ID.
// If we are the leader and the ID has not yet been created, it will
@@ -2066,7 +2067,7 @@ func (s *Server) ReplicationToken() string {
//
// The ID will not be created until all participating servers have reached
// a minimum version (0.10.4).
func (s *Server) ClusterID() (string, error) {
func (s *Server) ClusterMetaData() (structs.ClusterMetadata, error) {
s.clusterIDLock.Lock()
defer s.clusterIDLock.Unlock()
@@ -2075,32 +2076,55 @@ func (s *Server) ClusterID() (string, error) {
existingMeta, err := fsmState.ClusterMetadata(nil)
if err != nil {
s.logger.Named("core").Error("failed to get cluster ID", "error", err)
return "", err
return structs.ClusterMetadata{}, err
}
// got the cluster ID from state store, cache that and return it
if existingMeta != nil && existingMeta.ClusterID != "" {
return existingMeta.ClusterID, nil
return *existingMeta, nil
}
// if we are not the leader, nothing more we can do
if !s.IsLeader() {
return "", errors.New("cluster ID not ready yet")
return structs.ClusterMetadata{}, errors.New("cluster ID not ready {}yet")
}
// we are the leader, try to generate the ID now
generatedID, err := s.generateClusterID()
generatedMD, err := s.generateClusterMetadata()
if err != nil {
return "", err
return structs.ClusterMetadata{}, err
}
return generatedID, nil
return generatedMD, nil
}
func (s *Server) isSingleServerCluster() bool {
return s.config.BootstrapExpect == 1
}
func (s *Server) GetClientNodesCount() (int, error) {
stateSnapshot, err := s.State().Snapshot()
if err != nil {
return 0, err
}
var count int
iter, err := stateSnapshot.Nodes(nil)
if err != nil {
return 0, err
}
for {
raw := iter.Next()
if raw == nil {
break
}
count++
}
return count, nil
}
// peersInfoContent is used to help operators understand what happened to the
// peers.json file. This is written to a file called peers.info in the same
// location.

View File

@@ -15,6 +15,7 @@ import (
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
structsconfig "github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/version"
testing "github.com/mitchellh/go-testing-interface"
"github.com/shoenig/test/must"
@@ -115,6 +116,8 @@ func TestConfigForServer(t testing.T) *Config {
// Default to having concurrent schedulers
config.NumSchedulers = 2
config.Reporting = structsconfig.DefaultReporting()
return config
}