Merge "cmd/camtool: (re)index command"

This commit is contained in:
Brad Fitzpatrick 2013-12-26 17:29:03 +00:00 committed by Gerrit Code Review
commit 9fd329df80
3 changed files with 31 additions and 8 deletions

View File

@ -42,6 +42,7 @@ type syncCmd struct {
verbose bool verbose bool
all bool all bool
removeSrc bool removeSrc bool
wipe bool
logger *log.Logger logger *log.Logger
} }
@ -55,6 +56,7 @@ func init() {
flags.BoolVar(&cmd.loop, "loop", false, "Create an associate a new permanode for the uploaded file or directory.") flags.BoolVar(&cmd.loop, "loop", false, "Create an associate a new permanode for the uploaded file or directory.")
flags.BoolVar(&cmd.verbose, "verbose", false, "Be verbose.") flags.BoolVar(&cmd.verbose, "verbose", false, "Be verbose.")
flags.BoolVar(&cmd.wipe, "wipe", false, "If dest is an index, drop it and repopulate it from scratch. NOOP for now.")
flags.BoolVar(&cmd.all, "all", false, "Discover all sync destinations configured on the source server and run them.") flags.BoolVar(&cmd.all, "all", false, "Discover all sync destinations configured on the source server and run them.")
flags.BoolVar(&cmd.removeSrc, "removesrc", false, "Remove each blob from the source after syncing to the destination; for queue processing.") flags.BoolVar(&cmd.removeSrc, "removesrc", false, "Remove each blob from the source after syncing to the destination; for queue processing.")
@ -283,6 +285,12 @@ func (c *syncCmd) doPass(src, dest, thirdLeg blobserver.Storage) (stats SyncStat
return return
} }
if c.wipe {
// TODO(mpl): dest is a client. make it send a "wipe" request?
// upon reception its server then wipes itself if it is a wiper.
log.Print("Index wiping not yet supported.")
}
go func() { go func() {
destErr <- enumerateAllBlobs(ctx, dest, destBlobs) destErr <- enumerateAllBlobs(ctx, dest, destBlobs)
}() }()

View File

@ -347,8 +347,9 @@ func (c *Client) StorageGeneration() (string, error) {
// SyncInfo holds the data that were acquired with a discovery // SyncInfo holds the data that were acquired with a discovery
// and that are relevant to a syncHandler. // and that are relevant to a syncHandler.
type SyncInfo struct { type SyncInfo struct {
From string From string
To string To string
ToIndex bool // whether this sync is from a blob storage to an index
} }
// SyncHandlers returns the server's sync handlers "from" and // SyncHandlers returns the server's sync handlers "from" and
@ -640,8 +641,12 @@ func (c *Client) doDiscovery() {
c.discoErr = fmt.Errorf("client: invalid %q \"to\" sync; failed to resolve", to) c.discoErr = fmt.Errorf("client: invalid %q \"to\" sync; failed to resolve", to)
return return
} }
c.syncHandlers = append(c.syncHandlers, toIndex, _ := vmap["toIndex"].(bool)
&SyncInfo{From: ufrom.String(), To: uto.String()}) c.syncHandlers = append(c.syncHandlers, &SyncInfo{
From: ufrom.String(),
To: uto.String(),
ToIndex: toIndex,
})
} }
} }
} }

View File

@ -31,6 +31,7 @@ import (
"camlistore.org/pkg/blob" "camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver" "camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context" "camlistore.org/pkg/context"
"camlistore.org/pkg/index"
"camlistore.org/pkg/jsonconfig" "camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/readerutil" "camlistore.org/pkg/readerutil"
"camlistore.org/pkg/sorted" "camlistore.org/pkg/sorted"
@ -59,6 +60,7 @@ type SyncHandler struct {
from blobserver.Storage from blobserver.Storage
to blobserver.BlobReceiver to blobserver.BlobReceiver
queue sorted.KeyValue queue sorted.KeyValue
toIndex bool // whether this sync is from a blob storage to an index
idle bool // if true, the handler does nothing other than providing the discovery. idle bool // if true, the handler does nothing other than providing the discovery.
@ -123,6 +125,7 @@ func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler,
return nil, err return nil, err
} }
isToIndex := false
fromBs, err := ld.GetStorage(from) fromBs, err := ld.GetStorage(from)
if err != nil { if err != nil {
return nil, err return nil, err
@ -131,8 +134,13 @@ func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler,
if err != nil { if err != nil {
return nil, err return nil, err
} }
if _, ok := fromBs.(*index.Index); !ok {
if _, ok := toBs.(*index.Index); ok {
isToIndex = true
}
}
sh, err := createSyncHandler(from, to, fromBs, toBs, q) sh, err := createSyncHandler(from, to, fromBs, toBs, q, isToIndex)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -180,7 +188,7 @@ type timestampedError struct {
func createSyncHandler(fromName, toName string, func createSyncHandler(fromName, toName string,
from blobserver.Storage, to blobserver.BlobReceiver, from blobserver.Storage, to blobserver.BlobReceiver,
queue sorted.KeyValue) (*SyncHandler, error) { queue sorted.KeyValue, isToIndex bool) (*SyncHandler, error) {
h := &SyncHandler{ h := &SyncHandler{
copierPoolSize: 3, copierPoolSize: 3,
@ -189,6 +197,7 @@ func createSyncHandler(fromName, toName string,
fromName: fromName, fromName: fromName,
toName: toName, toName: toName,
queue: queue, queue: queue,
toIndex: isToIndex,
blobc: make(chan blob.SizedRef, 8), blobc: make(chan blob.SizedRef, 8),
status: "not started", status: "not started",
blobStatus: make(map[string]fmt.Stringer), blobStatus: make(map[string]fmt.Stringer),
@ -209,8 +218,9 @@ func createIdleSyncHandler(fromName, toName string) (*SyncHandler, error) {
func (sh *SyncHandler) discoveryMap() map[string]interface{} { func (sh *SyncHandler) discoveryMap() map[string]interface{} {
// TODO(mpl): more status info // TODO(mpl): more status info
return map[string]interface{}{ return map[string]interface{}{
"from": sh.fromName, "from": sh.fromName,
"to": sh.toName, "to": sh.toName,
"toIndex": sh.toIndex,
} }
} }