Make var get a blocking query as expected (#14205)

This commit is contained in:
Charlie Voiselle
2022-08-22 16:37:21 -04:00
committed by GitHub
parent 36c1c686bb
commit ab67f30e44
2 changed files with 139 additions and 4 deletions

View File

@@ -1,10 +1,12 @@
package nomad
import (
"encoding/json"
"fmt"
"math/rand"
"strings"
"testing"
"time"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/shoenig/test"
@@ -560,3 +562,136 @@ namespace "*" {}
testListPrefix("*", "", 6, nil)
}
func TestSecureVariablesEndpoint_GetSecureVariable_Blocking(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// First create an unrelated variable.
delay := 100 * time.Millisecond
time.AfterFunc(delay, func() {
writeVar(t, s1, 100, "default", "aaa")
})
// Upsert the variable we are watching later
delay = 200 * time.Millisecond
time.AfterFunc(delay, func() {
writeVar(t, s1, 200, "default", "bbb")
})
// Lookup the variable
req := &structs.SecureVariablesReadRequest{
Path: "bbb",
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 150,
MaxQueryTime: 500 * time.Millisecond,
},
}
var resp structs.SecureVariablesReadResponse
start := time.Now()
if err := msgpackrpc.CallWithCodec(codec, "SecureVariables.Read", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
elapsed := time.Since(start)
if elapsed < delay {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if elapsed > req.MaxQueryTime {
t.Fatalf("blocking query timed out %#v", resp)
}
if resp.Index != 200 {
t.Fatalf("Bad index: %d %d", resp.Index, 200)
}
if resp.Data == nil || resp.Data.Path != "bbb" {
t.Fatalf("bad: %#v", resp.Data)
}
// Variable update triggers watches
delay = 100 * time.Millisecond
time.AfterFunc(delay, func() {
writeVar(t, s1, 300, "default", "bbb")
})
req.QueryOptions.MinQueryIndex = 250
var resp2 structs.SecureVariablesReadResponse
start = time.Now()
if err := msgpackrpc.CallWithCodec(codec, "SecureVariables.Read", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
elapsed = time.Since(start)
if elapsed < delay {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if elapsed > req.MaxQueryTime {
t.Fatal("blocking query timed out")
}
if resp2.Index != 300 {
t.Fatalf("Bad index: %d %d", resp2.Index, 300)
}
if resp2.Data == nil || resp2.Data.Path != "bbb" {
t.Fatalf("bad: %#v", resp2.Data)
}
// Variable delete triggers watches
delay = 100 * time.Millisecond
time.AfterFunc(delay, func() {
sv := mock.SecureVariableEncrypted()
sv.Path = "bbb"
if resp := state.SVEDelete(400, &structs.SVApplyStateRequest{Op: structs.SVOpDelete, Var: sv}); !resp.IsOk() {
t.Fatalf("err: %v", resp.Error)
}
})
req.QueryOptions.MinQueryIndex = 350
var resp3 structs.SecureVariablesReadResponse
start = time.Now()
if err := msgpackrpc.CallWithCodec(codec, "SecureVariables.Read", req, &resp3); err != nil {
t.Fatalf("err: %v", err)
}
elapsed = time.Since(start)
if elapsed < delay {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if elapsed > req.MaxQueryTime {
t.Fatal("blocking query timed out")
}
if resp3.Index != 400 {
t.Fatalf("Bad index: %d %d", resp3.Index, 400)
}
if resp3.Data != nil {
t.Fatalf("bad: %#v", resp3.Data)
}
}
func writeVar(t *testing.T, s *Server, idx uint64, ns, path string) {
store := s.fsm.State()
sv := mock.SecureVariable()
sv.Namespace = ns
sv.Path = path
bPlain, err := json.Marshal(sv.Items)
must.NoError(t, err)
bEnc, kID, err := s.encrypter.Encrypt(bPlain)
must.NoError(t, err)
sve := &structs.SecureVariableEncrypted{
SecureVariableMetadata: sv.SecureVariableMetadata,
SecureVariableData: structs.SecureVariableData{
Data: bEnc,
KeyID: kID,
},
}
resp := store.SVESet(idx, &structs.SVApplyStateRequest{
Op: structs.SVOpSet,
Var: sve,
})
must.NoError(t, resp.Error)
}

View File

@@ -99,10 +99,11 @@ func (s *StateStore) GetSecureVariable(
txn := s.db.ReadTxn()
// Try to fetch the secure variable.
raw, err := txn.First(TableSecureVariables, indexID, namespace, path)
watchCh, raw, err := txn.FirstWatch(TableSecureVariables, indexID, namespace, path)
if err != nil { // error during fetch
return nil, fmt.Errorf("secure variable lookup failed: %v", err)
}
ws.Add(watchCh)
if raw == nil { // not found
return nil, nil
}
@@ -111,7 +112,7 @@ func (s *StateStore) GetSecureVariable(
return sv, nil
}
// SVESet is used to store a secure variable pair.
// SVESet is used to store a secure variable object.
func (s *StateStore) SVESet(idx uint64, sv *structs.SVApplyStateRequest) *structs.SVApplyStateResponse {
tx := s.db.WriteTxn(idx)
defer tx.Abort()
@@ -130,8 +131,7 @@ func (s *StateStore) SVESet(idx uint64, sv *structs.SVApplyStateRequest) *struct
// SVESetCAS is used to do a check-and-set operation on a secure
// variable. The ModifyIndex in the provided entry is used to determine if
// we should write the entry to the state store or not. Returns a bool
// indicating whether or not a write happened and any error that occurred.
// we should write the entry to the state store or not.
func (s *StateStore) SVESetCAS(idx uint64, sv *structs.SVApplyStateRequest) *structs.SVApplyStateResponse {
tx := s.db.WriteTxn(idx)
defer tx.Abort()