mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 01:45:44 +03:00
Change the signature of the PeriodicCallback to return an error
I *KNEW* I should have done this when I wrote it, but didn't want to go back and audit the handlers to include the appropriate return handling, but now that the code is taking shape, make this change.
This commit is contained in:
@@ -1246,12 +1246,12 @@ func (c *Client) setupConsulSyncer() error {
|
||||
// heartbeat deadline has been exceeded and this Client is orphaned
|
||||
// from its servers, periodically poll Consul to reattach this Client
|
||||
// to its cluster and automatically recover from a detached state.
|
||||
bootstrapFn := func() {
|
||||
bootstrapFn := func() error {
|
||||
now := time.Now()
|
||||
c.configLock.RLock()
|
||||
if now.Before(c.consulPullHeartbeatDeadline) {
|
||||
c.configLock.RUnlock()
|
||||
return
|
||||
return nil
|
||||
}
|
||||
c.configLock.RUnlock()
|
||||
|
||||
@@ -1261,7 +1261,7 @@ func (c *Client) setupConsulSyncer() error {
|
||||
&consulapi.QueryOptions{AllowStale: true})
|
||||
if err != nil {
|
||||
c.logger.Printf("[WARN] client: unable to query service %q: %v", nomadServerServiceName, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
serverAddrs := make([]string, 0, len(services))
|
||||
for _, s := range services {
|
||||
@@ -1272,18 +1272,23 @@ func (c *Client) setupConsulSyncer() error {
|
||||
}
|
||||
serverAddrs = append(serverAddrs, net.JoinHostPort(addr, port))
|
||||
}
|
||||
c.rpcProxy.SetBackupServers(serverAddrs)
|
||||
|
||||
if err := c.rpcProxy.SetBackupServers(serverAddrs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
c.consulSyncer.AddPeriodicHandler("Nomad Client Fallback Server Handler", bootstrapFn)
|
||||
|
||||
consulServicesSyncFn := func() {
|
||||
consulServicesSyncFn := func() error {
|
||||
// Give up pruning services if we can't fingerprint our
|
||||
// Consul Agent.
|
||||
c.configLock.RLock()
|
||||
_, ok := c.configCopy.Node.Attributes["consul.version"]
|
||||
c.configLock.RUnlock()
|
||||
if !ok {
|
||||
return
|
||||
return fmt.Errorf("Consul not running")
|
||||
}
|
||||
|
||||
services := make(map[string]struct{})
|
||||
@@ -1305,7 +1310,9 @@ func (c *Client) setupConsulSyncer() error {
|
||||
|
||||
if err := c.consulSyncer.KeepServices(services); err != nil {
|
||||
c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
c.consulSyncer.AddPeriodicHandler("Nomad Client Services Sync Handler", consulServicesSyncFn)
|
||||
|
||||
|
||||
@@ -426,47 +426,34 @@ func (c *Syncer) Run() {
|
||||
}
|
||||
|
||||
// RunHandlers executes each handler (randomly)
|
||||
func (c *Syncer) RunHandlers() {
|
||||
func (c *Syncer) RunHandlers() error {
|
||||
c.periodicLock.RLock()
|
||||
handlers := make(map[string]types.PeriodicCallback, len(c.periodicCallbacks))
|
||||
for name, fn := range c.periodicCallbacks {
|
||||
handlers[name] = fn
|
||||
}
|
||||
c.periodicLock.RUnlock()
|
||||
|
||||
var mErr multierror.Error
|
||||
for _, fn := range handlers {
|
||||
fn()
|
||||
if err := fn(); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
}
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// performSync sync the services and checks we are tracking with Consul.
|
||||
func (c *Syncer) performSync() error {
|
||||
c.RunHandlers()
|
||||
|
||||
var mErr multierror.Error
|
||||
cServices, err := c.client.Agent().Services()
|
||||
if err != nil {
|
||||
return err
|
||||
if err := c.RunHandlers(); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
|
||||
cChecks, err := c.client.Agent().Checks()
|
||||
if err != nil {
|
||||
return err
|
||||
if err := c.syncServices(); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
|
||||
// Add services and checks that consul doesn't have but we do
|
||||
for serviceID, service := range c.trackedServices {
|
||||
if _, ok := cServices[serviceID]; !ok {
|
||||
if err := c.registerService(service); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
for checkID, check := range c.trackedChecks {
|
||||
if _, ok := cChecks[checkID]; !ok {
|
||||
if err := c.registerCheck(check); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
}
|
||||
if err := c.syncChecks(); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
|
||||
return mErr.ErrorOrNil()
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
package types
|
||||
|
||||
type PeriodicCallback func()
|
||||
type PeriodicCallback func() error
|
||||
|
||||
Reference in New Issue
Block a user