mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 01:15:43 +03:00
serverID to target remote leader or server
handle the case where we request a server-id which is this current server update docs, error on node and server id params more accurate names for tests use shared no leader err, formatting rm bad comment remove redundant variable
This commit is contained in:
@@ -205,6 +205,11 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
|
||||
PlainText: plainText,
|
||||
}
|
||||
|
||||
// if node and server were requested return error
|
||||
if args.NodeID != "" && args.ServerID != "" {
|
||||
return nil, CodedError(400, "Cannot target node and server simultaneously")
|
||||
}
|
||||
|
||||
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
|
||||
|
||||
// Make the RPC
|
||||
|
||||
@@ -60,27 +60,25 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
|
||||
}
|
||||
|
||||
// Targeting a node, forward request to node
|
||||
if args.NodeID != "" && args.NodeID != "leader" {
|
||||
if args.NodeID != "" {
|
||||
m.forwardMonitorClient(conn, args, encoder, decoder)
|
||||
// forwarded request has ended, return
|
||||
return
|
||||
}
|
||||
|
||||
if args.NodeID == "leader" {
|
||||
isLeader, remoteServer := m.srv.getLeader()
|
||||
if !isLeader && remoteServer != nil {
|
||||
m.forwardMonitorLeader(remoteServer, conn, args, encoder, decoder)
|
||||
return
|
||||
}
|
||||
if !isLeader && remoteServer == nil {
|
||||
err := fmt.Errorf("no leader")
|
||||
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
|
||||
return
|
||||
}
|
||||
currentServer := m.srv.serf.LocalMember().Name
|
||||
var forwardServer bool
|
||||
// Targeting a remote server which is not the leader and not this server
|
||||
if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer {
|
||||
forwardServer = true
|
||||
}
|
||||
|
||||
// targeting a specific server, forward to that server
|
||||
if args.ServerID != "" {
|
||||
// Targeting leader and this server is not current leader
|
||||
if args.ServerID == "leader" && !m.srv.IsLeader() {
|
||||
forwardServer = true
|
||||
}
|
||||
|
||||
if forwardServer {
|
||||
m.forwardMonitorServer(conn, args, encoder, decoder)
|
||||
return
|
||||
}
|
||||
@@ -256,62 +254,52 @@ func (m *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Agent) forwardMonitorLeader(leader *serverParts, conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
|
||||
var leaderConn net.Conn
|
||||
|
||||
localConn, err := m.srv.streamingRpc(leader, "Agent.Monitor")
|
||||
if err != nil {
|
||||
handleStreamResultError(err, nil, encoder)
|
||||
return
|
||||
}
|
||||
|
||||
leaderConn = localConn
|
||||
defer leaderConn.Close()
|
||||
|
||||
// Send the Request
|
||||
outEncoder := codec.NewEncoder(leaderConn, structs.MsgpackHandle)
|
||||
if err := outEncoder.Encode(args); err != nil {
|
||||
handleStreamResultError(err, nil, encoder)
|
||||
return
|
||||
}
|
||||
|
||||
structs.Bridge(conn, leaderConn)
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
|
||||
var target *serverParts
|
||||
serverID := args.ServerID
|
||||
|
||||
// empty ServerID to prevent forwarding loop
|
||||
args.ServerID = ""
|
||||
|
||||
serfMembers := m.srv.Members()
|
||||
var target *serverParts
|
||||
for _, mem := range serfMembers {
|
||||
if mem.Name == serverID {
|
||||
ok, srv := isNomadServer(mem)
|
||||
if !ok {
|
||||
err := fmt.Errorf("unknown nomad server %s", serverID)
|
||||
handleStreamResultError(err, nil, encoder)
|
||||
return
|
||||
if serverID == "leader" {
|
||||
isLeader, remoteServer := m.srv.getLeader()
|
||||
if !isLeader && remoteServer != nil {
|
||||
target = remoteServer
|
||||
}
|
||||
if !isLeader && remoteServer == nil {
|
||||
handleStreamResultError(structs.ErrNoLeader, helper.Int64ToPtr(400), encoder)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// See if the server ID is a known member
|
||||
serfMembers := m.srv.Members()
|
||||
for _, mem := range serfMembers {
|
||||
if mem.Name == serverID {
|
||||
if ok, srv := isNomadServer(mem); ok {
|
||||
target = srv
|
||||
}
|
||||
}
|
||||
target = srv
|
||||
}
|
||||
}
|
||||
|
||||
var serverConn net.Conn
|
||||
localConn, err := m.srv.streamingRpc(target, "Agent.Monitor")
|
||||
if err != nil {
|
||||
handleStreamResultError(err, nil, encoder)
|
||||
// Unable to find a server
|
||||
if target == nil {
|
||||
err := fmt.Errorf("unknown nomad server %s", serverID)
|
||||
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
|
||||
return
|
||||
}
|
||||
|
||||
serverConn = localConn
|
||||
serverConn, err := m.srv.streamingRpc(target, "Agent.Monitor")
|
||||
if err != nil {
|
||||
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
|
||||
return
|
||||
}
|
||||
defer serverConn.Close()
|
||||
|
||||
// Send the Request
|
||||
outEncoder := codec.NewEncoder(serverConn, structs.MsgpackHandle)
|
||||
if err := outEncoder.Encode(args); err != nil {
|
||||
handleStreamResultError(err, nil, encoder)
|
||||
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/acl"
|
||||
"github.com/hashicorp/nomad/client"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
@@ -22,7 +23,7 @@ import (
|
||||
"github.com/ugorji/go/codec"
|
||||
)
|
||||
|
||||
func TestMonitor_Monitor_Remote_Server(t *testing.T) {
|
||||
func TestMonitor_Monitor_Remote_Client(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
@@ -117,9 +118,8 @@ OUTER:
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitor_MonitorRemoteLeader(t *testing.T) {
|
||||
func TestMonitor_Monitor_RemoteServer(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
// start servers
|
||||
s1 := TestServer(t, nil)
|
||||
@@ -132,6 +132,7 @@ func TestMonitor_MonitorRemoteLeader(t *testing.T) {
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
testutil.WaitForLeader(t, s2.RPC)
|
||||
|
||||
// determine leader and nonleader
|
||||
servers := []*Server{s1, s2}
|
||||
var nonLeader *Server
|
||||
var leader *Server
|
||||
@@ -143,78 +144,127 @@ func TestMonitor_MonitorRemoteLeader(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
leader.logger.Warn("leader log")
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}()
|
||||
|
||||
// No node ID to monitor the remote server
|
||||
req := cstructs.MonitorRequest{
|
||||
LogLevel: "warn",
|
||||
NodeID: "leader",
|
||||
cases := []struct {
|
||||
desc string
|
||||
serverID string
|
||||
expectedLog string
|
||||
logger hclog.InterceptLogger
|
||||
origin *Server
|
||||
}{
|
||||
{
|
||||
desc: "remote leader",
|
||||
serverID: "leader",
|
||||
expectedLog: "leader log",
|
||||
logger: leader.logger,
|
||||
origin: nonLeader,
|
||||
},
|
||||
{
|
||||
desc: "remote server",
|
||||
serverID: nonLeader.serf.LocalMember().Name,
|
||||
expectedLog: "nonleader log",
|
||||
logger: nonLeader.logger,
|
||||
origin: leader,
|
||||
},
|
||||
{
|
||||
desc: "serverID is current leader",
|
||||
serverID: "leader",
|
||||
expectedLog: "leader log",
|
||||
logger: leader.logger,
|
||||
origin: leader,
|
||||
},
|
||||
{
|
||||
desc: "serverID is current server",
|
||||
serverID: nonLeader.serf.LocalMember().Name,
|
||||
expectedLog: "non leader log",
|
||||
logger: nonLeader.logger,
|
||||
origin: nonLeader,
|
||||
},
|
||||
}
|
||||
|
||||
handler, err := nonLeader.StreamingRpcHandler("Agent.Monitor")
|
||||
require.Nil(err)
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
// create pipe
|
||||
p1, p2 := net.Pipe()
|
||||
defer p1.Close()
|
||||
defer p2.Close()
|
||||
|
||||
errCh := make(chan error)
|
||||
streamMsg := make(chan *cstructs.StreamErrWrapper)
|
||||
|
||||
go handler(p2)
|
||||
|
||||
// Start decoder
|
||||
go func() {
|
||||
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
|
||||
for {
|
||||
var msg cstructs.StreamErrWrapper
|
||||
if err := decoder.Decode(&msg); err != nil {
|
||||
if err == io.EOF || strings.Contains(err.Error(), "closed") {
|
||||
return
|
||||
// send some specific logs
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-doneCh:
|
||||
return
|
||||
default:
|
||||
tc.logger.Warn(tc.expectedLog)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
errCh <- fmt.Errorf("error decoding: %v", err)
|
||||
}()
|
||||
|
||||
req := cstructs.MonitorRequest{
|
||||
LogLevel: "warn",
|
||||
ServerID: tc.serverID,
|
||||
}
|
||||
|
||||
streamMsg <- &msg
|
||||
}
|
||||
}()
|
||||
handler, err := tc.origin.StreamingRpcHandler("Agent.Monitor")
|
||||
require.Nil(err)
|
||||
|
||||
// send request
|
||||
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
|
||||
require.Nil(encoder.Encode(req))
|
||||
// create pipe
|
||||
p1, p2 := net.Pipe()
|
||||
defer p1.Close()
|
||||
defer p2.Close()
|
||||
|
||||
timeout := time.After(2 * time.Second)
|
||||
expected := "leader log"
|
||||
received := ""
|
||||
errCh := make(chan error)
|
||||
streamMsg := make(chan *cstructs.StreamErrWrapper)
|
||||
|
||||
OUTER:
|
||||
for {
|
||||
select {
|
||||
case <-timeout:
|
||||
t.Fatal("timeout waiting for logs")
|
||||
case err := <-errCh:
|
||||
t.Fatal(err)
|
||||
case msg := <-streamMsg:
|
||||
if msg.Error != nil {
|
||||
t.Fatalf("Got error: %v", msg.Error.Error())
|
||||
go handler(p2)
|
||||
|
||||
// Start decoder
|
||||
go func() {
|
||||
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
|
||||
for {
|
||||
var msg cstructs.StreamErrWrapper
|
||||
if err := decoder.Decode(&msg); err != nil {
|
||||
if err == io.EOF || strings.Contains(err.Error(), "closed") {
|
||||
return
|
||||
}
|
||||
errCh <- fmt.Errorf("error decoding: %v", err)
|
||||
}
|
||||
|
||||
streamMsg <- &msg
|
||||
}
|
||||
}()
|
||||
|
||||
// send request
|
||||
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
|
||||
require.Nil(encoder.Encode(req))
|
||||
|
||||
timeout := time.After(2 * time.Second)
|
||||
received := ""
|
||||
|
||||
OUTER:
|
||||
for {
|
||||
select {
|
||||
case <-timeout:
|
||||
t.Fatal("timeout waiting for logs")
|
||||
case err := <-errCh:
|
||||
t.Fatal(err)
|
||||
case msg := <-streamMsg:
|
||||
if msg.Error != nil {
|
||||
t.Fatalf("Got error: %v", msg.Error.Error())
|
||||
}
|
||||
|
||||
var frame sframer.StreamFrame
|
||||
err := json.Unmarshal(msg.Payload, &frame)
|
||||
assert.NoError(t, err)
|
||||
|
||||
received += string(frame.Data)
|
||||
if strings.Contains(received, tc.expectedLog) {
|
||||
close(doneCh)
|
||||
require.Nil(p2.Close())
|
||||
break OUTER
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var frame sframer.StreamFrame
|
||||
err := json.Unmarshal(msg.Payload, &frame)
|
||||
assert.NoError(t, err)
|
||||
|
||||
received += string(frame.Data)
|
||||
if strings.Contains(received, expected) {
|
||||
require.Nil(p2.Close())
|
||||
break OUTER
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -518,16 +518,20 @@ The table below shows this endpoint's support for
|
||||
|
||||
### Parameters
|
||||
|
||||
- `log-level` `(string: "info")` - Specifies a text string containing a log level
|
||||
- `log_level` `(string: "info")` - Specifies a text string containing a log level
|
||||
to filter on, such as `info`. Possible values include `trace`, `debug`,
|
||||
`info`, `warn`, `error`
|
||||
|
||||
- `json` `(bool: false)` - Specifies if the log format for streamed logs
|
||||
should be JSON.
|
||||
|
||||
- `node-id` `(string: "a57b2adb-1a30-2dda-8df0-25abb0881952")` - Specifies a text
|
||||
- `node_id` `(string: "a57b2adb-1a30-2dda-8df0-25abb0881952")` - Specifies a text
|
||||
string containing a node-id to target for streaming.
|
||||
|
||||
- `server_id` `(string: "server1.global")` - Specifies a text
|
||||
string containing a server name or "leader" to target a specific remote server
|
||||
or leader for streaming.
|
||||
|
||||
- `plain` `(bool: false)` - Specifies if the response should be JSON or
|
||||
plaintext
|
||||
|
||||
@@ -535,7 +539,10 @@ The table below shows this endpoint's support for
|
||||
|
||||
```text
|
||||
$ curl \
|
||||
https://localhost:4646/v1/agent/monitor?log-level=debug&node-id=a57b2adb-1a30-2dda-8df0-25abb0881952
|
||||
https://localhost:4646/v1/agent/monitor?log_level=debug&server_id=leader
|
||||
|
||||
$ curl \
|
||||
https://localhost:4646/v1/agent/monitor?log_level=debug&node_id=a57b2adb-1a30-2dda-8df0-25abb0881952
|
||||
```
|
||||
|
||||
### Sample Response
|
||||
|
||||
@@ -37,6 +37,10 @@ The monitor command also allows you to specify a single client node id to follow
|
||||
- `-node-id`: Specifies the client node-id to stream logs from. If no
|
||||
node-id is given the nomad server from the -address flag will be used.
|
||||
|
||||
- `-server-id`: Specifies the nomad server id to stream logs from. Accepts
|
||||
server names from `nomad server members` and also a special `leader` option
|
||||
which will target the current leader.
|
||||
|
||||
- `-json`: Stream logs in json format
|
||||
|
||||
## Examples
|
||||
|
||||
Reference in New Issue
Block a user