From ab67f30e441649fc7ae82cf6046173bd3ae76830 Mon Sep 17 00:00:00 2001 From: Charlie Voiselle <464492+angrycub@users.noreply.github.com> Date: Mon, 22 Aug 2022 16:37:21 -0400 Subject: [PATCH] Make var get a blocking query as expected (#14205) --- nomad/secure_variables_endpoint_test.go | 135 ++++++++++++++++++++ nomad/state/state_store_secure_variables.go | 8 +- 2 files changed, 139 insertions(+), 4 deletions(-) diff --git a/nomad/secure_variables_endpoint_test.go b/nomad/secure_variables_endpoint_test.go index 58aafd892..5666dddcc 100644 --- a/nomad/secure_variables_endpoint_test.go +++ b/nomad/secure_variables_endpoint_test.go @@ -1,10 +1,12 @@ package nomad import ( + "encoding/json" "fmt" "math/rand" "strings" "testing" + "time" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/shoenig/test" @@ -560,3 +562,136 @@ namespace "*" {} testListPrefix("*", "", 6, nil) } + +func TestSecureVariablesEndpoint_GetSecureVariable_Blocking(t *testing.T) { + ci.Parallel(t) + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // First create an unrelated variable. + delay := 100 * time.Millisecond + time.AfterFunc(delay, func() { + writeVar(t, s1, 100, "default", "aaa") + }) + + // Upsert the variable we are watching later + delay = 200 * time.Millisecond + time.AfterFunc(delay, func() { + writeVar(t, s1, 200, "default", "bbb") + }) + + // Lookup the variable + req := &structs.SecureVariablesReadRequest{ + Path: "bbb", + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 150, + MaxQueryTime: 500 * time.Millisecond, + }, + } + var resp structs.SecureVariablesReadResponse + start := time.Now() + if err := msgpackrpc.CallWithCodec(codec, "SecureVariables.Read", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + elapsed := time.Since(start) + + if elapsed < delay { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + if elapsed > req.MaxQueryTime { + t.Fatalf("blocking query timed out %#v", resp) + } + if resp.Index != 200 { + t.Fatalf("Bad index: %d %d", resp.Index, 200) + } + if resp.Data == nil || resp.Data.Path != "bbb" { + t.Fatalf("bad: %#v", resp.Data) + } + + // Variable update triggers watches + delay = 100 * time.Millisecond + + time.AfterFunc(delay, func() { + writeVar(t, s1, 300, "default", "bbb") + }) + + req.QueryOptions.MinQueryIndex = 250 + var resp2 structs.SecureVariablesReadResponse + start = time.Now() + if err := msgpackrpc.CallWithCodec(codec, "SecureVariables.Read", req, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + elapsed = time.Since(start) + + if elapsed < delay { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) + } + if elapsed > req.MaxQueryTime { + t.Fatal("blocking query timed out") + } + if resp2.Index != 300 { + t.Fatalf("Bad index: %d %d", resp2.Index, 300) + } + if resp2.Data == nil || resp2.Data.Path != "bbb" { + t.Fatalf("bad: %#v", resp2.Data) + } + + // Variable delete triggers watches + delay = 100 * time.Millisecond + time.AfterFunc(delay, func() { + sv := mock.SecureVariableEncrypted() + sv.Path = "bbb" + if resp := state.SVEDelete(400, &structs.SVApplyStateRequest{Op: structs.SVOpDelete, Var: sv}); !resp.IsOk() { + t.Fatalf("err: %v", resp.Error) + } + }) + + req.QueryOptions.MinQueryIndex = 350 + var resp3 structs.SecureVariablesReadResponse + start = time.Now() + if err := msgpackrpc.CallWithCodec(codec, "SecureVariables.Read", req, &resp3); err != nil { + t.Fatalf("err: %v", err) + } + elapsed = time.Since(start) + + if elapsed < delay { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + if elapsed > req.MaxQueryTime { + t.Fatal("blocking query timed out") + } + if resp3.Index != 400 { + t.Fatalf("Bad index: %d %d", resp3.Index, 400) + } + if resp3.Data != nil { + t.Fatalf("bad: %#v", resp3.Data) + } +} + +func writeVar(t *testing.T, s *Server, idx uint64, ns, path string) { + store := s.fsm.State() + sv := mock.SecureVariable() + sv.Namespace = ns + sv.Path = path + bPlain, err := json.Marshal(sv.Items) + must.NoError(t, err) + bEnc, kID, err := s.encrypter.Encrypt(bPlain) + must.NoError(t, err) + sve := &structs.SecureVariableEncrypted{ + SecureVariableMetadata: sv.SecureVariableMetadata, + SecureVariableData: structs.SecureVariableData{ + Data: bEnc, + KeyID: kID, + }, + } + resp := store.SVESet(idx, &structs.SVApplyStateRequest{ + Op: structs.SVOpSet, + Var: sve, + }) + must.NoError(t, resp.Error) +} diff --git a/nomad/state/state_store_secure_variables.go b/nomad/state/state_store_secure_variables.go index 626d9eb28..9fbc7e64a 100644 --- a/nomad/state/state_store_secure_variables.go +++ b/nomad/state/state_store_secure_variables.go @@ -99,10 +99,11 @@ func (s *StateStore) GetSecureVariable( txn := s.db.ReadTxn() // Try to fetch the secure variable. - raw, err := txn.First(TableSecureVariables, indexID, namespace, path) + watchCh, raw, err := txn.FirstWatch(TableSecureVariables, indexID, namespace, path) if err != nil { // error during fetch return nil, fmt.Errorf("secure variable lookup failed: %v", err) } + ws.Add(watchCh) if raw == nil { // not found return nil, nil } @@ -111,7 +112,7 @@ func (s *StateStore) GetSecureVariable( return sv, nil } -// SVESet is used to store a secure variable pair. +// SVESet is used to store a secure variable object. func (s *StateStore) SVESet(idx uint64, sv *structs.SVApplyStateRequest) *structs.SVApplyStateResponse { tx := s.db.WriteTxn(idx) defer tx.Abort() @@ -130,8 +131,7 @@ func (s *StateStore) SVESet(idx uint64, sv *structs.SVApplyStateRequest) *struct // SVESetCAS is used to do a check-and-set operation on a secure // variable. The ModifyIndex in the provided entry is used to determine if -// we should write the entry to the state store or not. Returns a bool -// indicating whether or not a write happened and any error that occurred. +// we should write the entry to the state store or not. func (s *StateStore) SVESetCAS(idx uint64, sv *structs.SVApplyStateRequest) *structs.SVApplyStateResponse { tx := s.db.WriteTxn(idx) defer tx.Abort()