From 2715ebd067b3678ae285d57ed42c1ea89ad09cae Mon Sep 17 00:00:00 2001 From: mpl Date: Thu, 3 Dec 2015 16:04:54 +0100 Subject: [PATCH] pkg/wkfs/gcs: use google.golang.org/cloud/storage Use the new Client provided by cloud/storage, instead of our own pkg/googlestorage Change-Id: Ic7ebf7059894c72a36a6e935a15bcda1d9dd7598 --- pkg/wkfs/gcs/gcs.go | 145 ++++++++++++++++++--------------------- pkg/wkfs/gcs/gcs_test.go | 84 +++++++++++++++++++++++ 2 files changed, 152 insertions(+), 77 deletions(-) create mode 100644 pkg/wkfs/gcs/gcs_test.go diff --git a/pkg/wkfs/gcs/gcs.go b/pkg/wkfs/gcs/gcs.go index 186d0d099..bf0b9e7d8 100644 --- a/pkg/wkfs/gcs/gcs.go +++ b/pkg/wkfs/gcs/gcs.go @@ -17,45 +17,75 @@ limitations under the License. // Package gcs registers a Google Cloud Storage filesystem at the // well-known /gcs/ filesystem path if the current machine is running // on Google Compute Engine. +// +// It was initially only meant for small files, and as such, it can only +// read files smaller than 1MB for now. package gcs import ( "bytes" - "errors" "fmt" "io" "io/ioutil" "os" "path" "strings" - "sync" "time" - "camlistore.org/pkg/googlestorage" - "go4.org/wkfs" + "golang.org/x/net/context" + "golang.org/x/oauth2" + "golang.org/x/oauth2/google" + "google.golang.org/cloud" "google.golang.org/cloud/compute/metadata" + "google.golang.org/cloud/storage" ) -// Max size for all files read or written. This filesystem is only -// supposed to be for configuration data only, so this is very -// generous. +// Max size for all files read, because we use a bytes.Reader as our file +// reader, instead of storage.NewReader. This is because we get all wkfs.File +// methods for free by embedding a bytes.Reader. This filesystem was only supposed +// to be for configuration data only, so this is ok for now. const maxSize = 1 << 20 func init() { if !metadata.OnGCE() { return } - client, err := googlestorage.NewServiceClient() - wkfs.RegisterFS("/gcs/", &gcsFS{client, err}) + hc, err := google.DefaultClient(oauth2.NoContext) + if err != nil { + registerBrokenFS(fmt.Errorf("could not get http client for context: %v", err)) + return + } + projID, err := metadata.ProjectID() + if projID == "" || err != nil { + registerBrokenFS(fmt.Errorf("could not get GCE project ID: %v", err)) + return + } + ctx := cloud.NewContext(projID, hc) + sc, err := storage.NewClient(ctx) + if err != nil { + registerBrokenFS(fmt.Errorf("could not get cloud storage client: %v", err)) + return + } + wkfs.RegisterFS("/gcs/", &gcsFS{ + ctx: ctx, + sc: sc, + }) } type gcsFS struct { - client *googlestorage.Client - err error // sticky error + ctx context.Context + sc *storage.Client + err error // sticky error } -func (fs *gcsFS) parseName(name string) (bucket, key string, err error) { +func registerBrokenFS(err error) { + wkfs.RegisterFS("/gcs/", &gcsFS{ + err: err, + }) +} + +func (fs *gcsFS) parseName(name string) (bucket, fileName string, err error) { if fs.err != nil { return "", "", fs.err } @@ -67,22 +97,28 @@ func (fs *gcsFS) parseName(name string) (bucket, key string, err error) { return name[:i], name[i+1:], nil } +// Open opens the named file for reading. It returns an error if the file size +// is larger than 1 << 20. func (fs *gcsFS) Open(name string) (wkfs.File, error) { - bucket, key, err := fs.parseName(name) + bucket, fileName, err := fs.parseName(name) if err != nil { - return nil, fs.err + return nil, err } - rc, size, err := fs.client.GetObject(&googlestorage.Object{ - Bucket: bucket, - Key: key, - }) + obj := fs.sc.Bucket(bucket).Object(fileName) + attrs, err := obj.Attrs(fs.ctx) + if err != nil { + return nil, err + } + size := attrs.Size + if size > maxSize { + return nil, fmt.Errorf("file %s too large (%d bytes) for /gcs/ filesystem", name, size) + } + rc, err := obj.NewReader(fs.ctx) if err != nil { return nil, err } defer rc.Close() - if size > maxSize { - return nil, fmt.Errorf("file %s too large (%d bytes) for /gcs/ filesystem", name, size) - } + slurp, err := ioutil.ReadAll(io.LimitReader(rc, size)) if err != nil { return nil, err @@ -95,30 +131,27 @@ func (fs *gcsFS) Open(name string) (wkfs.File, error) { func (fs *gcsFS) Stat(name string) (os.FileInfo, error) { return fs.Lstat(name) } func (fs *gcsFS) Lstat(name string) (os.FileInfo, error) { - bucket, key, err := fs.parseName(name) + bucket, fileName, err := fs.parseName(name) if err != nil { return nil, err } - size, exists, err := fs.client.StatObject(&googlestorage.Object{ - Bucket: bucket, - Key: key, - }) - if err != nil { - return nil, err - } - if !exists { + attrs, err := fs.sc.Bucket(bucket).Object(fileName).Attrs(fs.ctx) + if err == storage.ErrObjectNotExist { return nil, os.ErrNotExist } + if err != nil { + return nil, err + } return &statInfo{ - name: name, - size: size, + name: attrs.Name, + size: attrs.Size, }, nil } func (fs *gcsFS) MkdirAll(path string, perm os.FileMode) error { return nil } func (fs *gcsFS) OpenFile(name string, flag int, perm os.FileMode) (wkfs.FileWriter, error) { - bucket, key, err := fs.parseName(name) + bucket, fileName, err := fs.parseName(name) if err != nil { return nil, err } @@ -133,50 +166,8 @@ func (fs *gcsFS) OpenFile(name string, flag int, perm os.FileMode) (wkfs.FileWri return nil, os.ErrExist } } - return &fileWriter{ - fs: fs, - name: name, - bucket: bucket, - key: key, - flag: flag, - perm: perm, - }, nil -} - -type fileWriter struct { - fs *gcsFS - name, bucket, key string - flag int - perm os.FileMode - - buf bytes.Buffer - - mu sync.Mutex - closed bool -} - -func (w *fileWriter) Write(p []byte) (n int, err error) { - if len(p)+w.buf.Len() > maxSize { - return 0, &os.PathError{ - Op: "Write", - Path: w.name, - Err: errors.New("file too large"), - } - } - return w.buf.Write(p) -} - -func (w *fileWriter) Close() (err error) { - w.mu.Lock() - defer w.mu.Unlock() - if w.closed { - return nil - } - w.closed = true - return w.fs.client.PutObject(&googlestorage.Object{ - Bucket: w.bucket, - Key: w.key, - }, ioutil.NopCloser(bytes.NewReader(w.buf.Bytes()))) + // TODO(mpl): consider adding perm to the object's ObjectAttrs.Metadata + return fs.sc.Bucket(bucket).Object(fileName).NewWriter(fs.ctx), nil } type statInfo struct { diff --git a/pkg/wkfs/gcs/gcs_test.go b/pkg/wkfs/gcs/gcs_test.go new file mode 100644 index 000000000..600554f27 --- /dev/null +++ b/pkg/wkfs/gcs/gcs_test.go @@ -0,0 +1,84 @@ +/* +Copyright 2015 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gcs + +import ( + "bytes" + "flag" + "io" + "strings" + "testing" + + "go4.org/wkfs" + "golang.org/x/net/context" + "google.golang.org/cloud/compute/metadata" + "google.golang.org/cloud/storage" +) + +var flagBucket = flag.String("bucket", "", "Google Cloud Storage bucket where to run the tests. It should be empty.") + +func TestWriteRead(t *testing.T) { + if !metadata.OnGCE() { + t.Skipf("Not testing on GCE") + } + if *flagBucket == "" { + t.Skipf("No bucket specified") + } + ctx := context.Background() + cl, err := storage.NewClient(ctx) + list, err := cl.Bucket(*flagBucket).List(ctx, nil) + if err != nil { + t.Fatal(err) + } + if len(list.Results) > 0 { + t.Fatalf("Bucket %v is not empty, aborting test.", *flagBucket) + } + filename := "camli-gcs_test.txt" + defer func() { + if err := cl.Bucket(*flagBucket).Object(filename).Delete(ctx); err != nil { + t.Fatalf("error while cleaning up: %v", err) + } + }() + + // Write to camli-gcs_test.txt + gcsPath := "/gcs/" + *flagBucket + "/" + filename + f, err := wkfs.Create(gcsPath) + if err != nil { + t.Fatalf("error creating %v: %v", gcsPath, err) + } + data := "Hello World" + if _, err := io.Copy(f, strings.NewReader(data)); err != nil { + t.Fatalf("error writing to %v: %v", gcsPath, err) + } + if err := f.Close(); err != nil { + t.Fatalf("error closing %v: %v", gcsPath, err) + } + + // Read back from camli-gcs_test.txt + g, err := wkfs.Open(gcsPath) + if err != nil { + t.Fatalf("error opening %v: %v", gcsPath, err) + } + defer g.Close() + var buf bytes.Buffer + if _, err := io.Copy(&buf, g); err != nil { + t.Fatalf("error reading %v: %v", gcsPath, err) + } + if buf.String() != data { + t.Fatalf("error with %v contents: got %v, wanted %v", gcsPath, buf.String(), data) + } +}