From 38b4f153e0c77cef308464fad8f9dd12a493163e Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Tue, 1 Feb 2011 22:48:12 -0800 Subject: [PATCH] go blobserver: queue partition support, via flag. --- server/go/blobserver/localdisk.go | 18 ++++++++++++++++-- server/go/blobserver/upload.go | 19 ++++++++++++++++++- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/server/go/blobserver/localdisk.go b/server/go/blobserver/localdisk.go index 0de7f92d6..d544e618b 100644 --- a/server/go/blobserver/localdisk.go +++ b/server/go/blobserver/localdisk.go @@ -50,18 +50,32 @@ func BlobFileBaseName(b *blobref.BlobRef) string { return fmt.Sprintf("%s-%s.dat", b.HashName(), b.Digest()) } -func BlobDirectoryName(b *blobref.BlobRef) string { +func blobPartitionDirName(partitionDirSlash string, b *blobref.BlobRef) string { d := b.Digest() if len(d) < 6 { d = d + "______" } - return fmt.Sprintf("%s/%s/%s/%s", *flagStorageRoot, b.HashName(), d[0:3], d[3:6]) + return fmt.Sprintf("%s/%s%s/%s/%s", + *flagStorageRoot, partitionDirSlash, + b.HashName(), d[0:3], d[3:6]) +} + +func BlobDirectoryName(b *blobref.BlobRef) string { + return blobPartitionDirName("", b) } func BlobFileName(b *blobref.BlobRef) string { return fmt.Sprintf("%s/%s-%s.dat", BlobDirectoryName(b), b.HashName(), b.Digest()) } +func BlobPartitionDirectoryName(partition string, b *blobref.BlobRef) string { + return blobPartitionDirName("partition/" + partition + "/", b) +} + +func PartitionBlobFileName(partition string, b *blobref.BlobRef) string { + return fmt.Sprintf("%s/%s-%s.dat", BlobPartitionDirectoryName(partition, b), b.HashName(), b.Digest()) +} + func BlobFromUrlPath(path string) *blobref.BlobRef { return blobref.FromPattern(kGetPutPattern, path) } diff --git a/server/go/blobserver/upload.go b/server/go/blobserver/upload.go index 34fbe401e..63c6c28fa 100644 --- a/server/go/blobserver/upload.go +++ b/server/go/blobserver/upload.go @@ -28,6 +28,7 @@ import ( "log" "mime" "os" + "strings" ) type receivedBlob struct { @@ -35,7 +36,9 @@ type receivedBlob struct { size int64 } -var flagOpenImages *bool = flag.Bool("showimages", false, "Show images on receiving them with eog.") +var flagOpenImages = flag.Bool("showimages", false, "Show images on receiving them with eog.") + +var flagQueuePartitions = flag.String("queue-partitions", "", "Comma-separated list of queue partitions to reference uploaded blobs into. Typically one for your indexer and one per mirror full syncer.") var CorruptBlobError = os.NewError("corrupt blob; digest doesn't match") @@ -193,6 +196,20 @@ func receiveBlob(blobRef *blobref.BlobRef, source io.Reader) (blobGot *receivedB return } + if p := *flagQueuePartitions; p != "" { + for _, part := range strings.Split(p, ",", -1) { + partitionDir := BlobPartitionDirectoryName(part, blobRef) + if err = os.MkdirAll(partitionDir, 0700); err != nil { + return + } + partitionFileName := PartitionBlobFileName(part, blobRef) + if err = os.Link(fileName, partitionFileName); err != nil { + return + } + log.Printf("Mirrored to partition %q", part) + } + } + blobGot = &receivedBlob{blobRef: blobRef, size: stat.Size} success = true