diff --git a/client/client.go b/client/client.go index c8e3b8767..22e2f24d1 100644 --- a/client/client.go +++ b/client/client.go @@ -135,6 +135,10 @@ type Client struct { blockedAllocations map[string]*structs.Allocation blockedAllocsLock sync.RWMutex + // migratingAllocs is the set of allocs whose data migration is in flight + migratingAllocs map[string]*migrateAllocCtrl + migratingAllocsLock sync.Mutex + // allocUpdates stores allocations that need to be synced to the server. allocUpdates chan *structs.Allocation @@ -151,10 +155,6 @@ type Client struct { // vaultClient is used to interact with Vault for token and secret renewals vaultClient vaultclient.VaultClient - // migratingAllocs is the set of allocs whose data migration is in flight - migratingAllocs map[string]*migrateAllocCtrl - migratingAllocsLock sync.Mutex - // garbageCollector is used to garbage collect terminal allocations present // in the node automatically garbageCollector *AllocGarbageCollector @@ -162,14 +162,16 @@ type Client struct { // migrateAllocCtrl indicates whether migration is complete type migrateAllocCtrl struct { + alloc *structs.Allocation ch chan struct{} closed bool chLock sync.Mutex } -func newMigrateAllocCtrl() *migrateAllocCtrl { +func newMigrateAllocCtrl(alloc *structs.Allocation) *migrateAllocCtrl { return &migrateAllocCtrl{ - ch: make(chan struct{}), + ch: make(chan struct{}), + alloc: alloc, } } @@ -1501,7 +1503,7 @@ func (c *Client) runAllocs(update *allocUpdates) { // prevents a race between a finishing blockForRemoteAlloc and // another invocation of runAllocs if _, ok := c.getAllocRunners()[add.PreviousAllocation]; !ok { - c.migratingAllocs[add.ID] = newMigrateAllocCtrl() + c.migratingAllocs[add.ID] = newMigrateAllocCtrl(add) go c.blockForRemoteAlloc(add) } } @@ -2220,6 +2222,111 @@ func (c *Client) emitStats(hStats *stats.HostStats) { metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "used_percent"}, float32(disk.UsedPercent)) metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "inodes_percent"}, float32(disk.InodesUsedPercent)) } + + // Get all the resources for the node + c.configLock.RLock() + node := c.configCopy.Node + c.configLock.RUnlock() + total := node.Resources + res := node.Reserved + allocated := c.getAllocatedResources(node) + + // Emit allocated + metrics.SetGauge([]string{"client", "allocated", "memory", nodeID}, float32(allocated.MemoryMB)) + metrics.SetGauge([]string{"client", "allocated", "disk", nodeID}, float32(allocated.DiskMB)) + metrics.SetGauge([]string{"client", "allocated", "cpu", nodeID}, float32(allocated.CPU)) + metrics.SetGauge([]string{"client", "allocated", "iops", nodeID}, float32(allocated.IOPS)) + + for _, n := range allocated.Networks { + metrics.SetGauge([]string{"client", "allocated", "network", n.Device, nodeID}, float32(n.MBits)) + } + + // Emit unallocated + unallocatedMem := total.MemoryMB - res.MemoryMB - allocated.MemoryMB + unallocatedDisk := total.DiskMB - res.DiskMB - allocated.DiskMB + unallocatedCpu := total.CPU - res.CPU - allocated.CPU + unallocatedIops := total.IOPS - res.IOPS - allocated.IOPS + metrics.SetGauge([]string{"client", "unallocated", "memory", nodeID}, float32(unallocatedMem)) + metrics.SetGauge([]string{"client", "unallocated", "disk", nodeID}, float32(unallocatedDisk)) + metrics.SetGauge([]string{"client", "unallocated", "cpu", nodeID}, float32(unallocatedCpu)) + metrics.SetGauge([]string{"client", "unallocated", "iops", nodeID}, float32(unallocatedIops)) + + for _, n := range allocated.Networks { + totalMbits := 0 + + totalIdx := total.NetIndex(n) + if totalIdx != -1 { + totalMbits = total.Networks[totalIdx].MBits + continue + } + + unallocatedMbits := totalMbits - n.MBits + metrics.SetGauge([]string{"client", "unallocated", "network", n.Device, nodeID}, float32(unallocatedMbits)) + } +} + +func (c *Client) getAllocatedResources(selfNode *structs.Node) *structs.Resources { + // Unfortunately the allocs only have IP so we need to match them to the + // device + cidrToDevice := make(map[*net.IPNet]string, len(selfNode.Resources.Networks)) + for _, n := range selfNode.Resources.Networks { + _, ipnet, err := net.ParseCIDR(n.CIDR) + if err != nil { + continue + } + cidrToDevice[ipnet] = n.Device + } + + // Sum the allocated resources + allocs := c.allAllocs() + var allocated structs.Resources + allocatedDeviceMbits := make(map[string]int) + for _, alloc := range allocs { + if !alloc.TerminalStatus() { + allocated.Add(alloc.Resources) + for _, allocatedNetwork := range alloc.Resources.Networks { + for cidr, dev := range cidrToDevice { + ip := net.ParseIP(allocatedNetwork.IP) + if cidr.Contains(ip) { + allocatedDeviceMbits[dev] += allocatedNetwork.MBits + break + } + } + } + } + } + + // Clear the networks + allocated.Networks = nil + for dev, speed := range allocatedDeviceMbits { + net := &structs.NetworkResource{ + Device: dev, + MBits: speed, + } + allocated.Networks = append(allocated.Networks, net) + } + + return &allocated +} + +// allAllocs returns all the allocations managed by the client +func (c *Client) allAllocs() []*structs.Allocation { + var allocs []*structs.Allocation + for _, ar := range c.getAllocRunners() { + allocs = append(allocs, ar.Alloc()) + } + c.blockedAllocsLock.Lock() + for _, alloc := range c.blockedAllocations { + allocs = append(allocs, alloc) + } + c.blockedAllocsLock.Unlock() + + c.migratingAllocsLock.Lock() + for _, ctrl := range c.migratingAllocs { + allocs = append(allocs, ctrl.alloc) + } + c.migratingAllocsLock.Unlock() + return allocs } // resolveServer given a sever's address as a string, return it's resolved diff --git a/demo/vagrant/client1.hcl b/demo/vagrant/client1.hcl index 24083ee3f..11699845c 100644 --- a/demo/vagrant/client1.hcl +++ b/demo/vagrant/client1.hcl @@ -22,9 +22,16 @@ client { } reserved { cpu = 500 + memory = 512 + disk = 1024 } } +telemetry { + publish_allocation_metrics = true + publish_node_metrics = true +} + # Modify our port to avoid a collision with server1 ports { http = 5656