2011-03-02 02:02:01 +00:00
/ *
2013-02-18 23:35:43 +00:00
Copyright 2013 The Camlistore Authors .
2011-03-02 02:02:01 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package main
import (
Update from r60 to [almost] Go 1.
A lot is still broken, but most stuff at least compiles now.
The directory tree has been rearranged now too. Go libraries are now
under "pkg". Fully qualified, they are e.g. "camlistore.org/pkg/jsonsign".
The go tool cannot yet fetch from arbitrary domains, but discussion is
happening now on which mechanism to use to allow that.
For now, put the camlistore root under $GOPATH/src. Typically $GOPATH
is $HOME, so Camlistore should be at $HOME/src/camlistore.org.
Then you can:
$ go build ./server/camlistored
... etc
The build.pl script is currently disabled. It'll be resurrected at
some point, but with a very different role (helping create a fake
GOPATH and running the go build command, if things are installed at
the wrong place, and/or running fileembed generators).
Many things are certainly broken.
Many things are disabled. (MySQL, all indexing, etc).
Many things need to be moved into
camlistore.org/third_party/{code.google.com,github.com} and updated
from their r60 to Go 1 versions, where applicable.
The GoMySQL stuff should be updated to use database/sql and the ziutek
library implementing database/sql/driver.
Help wanted.
Change-Id: If71217dc5c8f0e70dbe46e9504ca5131c6eeacde
2012-02-19 05:53:06 +00:00
"errors"
2011-03-02 02:02:01 +00:00
"flag"
"fmt"
"log"
"os"
2013-06-09 09:36:26 +00:00
"strings"
2012-12-22 23:22:22 +00:00
"time"
2012-12-22 00:41:15 +00:00
2013-08-04 02:54:30 +00:00
"camlistore.org/pkg/blob"
2013-06-09 09:36:26 +00:00
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/blobserver/localdisk"
2012-12-22 00:41:15 +00:00
"camlistore.org/pkg/client"
2013-02-18 23:35:43 +00:00
"camlistore.org/pkg/cmdmain"
2013-12-02 21:20:51 +00:00
"camlistore.org/pkg/context"
2011-03-02 02:02:01 +00:00
)
2013-02-18 23:35:43 +00:00
type syncCmd struct {
2013-03-09 06:56:21 +00:00
src string
dest string
third string
2011-03-02 02:02:01 +00:00
2013-02-18 23:35:43 +00:00
loop bool
verbose bool
all bool
removeSrc bool
2013-12-17 17:02:10 +00:00
wipe bool
2011-03-02 02:02:01 +00:00
2013-02-18 23:35:43 +00:00
logger * log . Logger
}
func init ( ) {
cmdmain . RegisterCommand ( "sync" , func ( flags * flag . FlagSet ) cmdmain . CommandRunner {
cmd := new ( syncCmd )
2013-06-09 09:36:26 +00:00
flags . StringVar ( & cmd . src , "src" , "" , "Source blobserver is either a URL prefix (with optional path), a host[:port], a path (starting with /, ./, or ../), or blank to use the Camlistore client config's default host." )
flags . StringVar ( & cmd . dest , "dest" , "" , "Destination blobserver (same format as src), or 'stdout' to just enumerate the --src blobs to stdout." )
flags . StringVar ( & cmd . third , "thirdleg" , "" , "Copy blobs present in source but missing from destination to this 'third leg' blob store, instead of the destination. (same format as src)" )
2013-02-18 23:35:43 +00:00
flags . BoolVar ( & cmd . loop , "loop" , false , "Create an associate a new permanode for the uploaded file or directory." )
flags . BoolVar ( & cmd . verbose , "verbose" , false , "Be verbose." )
2013-12-17 17:02:10 +00:00
flags . BoolVar ( & cmd . wipe , "wipe" , false , "If dest is an index, drop it and repopulate it from scratch. NOOP for now." )
2013-02-18 23:35:43 +00:00
flags . BoolVar ( & cmd . all , "all" , false , "Discover all sync destinations configured on the source server and run them." )
flags . BoolVar ( & cmd . removeSrc , "removesrc" , false , "Remove each blob from the source after syncing to the destination; for queue processing." )
return cmd
} )
}
2013-02-25 20:40:11 +00:00
func ( c * syncCmd ) Describe ( ) string {
return "Synchronize blobs from a source to a destination."
}
2013-02-18 23:35:43 +00:00
func ( c * syncCmd ) Usage ( ) {
fmt . Fprintf ( os . Stderr , "Usage: camtool [globalopts] sync [syncopts] \n" )
}
func ( c * syncCmd ) Examples ( ) [ ] string {
return [ ] string {
"--all" ,
"--src http://localhost:3179/bs/ --dest http://localhost:3179/index-mem/" ,
}
}
func ( c * syncCmd ) RunCommand ( args [ ] string ) error {
if c . loop && ! c . removeSrc {
return cmdmain . UsageError ( "Can't use --loop without --removesrc" )
}
if c . verbose {
c . logger = log . New ( os . Stderr , "" , 0 ) // else nil
}
if c . all {
err := c . syncAll ( )
if err != nil {
return fmt . Errorf ( "sync all failed: %v" , err )
}
return nil
}
2011-03-05 17:53:51 +00:00
2013-06-09 09:36:26 +00:00
ss , err := c . storageFromParam ( "src" , c . src )
2013-02-18 23:35:43 +00:00
if err != nil {
2013-06-09 09:36:26 +00:00
return err
2013-02-18 23:35:43 +00:00
}
2013-06-09 09:36:26 +00:00
ds , err := c . storageFromParam ( "dest" , c . dest )
if err != nil {
return err
}
ts , err := c . storageFromParam ( "thirdleg" , c . third )
if err != nil {
return err
2013-03-09 06:56:21 +00:00
}
2013-02-18 23:35:43 +00:00
passNum := 0
for {
passNum ++
2013-06-09 09:36:26 +00:00
stats , err := c . doPass ( ss , ds , ts )
2013-02-18 23:35:43 +00:00
if c . verbose {
log . Printf ( "sync stats - pass: %d, blobs: %d, bytes %d\n" , passNum , stats . BlobsCopied , stats . BytesCopied )
}
if err != nil {
return fmt . Errorf ( "sync failed: %v" , err )
}
if ! c . loop {
break
}
}
return nil
}
2013-01-17 00:24:42 +00:00
2013-06-09 09:36:26 +00:00
// A storageType is one of "src", "dest", or "thirdleg". These match the flag names.
type storageType string
const (
storageSource storageType = "src"
storageDest storageType = "dest"
storageThird storageType = "thirdleg"
)
// which is one of "src", "dest", or "thirdleg"
func ( c * syncCmd ) storageFromParam ( which storageType , val string ) ( blobserver . Storage , error ) {
if val == "" {
switch which {
case storageThird :
return nil , nil
case storageSource :
discl := c . discoClient ( )
discl . SetLogger ( c . logger )
src , err := discl . BlobRoot ( )
if err != nil {
return nil , fmt . Errorf ( "Failed to discover source server's blob path: %v" , err )
}
val = src
}
if val == "" {
return nil , cmdmain . UsageError ( "No --" + string ( which ) + " flag value specified" )
}
}
if which == storageDest && val == "stdout" {
return nil , nil
}
if looksLikePath ( val ) {
disk , err := localdisk . New ( val )
if err != nil {
2013-10-05 01:44:40 +00:00
return nil , fmt . Errorf ( "Interpreted --%v=%q as a local disk path, but got error: %v" , which , val , err )
2013-06-09 09:36:26 +00:00
}
return disk , nil
}
cl := client . New ( val )
2013-06-22 23:35:48 +00:00
// TODO(mpl): probably needs the transport setup for trusted certs here.
2013-06-09 09:36:26 +00:00
cl . SetupAuth ( )
cl . SetLogger ( c . logger )
2013-08-21 20:57:28 +00:00
return cl , nil
2013-06-09 09:36:26 +00:00
}
func looksLikePath ( v string ) bool {
prefix := func ( s string ) bool { return strings . HasPrefix ( v , s ) }
return prefix ( "./" ) || prefix ( "/" ) || prefix ( "../" )
}
2011-06-14 15:09:15 +00:00
type SyncStats struct {
2011-07-02 16:09:50 +00:00
BlobsCopied int
BytesCopied int64
ErrorCount int
2011-06-14 15:09:15 +00:00
}
2013-02-18 23:35:43 +00:00
func ( c * syncCmd ) syncAll ( ) error {
if c . loop {
2013-06-09 09:36:26 +00:00
return cmdmain . UsageError ( "--all can't be used with --loop" )
}
if c . third != "" {
return cmdmain . UsageError ( "--all can't be used with --thirdleg" )
}
if c . dest != "" {
return cmdmain . UsageError ( "--all can't be used with --dest" )
2011-03-02 02:02:01 +00:00
}
2013-02-18 23:35:43 +00:00
dc := c . discoClient ( )
dc . SetLogger ( c . logger )
2013-01-15 13:53:25 +00:00
syncHandlers , err := dc . SyncHandlers ( )
if err != nil {
2013-02-18 23:35:43 +00:00
return fmt . Errorf ( "sync handlers discovery failed: %v" , err )
2013-01-15 13:53:25 +00:00
}
2013-02-18 23:35:43 +00:00
if c . verbose {
2013-02-14 00:24:20 +00:00
log . Printf ( "To be synced:\n" )
for _ , sh := range syncHandlers {
log . Printf ( "%v -> %v" , sh . From , sh . To )
}
}
2013-01-15 13:53:25 +00:00
for _ , sh := range syncHandlers {
from := client . New ( sh . From )
2013-02-18 23:35:43 +00:00
from . SetLogger ( c . logger )
2013-01-15 13:53:25 +00:00
from . SetupAuth ( )
to := client . New ( sh . To )
2013-02-18 23:35:43 +00:00
to . SetLogger ( c . logger )
2013-01-15 13:53:25 +00:00
to . SetupAuth ( )
2013-02-18 23:35:43 +00:00
if c . verbose {
2013-02-14 00:24:20 +00:00
log . Printf ( "Now syncing: %v -> %v" , sh . From , sh . To )
}
2013-08-21 20:57:28 +00:00
stats , err := c . doPass ( from , to , nil )
2013-02-18 23:35:43 +00:00
if c . verbose {
2013-01-15 13:53:25 +00:00
log . Printf ( "sync stats, blobs: %d, bytes %d\n" , stats . BlobsCopied , stats . BytesCopied )
}
if err != nil {
return err
}
}
return nil
}
// discoClient returns a client initialized with a server
// based from --src or from the configuration file if --src
// is blank. The returned client can then be used to discover
// the blobRoot and syncHandlers.
2013-02-18 23:35:43 +00:00
func ( c * syncCmd ) discoClient ( ) * client . Client {
2013-01-15 13:53:25 +00:00
var cl * client . Client
2013-02-18 23:35:43 +00:00
if c . src == "" {
2013-01-15 13:53:25 +00:00
cl = client . NewOrFail ( )
} else {
2013-02-18 23:35:43 +00:00
cl = client . New ( c . src )
2013-01-15 13:53:25 +00:00
}
cl . SetupAuth ( )
return cl
}
2013-12-02 21:20:51 +00:00
func enumerateAllBlobs ( ctx * context . Context , s blobserver . Storage , destc chan <- blob . SizedRef ) error {
2013-06-09 09:36:26 +00:00
// Use *client.Client's support for enumerating all blobs if
// possible, since it could probably do a better job knowing
// HTTP boundaries and such.
2013-08-21 20:57:28 +00:00
if c , ok := s . ( * client . Client ) ; ok {
2013-12-02 21:20:51 +00:00
return c . SimpleEnumerateBlobs ( ctx , destc )
2013-06-09 09:36:26 +00:00
}
defer close ( destc )
2013-12-02 21:20:51 +00:00
return blobserver . EnumerateAll ( ctx , s , func ( sb blob . SizedRef ) error {
2013-06-18 04:56:25 +00:00
destc <- sb
return nil
} )
2013-06-09 09:36:26 +00:00
}
// src: non-nil source
// dest: non-nil destination
// thirdLeg: optional third-leg client. if not nil, anything on src
// but not on dest will instead be copied to thirdLeg, instead of
// directly to dest. (sneakernet mode, copying to a portable drive
// and transporting thirdLeg to dest)
func ( c * syncCmd ) doPass ( src , dest , thirdLeg blobserver . Storage ) ( stats SyncStats , retErr error ) {
2013-08-04 02:54:30 +00:00
srcBlobs := make ( chan blob . SizedRef , 100 )
destBlobs := make ( chan blob . SizedRef , 100 )
2013-06-09 09:36:26 +00:00
srcErr := make ( chan error , 1 )
destErr := make ( chan error , 1 )
2011-03-05 17:53:51 +00:00
2013-12-02 21:20:51 +00:00
ctx := context . TODO ( )
defer ctx . Cancel ( )
2011-03-02 02:02:01 +00:00
go func ( ) {
2013-12-02 21:20:51 +00:00
srcErr <- enumerateAllBlobs ( ctx , src , srcBlobs )
2011-03-02 02:02:01 +00:00
} ( )
2011-03-05 17:53:51 +00:00
checkSourceError := func ( ) {
if err := <- srcErr ; err != nil {
2012-12-22 23:22:22 +00:00
retErr = fmt . Errorf ( "Enumerate error from source: %v" , err )
2011-03-05 17:53:51 +00:00
}
}
2013-02-18 23:35:43 +00:00
if c . dest == "stdout" {
2011-03-03 04:03:09 +00:00
for sb := range srcBlobs {
2013-08-04 02:54:30 +00:00
fmt . Printf ( "%s %d\n" , sb . Ref , sb . Size )
2011-03-03 04:03:09 +00:00
}
2011-03-05 17:53:51 +00:00
checkSourceError ( )
return
2011-03-02 02:02:01 +00:00
}
2013-12-17 17:02:10 +00:00
if c . wipe {
// TODO(mpl): dest is a client. make it send a "wipe" request?
// upon reception its server then wipes itself if it is a wiper.
log . Print ( "Index wiping not yet supported." )
}
2011-03-05 17:53:51 +00:00
go func ( ) {
2013-12-02 21:20:51 +00:00
destErr <- enumerateAllBlobs ( ctx , dest , destBlobs )
2011-03-05 17:53:51 +00:00
} ( )
checkDestError := func ( ) {
2011-03-03 04:03:09 +00:00
if err := <- destErr ; err != nil {
Update from r60 to [almost] Go 1.
A lot is still broken, but most stuff at least compiles now.
The directory tree has been rearranged now too. Go libraries are now
under "pkg". Fully qualified, they are e.g. "camlistore.org/pkg/jsonsign".
The go tool cannot yet fetch from arbitrary domains, but discussion is
happening now on which mechanism to use to allow that.
For now, put the camlistore root under $GOPATH/src. Typically $GOPATH
is $HOME, so Camlistore should be at $HOME/src/camlistore.org.
Then you can:
$ go build ./server/camlistored
... etc
The build.pl script is currently disabled. It'll be resurrected at
some point, but with a very different role (helping create a fake
GOPATH and running the go build command, if things are installed at
the wrong place, and/or running fileembed generators).
Many things are certainly broken.
Many things are disabled. (MySQL, all indexing, etc).
Many things need to be moved into
camlistore.org/third_party/{code.google.com,github.com} and updated
from their r60 to Go 1 versions, where applicable.
The GoMySQL stuff should be updated to use database/sql and the ziutek
library implementing database/sql/driver.
Help wanted.
Change-Id: If71217dc5c8f0e70dbe46e9504ca5131c6eeacde
2012-02-19 05:53:06 +00:00
retErr = errors . New ( fmt . Sprintf ( "Enumerate error from destination: %v" , err ) )
2011-03-05 17:53:51 +00:00
}
}
2013-08-04 02:54:30 +00:00
destNotHaveBlobs := make ( chan blob . SizedRef )
sizeMismatch := make ( chan blob . Ref )
2012-12-22 23:22:22 +00:00
readSrcBlobs := srcBlobs
2013-02-18 23:35:43 +00:00
if c . verbose {
2012-12-22 23:22:22 +00:00
readSrcBlobs = loggingBlobRefChannel ( srcBlobs )
}
2013-08-04 02:54:30 +00:00
mismatches := [ ] blob . Ref { }
2012-12-23 17:22:25 +00:00
go client . ListMissingDestinationBlobs ( destNotHaveBlobs , sizeMismatch , readSrcBlobs , destBlobs )
2013-03-09 06:56:21 +00:00
// Handle three-legged mode if tc is provided.
checkThirdError := func ( ) { } // default nop
syncBlobs := destNotHaveBlobs
2013-06-09 09:36:26 +00:00
firstHopDest := dest
if thirdLeg != nil {
2013-08-04 02:54:30 +00:00
thirdBlobs := make ( chan blob . SizedRef , 100 )
2013-06-09 09:36:26 +00:00
thirdErr := make ( chan error , 1 )
2013-03-09 06:56:21 +00:00
go func ( ) {
2013-12-02 21:20:51 +00:00
thirdErr <- enumerateAllBlobs ( ctx , thirdLeg , thirdBlobs )
2013-03-09 06:56:21 +00:00
} ( )
checkThirdError = func ( ) {
if err := <- thirdErr ; err != nil {
retErr = fmt . Errorf ( "Enumerate error from third leg: %v" , err )
}
}
2013-08-04 02:54:30 +00:00
thirdNeedBlobs := make ( chan blob . SizedRef )
2013-03-09 06:56:21 +00:00
go client . ListMissingDestinationBlobs ( thirdNeedBlobs , sizeMismatch , destNotHaveBlobs , thirdBlobs )
syncBlobs = thirdNeedBlobs
2013-06-09 09:36:26 +00:00
firstHopDest = thirdLeg
2013-03-09 06:56:21 +00:00
}
2012-12-23 17:22:25 +00:00
For :
for {
select {
case br := <- sizeMismatch :
// TODO(bradfitz): check both sides and repair, carefully. For now, fail.
2013-08-02 23:32:50 +00:00
log . Printf ( "WARNING: blobref %v has differing sizes on source and dest" , br )
2011-06-14 15:09:15 +00:00
stats . ErrorCount ++
2012-12-23 17:22:25 +00:00
mismatches = append ( mismatches , br )
2013-03-09 06:56:21 +00:00
case sb , ok := <- syncBlobs :
2012-12-23 17:22:25 +00:00
if ! ok {
break For
}
fmt . Printf ( "Destination needs blob: %s\n" , sb )
2013-08-04 02:54:30 +00:00
blobReader , size , err := src . FetchStreaming ( sb . Ref )
2012-12-23 17:22:25 +00:00
if err != nil {
stats . ErrorCount ++
2013-08-04 02:54:30 +00:00
log . Printf ( "Error fetching %s: %v" , sb . Ref , err )
2012-12-23 17:22:25 +00:00
continue
}
if size != sb . Size {
2011-06-14 15:09:15 +00:00
stats . ErrorCount ++
2012-12-23 17:22:25 +00:00
log . Printf ( "Source blobserver's enumerate size of %d for blob %s doesn't match its Get size of %d" ,
2013-08-04 02:54:30 +00:00
sb . Size , sb . Ref , size )
2012-12-23 17:22:25 +00:00
continue
}
2013-06-09 09:36:26 +00:00
2013-09-16 14:57:14 +00:00
if _ , err := blobserver . Receive ( firstHopDest , sb . Ref , blobReader ) ; err != nil {
2012-12-23 17:22:25 +00:00
stats . ErrorCount ++
2013-08-04 02:54:30 +00:00
log . Printf ( "Upload of %s to destination blobserver failed: %v" , sb . Ref , err )
2012-12-23 17:22:25 +00:00
continue
}
2013-06-09 09:36:26 +00:00
stats . BlobsCopied ++
stats . BytesCopied += size
2013-02-18 23:35:43 +00:00
if c . removeSrc {
2013-08-04 02:54:30 +00:00
if err = src . RemoveBlobs ( [ ] blob . Ref { sb . Ref } ) ; err != nil {
2012-12-23 17:22:25 +00:00
stats . ErrorCount ++
2013-08-04 02:54:30 +00:00
log . Printf ( "Failed to delete %s from source: %v" , sb . Ref , err )
2012-12-23 17:22:25 +00:00
}
2011-03-05 19:34:12 +00:00
}
2011-03-05 17:53:51 +00:00
}
}
checkSourceError ( )
checkDestError ( )
2013-03-09 06:56:21 +00:00
checkThirdError ( )
2011-06-14 15:09:15 +00:00
if retErr == nil && stats . ErrorCount > 0 {
2013-02-18 23:35:43 +00:00
retErr = fmt . Errorf ( "%d errors during sync" , stats . ErrorCount )
2011-03-03 04:03:09 +00:00
}
2011-06-14 15:09:15 +00:00
return stats , retErr
2011-03-02 02:02:01 +00:00
}
2012-12-22 23:22:22 +00:00
2013-08-04 02:54:30 +00:00
func loggingBlobRefChannel ( ch <- chan blob . SizedRef ) chan blob . SizedRef {
ch2 := make ( chan blob . SizedRef )
2012-12-22 23:22:22 +00:00
go func ( ) {
defer close ( ch2 )
var last time . Time
2012-12-23 17:00:51 +00:00
var nblob , nbyte int64
2012-12-22 23:22:22 +00:00
for v := range ch {
ch2 <- v
2012-12-23 17:00:51 +00:00
nblob ++
nbyte += v . Size
2012-12-22 23:22:22 +00:00
now := time . Now ( )
if last . IsZero ( ) || now . After ( last . Add ( 1 * time . Second ) ) {
last = now
2013-08-04 02:54:30 +00:00
log . Printf ( "At source blob %v (%d blobs, %d bytes)" , v . Ref , nblob , nbyte )
2012-12-22 23:22:22 +00:00
}
}
2012-12-23 17:00:51 +00:00
log . Printf ( "Total blobs: %d, %d bytes" , nblob , nbyte )
2012-12-22 23:22:22 +00:00
} ( )
return ch2
}