Files
nomad/command/agent/agent.go
Sean Chittenden 74e691cab1 Change the API signature of Syncer.SyncServices().
SyncServices() immediately attempts to sync whatever information
the process has with Consul.  Previously this method would take an
argument of the exclusive list of services that should exist,
however this is not condusive to having a Nomad Client and Nomad
Server share the same consul.Syncer.
2016-06-10 15:54:39 -04:00

643 lines
18 KiB
Go

package agent
import (
"fmt"
"io"
"log"
"net"
"os"
"path/filepath"
"runtime"
"strconv"
"sync"
"time"
"github.com/hashicorp/nomad/client"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/structs"
)
// Agent is a long running daemon that is used to run both
// clients and servers. Servers are responsible for managing
// state and making scheduling decisions. Clients can be
// scheduled to, and are responsible for interfacing with
// servers to run allocations.
type Agent struct {
config *Config
logger *log.Logger
logOutput io.Writer
// consulSyncer registers the Nomad agent with the Consul Agent
consulSyncer *consul.Syncer
client *client.Client
clientHttpAddr string
clientRpcAddr string
server *nomad.Server
serverHttpAddr string
serverRpcAddr string
serverSerfAddr string
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
}
// NewAgent is used to create a new agent with the given configuration
func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) {
// Ensure we have a log sink
if logOutput == nil {
logOutput = os.Stderr
}
shutdownCh := make(chan struct{})
a := &Agent{
config: config,
logger: log.New(logOutput, "", log.LstdFlags),
logOutput: logOutput,
shutdownCh: shutdownCh,
}
if err := a.setupConsulSyncer(shutdownCh); err != nil {
return nil, err
}
if err := a.setupServer(); err != nil {
return nil, err
}
if err := a.setupClient(); err != nil {
return nil, err
}
if a.client == nil && a.server == nil {
return nil, fmt.Errorf("must have at least client or server mode enabled")
}
if a.config.Consul.AutoRegister {
if err := a.syncAgentServicesWithConsul(); err != nil {
a.logger.Printf("[ERR] agent: unable to sync agent services with consul: %v", err)
}
}
go a.consulSyncer.Run()
return a, nil
}
// serverConfig is used to generate a new server configuration struct
// for initializing a nomad server.
func (a *Agent) serverConfig() (*nomad.Config, error) {
conf := a.config.NomadConfig
if conf == nil {
conf = nomad.DefaultConfig()
}
conf.LogOutput = a.logOutput
conf.DevMode = a.config.DevMode
conf.Build = fmt.Sprintf("%s%s", a.config.Version, a.config.VersionPrerelease)
if a.config.Region != "" {
conf.Region = a.config.Region
}
if a.config.Datacenter != "" {
conf.Datacenter = a.config.Datacenter
}
if a.config.NodeName != "" {
conf.NodeName = a.config.NodeName
}
if a.config.Server.BootstrapExpect > 0 {
if a.config.Server.BootstrapExpect == 1 {
conf.Bootstrap = true
} else {
conf.BootstrapExpect = a.config.Server.BootstrapExpect
}
}
if a.config.DataDir != "" {
conf.DataDir = filepath.Join(a.config.DataDir, "server")
}
if a.config.Server.DataDir != "" {
conf.DataDir = a.config.Server.DataDir
}
if a.config.Server.ProtocolVersion != 0 {
conf.ProtocolVersion = uint8(a.config.Server.ProtocolVersion)
}
if a.config.Server.NumSchedulers != 0 {
conf.NumSchedulers = a.config.Server.NumSchedulers
}
if len(a.config.Server.EnabledSchedulers) != 0 {
conf.EnabledSchedulers = a.config.Server.EnabledSchedulers
}
// Set up the advertise addrs
if addr := a.config.AdvertiseAddrs.Serf; addr != "" {
serfAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, fmt.Errorf("error resolving serf advertise address: %s", err)
}
conf.SerfConfig.MemberlistConfig.AdvertiseAddr = serfAddr.IP.String()
conf.SerfConfig.MemberlistConfig.AdvertisePort = serfAddr.Port
}
if addr := a.config.AdvertiseAddrs.RPC; addr != "" {
rpcAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, fmt.Errorf("error resolving rpc advertise address: %s", err)
}
conf.RPCAdvertise = rpcAddr
}
// Set up the bind addresses
if addr := a.config.BindAddr; addr != "" {
conf.RPCAddr.IP = net.ParseIP(addr)
conf.SerfConfig.MemberlistConfig.BindAddr = addr
}
if addr := a.config.Addresses.RPC; addr != "" {
conf.RPCAddr.IP = net.ParseIP(addr)
}
if addr := a.config.Addresses.Serf; addr != "" {
conf.SerfConfig.MemberlistConfig.BindAddr = addr
}
// Set up the ports
if port := a.config.Ports.RPC; port != 0 {
conf.RPCAddr.Port = port
}
if port := a.config.Ports.Serf; port != 0 {
conf.SerfConfig.MemberlistConfig.BindPort = port
}
// Resolve the Server's HTTP Address
if a.config.AdvertiseAddrs.HTTP != "" {
a.serverHttpAddr = a.config.AdvertiseAddrs.HTTP
} else if a.config.Addresses.HTTP != "" {
a.serverHttpAddr = fmt.Sprintf("%v:%v", a.config.Addresses.HTTP, a.config.Ports.HTTP)
} else if a.config.BindAddr != "" {
a.serverHttpAddr = fmt.Sprintf("%v:%v", a.config.BindAddr, a.config.Ports.HTTP)
} else {
a.serverHttpAddr = fmt.Sprintf("%v:%v", "127.0.0.1", a.config.Ports.HTTP)
}
addr, err := net.ResolveTCPAddr("tcp", a.serverHttpAddr)
if err != nil {
return nil, fmt.Errorf("error resolving HTTP addr %q: %v:", a.serverHttpAddr, err)
}
a.serverHttpAddr = fmt.Sprintf("%s:%d", addr.IP.String(), addr.Port)
// Resolve the Server's RPC Address
if a.config.AdvertiseAddrs.RPC != "" {
a.serverRpcAddr = a.config.AdvertiseAddrs.RPC
} else if a.config.Addresses.RPC != "" {
a.serverRpcAddr = fmt.Sprintf("%v:%v", a.config.Addresses.RPC, a.config.Ports.RPC)
} else if a.config.BindAddr != "" {
a.serverRpcAddr = fmt.Sprintf("%v:%v", a.config.BindAddr, a.config.Ports.RPC)
} else {
a.serverRpcAddr = fmt.Sprintf("%v:%v", "127.0.0.1", a.config.Ports.RPC)
}
addr, err = net.ResolveTCPAddr("tcp", a.serverRpcAddr)
if err != nil {
return nil, fmt.Errorf("error resolving RPC addr %q: %v:", a.serverRpcAddr, err)
}
a.serverRpcAddr = fmt.Sprintf("%s:%d", addr.IP.String(), addr.Port)
// Resolve the Server's Serf Address
if a.config.AdvertiseAddrs.Serf != "" {
a.serverSerfAddr = a.config.AdvertiseAddrs.Serf
} else if a.config.Addresses.Serf != "" {
a.serverSerfAddr = fmt.Sprintf("%v:%v", a.config.Addresses.Serf, a.config.Ports.Serf)
} else if a.config.BindAddr != "" {
a.serverSerfAddr = fmt.Sprintf("%v:%v", a.config.BindAddr, a.config.Ports.Serf)
} else {
a.serverSerfAddr = fmt.Sprintf("%v:%v", "127.0.0.1", a.config.Ports.Serf)
}
addr, err = net.ResolveTCPAddr("tcp", a.serverSerfAddr)
if err != nil {
return nil, fmt.Errorf("error resolving Serf addr %q: %v:", a.serverSerfAddr, err)
}
a.serverSerfAddr = fmt.Sprintf("%s:%d", addr.IP.String(), addr.Port)
if gcThreshold := a.config.Server.NodeGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold)
if err != nil {
return nil, err
}
conf.NodeGCThreshold = dur
}
if heartbeatGrace := a.config.Server.HeartbeatGrace; heartbeatGrace != "" {
dur, err := time.ParseDuration(heartbeatGrace)
if err != nil {
return nil, err
}
conf.HeartbeatGrace = dur
}
return conf, nil
}
// clientConfig is used to generate a new client configuration struct
// for initializing a Nomad client.
func (a *Agent) clientConfig() (*clientconfig.Config, error) {
// Setup the configuration
conf := a.config.ClientConfig
if conf == nil {
conf = clientconfig.DefaultConfig()
}
if a.server != nil {
conf.RPCHandler = a.server
}
conf.LogOutput = a.logOutput
conf.DevMode = a.config.DevMode
if a.config.Region != "" {
conf.Region = a.config.Region
}
if a.config.DataDir != "" {
conf.StateDir = filepath.Join(a.config.DataDir, "client")
conf.AllocDir = filepath.Join(a.config.DataDir, "alloc")
}
if a.config.Client.StateDir != "" {
conf.StateDir = a.config.Client.StateDir
}
if a.config.Client.AllocDir != "" {
conf.AllocDir = a.config.Client.AllocDir
}
conf.Servers = a.config.Client.Servers
if a.config.Client.NetworkInterface != "" {
conf.NetworkInterface = a.config.Client.NetworkInterface
}
conf.Options = a.config.Client.Options
if a.config.Client.NetworkSpeed != 0 {
conf.NetworkSpeed = a.config.Client.NetworkSpeed
}
if a.config.Client.MaxKillTimeout != "" {
dur, err := time.ParseDuration(a.config.Client.MaxKillTimeout)
if err != nil {
return nil, fmt.Errorf("Error parsing retry interval: %s", err)
}
conf.MaxKillTimeout = dur
}
conf.ClientMaxPort = uint(a.config.Client.ClientMaxPort)
conf.ClientMinPort = uint(a.config.Client.ClientMinPort)
// Setup the node
conf.Node = new(structs.Node)
conf.Node.Datacenter = a.config.Datacenter
conf.Node.Name = a.config.NodeName
conf.Node.Meta = a.config.Client.Meta
conf.Node.NodeClass = a.config.Client.NodeClass
// Resolve the Client's HTTP address
if a.config.AdvertiseAddrs.HTTP != "" {
a.clientHttpAddr = a.config.AdvertiseAddrs.HTTP
} else if a.config.Addresses.HTTP != "" {
a.clientHttpAddr = fmt.Sprintf("%v:%v", a.config.Addresses.HTTP, a.config.Ports.HTTP)
} else if a.config.BindAddr != "" {
a.clientHttpAddr = fmt.Sprintf("%v:%v", a.config.BindAddr, a.config.Ports.HTTP)
} else {
a.clientHttpAddr = fmt.Sprintf("%v:%v", "127.0.0.1", a.config.Ports.HTTP)
}
addr, err := net.ResolveTCPAddr("tcp", a.clientHttpAddr)
if err != nil {
return nil, fmt.Errorf("error resolving HTTP addr %q: %v:", a.clientHttpAddr, err)
}
httpAddr := fmt.Sprintf("%s:%d", addr.IP.String(), addr.Port)
conf.Node.HTTPAddr = httpAddr
a.clientHttpAddr = httpAddr
// Resolve the Client's RPC address
if a.config.AdvertiseAddrs.RPC != "" {
a.clientRpcAddr = a.config.AdvertiseAddrs.RPC
} else if a.config.Addresses.RPC != "" {
a.clientRpcAddr = fmt.Sprintf("%v:%v", a.config.Addresses.RPC, a.config.Ports.RPC)
} else if a.config.BindAddr != "" {
a.clientRpcAddr = fmt.Sprintf("%v:%v", a.config.BindAddr, a.config.Ports.RPC)
} else {
a.clientRpcAddr = fmt.Sprintf("%v:%v", "127.0.0.1", a.config.Ports.RPC)
}
addr, err = net.ResolveTCPAddr("tcp", a.clientRpcAddr)
if err != nil {
return nil, fmt.Errorf("error resolving RPC addr %q: %v:", a.clientRpcAddr, err)
}
a.clientRpcAddr = fmt.Sprintf("%s:%d", addr.IP.String(), addr.Port)
// Reserve resources on the node.
r := conf.Node.Reserved
if r == nil {
r = new(structs.Resources)
conf.Node.Reserved = r
}
r.CPU = a.config.Client.Reserved.CPU
r.MemoryMB = a.config.Client.Reserved.MemoryMB
r.DiskMB = a.config.Client.Reserved.DiskMB
r.IOPS = a.config.Client.Reserved.IOPS
conf.GloballyReservedPorts = a.config.Client.Reserved.ParsedReservedPorts
conf.Version = fmt.Sprintf("%s%s", a.config.Version, a.config.VersionPrerelease)
conf.Revision = a.config.Revision
conf.ConsulConfig = a.config.Consul
conf.StatsDataPoints = a.config.Client.StatsConfig.DataPoints
conf.StatsCollectionInterval = a.config.Client.StatsConfig.collectionInterval
return conf, nil
}
// setupServer is used to setup the server if enabled
func (a *Agent) setupServer() error {
if !a.config.Server.Enabled {
return nil
}
// Setup the configuration
conf, err := a.serverConfig()
if err != nil {
return fmt.Errorf("server config setup failed: %s", err)
}
// Create the server
server, err := nomad.NewServer(conf)
if err != nil {
return fmt.Errorf("server setup failed: %v", err)
}
a.server = server
return nil
}
// setupClient is used to setup the client if enabled
func (a *Agent) setupClient() error {
if !a.config.Client.Enabled {
return nil
}
// Setup the configuration
conf, err := a.clientConfig()
if err != nil {
return fmt.Errorf("client setup failed: %v", err)
}
// Reserve some ports for the plugins if we are on Windows
if runtime.GOOS == "windows" {
if err := a.reservePortsForClient(conf); err != nil {
return err
}
}
// Create the client
client, err := client.NewClient(conf, a.consulSyncer)
if err != nil {
return fmt.Errorf("client setup failed: %v", err)
}
a.client = client
return nil
}
// reservePortsForClient reserves a range of ports for the client to use when
// it creates various plugins for log collection, executors, drivers, etc
func (a *Agent) reservePortsForClient(conf *clientconfig.Config) error {
// finding the device name for loopback
deviceName, addr, mask, err := a.findLoopbackDevice()
if err != nil {
return fmt.Errorf("error finding the device name for loopback: %v", err)
}
// seeing if the user has already reserved some resources on this device
var nr *structs.NetworkResource
if conf.Node.Reserved == nil {
conf.Node.Reserved = &structs.Resources{}
}
for _, n := range conf.Node.Reserved.Networks {
if n.Device == deviceName {
nr = n
}
}
// If the user hasn't already created the device, we create it
if nr == nil {
nr = &structs.NetworkResource{
Device: deviceName,
IP: addr,
CIDR: mask,
ReservedPorts: make([]structs.Port, 0),
}
}
// appending the port ranges we want to use for the client to the list of
// reserved ports for this device
for i := conf.ClientMinPort; i <= conf.ClientMaxPort; i++ {
nr.ReservedPorts = append(nr.ReservedPorts, structs.Port{Label: fmt.Sprintf("plugin-%d", i), Value: int(i)})
}
conf.Node.Reserved.Networks = append(conf.Node.Reserved.Networks, nr)
return nil
}
// findLoopbackDevice iterates through all the interfaces on a machine and
// returns the ip addr, mask of the loopback device
func (a *Agent) findLoopbackDevice() (string, string, string, error) {
var ifcs []net.Interface
var err error
ifcs, err = net.Interfaces()
if err != nil {
return "", "", "", err
}
for _, ifc := range ifcs {
addrs, err := ifc.Addrs()
if err != nil {
return "", "", "", err
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip.IsLoopback() {
if ip.To4() == nil {
continue
}
return ifc.Name, ip.String(), addr.String(), nil
}
}
}
return "", "", "", fmt.Errorf("no loopback devices with IPV4 addr found")
}
// Leave is used gracefully exit. Clients will inform servers
// of their departure so that allocations can be rescheduled.
func (a *Agent) Leave() error {
if a.client != nil {
if err := a.client.Leave(); err != nil {
a.logger.Printf("[ERR] agent: client leave failed: %v", err)
}
}
if a.server != nil {
if err := a.server.Leave(); err != nil {
a.logger.Printf("[ERR] agent: server leave failed: %v", err)
}
}
return nil
}
// Shutdown is used to terminate the agent.
func (a *Agent) Shutdown() error {
a.shutdownLock.Lock()
defer a.shutdownLock.Unlock()
if a.shutdown {
return nil
}
a.logger.Println("[INFO] agent: requesting shutdown")
if a.client != nil {
if err := a.client.Shutdown(); err != nil {
a.logger.Printf("[ERR] agent: client shutdown failed: %v", err)
}
}
if a.server != nil {
if err := a.server.Shutdown(); err != nil {
a.logger.Printf("[ERR] agent: server shutdown failed: %v", err)
}
}
if a.consulSyncer != nil {
if err := a.consulSyncer.Shutdown(); err != nil {
a.logger.Printf("[ERR] agent: shutting down consul service failed: %v", err)
}
}
a.logger.Println("[INFO] agent: shutdown complete")
a.shutdown = true
close(a.shutdownCh)
return nil
}
// RPC is used to make an RPC call to the Nomad servers
func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {
if a.server != nil {
return a.server.RPC(method, args, reply)
}
return a.client.RPC(method, args, reply)
}
// Client returns the configured client or nil
func (a *Agent) Client() *client.Client {
return a.client
}
// Server returns the configured server or nil
func (a *Agent) Server() *nomad.Server {
return a.server
}
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (a *Agent) Stats() map[string]map[string]string {
stats := make(map[string]map[string]string)
if a.server != nil {
subStat := a.server.Stats()
for k, v := range subStat {
stats[k] = v
}
}
if a.client != nil {
subStat := a.client.Stats()
for k, v := range subStat {
stats[k] = v
}
}
return stats
}
// setupConsulSyncer creates the Consul task used by this Nomad Agent when
// running in either Client and Server mode.
func (a *Agent) setupConsulSyncer(shutdownCh chan struct{}) (err error) {
a.consulSyncer, err = consul.NewSyncer(a.config.Consul, shutdownCh, a.logger)
return nil
}
// syncAgentServicesWithConsul syncs this Nomad Agent's services with Consul
// when running in either Client or Server mode.
func (a *Agent) syncAgentServicesWithConsul() error {
var services []*structs.Service
if a.client != nil {
if a.config.Consul.ClientServiceName != "" {
clientRpcService := &structs.Service{
Name: a.config.Consul.ClientServiceName,
PortLabel: a.clientRpcAddr,
Tags: []string{consul.ServiceTagRpc},
}
services = append(services, clientRpcService)
clientHttpService := &structs.Service{
Name: a.config.Consul.ClientServiceName,
PortLabel: a.clientHttpAddr,
Tags: []string{consul.ServiceTagHttp},
}
services = append(services, clientHttpService)
a.consulSyncer.SetServiceIdentifier("agent-client")
}
}
if a.server != nil {
if a.config.Consul.ServerServiceName != "" {
serverHttpService := &structs.Service{
Name: a.config.Consul.ServerServiceName,
Tags: []string{consul.ServiceTagHttp},
PortLabel: a.serverHttpAddr,
}
services = append(services, serverHttpService)
serverRpcService := &structs.Service{
Name: a.config.Consul.ServerServiceName,
Tags: []string{consul.ServiceTagRpc},
PortLabel: a.serverRpcAddr,
}
services = append(services, serverRpcService)
serverSerfService := &structs.Service{
Name: a.config.Consul.ServerServiceName,
Tags: []string{consul.ServiceTagSerf},
PortLabel: a.serverSerfAddr,
}
services = append(services, serverSerfService)
a.consulSyncer.SetServiceIdentifier("agent-server")
}
}
a.consulSyncer.SetAddrFinder(func(portLabel string) (string, int) {
host, port, err := net.SplitHostPort(portLabel)
if err != nil {
p, err := strconv.Atoi(port)
if err != nil {
return "", 0
}
return "", p
}
// If the addr for the service is ":port", then we fall back
// to Nomad's default address resolution protocol.
//
// TODO(sean@): This should poll Consul to figure out what
// its advertise address is and use that in order to handle
// the case where there is something funky like NAT on this
// host. For now we just use the BindAddr if set, otherwise
// we fall back to a loopback addr.
if host == "" {
if a.config.BindAddr != "" {
host = a.configBindAddr
} else {
host = "127.0.0.1"
}
}
p, err := strconv.Atoi(port)
if err != nil {
return host, 0
}
return host, p
})
a.consulSyncer.SetServices("agent", agentServiceGroup)
return a.consulSyncer.SyncServices()
}