issue 23: Fix google storage blobserver as sync dest

Change-Id: I33d88dc5028ebb76d4c17d2a00d8f8346114f876
This commit is contained in:
Michael Zuffoletti 2013-07-06 15:29:17 -05:00
parent 574b5feb96
commit c2669a6dfc
15 changed files with 353 additions and 93 deletions

View File

@ -24,7 +24,7 @@ import (
"os"
"strings"
"camlistore.org/pkg/blobserver/google"
"camlistore.org/pkg/googlestorage"
"camlistore.org/pkg/cmdmain"
"camlistore.org/third_party/code.google.com/p/goauth2/oauth"
)
@ -55,7 +55,7 @@ func (c *gsinitCmd) RunCommand(args []string) error {
if clientId, clientSecret, err = getClientInfo(); err != nil {
return err
}
transport := google.MakeOauthTransport(clientId, clientSecret, "")
transport := googlestorage.MakeOauthTransport(clientId, clientSecret, "")
var accessCode string
if accessCode, err = getAccessCode(transport.Config); err != nil {
@ -90,7 +90,7 @@ func prompt(promptText string) (string, error) {
// Provide the authorization link, then prompt for the resulting access code
func getAccessCode(config *oauth.Config) (string, error) {
fmt.Printf("In order to obtain a storage access code, you will need to naviage to the following URL:\n\n")
fmt.Printf("In order to obtain a storage access code, you will need to navigate to the following URL:\n\n")
fmt.Printf("https://accounts.google.com/o/oauth2/auth?client_id=%s&redirect_uri=urn:ietf:wg:oauth:2.0:oob&scope=%s&response_type=code\n\n",
config.ClientId, config.Scope)
return prompt("Please enter the access code provided by that page:")

View File

@ -19,8 +19,10 @@ limitations under the License.
package google
import (
"errors"
"bytes"
"io"
"io/ioutil"
"log"
"time"
"camlistore.org/pkg/blobref"
@ -35,16 +37,20 @@ type Storage struct {
client *googlestorage.Client
}
var _ blobserver.MaxEnumerateConfig = (*Storage)(nil)
func (gs *Storage) MaxEnumerate() int { return 1000 }
func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
auth := config.RequiredObject("auth")
gs := &Storage{
&blobserver.SimpleBlobHub{},
config.RequiredString("bucket"),
googlestorage.NewClient(MakeOauthTransport(
googlestorage.NewClient(googlestorage.MakeOauthTransport(
auth.RequiredString("client_id"),
auth.RequiredString("client_secret"),
auth.RequiredString("refresh_token"),
)),
auth.RequiredString("refresh_token"))),
}
if err := config.Validate(); err != nil {
return nil, err
@ -56,30 +62,83 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Stora
}
func (gs *Storage) EnumerateBlobs(dest chan<- blobref.SizedBlobRef, after string, limit int, wait time.Duration) error {
// TODO: Implement stub
defer close(dest)
objs, err := gs.client.EnumerateObjects(gs.bucket, after, limit)
if err != nil {
log.Printf("gstorage EnumerateObjects: %v", err)
return err
}
for _, obj := range objs {
br := blobref.Parse(obj.Key)
if br == nil {
continue
}
dest <- blobref.SizedBlobRef{BlobRef: br, Size: obj.Size}
}
return nil
}
func (gs *Storage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader) (blobref.SizedBlobRef, error) {
// TODO: Implement stub
return blobref.SizedBlobRef{}, nil
buf := &bytes.Buffer{}
hash := blob.Hash()
size, err := io.Copy(io.MultiWriter(hash, buf), source)
if err != nil {
return blobref.SizedBlobRef{}, err
}
if !blob.HashMatches(hash) {
return blobref.SizedBlobRef{}, blobserver.ErrCorruptBlob
}
for tries, shouldRetry := 0, true; tries < 2 && shouldRetry; tries++ {
shouldRetry, err = gs.client.PutObject(
&googlestorage.Object{Bucket: gs.bucket, Key: blob.String()},
ioutil.NopCloser(bytes.NewReader(buf.Bytes())))
}
if err != nil {
return blobref.SizedBlobRef{}, err
}
return blobref.SizedBlobRef{BlobRef: blob, Size: size}, nil
}
func (gs *Storage) StatBlobs(dest chan<- blobref.SizedBlobRef, blobs []*blobref.BlobRef, wait time.Duration) error {
// TODO: Implement stub
return errors.New("TODO: implement StatBlobs")
var reterr error
// TODO: do a batch API call, or at least keep N of these in flight at a time. No need to do them all serially.
for _, br := range blobs {
size, _, err := gs.client.StatObject(
&googlestorage.Object{Bucket: gs.bucket, Key: br.String()})
if err == nil {
dest <- blobref.SizedBlobRef{BlobRef: br, Size: size}
} else {
reterr = err
}
}
return reterr
}
func (gs *Storage) FetchStreaming(blob *blobref.BlobRef) (io.ReadCloser, int64, error) {
// TODO: Implement stub
return nil, 0, errors.New("TODO: implement FetchStreaming")
func (gs *Storage) FetchStreaming(blob *blobref.BlobRef) (file io.ReadCloser, size int64, err error) {
file, size, err = gs.client.GetObject(&googlestorage.Object{Bucket: gs.bucket, Key: blob.String()})
return
}
func (gs *Storage) RemoveBlobs(blobs []*blobref.BlobRef) error {
// TODO: Implement stub
return errors.New("TODO: implement RemoveBlobs")
var reterr error
// TODO: do a batch API call, or at least keep N of these in flight at a time. No need to do them all serially.
for _, br := range blobs {
err := gs.client.DeleteObject(&googlestorage.Object{Bucket: gs.bucket, Key: br.String()})
if err != nil {
reterr = err
}
}
return reterr
}
func (gs *Storage) GetBlobHub() blobserver.BlobHub {
return gs.hub
}
func init() {
blobserver.RegisterStorageConstructor("google", blobserver.StorageConstructor(newFromConfig))
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package s3
import (
"log"
"time"
"camlistore.org/pkg/blobref"
@ -27,7 +26,6 @@ func (sto *s3Storage) StatBlobs(dest chan<- blobref.SizedBlobRef, blobs []*blobr
// TODO: do n stats in parallel
for _, br := range blobs {
size, err := sto.s3Client.Stat(br.String(), sto.bucket)
log.Printf("stat of %s: %d, %v", br.String(), size, err)
if err == nil {
dest <- blobref.SizedBlobRef{BlobRef: br, Size: size}
} else {

View File

@ -2,9 +2,18 @@ Implements the Storage interface for Google Storage.
A GoogleStorage instance stores blobs in a single Google Storage bucket, with
each blob keyed by its blobref.
Testing:
Server configuration
=====================
gsapi_test.go contains integration tests which run against Google Storage.
High-level server config is formatted like:
"google": "clientId:clientSecret:refreshToken:bucketName"
Testing
========
googlestorage_test.go contains integration tests which run against Google Storage.
In order to run these tests properly, you will need to:
1. Set up google storage. See:
@ -14,17 +23,19 @@ In order to run these tests properly, you will need to:
that all these files begin with 'test-': such files will be ignored when
the bucket is used as blobserver storage.
3. Create the config file '~/.camli/gstestconfig.json'. The file should look
something like this:
3. Create the config file '~/.config/camlistore/gstestconfig.json'. The
file should look something like this:
{
"gsconf": {
"auth": {
"client_id": "your client id",
"client_secret": "your client secret",
"refresh_token": "a refresh token"
},
"bucket": "bucket.example.com"
"bucket": "bucketName"
}
}
You can use camgsinit to help obtain the auth config object.
You can use 'camtool gsinit' to help obtain the auth config object.

View File

@ -1,5 +1,5 @@
/*
Copyright 2011 Google Inc.
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package google
package googlestorage
import (
"time"

View File

@ -22,7 +22,6 @@ import (
"encoding/xml"
"fmt"
"io"
"log"
"net/http"
"net/url"
"strconv"
@ -32,7 +31,7 @@ import (
)
const (
gsAccessURL = "https://commondatastorage.googleapis.com"
gsAccessURL = "https://storage.googleapis.com"
)
type Client struct {
@ -63,11 +62,11 @@ func (sgso SizedObject) String() string {
}
// A close relative to http.Client.Do(), helping with token refresh logic.
// If canResend is true and the initial request's response is an auth error
// (401 or 403), oauth credentials will be refreshed and the request sent
// If canResend is true and the initial request's response is an auth error
// (401 or 403), oauth credentials will be refreshed and the request sent
// again. This should only be done for requests with empty bodies, since the
// Body will be consumed on the first attempt if it exists.
// If canResend is false, and req would have been resent if canResend were
// If canResend is false, and req would have been resent if canResend were
// true, then shouldRetry will be true.
// One of resp or err will always be nil.
func (gsa *Client) doRequest(req *http.Request, canResend bool) (resp *http.Response, err error, shouldRetry bool) {
@ -109,7 +108,6 @@ func (gsa *Client) simpleRequest(method, url_ string) (resp *http.Response, err
// Bucket and Key fields are trusted to be valid.
// Returns (object reader, object size, err). Reader must be closed.
func (gsa *Client) GetObject(obj *Object) (io.ReadCloser, int64, error) {
log.Printf("Fetching object from Google Storage: %s/%s\n", obj.Bucket, obj.Key)
resp, err := gsa.simpleRequest("GET", gsAccessURL+"/"+obj.Bucket+"/"+obj.Key)
if err != nil {
@ -127,8 +125,6 @@ func (gsa *Client) GetObject(obj *Object) (io.ReadCloser, int64, error) {
// Bucket and Key fields are trusted to be valid.
// err signals io / authz errors, a nonexistant file is not an error.
func (gsa *Client) StatObject(obj *Object) (size int64, exists bool, err error) {
log.Printf("Statting object in Google Storage: %s/%s\n", obj.Bucket, obj.Key)
resp, err := gsa.simpleRequest("HEAD", gsAccessURL+"/"+obj.Bucket+"/"+obj.Key)
if err != nil {
return
@ -154,8 +150,6 @@ func (gsa *Client) StatObject(obj *Object) (size int64, exists bool, err error)
// credentials have been refreshed and another attempt is likely to succeed.
// In this case, content will have been consumed.
func (gsa *Client) PutObject(obj *Object, content io.ReadCloser) (shouldRetry bool, err error) {
log.Printf("Putting object in Google Storage: %s/%s\n", obj.Bucket, obj.Key)
objURL := gsAccessURL + "/" + obj.Bucket + "/" + obj.Key
var req *http.Request
if req, err = http.NewRequest("PUT", objURL, content); err != nil {
@ -178,8 +172,6 @@ func (gsa *Client) PutObject(obj *Object, content io.ReadCloser) (shouldRetry bo
// Removes a GS object.
// Bucket and Key values are trusted to be valid.
func (gsa *Client) DeleteObject(obj *Object) (err error) {
log.Printf("Deleting %v/%v\n", obj.Bucket, obj.Key)
// bucketURL := gsAccessURL + "/" + obj.Bucket + "/" + obj.Key
resp, err := gsa.simpleRequest("DELETE", gsAccessURL+"/"+obj.Bucket+"/"+obj.Key)
if err != nil {
@ -199,9 +191,7 @@ type gsListResult struct {
// List the objects in a GS bucket.
// If after is nonempty, listing will begin with lexically greater object names
// If limit is nonzero, the length of the list will be limited to that number.
func (gsa *Client) EnumerateObjects(bucket, after string, limit uint) ([]SizedObject, error) {
log.Printf("Fetching from %v: after '%v', limit %v\n", bucket, after, limit)
func (gsa *Client) EnumerateObjects(bucket, after string, limit int) ([]SizedObject, error) {
// Build url, with query params
params := make([]string, 0, 2)
if after != "" {

View File

@ -23,12 +23,13 @@ import (
"bytes"
"fmt"
"os"
"path/filepath"
"testing"
"time"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/osutil"
"camlistore.org/pkg/test"
"camlistore.org/third_party/code.google.com/p/goauth2/oauth"
)
const testObjectContent = "Google Storage Test\n"
@ -44,11 +45,13 @@ func (b *BufferCloser) Close() error {
// Reads google storage config and creates a Client. Exits on error.
func doConfig(t *testing.T) (gsa *Client, bucket string) {
if _, err := os.Stat("gstestconfig.json"); os.IsNotExist(err) {
gsConfigPath := filepath.Join(osutil.CamliConfigDir(), "gstestconfig.json")
if _, err := os.Stat(gsConfigPath); os.IsNotExist(err) {
test.DependencyErrorOrSkip(t)
t.Fatalf("Missing config file: %v", err)
}
cf, err := jsonconfig.ReadFile("testconfig.json")
cf, err := jsonconfig.ReadFile(gsConfigPath)
if err != nil {
t.Fatalf("Failed to read config: %v", err)
}
@ -65,21 +68,9 @@ func doConfig(t *testing.T) (gsa *Client, bucket string) {
t.Fatalf("Invalid config: %v", err)
}
gsa = NewClient(&oauth.Transport{
&oauth.Config{
ClientId: auth.RequiredString("client_id"),
ClientSecret: auth.RequiredString("client_secret"),
Scope: "https://www.googleapis.com/auth/devstorage.read_write",
AuthURL: "https://accounts.google.com/o/oauth2/auth",
TokenURL: "https://accounts.google.com/o/oauth2/token",
RedirectURL: "urn:ietf:wg:oauth:2.0:oob",
},
&oauth.Token{
AccessToken: "",
RefreshToken: auth.RequiredString("refresh_token"),
},
nil,
})
gsa = NewClient(MakeOauthTransport(auth.RequiredString("client_id"),
auth.RequiredString("client_secret"),
auth.RequiredString("refresh_token")))
if err := auth.Validate(); err != nil {
t.Fatalf("Invalid config: %v", err)
@ -111,26 +102,28 @@ func TestStatObject(t *testing.T) {
// Stat a nonexistant file
size, exists, err := gs.StatObject(&Object{bucket, "test-shouldntexist"})
if err != nil {
t.Errorf("Stat failed: %v\n", err)
}
if exists {
t.Errorf("Test object exists!")
}
if size != 0 {
t.Errorf("Expected size to be 0, found %v\n", size)
t.Fatalf("Stat failed: %v\n", err)
} else {
if exists {
t.Errorf("Test object exists!")
}
if size != 0 {
t.Errorf("Expected size to be 0, found %v\n", size)
}
}
// Try statting an object which does exist
size, exists, err = gs.StatObject(&Object{bucket, "test-stat"})
if err != nil {
t.Errorf("Stat failed: %v\n", err)
}
if !exists {
t.Errorf("Test object doesn't exist!")
}
if size != int64(len(testObjectContent)) {
t.Errorf("Test object size is wrong: \nexpected: %v\nfound: %v\n",
len(testObjectContent), size)
t.Fatalf("Stat failed: %v\n", err)
} else {
if !exists {
t.Errorf("Test object doesn't exist!")
}
if size != int64(len(testObjectContent)) {
t.Errorf("Test object size is wrong: \nexpected: %v\nfound: %v\n",
len(testObjectContent), size)
}
}
}

View File

@ -1 +0,0 @@
{"gsconf": ["_fileobj", "gstestconfig.json"]}

View File

@ -248,6 +248,61 @@ func addS3Config(prefixes jsonconfig.Obj, s3 string) error {
return nil
}
func addGoogleConfig(prefixes jsonconfig.Obj, highCfg string) error {
f := strings.SplitN(highCfg, ":", 4)
if len(f) != 4 {
return errors.New(`genconfig: expected "google" field to be of form "client_id:client_secret:refresh_token:bucket"`)
}
clientId, secret, refreshToken, bucket := f[0], f[1], f[2], f[3]
isPrimary := false
if _, ok := prefixes["/bs/"]; !ok {
isPrimary = true
}
gsPrefix := ""
if isPrimary {
gsPrefix = "/bs/"
} else {
gsPrefix = "/sto-google/"
}
prefixes[gsPrefix] = map[string]interface{}{
"handler": "storage-google",
"handlerArgs": map[string]interface{}{
"bucket": bucket,
"auth": map[string]interface{}{
"client_id": clientId,
"client_secret": secret,
"refresh_token": refreshToken,
// If high-level config is for the common user then fullSyncOnStart = true
// Then the default just works.
//"fullSyncOnStart": true,
//"blockingFullSyncOnStart": false
},
},
}
if isPrimary {
// TODO: cacheBucket like s3CacheBucket?
prefixes["/cache/"] = map[string]interface{}{
"handler": "storage-filesystem",
"handlerArgs": map[string]interface{}{
"path": filepath.Join(tempDir(), "camli-cache"),
},
}
} else {
prefixes["/sync-to-google/"] = map[string]interface{}{
"handler": "sync",
"handlerArgs": map[string]interface{}{
"from": "/bs/",
"to": gsPrefix,
},
}
}
return nil
}
func genLowLevelPrefixes(params *configPrefixesParams, ownerName string) (m jsonconfig.Obj) {
m = make(jsonconfig.Obj)
@ -372,6 +427,7 @@ func genLowLevelConfig(conf *Config) (lowLevelConf *Config, err error) {
// Blob storage options
blobPath = conf.OptionalString("blobPath", "")
s3 = conf.OptionalString("s3", "") // "access_key_id:secret_access_key:bucket"
gstorage = conf.OptionalString("google", "") // "clientId:clientSecret:refreshToken:bucket"
shareHandler = conf.OptionalBool("shareHandler", true) // enable the share handler
// Index options
@ -463,8 +519,8 @@ func genLowLevelConfig(conf *Config) (lowLevelConf *Config, err error) {
}
nolocaldisk := blobPath == ""
if nolocaldisk && s3 == "" {
return nil, errors.New("You need at least one of blobPath (for localdisk) or s3 configured for a blobserver.")
if nolocaldisk && s3 == "" && gstorage == "" {
return nil, errors.New("You need at least one of blobPath (for localdisk) or s3 or google configured for a blobserver.")
}
prefixesParams := &configPrefixesParams{
@ -523,6 +579,11 @@ func genLowLevelConfig(conf *Config) (lowLevelConf *Config, err error) {
return nil, err
}
}
if gstorage != "" {
if err := addGoogleConfig(prefixes, gstorage); err != nil {
return nil, err
}
}
if indexerPath == "/index-mem/" {
addMemindexConfig(prefixes)
}

View File

@ -22,30 +22,30 @@
"scaledImage": "lrucache"
}
},
"/setup/": {
"/setup/": {
"handler": "setup"
},
"/status/": {
"/status/": {
"handler": "status"
},
"/share/": {
"/share/": {
"handler": "share",
"handlerArgs": {
"blobRoot": "/bs/"
}
},
"/sync/": {
"/sync/": {
"handler": "sync",
"handlerArgs": {
"from": "/bs/",
"to": "/index-mem/"
}
},
"/sighelper/": {
"handler": "jsonsign",
"handlerArgs": {
@ -54,14 +54,14 @@
"publicKeyDest": "/bs-and-index/"
}
},
"/bs-and-index/": {
"handler": "storage-replica",
"handlerArgs": {
"backends": ["/bs/", "/index-mem/"]
}
},
"/bs-and-maybe-also-index/": {
"handler": "storage-cond",
"handlerArgs": {
@ -73,28 +73,28 @@
"read": "/bs/"
}
},
"/bs/": {
"handler": "storage-filesystem",
"handlerArgs": {
"path": "/tmp/blobs"
}
},
"/cache/": {
"handler": "storage-filesystem",
"handlerArgs": {
"path": "/tmp/blobs/cache"
}
},
"/index-mem/": {
"handler": "storage-memory-only-dev-indexer",
"handlerArgs": {
"blobSource": "/bs/"
}
},
"/my-search/": {
"handler": "search",
"handlerArgs": {
@ -112,13 +112,34 @@
}
},
"/sync-to-s3/": {
"/sync-to-s3/": {
"handler": "sync",
"handlerArgs": {
"from": "/bs/",
"to": "/sto-s3/"
}
},
"/sto-google/": {
"handler": "storage-google",
"handlerArgs": {
"auth": {
"client_id": "clientId",
"client_secret": "clientSecret",
"refresh_token": "refreshToken"
},
"bucket": "bucketName"
}
},
"/sync-to-google/": {
"handler": "sync",
"handlerArgs": {
"from": "/bs/",
"to": "/sto-google/"
}
}
}
}

View File

@ -7,8 +7,9 @@
"identitySecretRing": "/path/to/secring",
"memIndex": true,
"s3": "key:secret:bucket",
"google": "clientId:clientSecret:refreshToken:bucketName",
"replicateTo": [],
"publish": {},
"ownerName": "Brad",
"ownerName": "Brad",
"shareHandler": true
}

View File

@ -0,0 +1,113 @@
{
"listen": "localhost:3179",
"auth": "userpass:camlistore:pass3179",
"https": false,
"prefixes": {
"/": {
"handler": "root",
"handlerArgs": {
"blobRoot": "/bs-and-maybe-also-index/",
"searchRoot": "/my-search/",
"statusRoot": "/status/",
"stealth": false
}
},
"/ui/": {
"handler": "ui",
"handlerArgs": {
"jsonSignRoot": "/sighelper/",
"cache": "/cache/",
"scaledImage": "lrucache"
}
},
"/setup/": {
"handler": "setup"
},
"/status/": {
"handler": "status"
},
"/share/": {
"handler": "share",
"handlerArgs": {
"blobRoot": "/bs/"
}
},
"/sync/": {
"handler": "sync",
"handlerArgs": {
"from": "/bs/",
"to": "/index-mem/"
}
},
"/sighelper/": {
"handler": "jsonsign",
"handlerArgs": {
"secretRing": "/path/to/secring",
"keyId": "26F5ABDA",
"publicKeyDest": "/bs-and-index/"
}
},
"/bs-and-index/": {
"handler": "storage-replica",
"handlerArgs": {
"backends": ["/bs/", "/index-mem/"]
}
},
"/bs-and-maybe-also-index/": {
"handler": "storage-cond",
"handlerArgs": {
"write": {
"if": "isSchema",
"then": "/bs-and-index/",
"else": "/bs/"
},
"read": "/bs/"
}
},
"/bs/": {
"handler": "storage-google",
"handlerArgs": {
"auth": {
"client_id": "clientId",
"client_secret": "clientSecret",
"refresh_token": "refreshToken"
},
"bucket": "bucketName"
}
},
"/cache/": {
"handler": "storage-filesystem",
"handlerArgs": {
"path": "/tmp/camli-cache"
}
},
"/index-mem/": {
"handler": "storage-memory-only-dev-indexer",
"handlerArgs": {
"blobSource": "/bs/"
}
},
"/my-search/": {
"handler": "search",
"handlerArgs": {
"index": "/index-mem/",
"owner": "sha1-f2b0b7da718b97ce8c31591d8ed4645c777f3ef4"
}
}
}
}

View File

@ -0,0 +1,12 @@
{
"listen": "localhost:3179",
"https": false,
"auth": "userpass:camlistore:pass3179",
"identity": "26F5ABDA",
"identitySecretRing": "/path/to/secring",
"memIndex": true,
"google": "clientId:clientSecret:refreshToken:bucketName",
"replicateTo": [],
"publish": {},
"shareHandler": true
}

View File

@ -53,6 +53,7 @@ import (
_ "camlistore.org/pkg/blobserver/remote"
_ "camlistore.org/pkg/blobserver/replica"
_ "camlistore.org/pkg/blobserver/s3"
_ "camlistore.org/pkg/blobserver/google"
_ "camlistore.org/pkg/blobserver/shard"
// Indexers: (also present themselves as storage targets)
_ "camlistore.org/pkg/index" // base indexer + in-memory dev index

View File

@ -39,10 +39,11 @@ web browser and restart the server.</p>
</ul>
<h2>Storage options</h2>
<p>Exactly one of these must be set:</p>
<p>At least one of these must be set:</p>
<ul>
<li><b><code>blobPath</code></b>: local disk path to store blobs</li>
<li><b><code>s3</code></b>: "<code>key:secret:bucket</code>" (with colons, but no quotes).</li>
<li><b><code>google</code></b>: "<code>clientId:clientSecret:refreshToken:bucketName</code>"</li>
</ul>
<p>Others aren't yet supported by the simple config mode. Patches
to <code>pkg/genconfig</code> welcome.</p>