From 9e9d5e0bf24ed87fc44fcf60d64e3de8534bbf83 Mon Sep 17 00:00:00 2001 From: mpl Date: Wed, 27 Apr 2016 11:18:54 -0700 Subject: [PATCH] blobserver/cloudstorage: use cloud/storage Client Using go4.org/cloud/google/gcsutil for the things we can't yet do efficiently with the cloud/storage Client. Fixes issue #652 Change-Id: I8a43a754c4e0762629ce2a21bb95d9991f6f4771 --- .../google/cloudstorage/cloudstorage_test.go | 76 ++++++---- pkg/blobserver/google/cloudstorage/storage.go | 133 +++++++++++++----- pkg/blobserver/storagetest/storagetest.go | 8 ++ 3 files changed, 154 insertions(+), 63 deletions(-) diff --git a/pkg/blobserver/google/cloudstorage/cloudstorage_test.go b/pkg/blobserver/google/cloudstorage/cloudstorage_test.go index 88c47083b..3773be778 100644 --- a/pkg/blobserver/google/cloudstorage/cloudstorage_test.go +++ b/pkg/blobserver/google/cloudstorage/cloudstorage_test.go @@ -19,6 +19,7 @@ package cloudstorage import ( "encoding/json" "flag" + "io" "io/ioutil" "log" "path" @@ -29,12 +30,13 @@ import ( "camlistore.org/pkg/blobserver" "camlistore.org/pkg/blobserver/storagetest" "camlistore.org/pkg/constants/google" - "camlistore.org/pkg/googlestorage" "go4.org/jsonconfig" - "golang.org/x/net/context" "go4.org/oauthutil" + "golang.org/x/net/context" "golang.org/x/oauth2" + "google.golang.org/cloud/compute/metadata" + "google.golang.org/cloud/storage" ) var ( @@ -87,34 +89,42 @@ func testStorage(t *testing.T, bucketDir string) { if *bucket == "" { t.Fatal("bucket not provided in config file or as a flag.") } - if *clientID == "" || *clientSecret == "" { - t.Fatal("client ID and client secret required. Obtain from https://console.developers.google.com/ > Project > APIs & Auth > Credentials. Should be a 'native' or 'Installed application'") + if *clientID == "" { + if !metadata.OnGCE() { + if *clientSecret == "" { + t.Fatal("client ID and client secret required. Obtain from https://console.developers.google.com/ > Project > APIs & Auth > Credentials. Should be a 'native' or 'Installed application'") + } + } else { + *clientID = "auto" + } } if *configFile == "" { config := &oauth2.Config{ - Scopes: []string{googlestorage.Scope}, + Scopes: []string{storage.ScopeReadWrite}, Endpoint: google.Endpoint, ClientID: *clientID, ClientSecret: *clientSecret, RedirectURL: oauthutil.TitleBarRedirectURL, } - token, err := oauth2.ReuseTokenSource(nil, - &oauthutil.TokenSource{ - Config: config, - CacheFile: *tokenCache, - AuthCode: func() string { - if *authCode == "" { - t.Skipf("Re-run using --auth_code= with the value obtained from %s", - config.AuthCodeURL("", oauth2.AccessTypeOffline, oauth2.ApprovalForce)) - return "" - } - return *authCode - }, - }).Token() - if err != nil { - t.Fatalf("could not acquire token: %v", err) + if !metadata.OnGCE() { + token, err := oauth2.ReuseTokenSource(nil, + &oauthutil.TokenSource{ + Config: config, + CacheFile: *tokenCache, + AuthCode: func() string { + if *authCode == "" { + t.Skipf("Re-run using --auth_code= with the value obtained from %s", + config.AuthCodeURL("", oauth2.AccessTypeOffline, oauth2.ApprovalForce)) + return "" + } + return *authCode + }, + }).Token() + if err != nil { + t.Fatalf("could not acquire token: %v", err) + } + refreshToken = token.RefreshToken } - refreshToken = token.RefreshToken } bucketWithDir := path.Join(*bucket, bucketDir) @@ -136,21 +146,24 @@ func testStorage(t *testing.T, bucketDir string) { log.Printf("Warning: this test does many serial operations. Without the go test -short flag, this test will be very slow.") } // Bail if bucket is not empty - objs, err := sto.(*Storage).client.EnumerateObjects(*bucket, "", 1) + ctx := context.Background() + stor := sto.(*Storage) + objs, err := stor.client.Bucket(stor.bucket).List(ctx, nil) if err != nil { t.Fatalf("Error checking if bucket is empty: %v", err) } - if len(objs) != 0 { + if len(objs.Results) != 0 { t.Fatalf("Refusing to run test: bucket %v is not empty", *bucket) } if bucketWithDir != *bucket { // Adding "a", and "c" objects in the bucket to make sure objects out of the // "directory" are not touched and have no influence. for _, key := range []string{"a", "c"} { - err := sto.(*Storage).client.PutObject( - &googlestorage.Object{Bucket: sto.(*Storage).bucket, Key: key}, - strings.NewReader(key)) - if err != nil { + w := stor.client.Bucket(stor.bucket).Object(key).NewWriter(ctx) + if _, err := io.Copy(w, strings.NewReader(key)); err != nil { + t.Fatalf("could not insert object %s in bucket %v: %v", key, sto.(*Storage).bucket, err) + } + if err := w.Close(); err != nil { t.Fatalf("could not insert object %s in bucket %v: %v", key, sto.(*Storage).bucket, err) } } @@ -173,11 +186,14 @@ func testStorage(t *testing.T, bucketDir string) { if bucketWithDir != *bucket { // checking that "a" and "c" at the root were left untouched. for _, key := range []string{"a", "c"} { - if _, _, err := sto.(*Storage).client.GetObject(&googlestorage.Object{Bucket: sto.(*Storage).bucket, - Key: key}); err != nil { + rc, err := stor.client.Bucket(stor.bucket).Object(key).NewReader(ctx) + if err != nil { t.Fatalf("could not find object %s after tests: %v", key, err) } - if err := sto.(*Storage).client.DeleteObject(&googlestorage.Object{Bucket: sto.(*Storage).bucket, Key: key}); err != nil { + if _, err := io.Copy(ioutil.Discard, rc); err != nil { + t.Fatalf("could not find object %s after tests: %v", key, err) + } + if err := stor.client.Bucket(stor.bucket).Object(key).Delete(ctx); err != nil { t.Fatalf("could not remove object %s after tests: %v", key, err) } diff --git a/pkg/blobserver/google/cloudstorage/storage.go b/pkg/blobserver/google/cloudstorage/storage.go index fbffcc38e..7a5bedce4 100644 --- a/pkg/blobserver/google/cloudstorage/storage.go +++ b/pkg/blobserver/google/cloudstorage/storage.go @@ -17,7 +17,7 @@ limitations under the License. // Package cloudstorage registers the "googlecloudstorage" blob storage type, storing blobs // on Google Cloud Storage (not Google Drive). // See https://cloud.google.com/products/cloud-storage -package cloudstorage // import "camlistore.org/pkg/blobserver/google/cloudstorage" +package cloudstorage import ( "bytes" @@ -27,6 +27,8 @@ import ( "io" "io/ioutil" "log" + "net/http" + "os" "path" "strings" "time" @@ -35,14 +37,18 @@ import ( "camlistore.org/pkg/blobserver" "camlistore.org/pkg/blobserver/memory" "camlistore.org/pkg/constants" - "camlistore.org/pkg/constants/google" - "camlistore.org/pkg/googlestorage" - "go4.org/jsonconfig" - "golang.org/x/net/context" + "go4.org/cloud/google/gcsutil" + "go4.org/ctxutil" + "go4.org/jsonconfig" "go4.org/oauthutil" "go4.org/syncutil" + "golang.org/x/net/context" "golang.org/x/oauth2" + "golang.org/x/oauth2/google" + "google.golang.org/cloud" + "google.golang.org/cloud/compute/metadata" + "google.golang.org/cloud/storage" ) type Storage struct { @@ -53,9 +59,13 @@ type Storage struct { // If non empty, it should be a slash separated path with a trailing slash and no starting // slash. dirPrefix string - client *googlestorage.Client + client *storage.Client cache *memory.Storage // or nil for no cache + // an OAuth-authenticated HTTP client, for methods that can't yet use a + // *storage.Client + baseHTTPClient *http.Client + // For blobserver.Generationer: genTime time.Time genRandom string @@ -104,9 +114,22 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Stora bucket: bucket, dirPrefix: dirPrefix, } + + var ( + ctx = context.Background() + ts oauth2.TokenSource + cl *storage.Client + err error + ) if clientID == "auto" { - var err error - gs.client, err = googlestorage.NewServiceClient() + if !metadata.OnGCE() { + return nil, errors.New(`Cannot use "auto" client_id when not running on GCE`) + } + ts, err = google.DefaultTokenSource(ctx, storage.ScopeReadWrite) + if err != nil { + return nil, err + } + cl, err = storage.NewClient(ctx) if err != nil { return nil, err } @@ -117,41 +140,52 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Stora if refreshToken == "" { return nil, errors.New("missing required parameter 'refresh_token'") } - oAuthClient := oauth2.NewClient(oauth2.NoContext, oauthutil.NewRefreshTokenSource(&oauth2.Config{ - Scopes: []string{googlestorage.Scope}, + ts = oauthutil.NewRefreshTokenSource(&oauth2.Config{ + Scopes: []string{storage.ScopeReadWrite}, Endpoint: google.Endpoint, ClientID: clientID, ClientSecret: clientSecret, RedirectURL: oauthutil.TitleBarRedirectURL, - }, refreshToken)) - gs.client = googlestorage.NewClient(oAuthClient) + }, refreshToken) + cl, err = storage.NewClient(ctx, cloud.WithTokenSource(ts)) + if err != nil { + return nil, err + } } + gs.baseHTTPClient = oauth2.NewClient(ctx, ts) + gs.client = cl + if cacheSize != 0 { gs.cache = memory.NewCache(cacheSize) } - bi, err := gs.client.BucketInfo(bucket) + ba, err := gs.client.Bucket(gs.bucket).Attrs(ctx) if err != nil { - return nil, fmt.Errorf("error statting bucket %q: %v", bucket, err) + return nil, fmt.Errorf("error statting bucket %q: %v", gs.bucket, err) } hash := sha1.New() - fmt.Fprintf(hash, "%v%v", bi.TimeCreated, bi.Metageneration) + fmt.Fprintf(hash, "%v%v", ba.Created, ba.MetaGeneration) gs.genRandom = fmt.Sprintf("%x", hash.Sum(nil)) - gs.genTime, _ = time.Parse(time.RFC3339, bi.TimeCreated) + gs.genTime = ba.Created return gs, nil } +// TODO(mpl, bradfitz): use a *storage.Client in EnumerateBlobs, instead of hitting the +// XML API, once we have an efficient replacement for the "marker" from the XML API. See +// https://github.com/GoogleCloudPlatform/gcloud-golang/issues/197 + func (s *Storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error { defer close(dest) - objs, err := s.client.EnumerateObjects(s.bucket, s.dirPrefix+after, limit) + ectx := context.WithValue(ctx, ctxutil.HTTPClient, s.baseHTTPClient) + objs, err := gcsutil.EnumerateObjects(ectx, s.bucket, s.dirPrefix+after, limit) if err != nil { log.Printf("gstorage EnumerateObjects: %v", err) return err } for _, obj := range objs { - dir, file := path.Split(obj.Key) + dir, file := path.Split(obj.Name) if dir != s.dirPrefix { continue } @@ -175,12 +209,15 @@ func (s *Storage) ReceiveBlob(br blob.Ref, source io.Reader) (blob.SizedRef, err return blob.SizedRef{}, err } - err = s.client.PutObject( - &googlestorage.Object{Bucket: s.bucket, Key: s.dirPrefix + br.String()}, - ioutil.NopCloser(bytes.NewReader(buf.Bytes()))) - if err != nil { + // TODO(mpl): use context from caller, once one is available (issue 733) + w := s.client.Bucket(s.bucket).Object(s.dirPrefix + br.String()).NewWriter(context.TODO()) + if _, err := io.Copy(w, ioutil.NopCloser(bytes.NewReader(buf.Bytes()))); err != nil { return blob.SizedRef{}, err } + if err := w.Close(); err != nil { + return blob.SizedRef{}, err + } + if s.cache != nil { // NoHash because it's already verified if we read it // without errors on the io.Copy above. @@ -191,6 +228,8 @@ func (s *Storage) ReceiveBlob(br blob.Ref, source io.Reader) (blob.SizedRef, err func (s *Storage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error { // TODO: use cache + // TODO(mpl): use context from caller, once one is available (issue 733) + ctx := context.TODO() var grp syncutil.Group gate := syncutil.NewGate(20) // arbitrary cap for i := range blobs { @@ -198,14 +237,14 @@ func (s *Storage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error { gate.Start() grp.Go(func() error { defer gate.Done() - size, exists, err := s.client.StatObject( - &googlestorage.Object{Bucket: s.bucket, Key: s.dirPrefix + br.String()}) + attrs, err := s.client.Bucket(s.bucket).Object(s.dirPrefix + br.String()).Attrs(ctx) + if err == storage.ErrObjectNotExist { + return nil + } if err != nil { return err } - if !exists { - return nil - } + size := attrs.Size if size > constants.MaxBlobSize { return fmt.Errorf("blob %s stat size too large (%d)", br, size) } @@ -222,21 +261,45 @@ func (s *Storage) Fetch(br blob.Ref) (rc io.ReadCloser, size uint32, err error) return } } - rc, sz, err := s.client.GetObject(&googlestorage.Object{Bucket: s.bucket, Key: s.dirPrefix + br.String()}) - if err != nil && sz > constants.MaxBlobSize { - err = errors.New("object too big") + // TODO(mpl): use context from caller, once one is available (issue 733) + r, err := s.client.Bucket(s.bucket).Object(s.dirPrefix + br.String()).NewReader(context.TODO()) + if r.Size() >= 1<<32 { + return nil, 0, errors.New("object larger than a uint32") } - return rc, uint32(sz), err + size = uint32(r.Size()) + if err != nil { + if err == storage.ErrObjectNotExist { + return nil, size, os.ErrNotExist + } + if size > constants.MaxBlobSize { + return nil, size, errors.New("object too big") + } + } + return r, size, err } func (s *Storage) SubFetch(br blob.Ref, offset, length int64) (rc io.ReadCloser, err error) { - return s.client.GetPartialObject(googlestorage.Object{Bucket: s.bucket, Key: s.dirPrefix + br.String()}, offset, length) + if offset < 0 || length < 0 { + return nil, blob.ErrNegativeSubFetch + } + // TODO(mpl): use context from caller, once one is available (issue 733) + ctx := context.WithValue(context.TODO(), ctxutil.HTTPClient, s.baseHTTPClient) + rc, err = gcsutil.GetPartialObject(ctx, gcsutil.Object{Bucket: s.bucket, Key: s.dirPrefix + br.String()}, offset, length) + if err == gcsutil.ErrInvalidRange { + return nil, blob.ErrOutOfRangeOffsetSubFetch + } + if err == storage.ErrObjectNotExist { + return nil, os.ErrNotExist + } + return rc, err } func (s *Storage) RemoveBlobs(blobs []blob.Ref) error { if s.cache != nil { s.cache.RemoveBlobs(blobs) } + // TODO(mpl): use context from caller, once one is available (issue 733) + ctx := context.TODO() gate := syncutil.NewGate(50) // arbitrary var grp syncutil.Group for i := range blobs { @@ -244,7 +307,11 @@ func (s *Storage) RemoveBlobs(blobs []blob.Ref) error { br := blobs[i] grp.Go(func() error { defer gate.Done() - return s.client.DeleteObject(&googlestorage.Object{Bucket: s.bucket, Key: s.dirPrefix + br.String()}) + err := s.client.Bucket(s.bucket).Object(s.dirPrefix + br.String()).Delete(ctx) + if err == storage.ErrObjectNotExist { + return nil + } + return err }) } return grp.Err() diff --git a/pkg/blobserver/storagetest/storagetest.go b/pkg/blobserver/storagetest/storagetest.go index 1cd1c143a..d74f1e6a7 100644 --- a/pkg/blobserver/storagetest/storagetest.go +++ b/pkg/blobserver/storagetest/storagetest.go @@ -172,15 +172,23 @@ func TestOpt(t *testing.T, opt Opts) { func (r *run) testRemove(blobRefs []blob.Ref) { t, sto := r.t, r.sto + implemented := true t.Logf("Testing Remove") if err := sto.RemoveBlobs(blobRefs); err != nil { if strings.Contains(err.Error(), "not implemented") { + implemented = false t.Logf("RemoveBlobs: %v", err) } else { t.Fatalf("RemoveBlobs: %v", err) } } r.testEnumerate(nil) // verify they're all gone + if len(blobRefs) > 0 && implemented { + t.Logf("Testing double-delete") + if err := sto.RemoveBlobs([]blob.Ref{blobRefs[0]}); err != nil { + t.Fatalf("Double RemoveBlobs: %v", err) + } + } } func (r *run) testSubFetcher() {