go blobserver: queue partition support, via flag.

This commit is contained in:
Brad Fitzpatrick 2011-02-01 22:48:12 -08:00
parent 39f1ebb895
commit 38b4f153e0
2 changed files with 34 additions and 3 deletions

View File

@ -50,18 +50,32 @@ func BlobFileBaseName(b *blobref.BlobRef) string {
return fmt.Sprintf("%s-%s.dat", b.HashName(), b.Digest())
}
func BlobDirectoryName(b *blobref.BlobRef) string {
func blobPartitionDirName(partitionDirSlash string, b *blobref.BlobRef) string {
d := b.Digest()
if len(d) < 6 {
d = d + "______"
}
return fmt.Sprintf("%s/%s/%s/%s", *flagStorageRoot, b.HashName(), d[0:3], d[3:6])
return fmt.Sprintf("%s/%s%s/%s/%s",
*flagStorageRoot, partitionDirSlash,
b.HashName(), d[0:3], d[3:6])
}
func BlobDirectoryName(b *blobref.BlobRef) string {
return blobPartitionDirName("", b)
}
func BlobFileName(b *blobref.BlobRef) string {
return fmt.Sprintf("%s/%s-%s.dat", BlobDirectoryName(b), b.HashName(), b.Digest())
}
func BlobPartitionDirectoryName(partition string, b *blobref.BlobRef) string {
return blobPartitionDirName("partition/" + partition + "/", b)
}
func PartitionBlobFileName(partition string, b *blobref.BlobRef) string {
return fmt.Sprintf("%s/%s-%s.dat", BlobPartitionDirectoryName(partition, b), b.HashName(), b.Digest())
}
func BlobFromUrlPath(path string) *blobref.BlobRef {
return blobref.FromPattern(kGetPutPattern, path)
}

View File

@ -28,6 +28,7 @@ import (
"log"
"mime"
"os"
"strings"
)
type receivedBlob struct {
@ -35,7 +36,9 @@ type receivedBlob struct {
size int64
}
var flagOpenImages *bool = flag.Bool("showimages", false, "Show images on receiving them with eog.")
var flagOpenImages = flag.Bool("showimages", false, "Show images on receiving them with eog.")
var flagQueuePartitions = flag.String("queue-partitions", "", "Comma-separated list of queue partitions to reference uploaded blobs into. Typically one for your indexer and one per mirror full syncer.")
var CorruptBlobError = os.NewError("corrupt blob; digest doesn't match")
@ -193,6 +196,20 @@ func receiveBlob(blobRef *blobref.BlobRef, source io.Reader) (blobGot *receivedB
return
}
if p := *flagQueuePartitions; p != "" {
for _, part := range strings.Split(p, ",", -1) {
partitionDir := BlobPartitionDirectoryName(part, blobRef)
if err = os.MkdirAll(partitionDir, 0700); err != nil {
return
}
partitionFileName := PartitionBlobFileName(part, blobRef)
if err = os.Link(fileName, partitionFileName); err != nil {
return
}
log.Printf("Mirrored to partition %q", part)
}
}
blobGot = &receivedBlob{blobRef: blobRef, size: stat.Size}
success = true