mirror of https://github.com/perkeep/perkeep.git
camput: automatic blobroot path discovery
This commit is contained in:
parent
9abbbef45e
commit
db55ee77d9
|
@ -1,5 +1,5 @@
|
||||||
{
|
{
|
||||||
"blobServer": "http://localhost:3179/bs",
|
"blobServer": "http://localhost:3179/",
|
||||||
|
|
||||||
"auth": ["_env", "${CAMLI_AUTH}" ],
|
"auth": ["_env", "${CAMLI_AUTH}" ],
|
||||||
"selfPubKeyDir": ["_env", "${CAMLI_DEV_KEYBLOBS}" ],
|
"selfPubKeyDir": ["_env", "${CAMLI_DEV_KEYBLOBS}" ],
|
||||||
|
|
|
@ -30,7 +30,7 @@ $ENV{"CAMLI_SECRET_RING"} = "$Bin/pkg/jsonsign/testdata/test-secring.gpg";
|
||||||
$ENV{"CAMLI_KEYID"} = "26F5ABDA";
|
$ENV{"CAMLI_KEYID"} = "26F5ABDA";
|
||||||
$ENV{"CAMLI_DEV_KEYBLOBS"} = "$Bin/config/dev-client-dir/keyblobs";
|
$ENV{"CAMLI_DEV_KEYBLOBS"} = "$Bin/config/dev-client-dir/keyblobs";
|
||||||
$ENV{"CAMLI_AUTH"} = "userpass:camlistore:pass3179";
|
$ENV{"CAMLI_AUTH"} = "userpass:camlistore:pass3179";
|
||||||
my $blobserver = "http://localhost:3179/bs";
|
my $blobserver = "http://localhost:3179/";
|
||||||
if ($opt_tls) {
|
if ($opt_tls) {
|
||||||
$blobserver =~ s/^http/https/;
|
$blobserver =~ s/^http/https/;
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,17 +17,32 @@ limitations under the License.
|
||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"camlistore.org/pkg/auth"
|
"camlistore.org/pkg/auth"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
server string // URL prefix before "/camli/"
|
// server is the input from user, pre-discovery.
|
||||||
|
// For example "http://foo.com" or "foo.com:1234".
|
||||||
|
// It is the responsibility of initPrefix to parse
|
||||||
|
// server and set prefix, including doing discovery
|
||||||
|
// to figure out what the proper server-declared
|
||||||
|
// prefix is.
|
||||||
|
server string
|
||||||
|
|
||||||
|
prefixOnce sync.Once
|
||||||
|
prefixv string // URL prefix before "/camli/"
|
||||||
|
prefixErr error
|
||||||
|
|
||||||
authMode auth.AuthMode
|
authMode auth.AuthMode
|
||||||
|
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
|
@ -82,6 +97,64 @@ func (c *Client) Stats() Stats {
|
||||||
return c.stats // copy
|
return c.stats // copy
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) prefix() (string, error) {
|
||||||
|
c.prefixOnce.Do(func() { c.initPrefix() })
|
||||||
|
if c.prefixErr != nil {
|
||||||
|
return "", c.prefixErr
|
||||||
|
}
|
||||||
|
return c.prefixv, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) initPrefix() {
|
||||||
|
s := c.server
|
||||||
|
if !strings.HasPrefix(s, "http") {
|
||||||
|
s = "http://" + s
|
||||||
|
}
|
||||||
|
u, err := url.Parse(s)
|
||||||
|
if err != nil {
|
||||||
|
c.prefixErr = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(u.Path) > 1 {
|
||||||
|
c.prefixv = strings.TrimRight(s, "/")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// If the path is just "" or "/", do discovery against
|
||||||
|
// the URL to see which path we should actually use.
|
||||||
|
req, _ := http.NewRequest("GET", u.String(), nil)
|
||||||
|
req.Header.Set("Accept", "text/x-camli-configuration")
|
||||||
|
res, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
c.prefixErr = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
if res.StatusCode != 200 {
|
||||||
|
c.prefixErr = fmt.Errorf("Got status %q from blobserver during configuration discovery", res.Status)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// TODO(bradfitz): little weird in retrospect that we request
|
||||||
|
// text/x-camli-configuration and expect to get back
|
||||||
|
// text/javascript. Make them consistent.
|
||||||
|
if ct := res.Header.Get("Content-Type"); ct != "text/javascript" {
|
||||||
|
c.prefixErr = fmt.Errorf("Blobserver returned unexpected type %q from discovery", ct)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m := make(map[string]interface{})
|
||||||
|
if err := json.NewDecoder(res.Body).Decode(&m); err != nil {
|
||||||
|
c.prefixErr = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
blobRoot, ok := m["blobRoot"].(string)
|
||||||
|
if !ok {
|
||||||
|
c.prefixErr = fmt.Errorf("No blobRoot in config discovery response")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
u.Path = blobRoot
|
||||||
|
c.prefixv = strings.TrimRight(u.String(), "/")
|
||||||
|
log.Printf("set prefix to %q", c.prefixv)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) newRequest(method, url string) *http.Request {
|
func (c *Client) newRequest(method, url string) *http.Request {
|
||||||
req, err := http.NewRequest(method, url, nil)
|
req, err := http.NewRequest(method, url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -44,6 +44,10 @@ func (c *Client) EnumerateBlobsOpts(ch chan<- blobref.SizedBlobRef, opts Enumera
|
||||||
if opts.After != "" && opts.MaxWait != 0 {
|
if opts.After != "" && opts.MaxWait != 0 {
|
||||||
return errors.New("client error: it's invalid to use enumerate After and MaxWaitSec together")
|
return errors.New("client error: it's invalid to use enumerate After and MaxWaitSec together")
|
||||||
}
|
}
|
||||||
|
pfx, err := c.prefix()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
error := func(msg string, e error) error {
|
error := func(msg string, e error) error {
|
||||||
err := errors.New(fmt.Sprintf("client enumerate error: %s: %v", msg, e))
|
err := errors.New(fmt.Sprintf("client enumerate error: %s: %v", msg, e))
|
||||||
|
@ -65,7 +69,7 @@ func (c *Client) EnumerateBlobsOpts(ch chan<- blobref.SizedBlobRef, opts Enumera
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
url_ := fmt.Sprintf("%s/camli/enumerate-blobs?after=%s&limit=%d&maxwaitsec=%d",
|
url_ := fmt.Sprintf("%s/camli/enumerate-blobs?after=%s&limit=%d&maxwaitsec=%d",
|
||||||
c.server, url.QueryEscape(after), enumerateBatchSize, waitSec)
|
pfx, url.QueryEscape(after), enumerateBatchSize, waitSec)
|
||||||
req := c.newRequest("GET", url_)
|
req := c.newRequest("GET", url_)
|
||||||
resp, err := c.httpClient.Do(req)
|
resp, err := c.httpClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -34,7 +34,11 @@ func (c *Client) FetchStreaming(b *blobref.BlobRef) (io.ReadCloser, int64, error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) FetchVia(b *blobref.BlobRef, v []*blobref.BlobRef) (io.ReadCloser, int64, error) {
|
func (c *Client) FetchVia(b *blobref.BlobRef, v []*blobref.BlobRef) (io.ReadCloser, int64, error) {
|
||||||
url := fmt.Sprintf("%s/camli/%s", c.server, b)
|
pfx, err := c.prefix()
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
url := fmt.Sprintf("%s/camli/%s", pfx, b)
|
||||||
|
|
||||||
if len(v) > 0 {
|
if len(v) > 0 {
|
||||||
buf := bytes.NewBufferString(url)
|
buf := bytes.NewBufferString(url)
|
||||||
|
|
|
@ -36,7 +36,11 @@ type removeResponse struct {
|
||||||
// Remove the list of blobs. An error is returned if the server failed to
|
// Remove the list of blobs. An error is returned if the server failed to
|
||||||
// remove a blob. Removing a non-existent blob isn't an error.
|
// remove a blob. Removing a non-existent blob isn't an error.
|
||||||
func (c *Client) RemoveBlobs(blobs []*blobref.BlobRef) error {
|
func (c *Client) RemoveBlobs(blobs []*blobref.BlobRef) error {
|
||||||
url_ := fmt.Sprintf("%s/camli/remove", c.server)
|
pfx, err := c.prefix()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
url_ := fmt.Sprintf("%s/camli/remove", pfx)
|
||||||
params := make(url.Values) // "blobN" -> BlobRefStr
|
params := make(url.Values) // "blobN" -> BlobRefStr
|
||||||
needsDelete := make(map[string]bool) // BlobRefStr -> true
|
needsDelete := make(map[string]bool) // BlobRefStr -> true
|
||||||
for n, b := range blobs {
|
for n, b := range blobs {
|
||||||
|
|
|
@ -196,7 +196,11 @@ func (c *Client) StatBlobs(dest chan<- blobref.SizedBlobRef, blobs []*blobref.Bl
|
||||||
fmt.Fprintf(&buf, "&maxwaitsec=%d", secs)
|
fmt.Fprintf(&buf, "&maxwaitsec=%d", secs)
|
||||||
}
|
}
|
||||||
|
|
||||||
req := c.newRequest("POST", fmt.Sprintf("%s/camli/stat", c.server))
|
pfx, err := c.prefix()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req := c.newRequest("POST", fmt.Sprintf("%s/camli/stat", pfx))
|
||||||
bodyStr := buf.String()
|
bodyStr := buf.String()
|
||||||
req.Body = ioutil.NopCloser(strings.NewReader(bodyStr))
|
req.Body = ioutil.NopCloser(strings.NewReader(bodyStr))
|
||||||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||||||
|
@ -259,9 +263,13 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, error) {
|
||||||
|
|
||||||
blobrefStr := h.BlobRef.String()
|
blobrefStr := h.BlobRef.String()
|
||||||
|
|
||||||
// Pre-upload. Check whether the blob already exists on the
|
// Pre-upload. Check whether the blob already exists on the
|
||||||
// server and if not, the URL to upload it to.
|
// server and if not, the URL to upload it to.
|
||||||
url_ := fmt.Sprintf("%s/camli/stat", c.server)
|
pfx, err := c.prefix()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
url_ := fmt.Sprintf("%s/camli/stat", pfx)
|
||||||
requestBody := "camliversion=1&blob1=" + blobrefStr
|
requestBody := "camliversion=1&blob1=" + blobrefStr
|
||||||
req := c.newRequest("POST", url_)
|
req := c.newRequest("POST", url_)
|
||||||
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
|
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
|
||||||
|
|
Loading…
Reference in New Issue