pkg/wkfs/gcs: use google.golang.org/cloud/storage

Use the new Client provided by cloud/storage, instead of our own
pkg/googlestorage

Change-Id: Ic7ebf7059894c72a36a6e935a15bcda1d9dd7598
This commit is contained in:
mpl 2015-12-03 16:04:54 +01:00
parent d9a3bbc73a
commit 2715ebd067
2 changed files with 152 additions and 77 deletions

View File

@ -17,45 +17,75 @@ limitations under the License.
// Package gcs registers a Google Cloud Storage filesystem at the
// well-known /gcs/ filesystem path if the current machine is running
// on Google Compute Engine.
//
// It was initially only meant for small files, and as such, it can only
// read files smaller than 1MB for now.
package gcs
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"strings"
"sync"
"time"
"camlistore.org/pkg/googlestorage"
"go4.org/wkfs"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/cloud"
"google.golang.org/cloud/compute/metadata"
"google.golang.org/cloud/storage"
)
// Max size for all files read or written. This filesystem is only
// supposed to be for configuration data only, so this is very
// generous.
// Max size for all files read, because we use a bytes.Reader as our file
// reader, instead of storage.NewReader. This is because we get all wkfs.File
// methods for free by embedding a bytes.Reader. This filesystem was only supposed
// to be for configuration data only, so this is ok for now.
const maxSize = 1 << 20
func init() {
if !metadata.OnGCE() {
return
}
client, err := googlestorage.NewServiceClient()
wkfs.RegisterFS("/gcs/", &gcsFS{client, err})
hc, err := google.DefaultClient(oauth2.NoContext)
if err != nil {
registerBrokenFS(fmt.Errorf("could not get http client for context: %v", err))
return
}
projID, err := metadata.ProjectID()
if projID == "" || err != nil {
registerBrokenFS(fmt.Errorf("could not get GCE project ID: %v", err))
return
}
ctx := cloud.NewContext(projID, hc)
sc, err := storage.NewClient(ctx)
if err != nil {
registerBrokenFS(fmt.Errorf("could not get cloud storage client: %v", err))
return
}
wkfs.RegisterFS("/gcs/", &gcsFS{
ctx: ctx,
sc: sc,
})
}
type gcsFS struct {
client *googlestorage.Client
err error // sticky error
ctx context.Context
sc *storage.Client
err error // sticky error
}
func (fs *gcsFS) parseName(name string) (bucket, key string, err error) {
func registerBrokenFS(err error) {
wkfs.RegisterFS("/gcs/", &gcsFS{
err: err,
})
}
func (fs *gcsFS) parseName(name string) (bucket, fileName string, err error) {
if fs.err != nil {
return "", "", fs.err
}
@ -67,22 +97,28 @@ func (fs *gcsFS) parseName(name string) (bucket, key string, err error) {
return name[:i], name[i+1:], nil
}
// Open opens the named file for reading. It returns an error if the file size
// is larger than 1 << 20.
func (fs *gcsFS) Open(name string) (wkfs.File, error) {
bucket, key, err := fs.parseName(name)
bucket, fileName, err := fs.parseName(name)
if err != nil {
return nil, fs.err
return nil, err
}
rc, size, err := fs.client.GetObject(&googlestorage.Object{
Bucket: bucket,
Key: key,
})
obj := fs.sc.Bucket(bucket).Object(fileName)
attrs, err := obj.Attrs(fs.ctx)
if err != nil {
return nil, err
}
size := attrs.Size
if size > maxSize {
return nil, fmt.Errorf("file %s too large (%d bytes) for /gcs/ filesystem", name, size)
}
rc, err := obj.NewReader(fs.ctx)
if err != nil {
return nil, err
}
defer rc.Close()
if size > maxSize {
return nil, fmt.Errorf("file %s too large (%d bytes) for /gcs/ filesystem", name, size)
}
slurp, err := ioutil.ReadAll(io.LimitReader(rc, size))
if err != nil {
return nil, err
@ -95,30 +131,27 @@ func (fs *gcsFS) Open(name string) (wkfs.File, error) {
func (fs *gcsFS) Stat(name string) (os.FileInfo, error) { return fs.Lstat(name) }
func (fs *gcsFS) Lstat(name string) (os.FileInfo, error) {
bucket, key, err := fs.parseName(name)
bucket, fileName, err := fs.parseName(name)
if err != nil {
return nil, err
}
size, exists, err := fs.client.StatObject(&googlestorage.Object{
Bucket: bucket,
Key: key,
})
if err != nil {
return nil, err
}
if !exists {
attrs, err := fs.sc.Bucket(bucket).Object(fileName).Attrs(fs.ctx)
if err == storage.ErrObjectNotExist {
return nil, os.ErrNotExist
}
if err != nil {
return nil, err
}
return &statInfo{
name: name,
size: size,
name: attrs.Name,
size: attrs.Size,
}, nil
}
func (fs *gcsFS) MkdirAll(path string, perm os.FileMode) error { return nil }
func (fs *gcsFS) OpenFile(name string, flag int, perm os.FileMode) (wkfs.FileWriter, error) {
bucket, key, err := fs.parseName(name)
bucket, fileName, err := fs.parseName(name)
if err != nil {
return nil, err
}
@ -133,50 +166,8 @@ func (fs *gcsFS) OpenFile(name string, flag int, perm os.FileMode) (wkfs.FileWri
return nil, os.ErrExist
}
}
return &fileWriter{
fs: fs,
name: name,
bucket: bucket,
key: key,
flag: flag,
perm: perm,
}, nil
}
type fileWriter struct {
fs *gcsFS
name, bucket, key string
flag int
perm os.FileMode
buf bytes.Buffer
mu sync.Mutex
closed bool
}
func (w *fileWriter) Write(p []byte) (n int, err error) {
if len(p)+w.buf.Len() > maxSize {
return 0, &os.PathError{
Op: "Write",
Path: w.name,
Err: errors.New("file too large"),
}
}
return w.buf.Write(p)
}
func (w *fileWriter) Close() (err error) {
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return nil
}
w.closed = true
return w.fs.client.PutObject(&googlestorage.Object{
Bucket: w.bucket,
Key: w.key,
}, ioutil.NopCloser(bytes.NewReader(w.buf.Bytes())))
// TODO(mpl): consider adding perm to the object's ObjectAttrs.Metadata
return fs.sc.Bucket(bucket).Object(fileName).NewWriter(fs.ctx), nil
}
type statInfo struct {

84
pkg/wkfs/gcs/gcs_test.go Normal file
View File

@ -0,0 +1,84 @@
/*
Copyright 2015 The Camlistore Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gcs
import (
"bytes"
"flag"
"io"
"strings"
"testing"
"go4.org/wkfs"
"golang.org/x/net/context"
"google.golang.org/cloud/compute/metadata"
"google.golang.org/cloud/storage"
)
var flagBucket = flag.String("bucket", "", "Google Cloud Storage bucket where to run the tests. It should be empty.")
func TestWriteRead(t *testing.T) {
if !metadata.OnGCE() {
t.Skipf("Not testing on GCE")
}
if *flagBucket == "" {
t.Skipf("No bucket specified")
}
ctx := context.Background()
cl, err := storage.NewClient(ctx)
list, err := cl.Bucket(*flagBucket).List(ctx, nil)
if err != nil {
t.Fatal(err)
}
if len(list.Results) > 0 {
t.Fatalf("Bucket %v is not empty, aborting test.", *flagBucket)
}
filename := "camli-gcs_test.txt"
defer func() {
if err := cl.Bucket(*flagBucket).Object(filename).Delete(ctx); err != nil {
t.Fatalf("error while cleaning up: %v", err)
}
}()
// Write to camli-gcs_test.txt
gcsPath := "/gcs/" + *flagBucket + "/" + filename
f, err := wkfs.Create(gcsPath)
if err != nil {
t.Fatalf("error creating %v: %v", gcsPath, err)
}
data := "Hello World"
if _, err := io.Copy(f, strings.NewReader(data)); err != nil {
t.Fatalf("error writing to %v: %v", gcsPath, err)
}
if err := f.Close(); err != nil {
t.Fatalf("error closing %v: %v", gcsPath, err)
}
// Read back from camli-gcs_test.txt
g, err := wkfs.Open(gcsPath)
if err != nil {
t.Fatalf("error opening %v: %v", gcsPath, err)
}
defer g.Close()
var buf bytes.Buffer
if _, err := io.Copy(&buf, g); err != nil {
t.Fatalf("error reading %v: %v", gcsPath, err)
}
if buf.String() != data {
t.Fatalf("error with %v contents: got %v, wanted %v", gcsPath, buf.String(), data)
}
}