blobserver: simplify interface more, add NewMultiBlobStreamer, storagetest func
This is round two of simplifying the BlobStreamer interface. The
continuation tokens are now sent per-item. This permits the following
item (NewMultiBlobStreamer) but also simplifies the return value (only
returns an error now) and permits the use of buffered channels as the
destination without getting out of sync.
The new NewMultiBlobStreamer is like io.MultiReader, but for
BlobStreamers, letting them be stitched together. This will be used by
the blobpacked storage layer to simplify its code, so it doesn't need
to deal with the handoff between loose and packed blobs itself. This
MultiBlobStreamer needs a 1 element buffer, which wasn't compatible
with the old BlobStreamer interface, hence the change to include the
continuation tokens with the blob on the channel.
Finally, add a new storagetest function for testing any blobstreamer
and use it for NewMultiBlobStreamer, diskpacked, and later in
blobpacked.
Updates #532
Change-Id: Iccffed289adec93ca5100c7ef8b0a8d57e05833c
2014-12-24 01:44:05 +00:00
|
|
|
/*
|
Rename import paths from camlistore.org to perkeep.org.
Part of the project renaming, issue #981.
After this, users will need to mv their $GOPATH/src/camlistore.org to
$GOPATH/src/perkeep.org. Sorry.
This doesn't yet rename the tools like camlistored, camput, camget,
camtool, etc.
Also, this only moves the lru package to internal. More will move to
internal later.
Also, this doesn't yet remove the "/pkg/" directory. That'll likely
happen later.
This updates some docs, but not all.
devcam test now passes again, even with Go 1.10 (which requires vet
checks are clean too). So a bunch of vet tests are fixed in this CL
too, and a bunch of other broken tests are now fixed (introduced from
the past week of merging the CL backlog).
Change-Id: If580db1691b5b99f8ed6195070789b1f44877dd4
2018-01-01 22:41:41 +00:00
|
|
|
Copyright 2014 The Perkeep Authors
|
blobserver: simplify interface more, add NewMultiBlobStreamer, storagetest func
This is round two of simplifying the BlobStreamer interface. The
continuation tokens are now sent per-item. This permits the following
item (NewMultiBlobStreamer) but also simplifies the return value (only
returns an error now) and permits the use of buffered channels as the
destination without getting out of sync.
The new NewMultiBlobStreamer is like io.MultiReader, but for
BlobStreamers, letting them be stitched together. This will be used by
the blobpacked storage layer to simplify its code, so it doesn't need
to deal with the handoff between loose and packed blobs itself. This
MultiBlobStreamer needs a 1 element buffer, which wasn't compatible
with the old BlobStreamer interface, hence the change to include the
continuation tokens with the blob on the channel.
Finally, add a new storagetest function for testing any blobstreamer
and use it for NewMultiBlobStreamer, diskpacked, and later in
blobpacked.
Updates #532
Change-Id: Iccffed289adec93ca5100c7ef8b0a8d57e05833c
2014-12-24 01:44:05 +00:00
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package blobserver
|
|
|
|
|
|
|
|
import (
|
2017-11-26 09:05:38 +00:00
|
|
|
"context"
|
blobserver: simplify interface more, add NewMultiBlobStreamer, storagetest func
This is round two of simplifying the BlobStreamer interface. The
continuation tokens are now sent per-item. This permits the following
item (NewMultiBlobStreamer) but also simplifies the return value (only
returns an error now) and permits the use of buffered channels as the
destination without getting out of sync.
The new NewMultiBlobStreamer is like io.MultiReader, but for
BlobStreamers, letting them be stitched together. This will be used by
the blobpacked storage layer to simplify its code, so it doesn't need
to deal with the handoff between loose and packed blobs itself. This
MultiBlobStreamer needs a 1 element buffer, which wasn't compatible
with the old BlobStreamer interface, hence the change to include the
continuation tokens with the blob on the channel.
Finally, add a new storagetest function for testing any blobstreamer
and use it for NewMultiBlobStreamer, diskpacked, and later in
blobpacked.
Updates #532
Change-Id: Iccffed289adec93ca5100c7ef8b0a8d57e05833c
2014-12-24 01:44:05 +00:00
|
|
|
"errors"
|
|
|
|
"regexp"
|
|
|
|
"strconv"
|
|
|
|
"strings"
|
|
|
|
)
|
|
|
|
|
|
|
|
// NewMultiBlobStreamer concatenates multiple BlobStreamers into one.
|
|
|
|
func NewMultiBlobStreamer(streamers ...BlobStreamer) BlobStreamer {
|
|
|
|
return multiStreamer{s: streamers}
|
|
|
|
}
|
|
|
|
|
|
|
|
type multiStreamer struct {
|
|
|
|
s []BlobStreamer
|
|
|
|
}
|
|
|
|
|
|
|
|
var msTokenPrefixRx = regexp.MustCompile(`^(\d+):`)
|
|
|
|
|
2015-12-12 21:47:31 +00:00
|
|
|
func (ms multiStreamer) StreamBlobs(ctx context.Context, dest chan<- BlobAndToken, contToken string) error {
|
blobserver: simplify interface more, add NewMultiBlobStreamer, storagetest func
This is round two of simplifying the BlobStreamer interface. The
continuation tokens are now sent per-item. This permits the following
item (NewMultiBlobStreamer) but also simplifies the return value (only
returns an error now) and permits the use of buffered channels as the
destination without getting out of sync.
The new NewMultiBlobStreamer is like io.MultiReader, but for
BlobStreamers, letting them be stitched together. This will be used by
the blobpacked storage layer to simplify its code, so it doesn't need
to deal with the handoff between loose and packed blobs itself. This
MultiBlobStreamer needs a 1 element buffer, which wasn't compatible
with the old BlobStreamer interface, hence the change to include the
continuation tokens with the blob on the channel.
Finally, add a new storagetest function for testing any blobstreamer
and use it for NewMultiBlobStreamer, diskpacked, and later in
blobpacked.
Updates #532
Change-Id: Iccffed289adec93ca5100c7ef8b0a8d57e05833c
2014-12-24 01:44:05 +00:00
|
|
|
defer close(dest)
|
|
|
|
part := 0
|
|
|
|
if contToken != "" {
|
|
|
|
pfx := msTokenPrefixRx.FindString(contToken)
|
|
|
|
var err error
|
|
|
|
part, err = strconv.Atoi(strings.TrimSuffix(pfx, ":"))
|
|
|
|
if pfx == "" || err != nil || part >= len(ms.s) {
|
|
|
|
return errors.New("invalid continuation token")
|
|
|
|
}
|
|
|
|
contToken = contToken[len(pfx):]
|
|
|
|
}
|
|
|
|
srcs := ms.s[part:]
|
|
|
|
for len(srcs) > 0 {
|
|
|
|
bs := srcs[0]
|
|
|
|
subDest := make(chan BlobAndToken, 16)
|
|
|
|
errc := make(chan error, 1)
|
|
|
|
go func() {
|
|
|
|
errc <- bs.StreamBlobs(ctx, subDest, contToken)
|
|
|
|
}()
|
|
|
|
partStr := strconv.Itoa(part)
|
|
|
|
for bt := range subDest {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
2015-12-12 21:47:31 +00:00
|
|
|
return ctx.Err()
|
blobserver: simplify interface more, add NewMultiBlobStreamer, storagetest func
This is round two of simplifying the BlobStreamer interface. The
continuation tokens are now sent per-item. This permits the following
item (NewMultiBlobStreamer) but also simplifies the return value (only
returns an error now) and permits the use of buffered channels as the
destination without getting out of sync.
The new NewMultiBlobStreamer is like io.MultiReader, but for
BlobStreamers, letting them be stitched together. This will be used by
the blobpacked storage layer to simplify its code, so it doesn't need
to deal with the handoff between loose and packed blobs itself. This
MultiBlobStreamer needs a 1 element buffer, which wasn't compatible
with the old BlobStreamer interface, hence the change to include the
continuation tokens with the blob on the channel.
Finally, add a new storagetest function for testing any blobstreamer
and use it for NewMultiBlobStreamer, diskpacked, and later in
blobpacked.
Updates #532
Change-Id: Iccffed289adec93ca5100c7ef8b0a8d57e05833c
2014-12-24 01:44:05 +00:00
|
|
|
case dest <- BlobAndToken{Blob: bt.Blob, Token: partStr + ":" + bt.Token}:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if err := <-errc; err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// Advance to the next part:
|
|
|
|
part++
|
|
|
|
srcs = srcs[1:]
|
|
|
|
contToken = ""
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|