Merge pull request #3892 from hashicorp/f-tunnel

Client RPC Endpoints, Server Routing and Streaming RPCs
This commit is contained in:
Alex Dadgar
2018-02-20 16:35:42 -08:00
committed by GitHub
104 changed files with 13003 additions and 3941 deletions

View File

@@ -4,14 +4,20 @@ import (
"bytes"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
func TestHTTP_AgentSelf(t *testing.T) {
@@ -44,7 +50,7 @@ func TestHTTP_AgentSelf(t *testing.T) {
t.Fatalf("bad: %#v", self)
}
// Assign a Vault token and assert it is redacted.
// Assign a Vault token and require it is redacted.
s.Config.Vault.Token = "badc0deb-adc0-deba-dc0d-ebadc0debadc"
respW = httptest.NewRecorder()
obj, err = s.Server.AgentSelfRequest(respW, req)
@@ -60,21 +66,21 @@ func TestHTTP_AgentSelf(t *testing.T) {
func TestHTTP_AgentSelf_ACL(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
httpACLTest(t, nil, func(s *TestAgent) {
state := s.Agent.server.State()
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/agent/self", nil)
assert.Nil(err)
require.Nil(err)
// Try request without a token and expect failure
{
respW := httptest.NewRecorder()
_, err := s.Server.AgentSelfRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with an invalid token and expect failure
@@ -83,8 +89,8 @@ func TestHTTP_AgentSelf_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyWrite))
setToken(req, token)
_, err := s.Server.AgentSelfRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with a valid token
@@ -93,11 +99,11 @@ func TestHTTP_AgentSelf_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1007, "valid", mock.AgentPolicy(acl.PolicyWrite))
setToken(req, token)
obj, err := s.Server.AgentSelfRequest(respW, req)
assert.Nil(err)
require.Nil(err)
self := obj.(agentSelf)
assert.NotNil(self.Config)
assert.NotNil(self.Stats)
require.NotNil(self.Config)
require.NotNil(self.Stats)
}
// Try request with a root token
@@ -105,18 +111,17 @@ func TestHTTP_AgentSelf_ACL(t *testing.T) {
respW := httptest.NewRecorder()
setToken(req, s.RootToken)
obj, err := s.Server.AgentSelfRequest(respW, req)
assert.Nil(err)
require.Nil(err)
self := obj.(agentSelf)
assert.NotNil(self.Config)
assert.NotNil(self.Stats)
require.NotNil(self.Config)
require.NotNil(self.Stats)
}
})
}
func TestHTTP_AgentJoin(t *testing.T) {
// TODO(alexdadgar)
// t.Parallel()
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
// Determine the join address
member := s.Agent.Server().LocalMember()
@@ -173,21 +178,21 @@ func TestHTTP_AgentMembers(t *testing.T) {
func TestHTTP_AgentMembers_ACL(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
httpACLTest(t, nil, func(s *TestAgent) {
state := s.Agent.server.State()
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/agent/members", nil)
assert.Nil(err)
require.Nil(err)
// Try request without a token and expect failure
{
respW := httptest.NewRecorder()
_, err := s.Server.AgentMembersRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with an invalid token and expect failure
@@ -196,8 +201,8 @@ func TestHTTP_AgentMembers_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.AgentPolicy(acl.PolicyWrite))
setToken(req, token)
_, err := s.Server.AgentMembersRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with a valid token
@@ -206,10 +211,10 @@ func TestHTTP_AgentMembers_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1007, "valid", mock.NodePolicy(acl.PolicyRead))
setToken(req, token)
obj, err := s.Server.AgentMembersRequest(respW, req)
assert.Nil(err)
require.Nil(err)
members := obj.(structs.ServerMembersResponse)
assert.Len(members.Members, 1)
require.Len(members.Members, 1)
}
// Try request with a root token
@@ -217,10 +222,10 @@ func TestHTTP_AgentMembers_ACL(t *testing.T) {
respW := httptest.NewRecorder()
setToken(req, s.RootToken)
obj, err := s.Server.AgentMembersRequest(respW, req)
assert.Nil(err)
require.Nil(err)
members := obj.(structs.ServerMembersResponse)
assert.Len(members.Members, 1)
require.Len(members.Members, 1)
}
})
}
@@ -245,21 +250,21 @@ func TestHTTP_AgentForceLeave(t *testing.T) {
func TestHTTP_AgentForceLeave_ACL(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
httpACLTest(t, nil, func(s *TestAgent) {
state := s.Agent.server.State()
// Make the HTTP request
req, err := http.NewRequest("PUT", "/v1/agent/force-leave?node=foo", nil)
assert.Nil(err)
require.Nil(err)
// Try request without a token and expect failure
{
respW := httptest.NewRecorder()
_, err := s.Server.AgentForceLeaveRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with an invalid token and expect failure
@@ -268,8 +273,8 @@ func TestHTTP_AgentForceLeave_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyRead))
setToken(req, token)
_, err := s.Server.AgentForceLeaveRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with a valid token
@@ -278,8 +283,8 @@ func TestHTTP_AgentForceLeave_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1007, "valid", mock.AgentPolicy(acl.PolicyWrite))
setToken(req, token)
_, err := s.Server.AgentForceLeaveRequest(respW, req)
assert.Nil(err)
assert.Equal(http.StatusOK, respW.Code)
require.Nil(err)
require.Equal(http.StatusOK, respW.Code)
}
// Try request with a root token
@@ -287,71 +292,113 @@ func TestHTTP_AgentForceLeave_ACL(t *testing.T) {
respW := httptest.NewRecorder()
setToken(req, s.RootToken)
_, err := s.Server.AgentForceLeaveRequest(respW, req)
assert.Nil(err)
assert.Equal(http.StatusOK, respW.Code)
require.Nil(err)
require.Equal(http.StatusOK, respW.Code)
}
})
}
func TestHTTP_AgentSetServers(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
addr := s.Config.AdvertiseAddrs.RPC
testutil.WaitForResult(func() (bool, error) {
conn, err := net.DialTimeout("tcp", addr, 100*time.Millisecond)
if err != nil {
return false, err
}
defer conn.Close()
// Write the Consul RPC byte to set the mode
if _, err := conn.Write([]byte{byte(pool.RpcNomad)}); err != nil {
return false, err
}
codec := pool.NewClientCodec(conn)
args := &structs.GenericRequest{}
var leader string
err = msgpackrpc.CallWithCodec(codec, "Status.Leader", args, &leader)
return leader != "", err
}, func(err error) {
t.Fatalf("failed to find leader: %v", err)
})
// Create the request
req, err := http.NewRequest("PUT", "/v1/agent/servers", nil)
assert.Nil(err)
require.Nil(err)
// Send the request
respW := httptest.NewRecorder()
_, err = s.Server.AgentServersRequest(respW, req)
assert.NotNil(err)
assert.Contains(err.Error(), "missing server address")
require.NotNil(err)
require.Contains(err.Error(), "missing server address")
// Create a valid request
req, err = http.NewRequest("PUT", "/v1/agent/servers?address=127.0.0.1%3A4647&address=127.0.0.2%3A4647&address=127.0.0.3%3A4647", nil)
assert.Nil(err)
require.Nil(err)
// Send the request
// Send the request which should fail
respW = httptest.NewRecorder()
_, err = s.Server.AgentServersRequest(respW, req)
assert.Nil(err)
require.NotNil(err)
// Retrieve the servers again
req, err = http.NewRequest("GET", "/v1/agent/servers", nil)
assert.Nil(err)
require.Nil(err)
respW = httptest.NewRecorder()
// Make the request and check the result
expected := []string{
"127.0.0.1:4647",
"127.0.0.2:4647",
"127.0.0.3:4647",
s.GetConfig().AdvertiseAddrs.RPC,
}
out, err := s.Server.AgentServersRequest(respW, req)
assert.Nil(err)
require.Nil(err)
servers := out.([]string)
assert.Len(servers, len(expected))
assert.Equal(expected, servers)
require.Len(servers, len(expected))
require.Equal(expected, servers)
})
}
func TestHTTP_AgentSetServers_ACL(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
httpACLTest(t, nil, func(s *TestAgent) {
state := s.Agent.server.State()
addr := s.Config.AdvertiseAddrs.RPC
testutil.WaitForResult(func() (bool, error) {
conn, err := net.DialTimeout("tcp", addr, 100*time.Millisecond)
if err != nil {
return false, err
}
defer conn.Close()
// Write the Consul RPC byte to set the mode
if _, err := conn.Write([]byte{byte(pool.RpcNomad)}); err != nil {
return false, err
}
codec := pool.NewClientCodec(conn)
args := &structs.GenericRequest{}
var leader string
err = msgpackrpc.CallWithCodec(codec, "Status.Leader", args, &leader)
return leader != "", err
}, func(err error) {
t.Fatalf("failed to find leader: %v", err)
})
// Make the HTTP request
req, err := http.NewRequest("PUT", "/v1/agent/servers?address=127.0.0.1%3A4647&address=127.0.0.2%3A4647&address=127.0.0.3%3A4647", nil)
assert.Nil(err)
path := fmt.Sprintf("/v1/agent/servers?address=%s", url.QueryEscape(s.GetConfig().AdvertiseAddrs.RPC))
req, err := http.NewRequest("PUT", path, nil)
require.Nil(err)
// Try request without a token and expect failure
{
respW := httptest.NewRecorder()
_, err := s.Server.AgentServersRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with an invalid token and expect failure
@@ -360,8 +407,8 @@ func TestHTTP_AgentSetServers_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyRead))
setToken(req, token)
_, err := s.Server.AgentServersRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with a valid token
@@ -370,8 +417,8 @@ func TestHTTP_AgentSetServers_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1007, "valid", mock.AgentPolicy(acl.PolicyWrite))
setToken(req, token)
_, err := s.Server.AgentServersRequest(respW, req)
assert.Nil(err)
assert.Equal(http.StatusOK, respW.Code)
require.Nil(err)
require.Equal(http.StatusOK, respW.Code)
}
// Try request with a root token
@@ -379,47 +426,33 @@ func TestHTTP_AgentSetServers_ACL(t *testing.T) {
respW := httptest.NewRecorder()
setToken(req, s.RootToken)
_, err := s.Server.AgentServersRequest(respW, req)
assert.Nil(err)
assert.Equal(http.StatusOK, respW.Code)
require.Nil(err)
require.Equal(http.StatusOK, respW.Code)
}
})
}
func TestHTTP_AgentListServers_ACL(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
httpACLTest(t, nil, func(s *TestAgent) {
state := s.Agent.server.State()
// Set some servers
{
req, err := http.NewRequest("PUT", "/v1/agent/servers?address=127.0.0.1%3A4647&address=127.0.0.2%3A4647&address=127.0.0.3%3A4647", nil)
assert.Nil(err)
respW := httptest.NewRecorder()
setToken(req, s.RootToken)
_, err = s.Server.AgentServersRequest(respW, req)
assert.Nil(err)
assert.Equal(http.StatusOK, respW.Code)
}
// Create list request
req, err := http.NewRequest("GET", "/v1/agent/servers", nil)
assert.Nil(err)
require.Nil(err)
expected := []string{
"127.0.0.1:4647",
"127.0.0.2:4647",
"127.0.0.3:4647",
s.GetConfig().AdvertiseAddrs.RPC,
}
// Try request without a token and expect failure
{
respW := httptest.NewRecorder()
_, err := s.Server.AgentServersRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with an invalid token and expect failure
@@ -428,20 +461,27 @@ func TestHTTP_AgentListServers_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyRead))
setToken(req, token)
_, err := s.Server.AgentServersRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Wait for client to have a server
testutil.WaitForResult(func() (bool, error) {
return len(s.client.GetServers()) != 0, fmt.Errorf("no servers")
}, func(err error) {
t.Fatal(err)
})
// Try request with a valid token
{
respW := httptest.NewRecorder()
token := mock.CreatePolicyAndToken(t, state, 1007, "valid", mock.AgentPolicy(acl.PolicyRead))
setToken(req, token)
out, err := s.Server.AgentServersRequest(respW, req)
assert.Nil(err)
require.Nil(err)
servers := out.([]string)
assert.Len(servers, len(expected))
assert.Equal(expected, servers)
require.Len(servers, len(expected))
require.Equal(expected, servers)
}
// Try request with a root token
@@ -449,10 +489,10 @@ func TestHTTP_AgentListServers_ACL(t *testing.T) {
respW := httptest.NewRecorder()
setToken(req, s.RootToken)
out, err := s.Server.AgentServersRequest(respW, req)
assert.Nil(err)
require.Nil(err)
servers := out.([]string)
assert.Len(servers, len(expected))
assert.Equal(expected, servers)
require.Len(servers, len(expected))
require.Equal(expected, servers)
}
})
}
@@ -472,19 +512,15 @@ func TestHTTP_AgentListKeys(t *testing.T) {
respW := httptest.NewRecorder()
out, err := s.Server.KeyringOperationRequest(respW, req)
if err != nil {
t.Fatalf("err: %s", err)
}
require.Nil(t, err)
kresp := out.(structs.KeyringResponse)
if len(kresp.Keys) != 1 {
t.Fatalf("bad: %v", kresp)
}
require.Len(t, kresp.Keys, 1)
})
}
func TestHTTP_AgentListKeys_ACL(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
key1 := "HS5lJ+XuTlYKWaeGYyG+/A=="
@@ -497,14 +533,14 @@ func TestHTTP_AgentListKeys_ACL(t *testing.T) {
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/agent/keyring/list", nil)
assert.Nil(err)
require.Nil(err)
// Try request without a token and expect failure
{
respW := httptest.NewRecorder()
_, err := s.Server.KeyringOperationRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with an invalid token and expect failure
@@ -513,8 +549,8 @@ func TestHTTP_AgentListKeys_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.AgentPolicy(acl.PolicyRead))
setToken(req, token)
_, err := s.Server.KeyringOperationRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with a valid token
@@ -523,10 +559,10 @@ func TestHTTP_AgentListKeys_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1007, "valid", mock.AgentPolicy(acl.PolicyWrite))
setToken(req, token)
out, err := s.Server.KeyringOperationRequest(respW, req)
assert.Nil(err)
require.Nil(err)
kresp := out.(structs.KeyringResponse)
assert.Len(kresp.Keys, 1)
assert.Contains(kresp.Keys, key1)
require.Len(kresp.Keys, 1)
require.Contains(kresp.Keys, key1)
}
// Try request with a root token
@@ -534,17 +570,16 @@ func TestHTTP_AgentListKeys_ACL(t *testing.T) {
respW := httptest.NewRecorder()
setToken(req, s.RootToken)
out, err := s.Server.KeyringOperationRequest(respW, req)
assert.Nil(err)
require.Nil(err)
kresp := out.(structs.KeyringResponse)
assert.Len(kresp.Keys, 1)
assert.Contains(kresp.Keys, key1)
require.Len(kresp.Keys, 1)
require.Contains(kresp.Keys, key1)
}
})
}
func TestHTTP_AgentInstallKey(t *testing.T) {
// TODO(alexdadgar)
// t.Parallel()
t.Parallel()
key1 := "HS5lJ+XuTlYKWaeGYyG+/A=="
key2 := "wH1Bn9hlJ0emgWB1JttVRA=="
@@ -584,8 +619,7 @@ func TestHTTP_AgentInstallKey(t *testing.T) {
}
func TestHTTP_AgentRemoveKey(t *testing.T) {
// TODO(alexdadgar)
// t.Parallel()
t.Parallel()
key1 := "HS5lJ+XuTlYKWaeGYyG+/A=="
key2 := "wH1Bn9hlJ0emgWB1JttVRA=="
@@ -635,87 +669,87 @@ func TestHTTP_AgentRemoveKey(t *testing.T) {
func TestHTTP_AgentHealth_Ok(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
// Enable ACLs to ensure they're not enforced
httpACLTest(t, nil, func(s *TestAgent) {
// No ?type=
{
req, err := http.NewRequest("GET", "/v1/agent/health", nil)
assert.Nil(err)
require.Nil(err)
respW := httptest.NewRecorder()
healthI, err := s.Server.HealthRequest(respW, req)
assert.Nil(err)
assert.Equal(http.StatusOK, respW.Code)
assert.NotNil(healthI)
require.Nil(err)
require.Equal(http.StatusOK, respW.Code)
require.NotNil(healthI)
health := healthI.(*healthResponse)
assert.NotNil(health.Client)
assert.True(health.Client.Ok)
assert.Equal("ok", health.Client.Message)
assert.NotNil(health.Server)
assert.True(health.Server.Ok)
assert.Equal("ok", health.Server.Message)
require.NotNil(health.Client)
require.True(health.Client.Ok)
require.Equal("ok", health.Client.Message)
require.NotNil(health.Server)
require.True(health.Server.Ok)
require.Equal("ok", health.Server.Message)
}
// type=client
{
req, err := http.NewRequest("GET", "/v1/agent/health?type=client", nil)
assert.Nil(err)
require.Nil(err)
respW := httptest.NewRecorder()
healthI, err := s.Server.HealthRequest(respW, req)
assert.Nil(err)
assert.Equal(http.StatusOK, respW.Code)
assert.NotNil(healthI)
require.Nil(err)
require.Equal(http.StatusOK, respW.Code)
require.NotNil(healthI)
health := healthI.(*healthResponse)
assert.NotNil(health.Client)
assert.True(health.Client.Ok)
assert.Equal("ok", health.Client.Message)
assert.Nil(health.Server)
require.NotNil(health.Client)
require.True(health.Client.Ok)
require.Equal("ok", health.Client.Message)
require.Nil(health.Server)
}
// type=server
{
req, err := http.NewRequest("GET", "/v1/agent/health?type=server", nil)
assert.Nil(err)
require.Nil(err)
respW := httptest.NewRecorder()
healthI, err := s.Server.HealthRequest(respW, req)
assert.Nil(err)
assert.Equal(http.StatusOK, respW.Code)
assert.NotNil(healthI)
require.Nil(err)
require.Equal(http.StatusOK, respW.Code)
require.NotNil(healthI)
health := healthI.(*healthResponse)
assert.NotNil(health.Server)
assert.True(health.Server.Ok)
assert.Equal("ok", health.Server.Message)
assert.Nil(health.Client)
require.NotNil(health.Server)
require.True(health.Server.Ok)
require.Equal("ok", health.Server.Message)
require.Nil(health.Client)
}
// type=client&type=server
{
req, err := http.NewRequest("GET", "/v1/agent/health?type=client&type=server", nil)
assert.Nil(err)
require.Nil(err)
respW := httptest.NewRecorder()
healthI, err := s.Server.HealthRequest(respW, req)
assert.Nil(err)
assert.Equal(http.StatusOK, respW.Code)
assert.NotNil(healthI)
require.Nil(err)
require.Equal(http.StatusOK, respW.Code)
require.NotNil(healthI)
health := healthI.(*healthResponse)
assert.NotNil(health.Client)
assert.True(health.Client.Ok)
assert.Equal("ok", health.Client.Message)
assert.NotNil(health.Server)
assert.True(health.Server.Ok)
assert.Equal("ok", health.Server.Message)
require.NotNil(health.Client)
require.True(health.Client.Ok)
require.Equal("ok", health.Client.Message)
require.NotNil(health.Server)
require.True(health.Server.Ok)
require.Equal("ok", health.Server.Message)
}
})
}
func TestHTTP_AgentHealth_BadServer(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
// Enable ACLs to ensure they're not enforced
httpACLTest(t, nil, func(s *TestAgent) {
@@ -726,39 +760,39 @@ func TestHTTP_AgentHealth_BadServer(t *testing.T) {
// No ?type= means server is just skipped
{
req, err := http.NewRequest("GET", "/v1/agent/health", nil)
assert.Nil(err)
require.Nil(err)
respW := httptest.NewRecorder()
healthI, err := s.Server.HealthRequest(respW, req)
assert.Nil(err)
assert.Equal(http.StatusOK, respW.Code)
assert.NotNil(healthI)
require.Nil(err)
require.Equal(http.StatusOK, respW.Code)
require.NotNil(healthI)
health := healthI.(*healthResponse)
assert.NotNil(health.Client)
assert.True(health.Client.Ok)
assert.Equal("ok", health.Client.Message)
assert.Nil(health.Server)
require.NotNil(health.Client)
require.True(health.Client.Ok)
require.Equal("ok", health.Client.Message)
require.Nil(health.Server)
}
// type=server means server is considered unhealthy
{
req, err := http.NewRequest("GET", "/v1/agent/health?type=server", nil)
assert.Nil(err)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.HealthRequest(respW, req)
assert.NotNil(err)
require.NotNil(err)
httpErr, ok := err.(HTTPCodedError)
assert.True(ok)
assert.Equal(500, httpErr.Code())
assert.Equal(`{"server":{"ok":false,"message":"server not enabled"}}`, err.Error())
require.True(ok)
require.Equal(500, httpErr.Code())
require.Equal(`{"server":{"ok":false,"message":"server not enabled"}}`, err.Error())
}
})
}
func TestHTTP_AgentHealth_BadClient(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
// Enable ACLs to ensure they're not enforced
httpACLTest(t, nil, func(s *TestAgent) {
@@ -769,32 +803,32 @@ func TestHTTP_AgentHealth_BadClient(t *testing.T) {
// No ?type= means client is just skipped
{
req, err := http.NewRequest("GET", "/v1/agent/health", nil)
assert.Nil(err)
require.Nil(err)
respW := httptest.NewRecorder()
healthI, err := s.Server.HealthRequest(respW, req)
assert.Nil(err)
assert.Equal(http.StatusOK, respW.Code)
assert.NotNil(healthI)
require.Nil(err)
require.Equal(http.StatusOK, respW.Code)
require.NotNil(healthI)
health := healthI.(*healthResponse)
assert.NotNil(health.Server)
assert.True(health.Server.Ok)
assert.Equal("ok", health.Server.Message)
assert.Nil(health.Client)
require.NotNil(health.Server)
require.True(health.Server.Ok)
require.Equal("ok", health.Server.Message)
require.Nil(health.Client)
}
// type=client means client is considered unhealthy
{
req, err := http.NewRequest("GET", "/v1/agent/health?type=client", nil)
assert.Nil(err)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.HealthRequest(respW, req)
assert.NotNil(err)
require.NotNil(err)
httpErr, ok := err.(HTTPCodedError)
assert.True(ok)
assert.Equal(500, httpErr.Code())
assert.Equal(`{"client":{"ok":false,"message":"client not enabled"}}`, err.Error())
require.True(ok)
require.Equal(500, httpErr.Code())
require.Equal(`{"client":{"ok":false,"message":"client not enabled"}}`, err.Error())
}
})
}

View File

@@ -6,7 +6,7 @@ import (
"strings"
"github.com/golang/snappy"
"github.com/hashicorp/nomad/acl"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -79,9 +79,6 @@ func (s *HTTPServer) AllocSpecificRequest(resp http.ResponseWriter, req *http.Re
}
func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if s.agent.client == nil {
return nil, clientNotRunning
}
reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/client/allocation/")
@@ -96,6 +93,10 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ
case "stats":
return s.allocStats(allocID, resp, req)
case "snapshot":
if s.agent.client == nil {
return nil, clientNotRunning
}
return s.allocSnapshot(allocID, resp, req)
case "gc":
return s.allocGC(allocID, resp, req)
@@ -105,43 +106,70 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ
}
func (s *HTTPServer) ClientGCRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if s.agent.client == nil {
return nil, clientNotRunning
// Get the requested Node ID
requestedNode := req.URL.Query().Get("node_id")
// Build the request and parse the ACL token
args := structs.NodeSpecificRequest{
NodeID: requestedNode,
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
// Determine the handler to use
useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForNode(requestedNode)
// Make the RPC
var reply structs.GenericResponse
var rpcErr error
if useLocalClient {
rpcErr = s.agent.Client().ClientRPC("Allocations.GarbageCollectAll", &args, &reply)
} else if useClientRPC {
rpcErr = s.agent.Client().RPC("ClientAllocations.GarbageCollectAll", &args, &reply)
} else if useServerRPC {
rpcErr = s.agent.Server().RPC("ClientAllocations.GarbageCollectAll", &args, &reply)
} else {
rpcErr = CodedError(400, "No local Node and node_id not provided")
}
var secret string
s.parseToken(req, &secret)
// Check node write permissions
if aclObj, err := s.agent.Client().ResolveToken(secret); err != nil {
return nil, err
} else if aclObj != nil && !aclObj.AllowNodeWrite() {
return nil, structs.ErrPermissionDenied
if rpcErr != nil {
if structs.IsErrNoNodeConn(rpcErr) {
rpcErr = CodedError(404, rpcErr.Error())
}
}
s.agent.Client().CollectAllAllocs()
return nil, nil
return nil, rpcErr
}
func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var secret string
s.parseToken(req, &secret)
// Build the request and parse the ACL token
args := structs.AllocSpecificRequest{
AllocID: allocID,
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
var namespace string
parseNamespace(req, &namespace)
// Determine the handler to use
useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForAlloc(allocID)
// Check namespace submit-job permissions
if aclObj, err := s.agent.Client().ResolveToken(secret); err != nil {
return nil, err
} else if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilitySubmitJob) {
return nil, structs.ErrPermissionDenied
// Make the RPC
var reply structs.GenericResponse
var rpcErr error
if useLocalClient {
rpcErr = s.agent.Client().ClientRPC("Allocations.GarbageCollect", &args, &reply)
} else if useClientRPC {
rpcErr = s.agent.Client().RPC("ClientAllocations.GarbageCollect", &args, &reply)
} else if useServerRPC {
rpcErr = s.agent.Server().RPC("ClientAllocations.GarbageCollect", &args, &reply)
} else {
rpcErr = CodedError(400, "No local Node and node_id not provided")
}
if !s.agent.Client().CollectAllocation(allocID) {
// Could not find alloc
return nil, fmt.Errorf("unable to collect allocation: not present")
if rpcErr != nil {
if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) {
rpcErr = CodedError(404, rpcErr.Error())
}
}
return nil, nil
return nil, rpcErr
}
func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
@@ -162,25 +190,36 @@ func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req
}
func (s *HTTPServer) allocStats(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var secret string
s.parseToken(req, &secret)
var namespace string
parseNamespace(req, &namespace)
// Check namespace read-job permissions
if aclObj, err := s.agent.Client().ResolveToken(secret); err != nil {
return nil, err
} else if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob) {
return nil, structs.ErrPermissionDenied
}
clientStats := s.agent.client.StatsReporter()
aStats, err := clientStats.GetAllocStats(allocID)
if err != nil {
return nil, err
}
// Build the request and parse the ACL token
task := req.URL.Query().Get("task")
return aStats.LatestAllocStats(task)
args := cstructs.AllocStatsRequest{
AllocID: allocID,
Task: task,
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
// Determine the handler to use
useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForAlloc(allocID)
// Make the RPC
var reply cstructs.AllocStatsResponse
var rpcErr error
if useLocalClient {
rpcErr = s.agent.Client().ClientRPC("Allocations.Stats", &args, &reply)
} else if useClientRPC {
rpcErr = s.agent.Client().RPC("ClientAllocations.Stats", &args, &reply)
} else if useServerRPC {
rpcErr = s.agent.Server().RPC("ClientAllocations.Stats", &args, &reply)
} else {
rpcErr = CodedError(400, "No local Node and node_id not provided")
}
if rpcErr != nil {
if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) {
rpcErr = CodedError(404, rpcErr.Error())
}
}
return reply.Stats, rpcErr
}

View File

@@ -15,11 +15,11 @@ import (
"github.com/golang/snappy"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestHTTP_AllocsList(t *testing.T) {
@@ -78,9 +78,9 @@ func TestHTTP_AllocsList(t *testing.T) {
}
expectedMsg := "Task's sibling failed"
displayMsg1 := allocs[0].TaskStates["test"].Events[0].DisplayMessage
assert.Equal(t, expectedMsg, displayMsg1, "DisplayMessage should be set")
require.Equal(t, expectedMsg, displayMsg1, "DisplayMessage should be set")
displayMsg2 := allocs[0].TaskStates["test"].Events[0].DisplayMessage
assert.Equal(t, expectedMsg, displayMsg2, "DisplayMessage should be set")
require.Equal(t, expectedMsg, displayMsg2, "DisplayMessage should be set")
})
}
@@ -151,7 +151,7 @@ func TestHTTP_AllocsPrefixList(t *testing.T) {
}
expectedMsg := "Task's sibling failed"
displayMsg1 := n[0].TaskStates["test"].Events[0].DisplayMessage
assert.Equal(t, expectedMsg, displayMsg1, "DisplayMessage should be set")
require.Equal(t, expectedMsg, displayMsg1, "DisplayMessage should be set")
})
}
@@ -262,31 +262,77 @@ func TestHTTP_AllocQuery_Payload(t *testing.T) {
func TestHTTP_AllocStats(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/client/allocation/123/foo", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()
require := require.New(t)
// Make the request
_, err = s.Server.ClientAllocRequest(respW, req)
if !strings.Contains(err.Error(), resourceNotFoundErr) {
t.Fatalf("err: %v", err)
httpTest(t, nil, func(s *TestAgent) {
// Local node, local resp
{
// Make the HTTP request
req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/allocation/%s/stats", uuid.Generate()), nil)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()
// Make the request
_, err = s.Server.ClientAllocRequest(respW, req)
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
}
// Local node, server resp
{
srv := s.server
s.server = nil
req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/allocation/%s/stats", uuid.Generate()), nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.ClientAllocRequest(respW, req)
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
s.server = srv
}
// no client, server resp
{
c := s.client
s.client = nil
testutil.WaitForResult(func() (bool, error) {
n, err := s.server.State().NodeByID(nil, c.NodeID())
if err != nil {
return false, err
}
return n != nil, nil
}, func(err error) {
t.Fatalf("should have client: %v", err)
})
req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/allocation/%s/stats", uuid.Generate()), nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.ClientAllocRequest(respW, req)
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
s.client = c
}
})
}
func TestHTTP_AllocStats_ACL(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
httpACLTest(t, nil, func(s *TestAgent) {
state := s.Agent.server.State()
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/client/allocation/123/stats", nil)
req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/allocation/%s/stats", uuid.Generate()), nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -295,8 +341,8 @@ func TestHTTP_AllocStats_ACL(t *testing.T) {
{
respW := httptest.NewRecorder()
_, err := s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with an invalid token and expect failure
@@ -305,8 +351,8 @@ func TestHTTP_AllocStats_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyWrite))
setToken(req, token)
_, err := s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with a valid token
@@ -317,8 +363,8 @@ func TestHTTP_AllocStats_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1007, "valid", policy)
setToken(req, token)
_, err := s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.Contains(err.Error(), "unknown allocation ID")
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
}
// Try request with a management token
@@ -327,8 +373,8 @@ func TestHTTP_AllocStats_ACL(t *testing.T) {
respW := httptest.NewRecorder()
setToken(req, s.RootToken)
_, err := s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.Contains(err.Error(), "unknown allocation ID")
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
}
})
}
@@ -353,35 +399,35 @@ func TestHTTP_AllocSnapshot(t *testing.T) {
func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
httpACLTest(t, nil, func(s *TestAgent) {
// Request without a token fails
req, err := http.NewRequest("GET", "/v1/client/allocation/123/snapshot", nil)
assert.Nil(err)
require.Nil(err)
// Make the unauthorized request
respW := httptest.NewRecorder()
_, err = s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.EqualError(err, structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.EqualError(err, structs.ErrPermissionDenied.Error())
// Create an allocation
alloc := mock.Alloc()
validMigrateToken, err := nomad.GenerateMigrateToken(alloc.ID, s.Agent.Client().Node().SecretID)
assert.Nil(err)
validMigrateToken, err := structs.GenerateMigrateToken(alloc.ID, s.Agent.Client().Node().SecretID)
require.Nil(err)
// Request with a token succeeds
url := fmt.Sprintf("/v1/client/allocation/%s/snapshot", alloc.ID)
req, err = http.NewRequest("GET", url, nil)
assert.Nil(err)
require.Nil(err)
req.Header.Set("X-Nomad-Token", validMigrateToken)
// Make the unauthorized request
respW = httptest.NewRecorder()
_, err = s.Server.ClientAllocRequest(respW, req)
assert.NotContains(err.Error(), structs.ErrPermissionDenied.Error())
require.NotContains(err.Error(), structs.ErrPermissionDenied.Error())
})
}
@@ -427,7 +473,7 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) {
// Remove the task dir to break Snapshot
os.RemoveAll(allocDir.TaskDirs["web"].LocalDir)
// Assert Snapshot fails
// require Snapshot fails
if err := allocDir.Snapshot(ioutil.Discard); err != nil {
s.logger.Printf("[DEBUG] agent.test: snapshot returned error: %v", err)
} else {
@@ -493,31 +539,84 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) {
func TestHTTP_AllocGC(t *testing.T) {
t.Parallel()
require := require.New(t)
path := fmt.Sprintf("/v1/client/allocation/%s/gc", uuid.Generate())
httpTest(t, nil, func(s *TestAgent) {
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/client/allocation/123/gc", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()
// Local node, local resp
{
req, err := http.NewRequest("GET", path, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
// Make the request
_, err = s.Server.ClientAllocRequest(respW, req)
if !strings.Contains(err.Error(), "unable to collect allocation") {
t.Fatalf("err: %v", err)
respW := httptest.NewRecorder()
_, err = s.Server.ClientAllocRequest(respW, req)
if !structs.IsErrUnknownAllocation(err) {
t.Fatalf("unexpected err: %v", err)
}
}
// Local node, server resp
{
srv := s.server
s.server = nil
req, err := http.NewRequest("GET", path, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()
_, err = s.Server.ClientAllocRequest(respW, req)
if !structs.IsErrUnknownAllocation(err) {
t.Fatalf("unexpected err: %v", err)
}
s.server = srv
}
// no client, server resp
{
c := s.client
s.client = nil
testutil.WaitForResult(func() (bool, error) {
n, err := s.server.State().NodeByID(nil, c.NodeID())
if err != nil {
return false, err
}
return n != nil, nil
}, func(err error) {
t.Fatalf("should have client: %v", err)
})
req, err := http.NewRequest("GET", path, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()
_, err = s.Server.ClientAllocRequest(respW, req)
require.NotNil(err)
if !structs.IsErrUnknownAllocation(err) {
t.Fatalf("unexpected err: %v", err)
}
s.client = c
}
})
}
func TestHTTP_AllocGC_ACL(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
path := fmt.Sprintf("/v1/client/allocation/%s/gc", uuid.Generate())
httpACLTest(t, nil, func(s *TestAgent) {
state := s.Agent.server.State()
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/client/allocation/123/gc", nil)
req, err := http.NewRequest("GET", path, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -526,8 +625,8 @@ func TestHTTP_AllocGC_ACL(t *testing.T) {
{
respW := httptest.NewRecorder()
_, err := s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with an invalid token and expect failure
@@ -536,8 +635,8 @@ func TestHTTP_AllocGC_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyWrite))
setToken(req, token)
_, err := s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with a valid token
@@ -548,8 +647,8 @@ func TestHTTP_AllocGC_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1007, "valid", policy)
setToken(req, token)
_, err := s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.Contains(err.Error(), "not present")
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
}
// Try request with a management token
@@ -558,26 +657,72 @@ func TestHTTP_AllocGC_ACL(t *testing.T) {
respW := httptest.NewRecorder()
setToken(req, s.RootToken)
_, err := s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.Contains(err.Error(), "not present")
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
}
})
}
func TestHTTP_AllocAllGC(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/client/gc", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()
// Local node, local resp
{
req, err := http.NewRequest("GET", "/v1/client/gc", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
// Make the request
_, err = s.Server.ClientGCRequest(respW, req)
if err != nil {
t.Fatalf("err: %v", err)
respW := httptest.NewRecorder()
_, err = s.Server.ClientGCRequest(respW, req)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
}
// Local node, server resp
{
srv := s.server
s.server = nil
req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/gc?node_id=%s", uuid.Generate()), nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.ClientGCRequest(respW, req)
require.NotNil(err)
require.Contains(err.Error(), "Unknown node")
s.server = srv
}
// no client, server resp
{
c := s.client
s.client = nil
testutil.WaitForResult(func() (bool, error) {
n, err := s.server.State().NodeByID(nil, c.NodeID())
if err != nil {
return false, err
}
return n != nil, nil
}, func(err error) {
t.Fatalf("should have client: %v", err)
})
req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/gc?node_id=%s", c.NodeID()), nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.ClientGCRequest(respW, req)
require.NotNil(err)
// The dev agent uses in-mem RPC so just assert the no route error
require.Contains(err.Error(), structs.ErrNoNodeConn.Error())
s.client = c
}
})
@@ -585,20 +730,20 @@ func TestHTTP_AllocAllGC(t *testing.T) {
func TestHTTP_AllocAllGC_ACL(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
httpACLTest(t, nil, func(s *TestAgent) {
state := s.Agent.server.State()
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/client/gc", nil)
assert.Nil(err)
require.Nil(err)
// Try request without a token and expect failure
{
respW := httptest.NewRecorder()
_, err := s.Server.ClientGCRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with an invalid token and expect failure
@@ -607,8 +752,8 @@ func TestHTTP_AllocAllGC_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyRead))
setToken(req, token)
_, err := s.Server.ClientGCRequest(respW, req)
assert.NotNil(err)
assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try request with a valid token
@@ -617,8 +762,8 @@ func TestHTTP_AllocAllGC_ACL(t *testing.T) {
token := mock.CreatePolicyAndToken(t, state, 1007, "valid", mock.NodePolicy(acl.PolicyWrite))
setToken(req, token)
_, err := s.Server.ClientGCRequest(respW, req)
assert.Nil(err)
assert.Equal(http.StatusOK, respW.Code)
require.Nil(err)
require.Equal(http.StatusOK, respW.Code)
}
// Try request with a management token
@@ -626,9 +771,8 @@ func TestHTTP_AllocAllGC_ACL(t *testing.T) {
respW := httptest.NewRecorder()
setToken(req, s.RootToken)
_, err := s.Server.ClientGCRequest(respW, req)
assert.Nil(err)
assert.Equal(http.StatusOK, respW.Code)
require.Nil(err)
require.Equal(http.StatusOK, respW.Code)
}
})
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

52
command/agent/helpers.go Normal file
View File

@@ -0,0 +1,52 @@
package agent
// rpcHandlerForAlloc is a helper that given an allocation ID returns whether to
// use the local clients RPC, the local clients remote RPC or the server on the
// agent.
func (s *HTTPServer) rpcHandlerForAlloc(allocID string) (localClient, remoteClient, server bool) {
c := s.agent.Client()
srv := s.agent.Server()
// See if the local client can handle the request.
localAlloc := false
if c != nil {
// If there is an error it means that the client doesn't have the
// allocation so we can't use the local client
_, err := c.GetClientAlloc(allocID)
if err == nil {
localAlloc = true
}
}
// Only use the client RPC to server if we don't have a server and the local
// client can't handle the call.
useClientRPC := c != nil && !localAlloc && srv == nil
// Use the server as a last case.
useServerRPC := !localAlloc && !useClientRPC && srv != nil
return localAlloc, useClientRPC, useServerRPC
}
// rpcHandlerForNode is a helper that given a node ID returns whether to
// use the local clients RPC, the local clients remote RPC or the server on the
// agent. If there is a local node and no node id is given, it is assumed the
// local node is being targed.
func (s *HTTPServer) rpcHandlerForNode(nodeID string) (localClient, remoteClient, server bool) {
c := s.agent.Client()
srv := s.agent.Server()
// See if the local client can handle the request.
localClient = c != nil && // Must have a client
(nodeID == "" || // If no node ID is given
nodeID == c.NodeID()) // Requested node is the local node.
// Only use the client RPC to server if we don't have a server and the local
// client can't handle the call.
useClientRPC := c != nil && !localClient && srv == nil
// Use the server as a last case.
useServerRPC := !localClient && !useClientRPC && srv != nil && nodeID != ""
return localClient, useClientRPC, useServerRPC
}

View File

@@ -0,0 +1,92 @@
package agent
import (
"testing"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/stretchr/testify/require"
)
func TestHTTP_rpcHandlerForAlloc(t *testing.T) {
t.Parallel()
require := require.New(t)
agent := NewTestAgent(t, t.Name(), nil)
a := mockFSAlloc(agent.client.NodeID(), nil)
addAllocToClient(agent, a, terminalClientAlloc)
// Case 1: Client has allocation
// Outcome: Use local client
lc, rc, s := agent.Server.rpcHandlerForAlloc(a.ID)
require.True(lc)
require.False(rc)
require.False(s)
// Case 2: Client doesn't have allocation and there is a server
// Outcome: Use server
lc, rc, s = agent.Server.rpcHandlerForAlloc(uuid.Generate())
require.False(lc)
require.False(rc)
require.True(s)
// Case 3: Client doesn't have allocation and there is no server
// Outcome: Use client RPC to server
srv := agent.server
agent.server = nil
lc, rc, s = agent.Server.rpcHandlerForAlloc(uuid.Generate())
require.False(lc)
require.True(rc)
require.False(s)
agent.server = srv
// Case 4: No client
// Outcome: Use server
client := agent.client
agent.client = nil
lc, rc, s = agent.Server.rpcHandlerForAlloc(uuid.Generate())
require.False(lc)
require.False(rc)
require.True(s)
agent.client = client
}
func TestHTTP_rpcHandlerForNode(t *testing.T) {
t.Parallel()
require := require.New(t)
agent := NewTestAgent(t, t.Name(), nil)
cID := agent.client.NodeID()
// Case 1: Node running, no node ID given
// Outcome: Use local node
lc, rc, s := agent.Server.rpcHandlerForNode("")
require.True(lc)
require.False(rc)
require.False(s)
// Case 2: Node running, it's ID given
// Outcome: Use local node
lc, rc, s = agent.Server.rpcHandlerForNode(cID)
require.True(lc)
require.False(rc)
require.False(s)
// Case 3: Local node but wrong ID and there is no server
// Outcome: Use client RPC to server
srv := agent.server
agent.server = nil
lc, rc, s = agent.Server.rpcHandlerForNode(uuid.Generate())
require.False(lc)
require.True(rc)
require.False(s)
agent.server = srv
// Case 4: No client
// Outcome: Use server
client := agent.client
agent.client = nil
lc, rc, s = agent.Server.rpcHandlerForNode(uuid.Generate())
require.False(lc)
require.False(rc)
require.True(s)
agent.client = client
}

View File

@@ -2,25 +2,47 @@ package agent
import (
"net/http"
"strings"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
func (s *HTTPServer) ClientStatsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if s.agent.client == nil {
return nil, clientNotRunning
// Get the requested Node ID
requestedNode := req.URL.Query().Get("node_id")
// Build the request and parse the ACL token
args := structs.NodeSpecificRequest{
NodeID: requestedNode,
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
// Determine the handler to use
useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForNode(requestedNode)
// Make the RPC
var reply cstructs.ClientStatsResponse
var rpcErr error
if useLocalClient {
rpcErr = s.agent.Client().ClientRPC("ClientStats.Stats", &args, &reply)
} else if useClientRPC {
rpcErr = s.agent.Client().RPC("ClientStats.Stats", &args, &reply)
} else if useServerRPC {
rpcErr = s.agent.Server().RPC("ClientStats.Stats", &args, &reply)
} else {
rpcErr = CodedError(400, "No local Node and node_id not provided")
}
var secret string
s.parseToken(req, &secret)
if rpcErr != nil {
if structs.IsErrNoNodeConn(rpcErr) {
rpcErr = CodedError(404, rpcErr.Error())
} else if strings.Contains(rpcErr.Error(), "Unknown node") {
rpcErr = CodedError(404, rpcErr.Error())
}
// Check node read permissions
if aclObj, err := s.agent.Client().ResolveToken(secret); err != nil {
return nil, err
} else if aclObj != nil && !aclObj.AllowNodeRead() {
return nil, structs.ErrPermissionDenied
return nil, rpcErr
}
clientStats := s.agent.client.StatsReporter()
return clientStats.LatestHostStats(), nil
return reply.HostStats, nil
}

View File

@@ -1,28 +1,81 @@
package agent
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestClientStatsRequest(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/client/stats/?since=foo", nil)
if err != nil {
t.Fatalf("err: %v", err)
// Local node, local resp
{
req, err := http.NewRequest("GET", "/v1/client/stats/?since=foo", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()
_, err = s.Server.ClientStatsRequest(respW, req)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
}
respW := httptest.NewRecorder()
_, err = s.Server.ClientStatsRequest(respW, req)
if err != nil {
t.Fatalf("unexpected err: %v", err)
// Local node, server resp
{
srv := s.server
s.server = nil
req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/stats?node_id=%s", uuid.Generate()), nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.ClientStatsRequest(respW, req)
require.NotNil(err)
require.Contains(err.Error(), "Unknown node")
s.server = srv
}
// no client, server resp
{
c := s.client
s.client = nil
testutil.WaitForResult(func() (bool, error) {
n, err := s.server.State().NodeByID(nil, c.NodeID())
if err != nil {
return false, err
}
return n != nil, nil
}, func(err error) {
t.Fatalf("should have client: %v", err)
})
req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/stats?node_id=%s", c.NodeID()), nil)
require.Nil(err)
respW := httptest.NewRecorder()
_, err = s.Server.ClientStatsRequest(respW, req)
require.NotNil(err)
// The dev agent uses in-mem RPC so just assert the no route error
require.Contains(err.Error(), structs.ErrNoNodeConn.Error())
s.client = c
}
})
}

View File

@@ -19,6 +19,7 @@ import (
"github.com/hashicorp/consul/lib/freeport"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
@@ -115,7 +116,10 @@ func (a *TestAgent) Start() *TestAgent {
a.Config.NomadConfig.DataDir = d
}
for i := 10; i >= 0; i-- {
i := 10
RETRY:
for ; i >= 0; i-- {
a.pickRandomPorts(a.Config)
if a.Config.NodeName == "" {
a.Config.NodeName = fmt.Sprintf("Node %d", a.Config.Ports.RPC)
@@ -137,14 +141,14 @@ func (a *TestAgent) Start() *TestAgent {
a.Agent = agent
break
} else if i == 0 {
fmt.Println(a.Name, "Error starting agent:", err)
a.T.Logf("%s: Error starting agent: %v", a.Name, err)
runtime.Goexit()
} else {
if agent != nil {
agent.Shutdown()
}
wait := time.Duration(rand.Int31n(2000)) * time.Millisecond
fmt.Println(a.Name, "retrying in", wait)
a.T.Logf("%s: retrying in %v", a.Name, wait)
time.Sleep(wait)
}
@@ -153,12 +157,13 @@ func (a *TestAgent) Start() *TestAgent {
// the data dir, such as in the Raft configuration.
if a.DataDir != "" {
if err := os.RemoveAll(a.DataDir); err != nil {
fmt.Println(a.Name, "Error resetting data dir:", err)
a.T.Logf("%s: Error resetting data dir: %v", a.Name, err)
runtime.Goexit()
}
}
}
failed := false
if a.Config.NomadConfig.Bootstrap && a.Config.Server.Enabled {
testutil.WaitForResult(func() (bool, error) {
args := &structs.GenericRequest{}
@@ -166,7 +171,8 @@ func (a *TestAgent) Start() *TestAgent {
err := a.RPC("Status.Leader", args, &leader)
return leader != "", err
}, func(err error) {
a.T.Fatalf("failed to find leader: %v", err)
a.T.Logf("failed to find leader: %v", err)
failed = true
})
} else {
testutil.WaitForResult(func() (bool, error) {
@@ -175,9 +181,14 @@ func (a *TestAgent) Start() *TestAgent {
_, err := a.Server.AgentSelfRequest(resp, req)
return err == nil && resp.Code == 200, err
}, func(err error) {
a.T.Fatalf("failed OK response: %v", err)
a.T.Logf("failed to find leader: %v", err)
failed = true
})
}
if failed {
a.Agent.Shutdown()
goto RETRY
}
// Check if ACLs enabled. Use special value of PolicyTTL 0s
// to do a bypass of this step. This is so we can test bootstrap
@@ -194,7 +205,7 @@ func (a *TestAgent) Start() *TestAgent {
func (a *TestAgent) start() (*Agent, error) {
if a.LogOutput == nil {
a.LogOutput = os.Stderr
a.LogOutput = testlog.NewWriter(a.T)
}
inm := metrics.NewInmemSink(10*time.Second, time.Minute)
@@ -264,6 +275,15 @@ func (a *TestAgent) pickRandomPorts(c *Config) {
c.Ports.RPC = ports[1]
c.Ports.Serf = ports[2]
// Clear out the advertise addresses such that through retries we
// re-normalize the addresses correctly instead of using the values from the
// last port selection that had a port conflict.
if c.AdvertiseAddrs != nil {
c.AdvertiseAddrs.HTTP = ""
c.AdvertiseAddrs.RPC = ""
c.AdvertiseAddrs.Serf = ""
}
if err := c.normalizeAddrs(); err != nil {
a.T.Fatalf("error normalizing config: %v", err)
}

View File

@@ -33,23 +33,16 @@ func TestClientConfigCommand_UpdateServers(t *testing.T) {
}
ui.ErrorWriter.Reset()
// Set the servers list
// Set the servers list with bad addresses
code = cmd.Run([]string{"-address=" + url, "-update-servers", "127.0.0.42", "198.18.5.5"})
if code != 0 {
t.Fatalf("expected exit 0, got: %d", code)
if code != 1 {
t.Fatalf("expected exit 1, got: %d", code)
}
// Query the servers list
code = cmd.Run([]string{"-address=" + url, "-servers"})
// Set the servers list with good addresses
code = cmd.Run([]string{"-address=" + url, "-update-servers", srv.Config.AdvertiseAddrs.RPC})
if code != 0 {
t.Fatalf("expect exit 0, got: %d", code)
}
out := ui.OutputWriter.String()
if !strings.Contains(out, "127.0.0.42") {
t.Fatalf("missing 127.0.0.42")
}
if !strings.Contains(out, "198.18.5.5") {
t.Fatalf("missing 198.18.5.5")
t.Fatalf("expected exit 0, got: %d", code)
}
}