From 5743cde13379b100eaf6762d98bb5abccf36609a Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sat, 6 Jun 2015 00:22:05 +0200 Subject: [PATCH] nomad: adding raftApply RPC --- nomad/rpc.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/nomad/rpc.go b/nomad/rpc.go index d0e2f4f60..28ad38fd2 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -2,12 +2,15 @@ package nomad import ( "crypto/tls" + "fmt" "io" "net" "strings" + "time" "github.com/armon/go-metrics" "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/yamux" ) @@ -30,6 +33,18 @@ const ( rpcHTTPMagic = 0x48 ) +const ( + // Warn if the Raft command is larger than this. + // If it's over 1MB something is probably being abusive. + raftWarnSize = 1024 * 1024 + + // enqueueLimit caps how long we will wait to enqueue + // a new Raft command. Something is probably wrong if this + // value is ever reached. However, it prevents us from blocking + // the requesting goroutine forever. + enqueueLimit = 30 * time.Second +) + // listen is used to listen for incoming RPC connections func (s *Server) listen() { for { @@ -136,3 +151,24 @@ func (s *Server) handleNomadConn(conn net.Conn) { metrics.IncrCounter([]string{"nomad", "rpc", "request"}, 1) } } + +// raftApply is used to encode a message, run it through raft, and return +// the FSM response along with any errors +func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) { + buf, err := structs.Encode(t, msg) + if err != nil { + return nil, fmt.Errorf("Failed to encode request: %v", err) + } + + // Warn if the command is very large + if n := len(buf); n > raftWarnSize { + s.logger.Printf("[WARN] nomad: Attempting to apply large raft entry (type %d) (%d bytes)", t, n) + } + + future := s.raft.Apply(buf, enqueueLimit) + if err := future.Error(); err != nil { + return nil, err + } + + return future.Response(), nil +}