networking: add ignore_collision for static port{} (#23956)

so more than one copy of a program can run
at a time on the same port with SO_REUSEPORT.

requires host network mode.

some task drivers (like docker) may also need
config {
  network_mode = "host"
}
but this is not validated prior to placement.
This commit is contained in:
Daniel Bennett
2024-09-17 16:01:48 -05:00
committed by GitHub
parent 603a747f5b
commit ec81e7c57c
12 changed files with 198 additions and 43 deletions

3
.changelog/23956.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:improvement
networking: Added an option to ignore static port collisions when scheduling, for programs that use the SO_REUSEPORT unix socket option
```

View File

@@ -25,7 +25,7 @@ func TestCompose(t *testing.T) {
{
CIDR: "0.0.0.0/0",
MBits: pointerOf(100),
ReservedPorts: []Port{{"", 80, 0, ""}, {"", 443, 0, ""}},
ReservedPorts: []Port{{Label: "", Value: 80}, {Label: "", Value: 443}},
},
},
})
@@ -116,8 +116,8 @@ func TestCompose(t *testing.T) {
CIDR: "0.0.0.0/0",
MBits: pointerOf(100),
ReservedPorts: []Port{
{"", 80, 0, ""},
{"", 443, 0, ""},
{Label: "", Value: 80},
{Label: "", Value: 443},
},
},
},

View File

@@ -145,10 +145,11 @@ func (n *NUMAResource) Canonicalize() {
}
type Port struct {
Label string `hcl:",label"`
Value int `hcl:"static,optional"`
To int `hcl:"to,optional"`
HostNetwork string `hcl:"host_network,optional"`
Label string `hcl:",label"`
Value int `hcl:"static,optional"`
To int `hcl:"to,optional"`
HostNetwork string `hcl:"host_network,optional"`
IgnoreCollision bool `hcl:"ignore_collision,optional"`
}
type DNSConfig struct {

View File

@@ -235,7 +235,7 @@ func TestTask_Require(t *testing.T) {
{
CIDR: "0.0.0.0/0",
MBits: pointerOf(100),
ReservedPorts: []Port{{"", 80, 0, ""}, {"", 443, 0, ""}},
ReservedPorts: []Port{{Label: "", Value: 80}, {Label: "", Value: 443}},
},
},
}

View File

@@ -1576,10 +1576,11 @@ func ApiNetworkResourceToStructs(in []*api.NetworkResource) []*structs.NetworkRe
func ApiPortToStructs(in api.Port) structs.Port {
return structs.Port{
Label: in.Label,
Value: in.Value,
To: in.To,
HostNetwork: in.HostNetwork,
Label: in.Label,
Value: in.Value,
To: in.To,
HostNetwork: in.HostNetwork,
IgnoreCollision: in.IgnoreCollision,
}
}

View File

@@ -2812,7 +2812,7 @@ func portDiffs(old, new []Port, dynamic bool, contextual bool) []*ObjectDiff {
filter := []string{"_struct"}
name := "Static Port"
if dynamic {
filter = []string{"_struct", "Value"}
filter = []string{"_struct", "Value", "IgnoreCollision"}
name = "Dynamic Port"
}

View File

@@ -4614,6 +4614,12 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "IgnoreCollision",
Old: "false",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Label",
@@ -6486,6 +6492,12 @@ func TestTaskDiff(t *testing.T) {
Type: DiffTypeAdded,
Name: "Static Port",
Fields: []*FieldDiff{
{
Type: DiffTypeAdded,
Name: "IgnoreCollision",
Old: "",
New: "false",
},
{
Type: DiffTypeAdded,
Name: "Label",
@@ -6534,6 +6546,12 @@ func TestTaskDiff(t *testing.T) {
Type: DiffTypeDeleted,
Name: "Static Port",
Fields: []*FieldDiff{
{
Type: DiffTypeDeleted,
Name: "IgnoreCollision",
Old: "false",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Label",

View File

@@ -430,14 +430,16 @@ func (idx *NetworkIndex) AddReserved(n *NetworkResource) (collide bool, reasons
func (idx *NetworkIndex) AddReservedPorts(ports AllocatedPorts) (collide bool, reasons []string) {
for _, port := range ports {
used := idx.getUsedPortsFor(port.HostIP)
if port.Value < 0 || port.Value >= MaxValidPort {
return true, []string{fmt.Sprintf("invalid port %d", port.Value)}
}
used := idx.getUsedPortsFor(port.HostIP)
if used.Check(uint(port.Value)) {
collide = true
reason := fmt.Sprintf("port %d already in use", port.Value)
reasons = append(reasons, reason)
if !port.IgnoreCollision {
collide = true
reason := fmt.Sprintf("port %d already in use", port.Value)
reasons = append(reasons, reason)
}
} else {
used.Set(uint(port.Value))
}
@@ -518,23 +520,26 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro
var allocPort *AllocatedPortMapping
var addrErr error
for _, addr := range idx.HostNetworks[port.HostNetwork] {
used := idx.getUsedPortsFor(addr.Address)
// Guard against invalid port
if port.Value < 0 || port.Value >= MaxValidPort {
return nil, fmt.Errorf("invalid port %d (out of range)", port.Value)
}
// Check if in use
if used != nil && used.Check(uint(port.Value)) {
addrErr = fmt.Errorf("reserved port collision %s=%d", port.Label, port.Value)
continue
if !port.IgnoreCollision {
used := idx.getUsedPortsFor(addr.Address)
if used != nil && used.Check(uint(port.Value)) {
addrErr = fmt.Errorf("reserved port collision %s=%d", port.Label, port.Value)
continue
}
}
allocPort = &AllocatedPortMapping{
Label: port.Label,
Value: port.Value,
To: port.To,
HostIP: addr.Address,
Label: port.Label,
Value: port.Value,
To: port.To,
HostIP: addr.Address,
IgnoreCollision: port.IgnoreCollision,
}
break
}

View File

@@ -584,6 +584,82 @@ func TestNetworkIndex_AssignPorts_TwoIp(t *testing.T) {
}
// TestNetworkIndex_IgnorePortCollision tests Port.IgnoreCollision.
func TestNetworkIndex_IgnorePortCollision(t *testing.T) {
ci.Parallel(t)
// set up some fake resources
ip := "127.3.2.1"
net := "test-ignore-port-collision"
n := &Node{
NodeResources: &NodeResources{
NodeNetworks: []*NodeNetworkResource{{
Addresses: []NodeNetworkAddress{{
Alias: net,
Address: ip,
}},
}},
},
}
getPortMappings := func(collideOK bool) []AllocatedPortMapping {
return []AllocatedPortMapping{{
HostIP: ip,
Label: "test-port",
Value: 10,
To: 10,
IgnoreCollision: collideOK,
}}
}
getPorts := func(collideOK bool) []Port {
return []Port{{
HostNetwork: net,
Label: "test-port",
Value: 10,
To: 10,
IgnoreCollision: collideOK,
}}
}
collidingPortMappings := getPortMappings(false)
nonCollidingPortMappings := getPortMappings(true)
collidingPorts := getPorts(false)
nonCollidingPorts := getPorts(true)
// now we can get started
idx := NewNetworkIndex()
idx.SetNode(n)
// initial reservation - pretend some other job has already used the port
// note the behavior below is the same whether this one is a collider or not
collide, reasons := idx.AddReservedPorts(collidingPortMappings)
must.False(t, collide, must.Sprint("expect no collision in first reservation"))
must.Len(t, 0, reasons, must.Sprint("expect no reasons in first reservation"))
t.Run("AddReservedPorts", func(t *testing.T) {
collide, reasons = idx.AddReservedPorts(collidingPortMappings)
must.True(t, collide, must.Sprint("expect collision"))
must.Eq(t, []string{"port 10 already in use"}, reasons, must.Sprint("expect collision reasons"))
collide, reasons = idx.AddReservedPorts(nonCollidingPortMappings)
must.False(t, collide, must.Sprint("expect no collision"))
must.Len(t, 0, reasons, must.Sprint("expect no collision reasons"))
})
t.Run("AssignPorts", func(t *testing.T) {
ask := &NetworkResource{ReservedPorts: collidingPorts}
allocated, err := idx.AssignPorts(ask)
must.ErrorContains(t, err, "reserved port collision test-port=10")
must.Nil(t, allocated, must.Sprint("expect no ports on AssignPorts error"))
ask = &NetworkResource{ReservedPorts: nonCollidingPorts}
allocated, err = idx.AssignPorts(ask)
must.NoError(t, err)
must.Eq(t, nonCollidingPortMappings, allocated)
})
}
func TestNetworkIndex_AssignTaskNetwork(t *testing.T) {
ci.Parallel(t)
idx := NewNetworkIndex()

View File

@@ -2805,18 +2805,20 @@ type AllocatedPortMapping struct {
// msgpack omit empty fields during serialization
_struct bool `codec:",omitempty"` // nolint: structcheck
Label string
Value int
To int
HostIP string
Label string
Value int
To int
HostIP string
IgnoreCollision bool
}
func (m *AllocatedPortMapping) Copy() *AllocatedPortMapping {
return &AllocatedPortMapping{
Label: m.Label,
Value: m.Value,
To: m.To,
HostIP: m.HostIP,
Label: m.Label,
Value: m.Value,
To: m.To,
HostIP: m.HostIP,
IgnoreCollision: m.IgnoreCollision,
}
}
@@ -2833,6 +2835,8 @@ func (m *AllocatedPortMapping) Equal(o *AllocatedPortMapping) bool {
return false
case m.HostIP != o.HostIP:
return false
case m.IgnoreCollision != o.IgnoreCollision:
return false
}
return true
}
@@ -2875,6 +2879,11 @@ type Port struct {
// to. Jobs with a HostNetwork set can only be placed on nodes with
// that host network available.
HostNetwork string
// IgnoreCollision ignores port collisions, so the port can be used more
// than one time on a single network, for tasks that support SO_REUSEPORT
// Should be used only with static ports.
IgnoreCollision bool
}
type DNSConfig struct {
@@ -3044,10 +3053,11 @@ func (ns Networks) Port(label string) AllocatedPortMapping {
for _, p := range n.ReservedPorts {
if p.Label == label {
return AllocatedPortMapping{
Label: label,
Value: p.Value,
To: p.To,
HostIP: n.IP,
Label: label,
Value: p.Value,
To: p.To,
HostIP: n.IP,
IgnoreCollision: p.IgnoreCollision,
}
}
}
@@ -7267,8 +7277,10 @@ func (tg *TaskGroup) validateNetworks() error {
}
// static port
if other, ok := staticPorts[port.Value]; ok {
err := fmt.Errorf("Static port %d already reserved by %s", port.Value, other)
mErr.Errors = append(mErr.Errors, err)
if !port.IgnoreCollision {
err := fmt.Errorf("Static port %d already reserved by %s", port.Value, other)
mErr.Errors = append(mErr.Errors, err)
}
} else if port.Value > math.MaxUint16 {
err := fmt.Errorf("Port %s (%d) cannot be greater than %d", port.Label, port.Value, math.MaxUint16)
mErr.Errors = append(mErr.Errors, err)
@@ -7285,6 +7297,11 @@ func (tg *TaskGroup) validateNetworks() error {
err := fmt.Errorf("Port %q cannot be mapped to a port (%d) greater than %d", port.Label, port.To, math.MaxUint16)
mErr.Errors = append(mErr.Errors, err)
}
if port.IgnoreCollision && !(net.Mode == "" || net.Mode == "host") {
err := fmt.Errorf("Port %q collision may not be ignored on non-host network mode %q", port.Label, net.Mode)
mErr.Errors = append(mErr.Errors, err)
}
}
// Validate the cniArgs in each network resource. Make sure there are no duplicate Args in
// different network resources or invalid characters (;) in key or value ;)

View File

@@ -2065,6 +2065,31 @@ func TestTaskGroupNetwork_Validate(t *testing.T) {
},
ErrContains: "invalid ';' character in CNI arg value \"first_value;",
},
{
TG: &TaskGroup{
Name: "testing-port-ignore-collision-ok",
Networks: []*NetworkResource{{
Mode: "host",
ReservedPorts: []Port{
{Label: "one", Value: 10, IgnoreCollision: true},
{Label: "two", Value: 10, IgnoreCollision: true},
},
}},
},
},
{
TG: &TaskGroup{
Name: "testing-port-ignore-collision-non-host-network-mode",
Networks: []*NetworkResource{{
Mode: "not-host",
ReservedPorts: []Port{
{Label: "one", Value: 10, IgnoreCollision: true},
{Label: "two", Value: 10, IgnoreCollision: true},
},
}},
},
ErrContains: "collision may not be ignored on non-host network mode",
},
}
for i := range cases {

View File

@@ -105,6 +105,12 @@ All other operating systems use the `host` networking mode.
- `host_network` `(string:nil)` - Designates the host network name to use when allocating
the port. When port mapping the host port will only forward traffic to the matched host
network address.
- `ignore_collision` `(bool: false)` - Allows the group to be placed on a node
where the port may already be reserved. Intended for programs that support
`SO_REUSEPORT` unix socket option, so more than one instance of the program
may bind to the same port. Only compatible with [`host`](#host) network mode
and `static` ports. Some task drivers (e.g. docker) may also require setting
`network_mode = "host"` (or similar) to avoid runtime errors after placement.
The label assigned to the port is used to identify the port in service
discovery, and used in the name of the environment variable that indicates
@@ -169,10 +175,10 @@ network {
### Static Ports
This example specifies a static port allocation for the port labeled "lb". Static
ports bind your job to a specific port on the host they' are placed on. Since
multiple services cannot share a port, the port must be open in order to place
your task.
Static ports place your job on a host where the port is not already reserved
by another job with the same port.
This example specifies a static port allocation for the port labeled "lb".
```hcl
network {
@@ -182,6 +188,9 @@ network {
}
```
For programs that support the `SO_REUSEPORT` unix socket option,
you may set `ignore_collision = true` to place multiple copies on a single node.
### Mapped Ports
Some drivers (such as [Docker][docker-driver] and [QEMU][qemu-driver]) allow you