vendor: update cloud logging to c7e49e47d60

This commit is contained in:
Brad Fitzpatrick 2015-09-29 08:52:16 -07:00
parent 6dfe405666
commit 9106ce8296
2 changed files with 397 additions and 43 deletions

View File

@ -13,11 +13,12 @@
// limitations under the License. // limitations under the License.
// Package logging contains a Google Cloud Logging client. // Package logging contains a Google Cloud Logging client.
//
// This package is experimental and subject to API changes.
package logging // import "google.golang.org/cloud/logging" package logging // import "google.golang.org/cloud/logging"
import ( import (
"errors" "errors"
"fmt"
"io" "io"
"log" "log"
"sync" "sync"
@ -43,16 +44,20 @@ const (
Warning Warning
Error Error
Critical Critical
Alert
Emergency
nLevel nLevel
) )
var levelName = [nLevel]string{ var levelName = [nLevel]string{
Default: "", Default: "",
Debug: "DEBUG", Debug: "DEBUG",
Info: "INFO", Info: "INFO",
Warning: "WARNING", Warning: "WARNING",
Error: "ERROR", Error: "ERROR",
Critical: "CRITICAL", Critical: "CRITICAL",
Alert: "ALERT",
Emergency: "EMERGENCY",
} }
func (v Level) String() string { func (v Level) String() string {
@ -76,10 +81,13 @@ type Client struct {
timerActive bool // whether flushTimer is armed timerActive bool // whether flushTimer is armed
inFlight int // number of log entries sent to API service but not yet ACKed inFlight int // number of log entries sent to API service but not yet ACKed
// For testing:
timeNow func() time.Time // optional
// ServiceName may be "appengine.googleapis.com", // ServiceName may be "appengine.googleapis.com",
// "compute.googleapis.com" or "custom.googleapis.com". // "compute.googleapis.com" or "custom.googleapis.com".
// //
// The default is unspecified is "custom.googleapis.com". // The default is "custom.googleapis.com".
// //
// The service name is only used by the API server to // The service name is only used by the API server to
// determine which of the labels are used to index the logs. // determine which of the labels are used to index the logs.
@ -89,7 +97,7 @@ type Client struct {
// entries in this request, so that you don't have to repeat // entries in this request, so that you don't have to repeat
// them in each log entry's metadata.labels field. If any of // them in each log entry's metadata.labels field. If any of
// the log entries contains a (key, value) with the same key // the log entries contains a (key, value) with the same key
// that is in commonLabels, then the entry's (key, value) // that is in CommonLabels, then the entry's (key, value)
// overrides the one in CommonLabels. // overrides the one in CommonLabels.
CommonLabels map[string]string CommonLabels map[string]string
@ -108,32 +116,33 @@ type Client struct {
// BufferInterval is the maximum amount of time that an item // BufferInterval is the maximum amount of time that an item
// should remain buffered in memory before being flushed to // should remain buffered in memory before being flushed to
// the logging service. // the logging service.
// The default is currently 1 second.
BufferInterval time.Duration BufferInterval time.Duration
// Overflow optionally specifies a function which is run // Overflow is a function which runs when the Log function
// when the Log function overflows its configured buffer // overflows its configured buffer limit. If nil, the log
// limit. If nil, the log entry is dropped. The return // entry is dropped. The return value from Overflow is
// value is returned by Log. // returned by Log.
Overflow func(*Client, Entry) error Overflow func(*Client, Entry) error
} }
func (c *Client) flushAfter() int { func (c *Client) flushAfter() int {
if c.FlushAfter > 0 { if v := c.FlushAfter; v > 0 {
return c.FlushAfter return v
} }
return 10 return 10
} }
func (c *Client) bufferInterval() time.Duration { func (c *Client) bufferInterval() time.Duration {
if c.BufferInterval > 0 { if v := c.BufferInterval; v > 0 {
return c.BufferInterval return v
} }
return time.Second return time.Second
} }
func (c *Client) bufferLimit() int { func (c *Client) bufferLimit() int {
if c.BufferLimit > 0 { if v := c.BufferLimit; v > 0 {
return c.BufferLimit return v
} }
return 10000 return 10000
} }
@ -145,6 +154,13 @@ func (c *Client) serviceName() string {
return "custom.googleapis.com" return "custom.googleapis.com"
} }
func (c *Client) now() time.Time {
if now := c.timeNow; now != nil {
return now()
}
return time.Now()
}
// Writer returns an io.Writer for the provided log level. // Writer returns an io.Writer for the provided log level.
// //
// Each Write call on the returned Writer generates a log entry. // Each Write call on the returned Writer generates a log entry.
@ -180,13 +196,12 @@ type Entry struct {
Time time.Time Time time.Time
// Level is log entry's severity level. // Level is log entry's severity level.
// The zero value means undefined. // The zero value means no assigned severity level.
Level Level Level Level
// Payload may be either a string or JSON object. // Payload must be either a string, []byte, or something that
// For JSON objects, the type must be either map[string]interface{} // marshals via the encoding/json package to a JSON object
// or implement json.Marshaler and encode a JSON object (and not any other // (and not any other type of JSON value).
// JSON value).
Payload interface{} Payload interface{}
// Labels optionally specifies key/value labels for the log entry. // Labels optionally specifies key/value labels for the log entry.
@ -202,7 +217,7 @@ type Entry struct {
func (c *Client) apiEntry(e Entry) (*api.LogEntry, error) { func (c *Client) apiEntry(e Entry) (*api.LogEntry, error) {
t := e.Time t := e.Time
if t.IsZero() { if t.IsZero() {
t = time.Now() t = c.now()
} }
ent := &api.LogEntry{ ent := &api.LogEntry{
@ -216,10 +231,10 @@ func (c *Client) apiEntry(e Entry) (*api.LogEntry, error) {
switch p := e.Payload.(type) { switch p := e.Payload.(type) {
case string: case string:
ent.TextPayload = p ent.TextPayload = p
case map[string]interface{}: case []byte:
ent.StructPayload = p ent.TextPayload = string(p)
default: default:
return nil, fmt.Errorf("unhandled Log Payload type %T", p) ent.StructPayload = api.LogEntryStructPayload(p)
} }
return ent, nil return ent, nil
} }
@ -242,9 +257,11 @@ var ErrOverflow = errors.New("logging: log entry overflowed buffer limits")
// Log queues an entry to be sent to the logging service, subject to the // Log queues an entry to be sent to the logging service, subject to the
// Client's parameters. By default, the log will be flushed within // Client's parameters. By default, the log will be flushed within
// a second. // one second.
// Log only returns an error if the entry is invalid, or ErrOverflow // Log only returns an error if the entry is invalid or the queue is at
// if the log entry overflows the buffer limit. // capacity. If the queue is at capacity and the entry can't be added,
// Log returns either ErrOverflow when c.Overflow is nil, or the
// value returned by c.Overflow.
func (c *Client) Log(e Entry) error { func (c *Client) Log(e Entry) error {
ent, err := c.apiEntry(e) ent, err := c.apiEntry(e)
if err != nil { if err != nil {
@ -252,14 +269,17 @@ func (c *Client) Log(e Entry) error {
} }
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock()
buffered := len(c.queued) + c.inFlight buffered := len(c.queued) + c.inFlight
if buffered >= c.bufferLimit() { if buffered >= c.bufferLimit() {
c.mu.Unlock()
if fn := c.Overflow; fn != nil { if fn := c.Overflow; fn != nil {
return fn(c, e) return fn(c, e)
} }
return ErrOverflow return ErrOverflow
} }
defer c.mu.Unlock()
c.queued = append(c.queued, ent) c.queued = append(c.queued, ent)
if len(c.queued) >= c.flushAfter() { if len(c.queued) >= c.flushAfter() {
c.scheduleFlushLocked(0) c.scheduleFlushLocked(0)
@ -303,6 +323,9 @@ func (c *Client) scheduleFlushLocked(d time.Duration) {
// timeoutFlush runs in its own goroutine (from time.AfterFunc) and // timeoutFlush runs in its own goroutine (from time.AfterFunc) and
// flushes c.queued. // flushes c.queued.
func (c *Client) timeoutFlush() { func (c *Client) timeoutFlush() {
c.mu.Lock()
c.timerActive = false
c.mu.Unlock()
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
// schedule another try // schedule another try
// TODO: smarter back-off? // TODO: smarter back-off?
@ -323,23 +346,44 @@ func (c *Client) Ping() error {
// Flush flushes any buffered log entries. // Flush flushes any buffered log entries.
func (c *Client) Flush() error { func (c *Client) Flush() error {
var numFlush int
c.mu.Lock()
for { for {
// Easy and final case: nothing to flush. // We're already flushing (or we just started flushing
c.mu.Lock() // ourselves), so wait for it to finish.
if f := c.curFlush; f != nil {
wasEmpty := len(c.queued) == 0
c.mu.Unlock()
<-f.donec // wait for it
numFlush++
// Terminate whenever there's an error, we've
// already flushed twice (one that was already
// in-flight when flush was called, and then
// one we instigated), or the queue was empty
// when we released the locked (meaning this
// in-flight flush removes everything present
// when Flush was called, and we don't need to
// kick off a new flush for things arriving
// afterward)
if f.err != nil || numFlush == 2 || wasEmpty {
return f.err
}
// Otherwise, re-obtain the lock and loop,
// starting over with seeing if a flush is in
// progress, which might've been started by a
// different goroutine before aquiring this
// lock again.
c.mu.Lock()
continue
}
// Terminal case:
if len(c.queued) == 0 { if len(c.queued) == 0 {
c.mu.Unlock() c.mu.Unlock()
return nil return nil
} }
if f := c.curFlush; f != nil {
c.mu.Unlock()
<-f.donec // wait for it
if f.err != nil {
return f.err
}
}
c.startFlushLocked() c.startFlushLocked()
c.mu.Unlock()
} }
} }
@ -358,6 +402,7 @@ func (c *Client) startFlushLocked() {
flush := &flushCall{ flush := &flushCall{
donec: make(chan struct{}), donec: make(chan struct{}),
} }
c.curFlush = flush
go func() { go func() {
defer close(flush.donec) defer close(flush.donec)
_, err := c.logs.Write(c.projID, c.logName, &api.WriteLogEntriesRequest{ _, err := c.logs.Write(c.projID, c.logName, &api.WriteLogEntriesRequest{
@ -365,13 +410,14 @@ func (c *Client) startFlushLocked() {
Entries: logEntries, Entries: logEntries,
}).Do() }).Do()
flush.err = err flush.err = err
log.Printf("Raw write of %d = %v", len(logEntries), err)
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
c.inFlight = 0 c.inFlight = 0
c.curFlush = nil c.curFlush = nil
if err != nil { if err != nil {
c.queued = append(c.queued, logEntries...) c.queued = append(c.queued, logEntries...)
} else if len(c.queued) > 0 {
c.scheduleFlushLocked(c.bufferInterval())
} }
}() }()

308
vendor/google.golang.org/cloud/logging/logging_test.go generated vendored Normal file
View File

@ -0,0 +1,308 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logging
import (
"errors"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
"golang.org/x/net/context"
"google.golang.org/cloud"
)
func TestLogPayload(t *testing.T) {
lt := newLogTest(t)
defer lt.ts.Close()
tests := []struct {
name string
entry Entry
want string
}{
{
name: "string",
entry: Entry{
Time: time.Unix(0, 0),
Payload: "some log string",
},
want: `{"entries":[{"metadata":{"serviceName":"custom.googleapis.com","timestamp":"1970-01-01T00:00:00Z"},"textPayload":"some log string"}]}`,
},
{
name: "[]byte",
entry: Entry{
Time: time.Unix(0, 0),
Payload: []byte("some log bytes"),
},
want: `{"entries":[{"metadata":{"serviceName":"custom.googleapis.com","timestamp":"1970-01-01T00:00:00Z"},"textPayload":"some log bytes"}]}`,
},
{
name: "struct",
entry: Entry{
Time: time.Unix(0, 0),
Payload: struct {
Foo string `json:"foo"`
Bar int `json:"bar,omitempty"`
}{
Foo: "foovalue",
},
},
want: `{"entries":[{"metadata":{"serviceName":"custom.googleapis.com","timestamp":"1970-01-01T00:00:00Z"},"structPayload":{"foo":"foovalue"}}]}`,
},
{
name: "map[string]interface{}",
entry: Entry{
Time: time.Unix(0, 0),
Payload: map[string]interface{}{
"string": "foo",
"int": 42,
},
},
want: `{"entries":[{"metadata":{"serviceName":"custom.googleapis.com","timestamp":"1970-01-01T00:00:00Z"},"structPayload":{"int":42,"string":"foo"}}]}`,
},
{
name: "map[string]interface{}",
entry: Entry{
Time: time.Unix(0, 0),
Payload: customJSONObject{},
},
want: `{"entries":[{"metadata":{"serviceName":"custom.googleapis.com","timestamp":"1970-01-01T00:00:00Z"},"structPayload":{"custom":"json"}}]}`,
},
}
for _, tt := range tests {
lt.startGetRequest()
if err := lt.c.LogSync(tt.entry); err != nil {
t.Errorf("%s: LogSync = %v", tt.name, err)
continue
}
got := lt.getRequest()
if got != tt.want {
t.Errorf("%s: mismatch\n got: %s\nwant: %s\n", tt.name, got, tt.want)
}
}
}
func TestBufferInterval(t *testing.T) {
lt := newLogTest(t)
defer lt.ts.Close()
lt.c.CommonLabels = map[string]string{
"common1": "one",
"common2": "two",
}
lt.c.BufferInterval = 1 * time.Millisecond // immediately, basically.
lt.c.FlushAfter = 100 // but we'll only send 1
lt.startGetRequest()
lt.c.Logger(Debug).Printf("log line 1")
got := lt.getRequest()
want := `{"commonLabels":{"common1":"one","common2":"two"},"entries":[{"metadata":{"serviceName":"custom.googleapis.com","severity":"DEBUG","timestamp":"1970-01-01T00:00:01Z"},"textPayload":"log line 1\n"}]}`
if got != want {
t.Errorf(" got: %s\nwant: %s\n", got, want)
}
}
func TestFlushAfter(t *testing.T) {
lt := newLogTest(t)
defer lt.ts.Close()
lt.c.CommonLabels = map[string]string{
"common1": "one",
"common2": "two",
}
lt.c.BufferInterval = getRequestTimeout * 2
lt.c.FlushAfter = 2
lt.c.Logger(Debug).Printf("log line 1")
lt.startGetRequest()
lt.c.Logger(Debug).Printf("log line 2")
got := lt.getRequest()
want := `{"commonLabels":{"common1":"one","common2":"two"},"entries":[{"metadata":{"serviceName":"custom.googleapis.com","severity":"DEBUG","timestamp":"1970-01-01T00:00:01Z"},"textPayload":"log line 1\n"},{"metadata":{"serviceName":"custom.googleapis.com","severity":"DEBUG","timestamp":"1970-01-01T00:00:02Z"},"textPayload":"log line 2\n"}]}`
if got != want {
t.Errorf(" got: %s\nwant: %s\n", got, want)
}
}
func TestFlush(t *testing.T) {
lt := newLogTest(t)
defer lt.ts.Close()
lt.c.BufferInterval = getRequestTimeout * 2
lt.c.FlushAfter = 100 // but we'll only send 1, requiring a Flush
lt.c.Logger(Debug).Printf("log line 1")
lt.startGetRequest()
if err := lt.c.Flush(); err != nil {
t.Fatal(err)
}
got := lt.getRequest()
want := `{"entries":[{"metadata":{"serviceName":"custom.googleapis.com","severity":"DEBUG","timestamp":"1970-01-01T00:00:01Z"},"textPayload":"log line 1\n"}]}`
if got != want {
t.Errorf(" got: %s\nwant: %s\n", got, want)
}
}
func TestOverflow(t *testing.T) {
lt := newLogTest(t)
defer lt.ts.Close()
lt.c.FlushAfter = 1
lt.c.BufferLimit = 5
lt.c.BufferInterval = 1 * time.Millisecond // immediately, basically.
someErr := errors.New("some specific error value")
lt.c.Overflow = func(c *Client, e Entry) error {
return someErr
}
unblock := make(chan bool, 1)
inHandler := make(chan bool, 1)
lt.handlerc <- http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
inHandler <- true
<-unblock
ioutil.ReadAll(r.Body)
io.WriteString(w, "{}") // WriteLogEntriesResponse
})
lt.c.Logger(Debug).Printf("log line 1")
<-inHandler
lt.c.Logger(Debug).Printf("log line 2")
lt.c.Logger(Debug).Printf("log line 3")
lt.c.Logger(Debug).Printf("log line 4")
lt.c.Logger(Debug).Printf("log line 5")
queued, inFlight := lt.c.stats()
if want := 4; queued != want {
t.Errorf("queued = %d; want %d", queued, want)
}
if want := 1; inFlight != want {
t.Errorf("inFlight = %d; want %d", inFlight, want)
}
if err := lt.c.Log(Entry{Payload: "to overflow"}); err != someErr {
t.Errorf("Log(overflow Log entry) = %v; want someErr", err)
}
lt.startGetRequest()
unblock <- true
got := lt.getRequest()
want := `{"entries":[{"metadata":{"serviceName":"custom.googleapis.com","severity":"DEBUG","timestamp":"1970-01-01T00:00:02Z"},"textPayload":"log line 2\n"},{"metadata":{"serviceName":"custom.googleapis.com","severity":"DEBUG","timestamp":"1970-01-01T00:00:03Z"},"textPayload":"log line 3\n"},{"metadata":{"serviceName":"custom.googleapis.com","severity":"DEBUG","timestamp":"1970-01-01T00:00:04Z"},"textPayload":"log line 4\n"},{"metadata":{"serviceName":"custom.googleapis.com","severity":"DEBUG","timestamp":"1970-01-01T00:00:05Z"},"textPayload":"log line 5\n"}]}`
if got != want {
t.Errorf(" got: %s\nwant: %s\n", got, want)
}
if err := lt.c.Flush(); err != nil {
t.Fatal(err)
}
queued, inFlight = lt.c.stats()
if want := 0; queued != want {
t.Errorf("queued = %d; want %d", queued, want)
}
if want := 0; inFlight != want {
t.Errorf("inFlight = %d; want %d", inFlight, want)
}
}
func (c *Client) stats() (queued, inFlight int) {
c.mu.Lock()
defer c.mu.Unlock()
return len(c.queued), c.inFlight
}
type customJSONObject struct{}
func (customJSONObject) MarshalJSON() ([]byte, error) {
return []byte(`{"custom":"json"}`), nil
}
type logTest struct {
t *testing.T
ts *httptest.Server
c *Client
handlerc chan<- http.Handler
bodyc chan string
}
func newLogTest(t *testing.T) *logTest {
handlerc := make(chan http.Handler, 1)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
select {
case h := <-handlerc:
h.ServeHTTP(w, r)
default:
slurp, _ := ioutil.ReadAll(r.Body)
t.Errorf("Unexpected HTTP request received: %s", slurp)
w.WriteHeader(500)
io.WriteString(w, "unexpected HTTP request")
}
}))
c, err := NewClient(context.Background(), "PROJ-ID", "LOG-NAME", cloud.WithEndpoint(ts.URL))
if err != nil {
t.Fatal(err)
}
var clock struct {
sync.Mutex
now time.Time
}
c.timeNow = func() time.Time {
clock.Lock()
defer clock.Unlock()
if clock.now.IsZero() {
clock.now = time.Unix(0, 0)
}
clock.now = clock.now.Add(1 * time.Second)
return clock.now
}
return &logTest{
t: t,
ts: ts,
c: c,
handlerc: handlerc,
}
}
func (lt *logTest) startGetRequest() {
bodyc := make(chan string, 1)
lt.bodyc = bodyc
lt.handlerc <- http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
slurp, err := ioutil.ReadAll(r.Body)
if err != nil {
bodyc <- "ERROR: " + err.Error()
} else {
bodyc <- string(slurp)
}
io.WriteString(w, "{}") // a complete WriteLogEntriesResponse JSON struct
})
}
const getRequestTimeout = 5 * time.Second
func (lt *logTest) getRequest() string {
if lt.bodyc == nil {
lt.t.Fatalf("getRequest called without previous startGetRequest")
}
select {
case v := <-lt.bodyc:
return strings.TrimSpace(v)
case <-time.After(getRequestTimeout):
lt.t.Fatalf("timeout waiting for request")
panic("unreachable")
}
}