diff --git a/api/api.go b/api/api.go new file mode 100644 index 000000000..48e4bdf5b --- /dev/null +++ b/api/api.go @@ -0,0 +1,368 @@ +package api + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "strconv" + "time" +) + +// QueryOptions are used to parameterize a query +type QueryOptions struct { + // Providing a datacenter overwrites the region provided + // by the Config + Region string + + // AllowStale allows any Nomad server (non-leader) to service + // a read. This allows for lower latency and higher throughput + AllowStale bool + + // WaitIndex is used to enable a blocking query. Waits + // until the timeout or the next index is reached + WaitIndex uint64 + + // WaitTime is used to bound the duration of a wait. + // Defaults to that of the Config, but can be overriden. + WaitTime time.Duration +} + +// WriteOptions are used to parameterize a write +type WriteOptions struct { + // Providing a datacenter overwrites the region provided + // by the Config + Region string +} + +// QueryMeta is used to return meta data about a query +type QueryMeta struct { + // LastIndex. This can be used as a WaitIndex to perform + // a blocking query + LastIndex uint64 + + // Time of last contact from the leader for the + // server servicing the request + LastContact time.Duration + + // Is there a known leader + KnownLeader bool + + // How long did the request take + RequestTime time.Duration +} + +// WriteMeta is used to return meta data about a write +type WriteMeta struct { + // LastIndex. This can be used as a WaitIndex to perform + // a blocking query + LastIndex uint64 + + // How long did the request take + RequestTime time.Duration +} + +// Config is used to configure the creation of a client +type Config struct { + // URL is the address of the Nomad agent + URL string + + // Region to use. If not provided, the default agent region is used. + Region string + + // HttpClient is the client to use. Default will be + // used if not provided. + HttpClient *http.Client + + // WaitTime limits how long a Watch will block. If not provided, + // the agent default values will be used. + WaitTime time.Duration +} + +// DefaultConfig returns a default configuration for the client +func DefaultConfig() *Config { + config := &Config{ + URL: "http://127.0.0.1:4646", + HttpClient: http.DefaultClient, + } + if url := os.Getenv("NOMAD_HTTP_URL"); url != "" { + config.URL = url + } + return config +} + +// Client provides a client to the Nomad API +type Client struct { + config Config +} + +// NewClient returns a new client +func NewClient(config *Config) (*Client, error) { + // bootstrap the config + defConfig := DefaultConfig() + + if config.URL == "" { + config.URL = defConfig.URL + } else if _, err := url.Parse(config.URL); err != nil { + return nil, fmt.Errorf("invalid url '%s': %v", config.URL, err) + } + + if config.HttpClient == nil { + config.HttpClient = defConfig.HttpClient + } + + client := &Client{ + config: *config, + } + return client, nil +} + +// request is used to help build up a request +type request struct { + config *Config + method string + url *url.URL + params url.Values + body io.Reader + obj interface{} +} + +// setQueryOptions is used to annotate the request with +// additional query options +func (r *request) setQueryOptions(q *QueryOptions) { + if q == nil { + return + } + if q.Region != "" { + r.params.Set("region", q.Region) + } + if q.AllowStale { + r.params.Set("stale", "") + } + if q.WaitIndex != 0 { + r.params.Set("index", strconv.FormatUint(q.WaitIndex, 10)) + } + if q.WaitTime != 0 { + r.params.Set("wait", durToMsec(q.WaitTime)) + } +} + +// durToMsec converts a duration to a millisecond specified string +func durToMsec(dur time.Duration) string { + return fmt.Sprintf("%dms", dur/time.Millisecond) +} + +// setWriteOptions is used to annotate the request with +// additional write options +func (r *request) setWriteOptions(q *WriteOptions) { + if q == nil { + return + } + if q.Region != "" { + r.params.Set("region", q.Region) + } +} + +// toHTTP converts the request to an HTTP request +func (r *request) toHTTP() (*http.Request, error) { + // Encode the query parameters + r.url.RawQuery = r.params.Encode() + + // Check if we should encode the body + if r.body == nil && r.obj != nil { + if b, err := encodeBody(r.obj); err != nil { + return nil, err + } else { + r.body = b + } + } + + // Create the HTTP request + req, err := http.NewRequest(r.method, r.url.RequestURI(), r.body) + if err != nil { + return nil, err + } + + req.URL.Host = r.url.Host + req.URL.Scheme = r.url.Scheme + req.Host = r.url.Host + return req, nil +} + +// newRequest is used to create a new request +func (c *Client) newRequest(method, path string) *request { + base, _ := url.Parse(c.config.URL) + r := &request{ + config: &c.config, + method: method, + url: &url.URL{ + Scheme: base.Scheme, + Host: base.Host, + Path: path, + }, + params: make(map[string][]string), + } + if c.config.Region != "" { + r.params.Set("region", c.config.Region) + } + if c.config.WaitTime != 0 { + r.params.Set("wait", durToMsec(r.config.WaitTime)) + } + return r +} + +// doRequest runs a request with our client +func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) { + req, err := r.toHTTP() + if err != nil { + return 0, nil, err + } + start := time.Now() + resp, err := c.config.HttpClient.Do(req) + diff := time.Now().Sub(start) + return diff, resp, err +} + +// Query is used to do a GET request against an endpoint +// and deserialize the response into an interface using +// standard Nomad conventions. +func (c *Client) query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) { + r := c.newRequest("GET", endpoint) + r.setQueryOptions(q) + rtt, resp, err := requireOK(c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + qm := &QueryMeta{} + parseQueryMeta(resp, qm) + qm.RequestTime = rtt + + if err := decodeBody(resp, out); err != nil { + return nil, err + } + return qm, nil +} + +// write is used to do a PUT request against an endpoint +// and serialize/deserialized using the standard Nomad conventions. +func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) { + r := c.newRequest("PUT", endpoint) + r.setWriteOptions(q) + r.obj = in + rtt, resp, err := requireOK(c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + wm := &WriteMeta{RequestTime: rtt} + parseWriteMeta(resp, wm) + + if out != nil { + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + } + return wm, nil +} + +// write is used to do a PUT request against an endpoint +// and serialize/deserialized using the standard Nomad conventions. +func (c *Client) delete(endpoint string, out interface{}, q *WriteOptions) (*WriteMeta, error) { + r := c.newRequest("DELETE", endpoint) + r.setWriteOptions(q) + rtt, resp, err := requireOK(c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + wm := &WriteMeta{RequestTime: rtt} + parseWriteMeta(resp, wm) + + if out != nil { + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + } + return wm, nil +} + +// parseQueryMeta is used to help parse query meta-data +func parseQueryMeta(resp *http.Response, q *QueryMeta) error { + header := resp.Header + + // Parse the X-Nomad-Index + index, err := strconv.ParseUint(header.Get("X-Nomad-Index"), 10, 64) + if err != nil { + return fmt.Errorf("Failed to parse X-Nomad-Index: %v", err) + } + q.LastIndex = index + + // Parse the X-Nomad-LastContact + last, err := strconv.ParseUint(header.Get("X-Nomad-LastContact"), 10, 64) + if err != nil { + return fmt.Errorf("Failed to parse X-Nomad-LastContact: %v", err) + } + q.LastContact = time.Duration(last) * time.Millisecond + + // Parse the X-Nomad-KnownLeader + switch header.Get("X-Nomad-KnownLeader") { + case "true": + q.KnownLeader = true + default: + q.KnownLeader = false + } + return nil +} + +// parseWriteMeta is used to help parse write meta-data +func parseWriteMeta(resp *http.Response, q *WriteMeta) error { + header := resp.Header + + // Parse the X-Nomad-Index + index, err := strconv.ParseUint(header.Get("X-Nomad-Index"), 10, 64) + if err != nil { + return fmt.Errorf("Failed to parse X-Nomad-Index: %v", err) + } + q.LastIndex = index + return nil +} + +// decodeBody is used to JSON decode a body +func decodeBody(resp *http.Response, out interface{}) error { + dec := json.NewDecoder(resp.Body) + return dec.Decode(out) +} + +// encodeBody is used to encode a request body +func encodeBody(obj interface{}) (io.Reader, error) { + buf := bytes.NewBuffer(nil) + enc := json.NewEncoder(buf) + if err := enc.Encode(obj); err != nil { + return nil, err + } + return buf, nil +} + +// requireOK is used to wrap doRequest and check for a 200 +func requireOK(d time.Duration, resp *http.Response, e error) (time.Duration, *http.Response, error) { + if e != nil { + if resp != nil { + resp.Body.Close() + } + return d, nil, e + } + if resp.StatusCode != 200 { + var buf bytes.Buffer + io.Copy(&buf, resp.Body) + resp.Body.Close() + return d, nil, fmt.Errorf("Unexpected response code: %d (%s)", resp.StatusCode, buf.Bytes()) + } + return d, resp, nil +} diff --git a/api/api_test.go b/api/api_test.go new file mode 100644 index 000000000..466d6e011 --- /dev/null +++ b/api/api_test.go @@ -0,0 +1,157 @@ +package api + +import ( + "net/http" + "os" + "testing" + "time" + + "github.com/hashicorp/nomad/testutil" +) + +type configCallback func(c *Config) + +func makeClient(t *testing.T, cb1 configCallback, + cb2 testutil.ServerConfigCallback) (*Client, *testutil.TestServer) { + + // Make client config + conf := DefaultConfig() + if cb1 != nil { + cb1(conf) + } + + // Create server + server := testutil.NewTestServer(t, cb2) + conf.URL = server.HTTPAddr + + // Create client + client, err := NewClient(conf) + if err != nil { + t.Fatalf("err: %v", err) + } + + return client, server +} + +func TestDefaultConfig_env(t *testing.T) { + t.Parallel() + url := "http://1.2.3.4:5678" + + os.Setenv("NOMAD_HTTP_URL", url) + defer os.Setenv("NOMAD_HTTP_URL", "") + + config := DefaultConfig() + + if config.URL != url { + t.Errorf("expected %q to be %q", config.URL, url) + } +} + +func TestSetQueryOptions(t *testing.T) { + // TODO t.Parallel() + c, s := makeClient(t, nil, nil) + defer s.Stop() + + r := c.newRequest("GET", "/v1/jobs") + q := &QueryOptions{ + Region: "foo", + AllowStale: true, + WaitIndex: 1000, + WaitTime: 100 * time.Second, + } + r.setQueryOptions(q) + + if r.params.Get("region") != "foo" { + t.Fatalf("bad: %v", r.params) + } + if _, ok := r.params["stale"]; !ok { + t.Fatalf("bad: %v", r.params) + } + if r.params.Get("index") != "1000" { + t.Fatalf("bad: %v", r.params) + } + if r.params.Get("wait") != "100000ms" { + t.Fatalf("bad: %v", r.params) + } +} + +func TestSetWriteOptions(t *testing.T) { + // TODO t.Parallel() + c, s := makeClient(t, nil, nil) + defer s.Stop() + + r := c.newRequest("GET", "/v1/jobs") + q := &WriteOptions{ + Region: "foo", + } + r.setWriteOptions(q) + + if r.params.Get("region") != "foo" { + t.Fatalf("bad: %v", r.params) + } +} + +func TestRequestToHTTP(t *testing.T) { + // TODO t.Parallel() + c, s := makeClient(t, nil, nil) + defer s.Stop() + + r := c.newRequest("DELETE", "/v1/jobs/foo") + q := &QueryOptions{ + Region: "foo", + } + r.setQueryOptions(q) + req, err := r.toHTTP() + if err != nil { + t.Fatalf("err: %v", err) + } + + if req.Method != "DELETE" { + t.Fatalf("bad: %v", req) + } + if req.URL.RequestURI() != "/v1/jobs/foo?region=foo" { + t.Fatalf("bad: %v", req) + } +} + +func TestParseQueryMeta(t *testing.T) { + t.Parallel() + resp := &http.Response{ + Header: make(map[string][]string), + } + resp.Header.Set("X-Nomad-Index", "12345") + resp.Header.Set("X-Nomad-LastContact", "80") + resp.Header.Set("X-Nomad-KnownLeader", "true") + + qm := &QueryMeta{} + if err := parseQueryMeta(resp, qm); err != nil { + t.Fatalf("err: %v", err) + } + + if qm.LastIndex != 12345 { + t.Fatalf("Bad: %v", qm) + } + if qm.LastContact != 80*time.Millisecond { + t.Fatalf("Bad: %v", qm) + } + if !qm.KnownLeader { + t.Fatalf("Bad: %v", qm) + } +} + +func TestParseWriteMeta(t *testing.T) { + t.Parallel() + resp := &http.Response{ + Header: make(map[string][]string), + } + resp.Header.Set("X-Nomad-Index", "12345") + + wm := &WriteMeta{} + if err := parseWriteMeta(resp, wm); err != nil { + t.Fatalf("err: %v", err) + } + + if wm.LastIndex != 12345 { + t.Fatalf("Bad: %v", wm) + } +} diff --git a/api/raw.go b/api/raw.go new file mode 100644 index 000000000..fc9f5e61b --- /dev/null +++ b/api/raw.go @@ -0,0 +1,30 @@ +package api + +// Raw can be used to do raw queries against custom endpoints +type Raw struct { + c *Client +} + +// Raw returns a handle to query endpoints +func (c *Client) Raw() *Raw { + return &Raw{c} +} + +// Query is used to do a GET request against an endpoint +// and deserialize the response into an interface using +// standard Nomad conventions. +func (raw *Raw) Query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) { + return raw.c.query(endpoint, out, q) +} + +// Write is used to do a PUT request against an endpoint +// and serialize/deserialized using the standard Nomad conventions. +func (raw *Raw) Write(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) { + return raw.c.write(endpoint, in, out, q) +} + +// Delete is used to do a DELETE request against an endpoint +// and serialize/deserialized using the standard Nomad conventions. +func (raw *Raw) Delete(endpoint string, out interface{}, q *WriteOptions) (*WriteMeta, error) { + return raw.c.delete(endpoint, out, q) +} diff --git a/testutil/server.go b/testutil/server.go new file mode 100644 index 000000000..81ded4649 --- /dev/null +++ b/testutil/server.go @@ -0,0 +1,254 @@ +package testutil + +// TestServer is a test helper. It uses a fork/exec model to create +// a test Nomad server instance in the background and initialize it +// with some data and/or services. The test server can then be used +// to run a unit test, and offers an easy API to tear itself down +// when the test has completed. The only prerequisite is to have a nomad +// binary available on the $PATH. +// +// This package does not use Nomad's official API client. This is +// because we use TestServer to test the API client, which would +// otherwise cause an import cycle. + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "os/exec" + "sync/atomic" + "testing" +) + +// offset is used to atomically increment the port numbers. +var offset uint64 + +// TestServerConfig is the main server configuration struct. +type TestServerConfig struct { + HTTPAddr string `json:"http_addr,omitempty"` + Bootstrap bool `json:"bootstrap,omitempty"` + DataDir string `json:"data_dir,omitempty"` + Region string `json:"region,omitempty"` + DisableCheckpoint bool `json:"disable_update_check"` + LogLevel string `json:"log_level,omitempty"` + Stdout, Stderr io.Writer `json:"-"` +} + +// ServerConfigCallback is a function interface which can be +// passed to NewTestServerConfig to modify the server config. +type ServerConfigCallback func(c *TestServerConfig) + +// defaultServerConfig returns a new TestServerConfig struct +// with all of the listen ports incremented by one. +func defaultServerConfig() *TestServerConfig { + idx := int(atomic.AddUint64(&offset, 1)) + + return &TestServerConfig{ + DisableCheckpoint: true, + Bootstrap: true, + LogLevel: "DEBUG", + HTTPAddr: fmt.Sprintf("127.0.0.1:%d", 20000+idx), + } +} + +// TestServer is the main server wrapper struct. +type TestServer struct { + PID int + Config *TestServerConfig + t *testing.T + + HTTPAddr string + HttpClient *http.Client +} + +// NewTestServerConfig creates a new TestServer, and makes a call to +// an optional callback function to modify the configuration. +func NewTestServer(t *testing.T, cb ServerConfigCallback) *TestServer { + if path, err := exec.LookPath("nomad"); err != nil || path == "" { + t.Skip("nomad not found on $PATH, skipping") + } + + dataDir, err := ioutil.TempDir("", "nomad") + if err != nil { + t.Fatalf("err: %s", err) + } + + configFile, err := ioutil.TempFile(dataDir, "nomad") + if err != nil { + defer os.RemoveAll(dataDir) + t.Fatalf("err: %s", err) + } + + nomadConfig := defaultServerConfig() + nomadConfig.DataDir = dataDir + + if cb != nil { + cb(nomadConfig) + } + + configContent, err := json.Marshal(nomadConfig) + if err != nil { + t.Fatalf("err: %s", err) + } + + if _, err := configFile.Write(configContent); err != nil { + t.Fatalf("err: %s", err) + } + configFile.Close() + + stdout := io.Writer(os.Stdout) + if nomadConfig.Stdout != nil { + stdout = nomadConfig.Stdout + } + + stderr := io.Writer(os.Stderr) + if nomadConfig.Stderr != nil { + stderr = nomadConfig.Stderr + } + + // Start the server + // TODO: Use "-config", configFile.Name() + cmd := exec.Command("nomad", "agent", "-dev") + cmd.Stdout = stdout + cmd.Stderr = stderr + if err := cmd.Start(); err != nil { + t.Fatalf("err: %s", err) + } + + var client *http.Client + client = http.DefaultClient + + server := &TestServer{ + Config: nomadConfig, + PID: cmd.Process.Pid, + t: t, + + HTTPAddr: "127.0.0.1:4646", // TODO nomadConfig.HTTPAddr, + HttpClient: client, + } + + // Wait for the server to be ready + if nomadConfig.Bootstrap { + server.waitForLeader() + } else { + server.waitForAPI() + } + return server +} + +// Stop stops the test Nomad server, and removes the Nomad data +// directory once we are done. +func (s *TestServer) Stop() { + defer os.RemoveAll(s.Config.DataDir) + + cmd := exec.Command("kill", "-9", fmt.Sprintf("%d", s.PID)) + if err := cmd.Run(); err != nil { + s.t.Errorf("err: %s", err) + } +} + +// waitForAPI waits for only the agent HTTP endpoint to start +// responding. This is an indication that the agent has started, +// but will likely return before a leader is elected. +func (s *TestServer) waitForAPI() { + WaitForResult(func() (bool, error) { + resp, err := s.HttpClient.Get(s.url("/v1/jobs?stale")) + if err != nil { + return false, err + } + defer resp.Body.Close() + if err := s.requireOK(resp); err != nil { + return false, err + } + return true, nil + }, func(err error) { + defer s.Stop() + s.t.Fatalf("err: %s", err) + }) +} + +// waitForLeader waits for the Nomad server's HTTP API to become +// available, and then waits for a known leader and an index of +// 1 or more to be observed to confirm leader election is done. +func (s *TestServer) waitForLeader() { + WaitForResult(func() (bool, error) { + // Query the API and check the status code + resp, err := s.HttpClient.Get(s.url("/v1/jobs")) + if err != nil { + return false, err + } + defer resp.Body.Close() + if err := s.requireOK(resp); err != nil { + return false, err + } + + // Ensure we have a leader and a node registeration + if leader := resp.Header.Get("X-Nomad-KnownLeader"); leader != "true" { + fmt.Println(leader) + return false, fmt.Errorf("Nomad leader status: %#v", leader) + } + return true, nil + }, func(err error) { + defer s.Stop() + s.t.Fatalf("err: %s", err) + }) +} + +// url is a helper function which takes a relative URL and +// makes it into a proper URL against the local Nomad server. +func (s *TestServer) url(path string) string { + return fmt.Sprintf("http://%s%s", s.HTTPAddr, path) +} + +// requireOK checks the HTTP response code and ensures it is acceptable. +func (s *TestServer) requireOK(resp *http.Response) error { + if resp.StatusCode != 200 { + return fmt.Errorf("Bad status code: %d", resp.StatusCode) + } + return nil +} + +// put performs a new HTTP PUT request. +func (s *TestServer) put(path string, body io.Reader) *http.Response { + req, err := http.NewRequest("PUT", s.url(path), body) + if err != nil { + s.t.Fatalf("err: %s", err) + } + resp, err := s.HttpClient.Do(req) + if err != nil { + s.t.Fatalf("err: %s", err) + } + if err := s.requireOK(resp); err != nil { + defer resp.Body.Close() + s.t.Fatal(err) + } + return resp +} + +// get performs a new HTTP GET request. +func (s *TestServer) get(path string) *http.Response { + resp, err := s.HttpClient.Get(s.url(path)) + if err != nil { + s.t.Fatalf("err: %s", err) + } + if err := s.requireOK(resp); err != nil { + defer resp.Body.Close() + s.t.Fatal(err) + } + return resp +} + +// encodePayload returns a new io.Reader wrapping the encoded contents +// of the payload, suitable for passing directly to a new request. +func (s *TestServer) encodePayload(payload interface{}) io.Reader { + var encoded bytes.Buffer + enc := json.NewEncoder(&encoded) + if err := enc.Encode(payload); err != nil { + s.t.Fatalf("err: %s", err) + } + return &encoded +}