cmd/camtool: (re)index command

http://camlistore.org/issue/193

Change-Id: I498f92bdc153f44dc84d4b47f03c47a8e7b54ad9
This commit is contained in:
mpl 2013-12-17 18:02:10 +01:00
parent fb933a84cd
commit 2d85e017ff
3 changed files with 31 additions and 8 deletions

View File

@ -42,6 +42,7 @@ type syncCmd struct {
verbose bool
all bool
removeSrc bool
wipe bool
logger *log.Logger
}
@ -55,6 +56,7 @@ func init() {
flags.BoolVar(&cmd.loop, "loop", false, "Create an associate a new permanode for the uploaded file or directory.")
flags.BoolVar(&cmd.verbose, "verbose", false, "Be verbose.")
flags.BoolVar(&cmd.wipe, "wipe", false, "If dest is an index, drop it and repopulate it from scratch. NOOP for now.")
flags.BoolVar(&cmd.all, "all", false, "Discover all sync destinations configured on the source server and run them.")
flags.BoolVar(&cmd.removeSrc, "removesrc", false, "Remove each blob from the source after syncing to the destination; for queue processing.")
@ -283,6 +285,12 @@ func (c *syncCmd) doPass(src, dest, thirdLeg blobserver.Storage) (stats SyncStat
return
}
if c.wipe {
// TODO(mpl): dest is a client. make it send a "wipe" request?
// upon reception its server then wipes itself if it is a wiper.
log.Print("Index wiping not yet supported.")
}
go func() {
destErr <- enumerateAllBlobs(ctx, dest, destBlobs)
}()

View File

@ -349,6 +349,7 @@ func (c *Client) StorageGeneration() (string, error) {
type SyncInfo struct {
From string
To string
ToIndex bool // whether this sync is from a blob storage to an index
}
// SyncHandlers returns the server's sync handlers "from" and
@ -640,8 +641,12 @@ func (c *Client) doDiscovery() {
c.discoErr = fmt.Errorf("client: invalid %q \"to\" sync; failed to resolve", to)
return
}
c.syncHandlers = append(c.syncHandlers,
&SyncInfo{From: ufrom.String(), To: uto.String()})
toIndex, _ := vmap["toIndex"].(bool)
c.syncHandlers = append(c.syncHandlers, &SyncInfo{
From: ufrom.String(),
To: uto.String(),
ToIndex: toIndex,
})
}
}
}

View File

@ -31,6 +31,7 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context"
"camlistore.org/pkg/index"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/readerutil"
"camlistore.org/pkg/sorted"
@ -59,6 +60,7 @@ type SyncHandler struct {
from blobserver.Storage
to blobserver.BlobReceiver
queue sorted.KeyValue
toIndex bool // whether this sync is from a blob storage to an index
idle bool // if true, the handler does nothing other than providing the discovery.
@ -123,6 +125,7 @@ func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler,
return nil, err
}
isToIndex := false
fromBs, err := ld.GetStorage(from)
if err != nil {
return nil, err
@ -131,8 +134,13 @@ func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler,
if err != nil {
return nil, err
}
if _, ok := fromBs.(*index.Index); !ok {
if _, ok := toBs.(*index.Index); ok {
isToIndex = true
}
}
sh, err := createSyncHandler(from, to, fromBs, toBs, q)
sh, err := createSyncHandler(from, to, fromBs, toBs, q, isToIndex)
if err != nil {
return nil, err
}
@ -180,7 +188,7 @@ type timestampedError struct {
func createSyncHandler(fromName, toName string,
from blobserver.Storage, to blobserver.BlobReceiver,
queue sorted.KeyValue) (*SyncHandler, error) {
queue sorted.KeyValue, isToIndex bool) (*SyncHandler, error) {
h := &SyncHandler{
copierPoolSize: 3,
@ -189,6 +197,7 @@ func createSyncHandler(fromName, toName string,
fromName: fromName,
toName: toName,
queue: queue,
toIndex: isToIndex,
blobc: make(chan blob.SizedRef, 8),
status: "not started",
blobStatus: make(map[string]fmt.Stringer),
@ -211,6 +220,7 @@ func (sh *SyncHandler) discoveryMap() map[string]interface{} {
return map[string]interface{}{
"from": sh.fromName,
"to": sh.toName,
"toIndex": sh.toIndex,
}
}