mirror of https://github.com/perkeep/perkeep.git
Merge "blobserver/cloudstorage: use cloud/storage Client"
This commit is contained in:
commit
5545134529
|
@ -19,6 +19,7 @@ package cloudstorage
|
|||
import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"path"
|
||||
|
@ -29,12 +30,13 @@ import (
|
|||
"camlistore.org/pkg/blobserver"
|
||||
"camlistore.org/pkg/blobserver/storagetest"
|
||||
"camlistore.org/pkg/constants/google"
|
||||
"camlistore.org/pkg/googlestorage"
|
||||
"go4.org/jsonconfig"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"go4.org/oauthutil"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/oauth2"
|
||||
"google.golang.org/cloud/compute/metadata"
|
||||
"google.golang.org/cloud/storage"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -87,34 +89,42 @@ func testStorage(t *testing.T, bucketDir string) {
|
|||
if *bucket == "" {
|
||||
t.Fatal("bucket not provided in config file or as a flag.")
|
||||
}
|
||||
if *clientID == "" || *clientSecret == "" {
|
||||
t.Fatal("client ID and client secret required. Obtain from https://console.developers.google.com/ > Project > APIs & Auth > Credentials. Should be a 'native' or 'Installed application'")
|
||||
if *clientID == "" {
|
||||
if !metadata.OnGCE() {
|
||||
if *clientSecret == "" {
|
||||
t.Fatal("client ID and client secret required. Obtain from https://console.developers.google.com/ > Project > APIs & Auth > Credentials. Should be a 'native' or 'Installed application'")
|
||||
}
|
||||
} else {
|
||||
*clientID = "auto"
|
||||
}
|
||||
}
|
||||
if *configFile == "" {
|
||||
config := &oauth2.Config{
|
||||
Scopes: []string{googlestorage.Scope},
|
||||
Scopes: []string{storage.ScopeReadWrite},
|
||||
Endpoint: google.Endpoint,
|
||||
ClientID: *clientID,
|
||||
ClientSecret: *clientSecret,
|
||||
RedirectURL: oauthutil.TitleBarRedirectURL,
|
||||
}
|
||||
token, err := oauth2.ReuseTokenSource(nil,
|
||||
&oauthutil.TokenSource{
|
||||
Config: config,
|
||||
CacheFile: *tokenCache,
|
||||
AuthCode: func() string {
|
||||
if *authCode == "" {
|
||||
t.Skipf("Re-run using --auth_code= with the value obtained from %s",
|
||||
config.AuthCodeURL("", oauth2.AccessTypeOffline, oauth2.ApprovalForce))
|
||||
return ""
|
||||
}
|
||||
return *authCode
|
||||
},
|
||||
}).Token()
|
||||
if err != nil {
|
||||
t.Fatalf("could not acquire token: %v", err)
|
||||
if !metadata.OnGCE() {
|
||||
token, err := oauth2.ReuseTokenSource(nil,
|
||||
&oauthutil.TokenSource{
|
||||
Config: config,
|
||||
CacheFile: *tokenCache,
|
||||
AuthCode: func() string {
|
||||
if *authCode == "" {
|
||||
t.Skipf("Re-run using --auth_code= with the value obtained from %s",
|
||||
config.AuthCodeURL("", oauth2.AccessTypeOffline, oauth2.ApprovalForce))
|
||||
return ""
|
||||
}
|
||||
return *authCode
|
||||
},
|
||||
}).Token()
|
||||
if err != nil {
|
||||
t.Fatalf("could not acquire token: %v", err)
|
||||
}
|
||||
refreshToken = token.RefreshToken
|
||||
}
|
||||
refreshToken = token.RefreshToken
|
||||
}
|
||||
|
||||
bucketWithDir := path.Join(*bucket, bucketDir)
|
||||
|
@ -136,21 +146,24 @@ func testStorage(t *testing.T, bucketDir string) {
|
|||
log.Printf("Warning: this test does many serial operations. Without the go test -short flag, this test will be very slow.")
|
||||
}
|
||||
// Bail if bucket is not empty
|
||||
objs, err := sto.(*Storage).client.EnumerateObjects(*bucket, "", 1)
|
||||
ctx := context.Background()
|
||||
stor := sto.(*Storage)
|
||||
objs, err := stor.client.Bucket(stor.bucket).List(ctx, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Error checking if bucket is empty: %v", err)
|
||||
}
|
||||
if len(objs) != 0 {
|
||||
if len(objs.Results) != 0 {
|
||||
t.Fatalf("Refusing to run test: bucket %v is not empty", *bucket)
|
||||
}
|
||||
if bucketWithDir != *bucket {
|
||||
// Adding "a", and "c" objects in the bucket to make sure objects out of the
|
||||
// "directory" are not touched and have no influence.
|
||||
for _, key := range []string{"a", "c"} {
|
||||
err := sto.(*Storage).client.PutObject(
|
||||
&googlestorage.Object{Bucket: sto.(*Storage).bucket, Key: key},
|
||||
strings.NewReader(key))
|
||||
if err != nil {
|
||||
w := stor.client.Bucket(stor.bucket).Object(key).NewWriter(ctx)
|
||||
if _, err := io.Copy(w, strings.NewReader(key)); err != nil {
|
||||
t.Fatalf("could not insert object %s in bucket %v: %v", key, sto.(*Storage).bucket, err)
|
||||
}
|
||||
if err := w.Close(); err != nil {
|
||||
t.Fatalf("could not insert object %s in bucket %v: %v", key, sto.(*Storage).bucket, err)
|
||||
}
|
||||
}
|
||||
|
@ -173,11 +186,14 @@ func testStorage(t *testing.T, bucketDir string) {
|
|||
if bucketWithDir != *bucket {
|
||||
// checking that "a" and "c" at the root were left untouched.
|
||||
for _, key := range []string{"a", "c"} {
|
||||
if _, _, err := sto.(*Storage).client.GetObject(&googlestorage.Object{Bucket: sto.(*Storage).bucket,
|
||||
Key: key}); err != nil {
|
||||
rc, err := stor.client.Bucket(stor.bucket).Object(key).NewReader(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("could not find object %s after tests: %v", key, err)
|
||||
}
|
||||
if err := sto.(*Storage).client.DeleteObject(&googlestorage.Object{Bucket: sto.(*Storage).bucket, Key: key}); err != nil {
|
||||
if _, err := io.Copy(ioutil.Discard, rc); err != nil {
|
||||
t.Fatalf("could not find object %s after tests: %v", key, err)
|
||||
}
|
||||
if err := stor.client.Bucket(stor.bucket).Object(key).Delete(ctx); err != nil {
|
||||
t.Fatalf("could not remove object %s after tests: %v", key, err)
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@ limitations under the License.
|
|||
// Package cloudstorage registers the "googlecloudstorage" blob storage type, storing blobs
|
||||
// on Google Cloud Storage (not Google Drive).
|
||||
// See https://cloud.google.com/products/cloud-storage
|
||||
package cloudstorage // import "camlistore.org/pkg/blobserver/google/cloudstorage"
|
||||
package cloudstorage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
@ -27,6 +27,8 @@ import (
|
|||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -35,14 +37,18 @@ import (
|
|||
"camlistore.org/pkg/blobserver"
|
||||
"camlistore.org/pkg/blobserver/memory"
|
||||
"camlistore.org/pkg/constants"
|
||||
"camlistore.org/pkg/constants/google"
|
||||
"camlistore.org/pkg/googlestorage"
|
||||
"go4.org/jsonconfig"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"go4.org/cloud/google/gcsutil"
|
||||
"go4.org/ctxutil"
|
||||
"go4.org/jsonconfig"
|
||||
"go4.org/oauthutil"
|
||||
"go4.org/syncutil"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/oauth2"
|
||||
"golang.org/x/oauth2/google"
|
||||
"google.golang.org/cloud"
|
||||
"google.golang.org/cloud/compute/metadata"
|
||||
"google.golang.org/cloud/storage"
|
||||
)
|
||||
|
||||
type Storage struct {
|
||||
|
@ -53,9 +59,13 @@ type Storage struct {
|
|||
// If non empty, it should be a slash separated path with a trailing slash and no starting
|
||||
// slash.
|
||||
dirPrefix string
|
||||
client *googlestorage.Client
|
||||
client *storage.Client
|
||||
cache *memory.Storage // or nil for no cache
|
||||
|
||||
// an OAuth-authenticated HTTP client, for methods that can't yet use a
|
||||
// *storage.Client
|
||||
baseHTTPClient *http.Client
|
||||
|
||||
// For blobserver.Generationer:
|
||||
genTime time.Time
|
||||
genRandom string
|
||||
|
@ -104,9 +114,22 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Stora
|
|||
bucket: bucket,
|
||||
dirPrefix: dirPrefix,
|
||||
}
|
||||
|
||||
var (
|
||||
ctx = context.Background()
|
||||
ts oauth2.TokenSource
|
||||
cl *storage.Client
|
||||
err error
|
||||
)
|
||||
if clientID == "auto" {
|
||||
var err error
|
||||
gs.client, err = googlestorage.NewServiceClient()
|
||||
if !metadata.OnGCE() {
|
||||
return nil, errors.New(`Cannot use "auto" client_id when not running on GCE`)
|
||||
}
|
||||
ts, err = google.DefaultTokenSource(ctx, storage.ScopeReadWrite)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cl, err = storage.NewClient(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -117,41 +140,52 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Stora
|
|||
if refreshToken == "" {
|
||||
return nil, errors.New("missing required parameter 'refresh_token'")
|
||||
}
|
||||
oAuthClient := oauth2.NewClient(oauth2.NoContext, oauthutil.NewRefreshTokenSource(&oauth2.Config{
|
||||
Scopes: []string{googlestorage.Scope},
|
||||
ts = oauthutil.NewRefreshTokenSource(&oauth2.Config{
|
||||
Scopes: []string{storage.ScopeReadWrite},
|
||||
Endpoint: google.Endpoint,
|
||||
ClientID: clientID,
|
||||
ClientSecret: clientSecret,
|
||||
RedirectURL: oauthutil.TitleBarRedirectURL,
|
||||
}, refreshToken))
|
||||
gs.client = googlestorage.NewClient(oAuthClient)
|
||||
}, refreshToken)
|
||||
cl, err = storage.NewClient(ctx, cloud.WithTokenSource(ts))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
gs.baseHTTPClient = oauth2.NewClient(ctx, ts)
|
||||
gs.client = cl
|
||||
|
||||
if cacheSize != 0 {
|
||||
gs.cache = memory.NewCache(cacheSize)
|
||||
}
|
||||
|
||||
bi, err := gs.client.BucketInfo(bucket)
|
||||
ba, err := gs.client.Bucket(gs.bucket).Attrs(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error statting bucket %q: %v", bucket, err)
|
||||
return nil, fmt.Errorf("error statting bucket %q: %v", gs.bucket, err)
|
||||
}
|
||||
hash := sha1.New()
|
||||
fmt.Fprintf(hash, "%v%v", bi.TimeCreated, bi.Metageneration)
|
||||
fmt.Fprintf(hash, "%v%v", ba.Created, ba.MetaGeneration)
|
||||
gs.genRandom = fmt.Sprintf("%x", hash.Sum(nil))
|
||||
gs.genTime, _ = time.Parse(time.RFC3339, bi.TimeCreated)
|
||||
gs.genTime = ba.Created
|
||||
|
||||
return gs, nil
|
||||
}
|
||||
|
||||
// TODO(mpl, bradfitz): use a *storage.Client in EnumerateBlobs, instead of hitting the
|
||||
// XML API, once we have an efficient replacement for the "marker" from the XML API. See
|
||||
// https://github.com/GoogleCloudPlatform/gcloud-golang/issues/197
|
||||
|
||||
func (s *Storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
|
||||
defer close(dest)
|
||||
objs, err := s.client.EnumerateObjects(s.bucket, s.dirPrefix+after, limit)
|
||||
ectx := context.WithValue(ctx, ctxutil.HTTPClient, s.baseHTTPClient)
|
||||
objs, err := gcsutil.EnumerateObjects(ectx, s.bucket, s.dirPrefix+after, limit)
|
||||
if err != nil {
|
||||
log.Printf("gstorage EnumerateObjects: %v", err)
|
||||
return err
|
||||
}
|
||||
for _, obj := range objs {
|
||||
dir, file := path.Split(obj.Key)
|
||||
dir, file := path.Split(obj.Name)
|
||||
if dir != s.dirPrefix {
|
||||
continue
|
||||
}
|
||||
|
@ -175,12 +209,15 @@ func (s *Storage) ReceiveBlob(br blob.Ref, source io.Reader) (blob.SizedRef, err
|
|||
return blob.SizedRef{}, err
|
||||
}
|
||||
|
||||
err = s.client.PutObject(
|
||||
&googlestorage.Object{Bucket: s.bucket, Key: s.dirPrefix + br.String()},
|
||||
ioutil.NopCloser(bytes.NewReader(buf.Bytes())))
|
||||
if err != nil {
|
||||
// TODO(mpl): use context from caller, once one is available (issue 733)
|
||||
w := s.client.Bucket(s.bucket).Object(s.dirPrefix + br.String()).NewWriter(context.TODO())
|
||||
if _, err := io.Copy(w, ioutil.NopCloser(bytes.NewReader(buf.Bytes()))); err != nil {
|
||||
return blob.SizedRef{}, err
|
||||
}
|
||||
if err := w.Close(); err != nil {
|
||||
return blob.SizedRef{}, err
|
||||
}
|
||||
|
||||
if s.cache != nil {
|
||||
// NoHash because it's already verified if we read it
|
||||
// without errors on the io.Copy above.
|
||||
|
@ -191,6 +228,8 @@ func (s *Storage) ReceiveBlob(br blob.Ref, source io.Reader) (blob.SizedRef, err
|
|||
|
||||
func (s *Storage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error {
|
||||
// TODO: use cache
|
||||
// TODO(mpl): use context from caller, once one is available (issue 733)
|
||||
ctx := context.TODO()
|
||||
var grp syncutil.Group
|
||||
gate := syncutil.NewGate(20) // arbitrary cap
|
||||
for i := range blobs {
|
||||
|
@ -198,14 +237,14 @@ func (s *Storage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error {
|
|||
gate.Start()
|
||||
grp.Go(func() error {
|
||||
defer gate.Done()
|
||||
size, exists, err := s.client.StatObject(
|
||||
&googlestorage.Object{Bucket: s.bucket, Key: s.dirPrefix + br.String()})
|
||||
attrs, err := s.client.Bucket(s.bucket).Object(s.dirPrefix + br.String()).Attrs(ctx)
|
||||
if err == storage.ErrObjectNotExist {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
size := attrs.Size
|
||||
if size > constants.MaxBlobSize {
|
||||
return fmt.Errorf("blob %s stat size too large (%d)", br, size)
|
||||
}
|
||||
|
@ -222,21 +261,45 @@ func (s *Storage) Fetch(br blob.Ref) (rc io.ReadCloser, size uint32, err error)
|
|||
return
|
||||
}
|
||||
}
|
||||
rc, sz, err := s.client.GetObject(&googlestorage.Object{Bucket: s.bucket, Key: s.dirPrefix + br.String()})
|
||||
if err != nil && sz > constants.MaxBlobSize {
|
||||
err = errors.New("object too big")
|
||||
// TODO(mpl): use context from caller, once one is available (issue 733)
|
||||
r, err := s.client.Bucket(s.bucket).Object(s.dirPrefix + br.String()).NewReader(context.TODO())
|
||||
if r.Size() >= 1<<32 {
|
||||
return nil, 0, errors.New("object larger than a uint32")
|
||||
}
|
||||
return rc, uint32(sz), err
|
||||
size = uint32(r.Size())
|
||||
if err != nil {
|
||||
if err == storage.ErrObjectNotExist {
|
||||
return nil, size, os.ErrNotExist
|
||||
}
|
||||
if size > constants.MaxBlobSize {
|
||||
return nil, size, errors.New("object too big")
|
||||
}
|
||||
}
|
||||
return r, size, err
|
||||
}
|
||||
|
||||
func (s *Storage) SubFetch(br blob.Ref, offset, length int64) (rc io.ReadCloser, err error) {
|
||||
return s.client.GetPartialObject(googlestorage.Object{Bucket: s.bucket, Key: s.dirPrefix + br.String()}, offset, length)
|
||||
if offset < 0 || length < 0 {
|
||||
return nil, blob.ErrNegativeSubFetch
|
||||
}
|
||||
// TODO(mpl): use context from caller, once one is available (issue 733)
|
||||
ctx := context.WithValue(context.TODO(), ctxutil.HTTPClient, s.baseHTTPClient)
|
||||
rc, err = gcsutil.GetPartialObject(ctx, gcsutil.Object{Bucket: s.bucket, Key: s.dirPrefix + br.String()}, offset, length)
|
||||
if err == gcsutil.ErrInvalidRange {
|
||||
return nil, blob.ErrOutOfRangeOffsetSubFetch
|
||||
}
|
||||
if err == storage.ErrObjectNotExist {
|
||||
return nil, os.ErrNotExist
|
||||
}
|
||||
return rc, err
|
||||
}
|
||||
|
||||
func (s *Storage) RemoveBlobs(blobs []blob.Ref) error {
|
||||
if s.cache != nil {
|
||||
s.cache.RemoveBlobs(blobs)
|
||||
}
|
||||
// TODO(mpl): use context from caller, once one is available (issue 733)
|
||||
ctx := context.TODO()
|
||||
gate := syncutil.NewGate(50) // arbitrary
|
||||
var grp syncutil.Group
|
||||
for i := range blobs {
|
||||
|
@ -244,7 +307,11 @@ func (s *Storage) RemoveBlobs(blobs []blob.Ref) error {
|
|||
br := blobs[i]
|
||||
grp.Go(func() error {
|
||||
defer gate.Done()
|
||||
return s.client.DeleteObject(&googlestorage.Object{Bucket: s.bucket, Key: s.dirPrefix + br.String()})
|
||||
err := s.client.Bucket(s.bucket).Object(s.dirPrefix + br.String()).Delete(ctx)
|
||||
if err == storage.ErrObjectNotExist {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
return grp.Err()
|
||||
|
|
|
@ -172,15 +172,23 @@ func TestOpt(t *testing.T, opt Opts) {
|
|||
|
||||
func (r *run) testRemove(blobRefs []blob.Ref) {
|
||||
t, sto := r.t, r.sto
|
||||
implemented := true
|
||||
t.Logf("Testing Remove")
|
||||
if err := sto.RemoveBlobs(blobRefs); err != nil {
|
||||
if strings.Contains(err.Error(), "not implemented") {
|
||||
implemented = false
|
||||
t.Logf("RemoveBlobs: %v", err)
|
||||
} else {
|
||||
t.Fatalf("RemoveBlobs: %v", err)
|
||||
}
|
||||
}
|
||||
r.testEnumerate(nil) // verify they're all gone
|
||||
if len(blobRefs) > 0 && implemented {
|
||||
t.Logf("Testing double-delete")
|
||||
if err := sto.RemoveBlobs([]blob.Ref{blobRefs[0]}); err != nil {
|
||||
t.Fatalf("Double RemoveBlobs: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *run) testSubFetcher() {
|
||||
|
|
Loading…
Reference in New Issue