mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
reset max query time of blocking queries in client after retries (#25039)
When a blocking query on the client hits a retryable error, we change the max query time so that it falls within the `RPCHoldTimeout` timeout. But when the retry succeeds we don't reset it to the original value. Because the calls to `Node.GetClientAllocs` reuse the same request struct instead of reallocating it, any retry will cause the agent to poll at a faster frequency until the agent restarts. No other current RPC on the client has this behavior, but we'll fix this in the `rpc` method rather than in the caller so that any future users of the `rpc` method don't have to remember this detail. Fixes: https://github.com/hashicorp/nomad/issues/25033
This commit is contained in:
3
.changelog/25039.txt
Normal file
3
.changelog/25039.txt
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
```release-note:bug
|
||||||
|
client: Fixed a bug where temporary RPC errors cause the client to poll for changes more frequently thereafter
|
||||||
|
```
|
||||||
@@ -91,7 +91,9 @@ func (c *Client) rpc(method string, args any, reply any) error {
|
|||||||
|
|
||||||
// If its a blocking query, allow the time specified by the request
|
// If its a blocking query, allow the time specified by the request
|
||||||
if info, ok := args.(structs.RPCInfo); ok {
|
if info, ok := args.(structs.RPCInfo); ok {
|
||||||
deadline = deadline.Add(info.TimeToBlock())
|
oldBlockTime := info.TimeToBlock()
|
||||||
|
deadline = deadline.Add(oldBlockTime)
|
||||||
|
defer info.SetTimeToBlock(oldBlockTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
TRY:
|
TRY:
|
||||||
@@ -127,9 +129,12 @@ TRY:
|
|||||||
}
|
}
|
||||||
|
|
||||||
if time.Now().After(deadline) {
|
if time.Now().After(deadline) {
|
||||||
// Blocking queries are tricky. jitters and rpcholdtimes in multiple places can result in our server call taking longer than we wanted it to. For example:
|
// Blocking queries are tricky. jitters and rpcholdtimes in multiple
|
||||||
// a block time of 5s may easily turn into the server blocking for 10s since it applies its own RPCHoldTime. If the server dies at t=7s we still want to retry
|
// places can result in our server call taking longer than we wanted it
|
||||||
// so before we give up on blocking queries make one last attempt for an immediate answer
|
// to. For example: a block time of 5s may easily turn into the server
|
||||||
|
// blocking for 10s since it applies its own RPCHoldTime. If the server
|
||||||
|
// dies at t=7s we still want to retry so before we give up on blocking
|
||||||
|
// queries make one last attempt for an immediate answer
|
||||||
if info, ok := args.(structs.RPCInfo); ok && info.TimeToBlock() > 0 {
|
if info, ok := args.(structs.RPCInfo); ok && info.TimeToBlock() > 0 {
|
||||||
info.SetTimeToBlock(0)
|
info.SetTimeToBlock(0)
|
||||||
return c.RPC(method, args, reply)
|
return c.RPC(method, args, reply)
|
||||||
@@ -144,10 +149,13 @@ TRY:
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
// If we are going to retry a blocking query we need to update the time to block so it finishes by our deadline.
|
// If we are going to retry a blocking query we need to update the time
|
||||||
|
// to block so it finishes by our deadline.
|
||||||
|
|
||||||
if info, ok := args.(structs.RPCInfo); ok && info.TimeToBlock() > 0 {
|
if info, ok := args.(structs.RPCInfo); ok && info.TimeToBlock() > 0 {
|
||||||
newBlockTime := time.Until(deadline)
|
newBlockTime := time.Until(deadline)
|
||||||
// We can get below 0 here on slow computers because we slept for jitter so at least try to get an immediate response
|
// We can get below 0 here on slow computers because we slept for
|
||||||
|
// jitter so at least try to get an immediate response
|
||||||
if newBlockTime < 0 {
|
if newBlockTime < 0 {
|
||||||
newBlockTime = 0
|
newBlockTime = 0
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ package client
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/nomad/ci"
|
"github.com/hashicorp/nomad/ci"
|
||||||
"github.com/hashicorp/nomad/client/config"
|
"github.com/hashicorp/nomad/client/config"
|
||||||
@@ -191,3 +192,57 @@ func Test_resolveServer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRpc_RetryBlockTime(t *testing.T) {
|
||||||
|
ci.Parallel(t)
|
||||||
|
|
||||||
|
// Timeouts have to allow for multiple passes thru the recursive c.rpc
|
||||||
|
// call. Unconfigurable internal timeouts prevent us from using a shorter
|
||||||
|
// MaxQueryTime base for this test
|
||||||
|
expectMaxQueryTime := time.Second
|
||||||
|
rpcHoldTimeout := 5 * time.Second
|
||||||
|
unblockTimeout := 7 * time.Second
|
||||||
|
|
||||||
|
srv, cleanupSrv := nomad.TestServer(t, func(c *nomad.Config) {
|
||||||
|
c.NumSchedulers = 0
|
||||||
|
c.BootstrapExpect = 3 // we intentionally don't want a leader
|
||||||
|
})
|
||||||
|
t.Cleanup(func() { cleanupSrv() })
|
||||||
|
|
||||||
|
c, cleanupC := TestClient(t, func(c *config.Config) {
|
||||||
|
c.Servers = []string{srv.GetConfig().RPCAddr.String()}
|
||||||
|
c.RPCHoldTimeout = rpcHoldTimeout
|
||||||
|
})
|
||||||
|
t.Cleanup(func() { cleanupC() })
|
||||||
|
|
||||||
|
req := structs.NodeSpecificRequest{
|
||||||
|
NodeID: c.NodeID(),
|
||||||
|
SecretID: c.secretNodeID(),
|
||||||
|
QueryOptions: structs.QueryOptions{
|
||||||
|
Region: c.Region(),
|
||||||
|
AuthToken: c.secretNodeID(),
|
||||||
|
MinQueryIndex: 10000, // some far-flung index we know won't exist yet
|
||||||
|
MaxQueryTime: expectMaxQueryTime,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := structs.NodeClientAllocsResponse{}
|
||||||
|
errCh := make(chan error)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
err := c.rpc("Node.GetClientAllocs", &req, &resp)
|
||||||
|
errCh <- err
|
||||||
|
}()
|
||||||
|
|
||||||
|
// wait for the blocking query to run long enough for 2 passes thru,
|
||||||
|
// including jitter
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
must.NoError(t, err)
|
||||||
|
case <-time.After(unblockTimeout):
|
||||||
|
cleanupC() // force unblock
|
||||||
|
}
|
||||||
|
|
||||||
|
must.Eq(t, expectMaxQueryTime, req.MaxQueryTime,
|
||||||
|
must.Sprintf("MaxQueryTime was changed during retries but not reset"))
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user