mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
nomad: adding raftApply RPC
This commit is contained in:
36
nomad/rpc.go
36
nomad/rpc.go
@@ -2,12 +2,15 @@ package nomad
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/yamux"
|
||||
)
|
||||
|
||||
@@ -30,6 +33,18 @@ const (
|
||||
rpcHTTPMagic = 0x48
|
||||
)
|
||||
|
||||
const (
|
||||
// Warn if the Raft command is larger than this.
|
||||
// If it's over 1MB something is probably being abusive.
|
||||
raftWarnSize = 1024 * 1024
|
||||
|
||||
// enqueueLimit caps how long we will wait to enqueue
|
||||
// a new Raft command. Something is probably wrong if this
|
||||
// value is ever reached. However, it prevents us from blocking
|
||||
// the requesting goroutine forever.
|
||||
enqueueLimit = 30 * time.Second
|
||||
)
|
||||
|
||||
// listen is used to listen for incoming RPC connections
|
||||
func (s *Server) listen() {
|
||||
for {
|
||||
@@ -136,3 +151,24 @@ func (s *Server) handleNomadConn(conn net.Conn) {
|
||||
metrics.IncrCounter([]string{"nomad", "rpc", "request"}, 1)
|
||||
}
|
||||
}
|
||||
|
||||
// raftApply is used to encode a message, run it through raft, and return
|
||||
// the FSM response along with any errors
|
||||
func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) {
|
||||
buf, err := structs.Encode(t, msg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to encode request: %v", err)
|
||||
}
|
||||
|
||||
// Warn if the command is very large
|
||||
if n := len(buf); n > raftWarnSize {
|
||||
s.logger.Printf("[WARN] nomad: Attempting to apply large raft entry (type %d) (%d bytes)", t, n)
|
||||
}
|
||||
|
||||
future := s.raft.Apply(buf, enqueueLimit)
|
||||
if err := future.Error(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return future.Response(), nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user