From 9106ce829629773474c689b34aacd7d3aaa99426 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Tue, 29 Sep 2015 08:52:16 -0700 Subject: [PATCH] vendor: update cloud logging to c7e49e47d60 --- .../cloud/logging/logging.go | 132 +++++--- .../cloud/logging/logging_test.go | 308 ++++++++++++++++++ 2 files changed, 397 insertions(+), 43 deletions(-) create mode 100644 vendor/google.golang.org/cloud/logging/logging_test.go diff --git a/vendor/google.golang.org/cloud/logging/logging.go b/vendor/google.golang.org/cloud/logging/logging.go index 194230884..366fb84bc 100644 --- a/vendor/google.golang.org/cloud/logging/logging.go +++ b/vendor/google.golang.org/cloud/logging/logging.go @@ -13,11 +13,12 @@ // limitations under the License. // Package logging contains a Google Cloud Logging client. +// +// This package is experimental and subject to API changes. package logging // import "google.golang.org/cloud/logging" import ( "errors" - "fmt" "io" "log" "sync" @@ -43,16 +44,20 @@ const ( Warning Error Critical + Alert + Emergency nLevel ) var levelName = [nLevel]string{ - Default: "", - Debug: "DEBUG", - Info: "INFO", - Warning: "WARNING", - Error: "ERROR", - Critical: "CRITICAL", + Default: "", + Debug: "DEBUG", + Info: "INFO", + Warning: "WARNING", + Error: "ERROR", + Critical: "CRITICAL", + Alert: "ALERT", + Emergency: "EMERGENCY", } func (v Level) String() string { @@ -76,10 +81,13 @@ type Client struct { timerActive bool // whether flushTimer is armed inFlight int // number of log entries sent to API service but not yet ACKed + // For testing: + timeNow func() time.Time // optional + // ServiceName may be "appengine.googleapis.com", // "compute.googleapis.com" or "custom.googleapis.com". // - // The default is unspecified is "custom.googleapis.com". + // The default is "custom.googleapis.com". // // The service name is only used by the API server to // determine which of the labels are used to index the logs. @@ -89,7 +97,7 @@ type Client struct { // entries in this request, so that you don't have to repeat // them in each log entry's metadata.labels field. If any of // the log entries contains a (key, value) with the same key - // that is in commonLabels, then the entry's (key, value) + // that is in CommonLabels, then the entry's (key, value) // overrides the one in CommonLabels. CommonLabels map[string]string @@ -108,32 +116,33 @@ type Client struct { // BufferInterval is the maximum amount of time that an item // should remain buffered in memory before being flushed to // the logging service. + // The default is currently 1 second. BufferInterval time.Duration - // Overflow optionally specifies a function which is run - // when the Log function overflows its configured buffer - // limit. If nil, the log entry is dropped. The return - // value is returned by Log. + // Overflow is a function which runs when the Log function + // overflows its configured buffer limit. If nil, the log + // entry is dropped. The return value from Overflow is + // returned by Log. Overflow func(*Client, Entry) error } func (c *Client) flushAfter() int { - if c.FlushAfter > 0 { - return c.FlushAfter + if v := c.FlushAfter; v > 0 { + return v } return 10 } func (c *Client) bufferInterval() time.Duration { - if c.BufferInterval > 0 { - return c.BufferInterval + if v := c.BufferInterval; v > 0 { + return v } return time.Second } func (c *Client) bufferLimit() int { - if c.BufferLimit > 0 { - return c.BufferLimit + if v := c.BufferLimit; v > 0 { + return v } return 10000 } @@ -145,6 +154,13 @@ func (c *Client) serviceName() string { return "custom.googleapis.com" } +func (c *Client) now() time.Time { + if now := c.timeNow; now != nil { + return now() + } + return time.Now() +} + // Writer returns an io.Writer for the provided log level. // // Each Write call on the returned Writer generates a log entry. @@ -180,13 +196,12 @@ type Entry struct { Time time.Time // Level is log entry's severity level. - // The zero value means undefined. + // The zero value means no assigned severity level. Level Level - // Payload may be either a string or JSON object. - // For JSON objects, the type must be either map[string]interface{} - // or implement json.Marshaler and encode a JSON object (and not any other - // JSON value). + // Payload must be either a string, []byte, or something that + // marshals via the encoding/json package to a JSON object + // (and not any other type of JSON value). Payload interface{} // Labels optionally specifies key/value labels for the log entry. @@ -202,7 +217,7 @@ type Entry struct { func (c *Client) apiEntry(e Entry) (*api.LogEntry, error) { t := e.Time if t.IsZero() { - t = time.Now() + t = c.now() } ent := &api.LogEntry{ @@ -216,10 +231,10 @@ func (c *Client) apiEntry(e Entry) (*api.LogEntry, error) { switch p := e.Payload.(type) { case string: ent.TextPayload = p - case map[string]interface{}: - ent.StructPayload = p + case []byte: + ent.TextPayload = string(p) default: - return nil, fmt.Errorf("unhandled Log Payload type %T", p) + ent.StructPayload = api.LogEntryStructPayload(p) } return ent, nil } @@ -242,9 +257,11 @@ var ErrOverflow = errors.New("logging: log entry overflowed buffer limits") // Log queues an entry to be sent to the logging service, subject to the // Client's parameters. By default, the log will be flushed within -// a second. -// Log only returns an error if the entry is invalid, or ErrOverflow -// if the log entry overflows the buffer limit. +// one second. +// Log only returns an error if the entry is invalid or the queue is at +// capacity. If the queue is at capacity and the entry can't be added, +// Log returns either ErrOverflow when c.Overflow is nil, or the +// value returned by c.Overflow. func (c *Client) Log(e Entry) error { ent, err := c.apiEntry(e) if err != nil { @@ -252,14 +269,17 @@ func (c *Client) Log(e Entry) error { } c.mu.Lock() - defer c.mu.Unlock() buffered := len(c.queued) + c.inFlight + if buffered >= c.bufferLimit() { + c.mu.Unlock() if fn := c.Overflow; fn != nil { return fn(c, e) } return ErrOverflow } + defer c.mu.Unlock() + c.queued = append(c.queued, ent) if len(c.queued) >= c.flushAfter() { c.scheduleFlushLocked(0) @@ -303,6 +323,9 @@ func (c *Client) scheduleFlushLocked(d time.Duration) { // timeoutFlush runs in its own goroutine (from time.AfterFunc) and // flushes c.queued. func (c *Client) timeoutFlush() { + c.mu.Lock() + c.timerActive = false + c.mu.Unlock() if err := c.Flush(); err != nil { // schedule another try // TODO: smarter back-off? @@ -323,23 +346,44 @@ func (c *Client) Ping() error { // Flush flushes any buffered log entries. func (c *Client) Flush() error { + var numFlush int + c.mu.Lock() for { - // Easy and final case: nothing to flush. - c.mu.Lock() + // We're already flushing (or we just started flushing + // ourselves), so wait for it to finish. + if f := c.curFlush; f != nil { + wasEmpty := len(c.queued) == 0 + c.mu.Unlock() + <-f.donec // wait for it + numFlush++ + // Terminate whenever there's an error, we've + // already flushed twice (one that was already + // in-flight when flush was called, and then + // one we instigated), or the queue was empty + // when we released the locked (meaning this + // in-flight flush removes everything present + // when Flush was called, and we don't need to + // kick off a new flush for things arriving + // afterward) + if f.err != nil || numFlush == 2 || wasEmpty { + return f.err + } + // Otherwise, re-obtain the lock and loop, + // starting over with seeing if a flush is in + // progress, which might've been started by a + // different goroutine before aquiring this + // lock again. + c.mu.Lock() + continue + } + + // Terminal case: if len(c.queued) == 0 { c.mu.Unlock() return nil } - if f := c.curFlush; f != nil { - c.mu.Unlock() - <-f.donec // wait for it - if f.err != nil { - return f.err - } - } c.startFlushLocked() - c.mu.Unlock() } } @@ -358,6 +402,7 @@ func (c *Client) startFlushLocked() { flush := &flushCall{ donec: make(chan struct{}), } + c.curFlush = flush go func() { defer close(flush.donec) _, err := c.logs.Write(c.projID, c.logName, &api.WriteLogEntriesRequest{ @@ -365,13 +410,14 @@ func (c *Client) startFlushLocked() { Entries: logEntries, }).Do() flush.err = err - log.Printf("Raw write of %d = %v", len(logEntries), err) c.mu.Lock() defer c.mu.Unlock() c.inFlight = 0 c.curFlush = nil if err != nil { c.queued = append(c.queued, logEntries...) + } else if len(c.queued) > 0 { + c.scheduleFlushLocked(c.bufferInterval()) } }() diff --git a/vendor/google.golang.org/cloud/logging/logging_test.go b/vendor/google.golang.org/cloud/logging/logging_test.go new file mode 100644 index 000000000..fe3234fce --- /dev/null +++ b/vendor/google.golang.org/cloud/logging/logging_test.go @@ -0,0 +1,308 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logging + +import ( + "errors" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "golang.org/x/net/context" + + "google.golang.org/cloud" +) + +func TestLogPayload(t *testing.T) { + lt := newLogTest(t) + defer lt.ts.Close() + + tests := []struct { + name string + entry Entry + want string + }{ + { + name: "string", + entry: Entry{ + Time: time.Unix(0, 0), + Payload: "some log string", + }, + want: `{"entries":[{"metadata":{"serviceName":"custom.googleapis.com","timestamp":"1970-01-01T00:00:00Z"},"textPayload":"some log string"}]}`, + }, + { + name: "[]byte", + entry: Entry{ + Time: time.Unix(0, 0), + Payload: []byte("some log bytes"), + }, + want: `{"entries":[{"metadata":{"serviceName":"custom.googleapis.com","timestamp":"1970-01-01T00:00:00Z"},"textPayload":"some log bytes"}]}`, + }, + { + name: "struct", + entry: Entry{ + Time: time.Unix(0, 0), + Payload: struct { + Foo string `json:"foo"` + Bar int `json:"bar,omitempty"` + }{ + Foo: "foovalue", + }, + }, + want: `{"entries":[{"metadata":{"serviceName":"custom.googleapis.com","timestamp":"1970-01-01T00:00:00Z"},"structPayload":{"foo":"foovalue"}}]}`, + }, + { + name: "map[string]interface{}", + entry: Entry{ + Time: time.Unix(0, 0), + Payload: map[string]interface{}{ + "string": "foo", + "int": 42, + }, + }, + want: `{"entries":[{"metadata":{"serviceName":"custom.googleapis.com","timestamp":"1970-01-01T00:00:00Z"},"structPayload":{"int":42,"string":"foo"}}]}`, + }, + { + name: "map[string]interface{}", + entry: Entry{ + Time: time.Unix(0, 0), + Payload: customJSONObject{}, + }, + want: `{"entries":[{"metadata":{"serviceName":"custom.googleapis.com","timestamp":"1970-01-01T00:00:00Z"},"structPayload":{"custom":"json"}}]}`, + }, + } + for _, tt := range tests { + lt.startGetRequest() + if err := lt.c.LogSync(tt.entry); err != nil { + t.Errorf("%s: LogSync = %v", tt.name, err) + continue + } + got := lt.getRequest() + if got != tt.want { + t.Errorf("%s: mismatch\n got: %s\nwant: %s\n", tt.name, got, tt.want) + } + } +} + +func TestBufferInterval(t *testing.T) { + lt := newLogTest(t) + defer lt.ts.Close() + + lt.c.CommonLabels = map[string]string{ + "common1": "one", + "common2": "two", + } + lt.c.BufferInterval = 1 * time.Millisecond // immediately, basically. + lt.c.FlushAfter = 100 // but we'll only send 1 + + lt.startGetRequest() + lt.c.Logger(Debug).Printf("log line 1") + got := lt.getRequest() + want := `{"commonLabels":{"common1":"one","common2":"two"},"entries":[{"metadata":{"serviceName":"custom.googleapis.com","severity":"DEBUG","timestamp":"1970-01-01T00:00:01Z"},"textPayload":"log line 1\n"}]}` + if got != want { + t.Errorf(" got: %s\nwant: %s\n", got, want) + } +} + +func TestFlushAfter(t *testing.T) { + lt := newLogTest(t) + defer lt.ts.Close() + + lt.c.CommonLabels = map[string]string{ + "common1": "one", + "common2": "two", + } + lt.c.BufferInterval = getRequestTimeout * 2 + lt.c.FlushAfter = 2 + + lt.c.Logger(Debug).Printf("log line 1") + lt.startGetRequest() + lt.c.Logger(Debug).Printf("log line 2") + got := lt.getRequest() + want := `{"commonLabels":{"common1":"one","common2":"two"},"entries":[{"metadata":{"serviceName":"custom.googleapis.com","severity":"DEBUG","timestamp":"1970-01-01T00:00:01Z"},"textPayload":"log line 1\n"},{"metadata":{"serviceName":"custom.googleapis.com","severity":"DEBUG","timestamp":"1970-01-01T00:00:02Z"},"textPayload":"log line 2\n"}]}` + if got != want { + t.Errorf(" got: %s\nwant: %s\n", got, want) + } +} + +func TestFlush(t *testing.T) { + lt := newLogTest(t) + defer lt.ts.Close() + lt.c.BufferInterval = getRequestTimeout * 2 + lt.c.FlushAfter = 100 // but we'll only send 1, requiring a Flush + + lt.c.Logger(Debug).Printf("log line 1") + lt.startGetRequest() + if err := lt.c.Flush(); err != nil { + t.Fatal(err) + } + got := lt.getRequest() + want := `{"entries":[{"metadata":{"serviceName":"custom.googleapis.com","severity":"DEBUG","timestamp":"1970-01-01T00:00:01Z"},"textPayload":"log line 1\n"}]}` + if got != want { + t.Errorf(" got: %s\nwant: %s\n", got, want) + } +} + +func TestOverflow(t *testing.T) { + lt := newLogTest(t) + defer lt.ts.Close() + + lt.c.FlushAfter = 1 + lt.c.BufferLimit = 5 + lt.c.BufferInterval = 1 * time.Millisecond // immediately, basically. + + someErr := errors.New("some specific error value") + lt.c.Overflow = func(c *Client, e Entry) error { + return someErr + } + + unblock := make(chan bool, 1) + inHandler := make(chan bool, 1) + lt.handlerc <- http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + inHandler <- true + <-unblock + ioutil.ReadAll(r.Body) + io.WriteString(w, "{}") // WriteLogEntriesResponse + }) + + lt.c.Logger(Debug).Printf("log line 1") + <-inHandler + lt.c.Logger(Debug).Printf("log line 2") + lt.c.Logger(Debug).Printf("log line 3") + lt.c.Logger(Debug).Printf("log line 4") + lt.c.Logger(Debug).Printf("log line 5") + + queued, inFlight := lt.c.stats() + if want := 4; queued != want { + t.Errorf("queued = %d; want %d", queued, want) + } + if want := 1; inFlight != want { + t.Errorf("inFlight = %d; want %d", inFlight, want) + } + + if err := lt.c.Log(Entry{Payload: "to overflow"}); err != someErr { + t.Errorf("Log(overflow Log entry) = %v; want someErr", err) + } + lt.startGetRequest() + unblock <- true + got := lt.getRequest() + want := `{"entries":[{"metadata":{"serviceName":"custom.googleapis.com","severity":"DEBUG","timestamp":"1970-01-01T00:00:02Z"},"textPayload":"log line 2\n"},{"metadata":{"serviceName":"custom.googleapis.com","severity":"DEBUG","timestamp":"1970-01-01T00:00:03Z"},"textPayload":"log line 3\n"},{"metadata":{"serviceName":"custom.googleapis.com","severity":"DEBUG","timestamp":"1970-01-01T00:00:04Z"},"textPayload":"log line 4\n"},{"metadata":{"serviceName":"custom.googleapis.com","severity":"DEBUG","timestamp":"1970-01-01T00:00:05Z"},"textPayload":"log line 5\n"}]}` + if got != want { + t.Errorf(" got: %s\nwant: %s\n", got, want) + } + if err := lt.c.Flush(); err != nil { + t.Fatal(err) + } + queued, inFlight = lt.c.stats() + if want := 0; queued != want { + t.Errorf("queued = %d; want %d", queued, want) + } + if want := 0; inFlight != want { + t.Errorf("inFlight = %d; want %d", inFlight, want) + } +} + +func (c *Client) stats() (queued, inFlight int) { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.queued), c.inFlight +} + +type customJSONObject struct{} + +func (customJSONObject) MarshalJSON() ([]byte, error) { + return []byte(`{"custom":"json"}`), nil +} + +type logTest struct { + t *testing.T + ts *httptest.Server + c *Client + handlerc chan<- http.Handler + + bodyc chan string +} + +func newLogTest(t *testing.T) *logTest { + handlerc := make(chan http.Handler, 1) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + select { + case h := <-handlerc: + h.ServeHTTP(w, r) + default: + slurp, _ := ioutil.ReadAll(r.Body) + t.Errorf("Unexpected HTTP request received: %s", slurp) + w.WriteHeader(500) + io.WriteString(w, "unexpected HTTP request") + } + })) + c, err := NewClient(context.Background(), "PROJ-ID", "LOG-NAME", cloud.WithEndpoint(ts.URL)) + if err != nil { + t.Fatal(err) + } + var clock struct { + sync.Mutex + now time.Time + } + c.timeNow = func() time.Time { + clock.Lock() + defer clock.Unlock() + if clock.now.IsZero() { + clock.now = time.Unix(0, 0) + } + clock.now = clock.now.Add(1 * time.Second) + return clock.now + } + return &logTest{ + t: t, + ts: ts, + c: c, + handlerc: handlerc, + } +} + +func (lt *logTest) startGetRequest() { + bodyc := make(chan string, 1) + lt.bodyc = bodyc + lt.handlerc <- http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + slurp, err := ioutil.ReadAll(r.Body) + if err != nil { + bodyc <- "ERROR: " + err.Error() + } else { + bodyc <- string(slurp) + } + io.WriteString(w, "{}") // a complete WriteLogEntriesResponse JSON struct + }) +} + +const getRequestTimeout = 5 * time.Second + +func (lt *logTest) getRequest() string { + if lt.bodyc == nil { + lt.t.Fatalf("getRequest called without previous startGetRequest") + } + select { + case v := <-lt.bodyc: + return strings.TrimSpace(v) + case <-time.After(getRequestTimeout): + lt.t.Fatalf("timeout waiting for request") + panic("unreachable") + } +}