Reconcile, clean up, and centralize API version numbers (major and minor).

Reduce future confusion by introducing a minor version that is gossiped out
via the `mvn` Serf tag (Minor Version Number, `vsn` is already being used for
to communicate `Major Version Number`).

Background: hashicorp/consul/issues/1346#issuecomment-151663152
This commit is contained in:
Sean Chittenden
2016-05-27 18:14:34 -07:00
parent b2357598ba
commit ab99e89bd9
15 changed files with 98 additions and 80 deletions

View File

@@ -47,9 +47,6 @@ const (
// devModeRetryIntv is the retry interval used for development
devModeRetryIntv = time.Second
// rpcVersion specifies the RPC version
rpcVersion = 1
// stateSnapshotIntv is how often the client snapshots state
stateSnapshotIntv = 60 * time.Second
@@ -301,9 +298,14 @@ func (c *Client) Region() string {
return c.config.Region
}
// Region returns the rpcVersion in use by the client
func (c *Client) RPCVersion() int {
return rpcVersion
// Region returns the structs.ApiMajorVersion in use by the client
func (c *Client) RpcMajorVersion() int {
return structs.ApiMajorVersion
}
// Region returns the structs.ApiMinorVersion in use by the client
func (c *Client) RpcMinorVersion() int {
return structs.ApiMinorVersion
}
// Shutdown is used to tear down the client
@@ -344,7 +346,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
}
// Make the RPC request
if err := c.connPool.RPC(c.Region(), server.Addr, c.RPCVersion(), method, args, reply); err != nil {
if err := c.connPool.RPC(c.Region(), server.Addr, c.RpcMajorVersion(), method, args, reply); err != nil {
c.rpcProxy.NotifyFailedServer(server)
c.logger.Printf("[ERR] client: RPC failed to server %s: %v", server.Addr, err)
return err

View File

@@ -151,9 +151,7 @@ const mockConsulResponse = `
"expect": "3",
"port": "8300",
"role": "consul",
"vsn": "2",
"vsn_max": "2",
"vsn_min": "1"
"vsn": "2"
},
"Status": 1,
"ProtocolMin": 1,

View File

@@ -21,12 +21,6 @@ import (
)
const (
// apiMajorVersion is synchronized with `nomad/server.go` and
// represents the API version supported by this client.
//
// TODO(sean@): This symbol should be exported somewhere.
apiMajorVersion = 1
// clientRPCJitterFraction determines the amount of jitter added to
// clientRPCMinReuseDuration before a connection is expired and a new
// connection is established in order to rebalance load across Nomad
@@ -68,14 +62,15 @@ const (
// configuration to prevents a cyclic import dependency.
type NomadConfigInfo interface {
Datacenter() string
RPCVersion() int
RpcMajorVersion() int
RpcMinorVersion() int
Region() string
}
// Pinger is an interface wrapping client.ConnPool to prevent a
// cyclic import dependency
type Pinger interface {
PingNomadServer(region string, version int, s *ServerEndpoint) (bool, error)
PingNomadServer(region string, apiMajorVersion int, s *ServerEndpoint) (bool, error)
}
// serverList is an array of Nomad Servers. The first server in the list is
@@ -439,7 +434,7 @@ func (p *RpcProxy) RebalanceServers() {
// detect the failed node.
selectedServer := l.L[0]
ok, err := p.connPoolPinger.PingNomadServer(p.configInfo.Region(), p.configInfo.RPCVersion(), selectedServer)
ok, err := p.connPoolPinger.PingNomadServer(p.configInfo.Region(), p.configInfo.RpcMajorVersion(), selectedServer)
if ok {
foundHealthyServer = true
break
@@ -673,14 +668,16 @@ func (p *RpcProxy) UpdateFromNodeUpdateResponse(resp *structs.NodeUpdateResponse
// TODO(sean@): Move the logging throttle logic into a
// dedicated logging package so RpcProxy does not have to
// perform this accounting.
if int32(p.configInfo.RPCVersion()) < s.RpcVersion {
if int32(p.configInfo.RpcMajorVersion()) < s.RpcMajorVersion ||
(int32(p.configInfo.RpcMajorVersion()) == s.RpcMajorVersion &&
int32(p.configInfo.RpcMinorVersion()) < s.RpcMinorVersion) {
now := time.Now()
t, ok := p.rpcAPIMismatchThrottle[s.RpcAdvertiseAddr]
if ok && t.After(now) {
continue
}
p.logger.Printf("[WARN] API mismatch between client (v%d) and server (v%d), ignoring server %q", apiMajorVersion, s.RpcVersion, s.RpcAdvertiseAddr)
p.logger.Printf("[WARN] API mismatch between client version (v%d.%d) and server version (v%d.%d), ignoring server %q", p.configInfo.RpcMajorVersion(), p.configInfo.RpcMinorVersion(), s.RpcMajorVersion, s.RpcMinorVersion, s.RpcAdvertiseAddr)
p.rpcAPIMismatchThrottle[s.RpcAdvertiseAddr] = now.Add(rpcAPIMismatchLogRate)
continue
}

View File

@@ -8,6 +8,8 @@ import (
"os"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/structs"
)
var (
@@ -50,8 +52,16 @@ func (s *fauxSerf) Region() string {
return "global"
}
func (s *fauxSerf) RPCVersion() int {
return 1
func (s *fauxSerf) Datacenter() string {
return "dc1"
}
func (s *fauxSerf) RpcMajorVersion() int {
return structs.ApiMajorVersion
}
func (s *fauxSerf) RpcMinorVersion() int {
return structs.ApiMinorVersion
}
func testManager() (p *RpcProxy) {
@@ -180,7 +190,7 @@ func test_reconcileServerList(maxServers int) (bool, error) {
// failPct of the servers for the reconcile. This
// allows for the selected server to no longer be
// healthy for the reconcile below.
if ok, _ := m.connPoolPinger.PingNomadServer(m.configInfo.Region(), m.configInfo.RPCVersion(), node); ok {
if ok, _ := m.connPoolPinger.PingNomadServer(m.configInfo.Region(), m.configInfo.RpcMajorVersion(), node); ok {
// Will still be present
healthyServers = append(healthyServers, node)
} else {

View File

@@ -217,7 +217,8 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
reply.Servers = append(reply.Servers,
&structs.NodeServerInfo{
RpcAdvertiseAddr: k,
RpcVersion: int32(v.Version),
RpcMajorVersion: int32(v.MajorVersion),
RpcMinorVersion: int32(v.MinorVersion),
Datacenter: v.Datacenter,
})
}

View File

@@ -376,9 +376,9 @@ func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string,
// PingNomadServer sends a Status.Ping message to the specified server and
// returns true if healthy, false if an error occurred
func (p *ConnPool) PingNomadServer(region string, version int, s *rpcproxy.ServerEndpoint) (bool, error) {
func (p *ConnPool) PingNomadServer(region string, apiMajorVersion int, s *rpcproxy.ServerEndpoint) (bool, error) {
// Get a usable client
conn, sc, err := p.getClient(region, s.Addr, version)
conn, sc, err := p.getClient(region, s.Addr, apiMajorVersion)
if err != nil {
return false, err
}

View File

@@ -216,7 +216,7 @@ func (s *Server) forwardLeader(method string, args interface{}, reply interface{
if server == nil {
return structs.ErrNoLeader
}
return s.connPool.RPC(s.config.Region, server.Addr, server.Version, method, args, reply)
return s.connPool.RPC(s.config.Region, server.Addr, server.MajorVersion, method, args, reply)
}
// forwardRegion is used to forward an RPC call to a remote region, or fail if no servers
@@ -238,7 +238,7 @@ func (s *Server) forwardRegion(region, method string, args interface{}, reply in
// Forward to remote Nomad
metrics.IncrCounter([]string{"nomad", "rpc", "cross-region", region}, 1)
return s.connPool.RPC(region, server.Addr, server.Version, method, args, reply)
return s.connPool.RPC(region, server.Addr, server.MajorVersion, method, args, reply)
}
// raftApplyFuture is used to encode a message, run it through raft, and return the Raft future.

View File

@@ -18,6 +18,7 @@ import (
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/serf"
@@ -41,17 +42,6 @@ const (
// raftRemoveGracePeriod is how long we wait to allow a RemovePeer
// to replicate to gracefully leave the cluster.
raftRemoveGracePeriod = 5 * time.Second
// apiMajorVersion is returned as part of the Status.Version request.
// It should be incremented anytime the APIs are changed in a way that
// would break clients for sane client versioning.
apiMajorVersion = 1
// apiMinorVersion is returned as part of the Status.Version request.
// It should be incremented anytime the APIs are changed to allow
// for sane client versioning. Minor changes should be compatible
// within the major version.
apiMinorVersion = 1
)
// Server is Nomad server which manages the job queues,
@@ -534,9 +524,8 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.Tags["role"] = "nomad"
conf.Tags["region"] = s.config.Region
conf.Tags["dc"] = s.config.Datacenter
conf.Tags["vsn"] = fmt.Sprintf("%d", s.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["vsn"] = fmt.Sprintf("%d", structs.ApiMajorVersion)
conf.Tags["mvn"] = fmt.Sprintf("%d", structs.ApiMinorVersion)
conf.Tags["build"] = s.config.Build
conf.Tags["port"] = fmt.Sprintf("%d", s.rpcAdvertise.(*net.TCPAddr).Port)
if s.config.Bootstrap || (s.config.DevMode && !s.config.DevDisableBootstrap) {

View File

@@ -18,8 +18,8 @@ func (s *Status) Version(args *structs.GenericRequest, reply *structs.VersionRes
reply.Build = conf.Build
reply.Versions = map[string]int{
structs.ProtocolVersion: int(conf.ProtocolVersion),
structs.APIMajorVersion: apiMajorVersion,
structs.APIMinorVersion: apiMinorVersion,
structs.APIMajorVersion: structs.ApiMajorVersion,
structs.APIMinorVersion: structs.ApiMinorVersion,
}
return nil
}

View File

@@ -30,10 +30,10 @@ func TestStatusVersion(t *testing.T) {
if out.Versions[structs.ProtocolVersion] != ProtocolVersionMax {
t.Fatalf("bad: %#v", out)
}
if out.Versions[structs.APIMajorVersion] != apiMajorVersion {
if out.Versions[structs.APIMajorVersion] != structs.ApiMajorVersion {
t.Fatalf("bad: %#v", out)
}
if out.Versions[structs.APIMinorVersion] != apiMinorVersion {
if out.Versions[structs.APIMinorVersion] != structs.ApiMinorVersion {
t.Fatalf("bad: %#v", out)
}
}

View File

@@ -54,6 +54,21 @@ const (
// that new commands can be added in a way that won't cause
// old servers to crash when the FSM attempts to process them.
IgnoreUnknownTypeFlag MessageType = 128
// ApiMajorVersion is returned as part of the Status.Version request.
// It should be incremented anytime the APIs are changed in a way
// that would break clients for sane client versioning.
ApiMajorVersion = 1
// ApiMinorVersion is returned as part of the Status.Version request.
// It should be incremented anytime the APIs are changed to allow
// for sane client versioning. Minor changes should be compatible
// within the major version.
ApiMinorVersion = 1
ProtocolVersion = "protocol"
APIMajorVersion = "api.major"
APIMinorVersion = "api.minor"
)
// RPCInfo is used to describe common information about query
@@ -158,8 +173,13 @@ type NodeServerInfo struct {
// be contacted at for RPCs.
RpcAdvertiseAddr string
// RPCVersion is the version number the Nomad Server supports
RpcVersion int32
// RpcMajorVersion is the major version number the Nomad Server
// supports
RpcMajorVersion int32
// RpcMinorVersion is the minor version number the Nomad Server
// supports
RpcMinorVersion int32
// Datacenter is the datacenter that a Nomad server belongs to
Datacenter string
@@ -330,12 +350,6 @@ type GenericResponse struct {
WriteMeta
}
const (
ProtocolVersion = "protocol"
APIMajorVersion = "api.major"
APIMinorVersion = "api.minor"
)
// VersionResponse is used for the Status.Version reseponse
type VersionResponse struct {
Build string

View File

@@ -34,14 +34,15 @@ func RuntimeStats() map[string]string {
// serverParts is used to return the parts of a server role
type serverParts struct {
Name string
Region string
Datacenter string
Port int
Bootstrap bool
Expect int
Version int
Addr net.Addr
Name string
Region string
Datacenter string
Port int
Bootstrap bool
Expect int
MajorVersion int
MinorVersion int
Addr net.Addr
}
func (s *serverParts) String() string {
@@ -76,22 +77,32 @@ func isNomadServer(m serf.Member) (bool, *serverParts) {
return false, nil
}
vsn_str := m.Tags["vsn"]
vsn, err := strconv.Atoi(vsn_str)
// The "vsn" tag was Version, which is now the MajorVersion number.
majorVersionStr := m.Tags["vsn"]
majorVersion, err := strconv.Atoi(majorVersionStr)
if err != nil {
return false, nil
}
// To keep some semblance of convention, "mvn" is now the "Minor
// Version Number."
minorVersionStr := m.Tags["mvn"]
minorVersion, err := strconv.Atoi(minorVersionStr)
if err != nil {
minorVersion = 0
}
addr := &net.TCPAddr{IP: m.Addr, Port: port}
parts := &serverParts{
Name: m.Name,
Region: region,
Datacenter: datacenter,
Port: port,
Bootstrap: bootstrap,
Expect: expect,
Addr: addr,
Version: vsn,
Name: m.Name,
Region: region,
Datacenter: datacenter,
Port: port,
Bootstrap: bootstrap,
Expect: expect,
Addr: addr,
MajorVersion: majorVersion,
MinorVersion: minorVersion,
}
return true, parts
}

View File

@@ -44,7 +44,7 @@ func TestIsNomadServer(t *testing.T) {
if parts.Addr.String() != "127.0.0.1:10000" {
t.Fatalf("bad addr: %v", parts.Addr)
}
if parts.Version != 1 {
if parts.MajorVersion != 1 {
t.Fatalf("bad: %v", parts)
}

View File

@@ -46,9 +46,7 @@ the gossip pool. This is only applicable to servers.
"port": "4647",
"region": "global",
"role": "nomad",
"vsn": "1",
"vsn_max": "1",
"vsn_min": "1"
"vsn": "1"
},
"Status": "alive",
"ProtocolMin": 1,

View File

@@ -98,9 +98,7 @@ The `self` endpoint is used to query the state of the target agent.
"port": "4647",
"region": "global",
"role": "nomad",
"vsn": "1",
"vsn_max": "1",
"vsn_min": "1"
"vsn": "1"
},
"Status": "alive",
"ProtocolMin": 1,