mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 09:55:44 +03:00
Merge pull request #4127 from hashicorp/b-autopilot-removepeer-fixes
Add node id persistence
This commit is contained in:
@@ -3,6 +3,7 @@ package agent
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
@@ -15,9 +16,12 @@ import (
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
uuidparse "github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/nomad/client"
|
||||
clientconfig "github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/nomad/structs/config"
|
||||
@@ -444,6 +448,12 @@ func (a *Agent) setupServer() error {
|
||||
return fmt.Errorf("server config setup failed: %s", err)
|
||||
}
|
||||
|
||||
// Generate a node ID and persist it if it is the first instance, otherwise
|
||||
// read the persisted node ID.
|
||||
if err := a.setupNodeID(conf); err != nil {
|
||||
return fmt.Errorf("setting up server node ID failed: %s", err)
|
||||
}
|
||||
|
||||
// Sets up the keyring for gossip encryption
|
||||
if err := a.setupKeyrings(conf); err != nil {
|
||||
return fmt.Errorf("failed to configure keyring: %v", err)
|
||||
@@ -518,6 +528,65 @@ func (a *Agent) setupServer() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// setupNodeID will pull the persisted node ID, if any, or create a random one
|
||||
// and persist it.
|
||||
func (a *Agent) setupNodeID(config *nomad.Config) error {
|
||||
// For dev mode we have no filesystem access so just make a node ID.
|
||||
if a.config.DevMode {
|
||||
config.NodeID = uuid.Generate()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load saved state, if any. Since a user could edit this, we also
|
||||
// validate it. Saved state overwrites any configured node id
|
||||
fileID := filepath.Join(config.DataDir, "node-id")
|
||||
if _, err := os.Stat(fileID); err == nil {
|
||||
rawID, err := ioutil.ReadFile(fileID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nodeID := strings.TrimSpace(string(rawID))
|
||||
nodeID = strings.ToLower(nodeID)
|
||||
if _, err := uuidparse.ParseUUID(nodeID); err != nil {
|
||||
return err
|
||||
}
|
||||
config.NodeID = nodeID
|
||||
return nil
|
||||
}
|
||||
|
||||
// If they've configured a node ID manually then just use that, as
|
||||
// long as it's valid.
|
||||
if config.NodeID != "" {
|
||||
config.NodeID = strings.ToLower(config.NodeID)
|
||||
if _, err := uuidparse.ParseUUID(config.NodeID); err != nil {
|
||||
return err
|
||||
}
|
||||
// Persist this configured nodeID to our data directory
|
||||
if err := lib.EnsurePath(fileID, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ioutil.WriteFile(fileID, []byte(config.NodeID), 0600); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// If we still don't have a valid node ID, make one.
|
||||
if config.NodeID == "" {
|
||||
id := uuid.Generate()
|
||||
if err := lib.EnsurePath(fileID, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ioutil.WriteFile(fileID, []byte(id), 0600); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
config.NodeID = id
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// setupKeyrings is used to initialize and load keyrings during agent startup
|
||||
func (a *Agent) setupKeyrings(config *nomad.Config) error {
|
||||
file := filepath.Join(a.config.DataDir, serfKeyring)
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/hashicorp/nomad/nomad/state"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@@ -903,6 +904,74 @@ func TestLeader_UpgradeRaftVersion(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeader_Reelection(t *testing.T) {
|
||||
raftProtocols := []int{1, 2, 3}
|
||||
for _, p := range raftProtocols {
|
||||
t.Run("Leader Election - Protocol version "+string(p), func(t *testing.T) {
|
||||
leaderElectionTest(t, raft.ProtocolVersion(p))
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func leaderElectionTest(t *testing.T, raftProtocol raft.ProtocolVersion) {
|
||||
s1 := TestServer(t, func(c *Config) {
|
||||
c.BootstrapExpect = 3
|
||||
c.RaftConfig.ProtocolVersion = raftProtocol
|
||||
})
|
||||
defer s1.Shutdown()
|
||||
|
||||
s2 := TestServer(t, func(c *Config) {
|
||||
c.BootstrapExpect = 3
|
||||
c.DevDisableBootstrap = true
|
||||
c.RaftConfig.ProtocolVersion = raftProtocol
|
||||
})
|
||||
defer s2.Shutdown()
|
||||
|
||||
s3 := TestServer(t, func(c *Config) {
|
||||
c.BootstrapExpect = 3
|
||||
c.DevDisableBootstrap = true
|
||||
c.RaftConfig.ProtocolVersion = raftProtocol
|
||||
})
|
||||
|
||||
servers := []*Server{s1, s2, s3}
|
||||
|
||||
// Try to join
|
||||
TestJoin(t, s1, s2, s3)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
future := s1.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, server := range future.Configuration().Servers {
|
||||
if server.Suffrage == raft.Nonvoter {
|
||||
return false, fmt.Errorf("non-voter %v", server)
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatal(err)
|
||||
})
|
||||
|
||||
var leader, nonLeader *Server
|
||||
for _, s := range servers {
|
||||
if s.IsLeader() {
|
||||
leader = s
|
||||
} else {
|
||||
nonLeader = s
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown the leader
|
||||
leader.Shutdown()
|
||||
// Wait for new leader to elect
|
||||
testutil.WaitForLeader(t, nonLeader.RPC)
|
||||
}
|
||||
|
||||
func TestLeader_RollRaftServer(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, func(c *Config) {
|
||||
@@ -912,7 +981,7 @@ func TestLeader_RollRaftServer(t *testing.T) {
|
||||
|
||||
s2 := TestServer(t, func(c *Config) {
|
||||
c.DevDisableBootstrap = true
|
||||
c.RaftConfig.ProtocolVersion = 1
|
||||
c.RaftConfig.ProtocolVersion = 2
|
||||
})
|
||||
defer s2.Shutdown()
|
||||
|
||||
@@ -931,8 +1000,8 @@ func TestLeader_RollRaftServer(t *testing.T) {
|
||||
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
|
||||
}
|
||||
|
||||
// Kill the v1 server
|
||||
s2.Shutdown()
|
||||
// Kill the first v2 server
|
||||
s1.Shutdown()
|
||||
|
||||
for _, s := range []*Server{s1, s3} {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
@@ -952,10 +1021,57 @@ func TestLeader_RollRaftServer(t *testing.T) {
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
})
|
||||
defer s4.Shutdown()
|
||||
TestJoin(t, s4, s1)
|
||||
servers[1] = s4
|
||||
TestJoin(t, s4, s2)
|
||||
servers[0] = s4
|
||||
|
||||
// Make sure the dead server is removed and we're back to 3 total peers
|
||||
// Kill the second v2 server
|
||||
s2.Shutdown()
|
||||
|
||||
for _, s := range []*Server{s3, s4} {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
minVer, err := s.autopilot.MinRaftProtocol()
|
||||
if err != nil {
|
||||
r.Fatal(err)
|
||||
}
|
||||
if got, want := minVer, 2; got != want {
|
||||
r.Fatalf("got min raft version %d want %d", got, want)
|
||||
}
|
||||
})
|
||||
}
|
||||
// Replace another dead server with one running raft protocol v3
|
||||
s5 := TestServer(t, func(c *Config) {
|
||||
c.DevDisableBootstrap = true
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
})
|
||||
defer s5.Shutdown()
|
||||
TestJoin(t, s5, s4)
|
||||
servers[1] = s5
|
||||
|
||||
// Kill the last v2 server, now minRaftProtocol should be 3
|
||||
s3.Shutdown()
|
||||
|
||||
for _, s := range []*Server{s4, s5} {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
minVer, err := s.autopilot.MinRaftProtocol()
|
||||
if err != nil {
|
||||
r.Fatal(err)
|
||||
}
|
||||
if got, want := minVer, 3; got != want {
|
||||
r.Fatalf("got min raft version %d want %d", got, want)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Replace the last dead server with one running raft protocol v3
|
||||
s6 := TestServer(t, func(c *Config) {
|
||||
c.DevDisableBootstrap = true
|
||||
c.RaftConfig.ProtocolVersion = 3
|
||||
})
|
||||
defer s6.Shutdown()
|
||||
TestJoin(t, s6, s4)
|
||||
servers[2] = s6
|
||||
|
||||
// Make sure all the dead servers are removed and we're back to 3 total peers
|
||||
for _, s := range servers {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
addrs := 0
|
||||
@@ -971,10 +1087,10 @@ func TestLeader_RollRaftServer(t *testing.T) {
|
||||
ids++
|
||||
}
|
||||
}
|
||||
if got, want := addrs, 2; got != want {
|
||||
if got, want := addrs, 0; got != want {
|
||||
r.Fatalf("got %d server addresses want %d", got, want)
|
||||
}
|
||||
if got, want := ids, 1; got != want {
|
||||
if got, want := ids, 3; got != want {
|
||||
r.Fatalf("got %d server ids want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user