mirror of https://github.com/perkeep/perkeep.git
384 lines
9.9 KiB
Go
384 lines
9.9 KiB
Go
/*
|
|
Copyright 2011 Google Inc.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package schema
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"crypto"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"strings"
|
|
|
|
"camlistore.org/pkg/blobref"
|
|
"camlistore.org/pkg/blobserver"
|
|
"camlistore.org/pkg/rollsum"
|
|
)
|
|
|
|
const (
|
|
// maxBlobSize is the largest blob we ever make when cutting up
|
|
// a file.
|
|
maxBlobSize = 1 << 20
|
|
|
|
// firstChunkSize is the ideal size of the first chunk of a
|
|
// file. It's kept smaller for the file(1) command, which
|
|
// likes to read 96 kB on Linux and 256 kB on OS X. Related
|
|
// are tools which extract the EXIF metadata from JPEGs,
|
|
// ID3 from mp3s, etc. Nautilus, OS X Finder, etc.
|
|
// The first chunk may be larger than this if cutting the file
|
|
// here would create a small subsequent chunk (e.g. a file one
|
|
// byte larger than firstChunkSize)
|
|
firstChunkSize = 256 << 10
|
|
|
|
// bufioReaderSize is an explicit size for our bufio.Reader,
|
|
// so we don't rely on NewReader's implicit size.
|
|
// We care about the buffer size because it affects how far
|
|
// in advance we can detect EOF from an io.Reader that doesn't
|
|
// know its size. Detecting an EOF bufioReaderSize bytes early
|
|
// means we can plan for the final chunk.
|
|
bufioReaderSize = 32 << 10
|
|
|
|
// tooSmallThreshold is the threshold at which rolling checksum
|
|
// boundaries are ignored if the current chunk being built is
|
|
// smaller than this.
|
|
tooSmallThreshold = 64 << 10
|
|
)
|
|
|
|
var _ = log.Printf
|
|
|
|
// WriteFileFromReader creates and uploads a "file" JSON schema
|
|
// composed of chunks of r, also uploading the chunks. The returned
|
|
// BlobRef is of the JSON file schema blob.
|
|
func WriteFileFromReader(bs blobserver.StatReceiver, filename string, r io.Reader) (*blobref.BlobRef, error) {
|
|
m := NewFileMap(filename)
|
|
return WriteFileMap(bs, m, r)
|
|
}
|
|
|
|
// WriteFileMap uploads chunks of r to bs while populating fileMap and
|
|
// finally uploading fileMap. The returned blobref is of fileMap's
|
|
// JSON blob.
|
|
func WriteFileMap(bs blobserver.StatReceiver, fileMap Map, r io.Reader) (*blobref.BlobRef, error) {
|
|
return writeFileMapRolling(bs, fileMap, r)
|
|
}
|
|
|
|
// This is the simple 1MB chunk version. The rolling checksum version is below.
|
|
func writeFileMapOld(bs blobserver.StatReceiver, fileMap Map, r io.Reader) (*blobref.BlobRef, error) {
|
|
parts, size := []BytesPart{}, int64(0)
|
|
|
|
var buf bytes.Buffer
|
|
for {
|
|
buf.Reset()
|
|
n, err := io.Copy(&buf, io.LimitReader(r, maxBlobSize))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if n == 0 {
|
|
break
|
|
}
|
|
|
|
hash := crypto.SHA1.New()
|
|
io.Copy(hash, bytes.NewBuffer(buf.Bytes()))
|
|
br := blobref.FromHash("sha1", hash)
|
|
hasBlob, err := serverHasBlob(bs, br)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !hasBlob {
|
|
sb, err := bs.ReceiveBlob(br, &buf)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if expect := (blobref.SizedBlobRef{br, n}); !expect.Equal(sb) {
|
|
return nil, fmt.Errorf("schema/filewriter: wrote %s bytes, got %s ack'd", expect, sb)
|
|
}
|
|
}
|
|
|
|
size += n
|
|
parts = append(parts, BytesPart{
|
|
BlobRef: br,
|
|
Size: uint64(n),
|
|
Offset: 0, // into BlobRef to read from (not of dest)
|
|
})
|
|
}
|
|
|
|
err := PopulateParts(fileMap, size, parts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
json, err := fileMap.JSON()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
br := blobref.SHA1FromString(json)
|
|
sb, err := bs.ReceiveBlob(br, strings.NewReader(json))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if expect := (blobref.SizedBlobRef{br, int64(len(json))}); !expect.Equal(sb) {
|
|
return nil, fmt.Errorf("schema/filewriter: wrote %s bytes, got %s ack'd", expect, sb)
|
|
}
|
|
|
|
return br, nil
|
|
}
|
|
|
|
func serverHasBlob(bs blobserver.BlobStatter, br *blobref.BlobRef) (have bool, err error) {
|
|
ch := make(chan blobref.SizedBlobRef, 1)
|
|
go func() {
|
|
err = bs.StatBlobs(ch, []*blobref.BlobRef{br}, 0)
|
|
close(ch)
|
|
}()
|
|
for _ = range ch {
|
|
have = true
|
|
}
|
|
return
|
|
}
|
|
|
|
type span struct {
|
|
from, to int64
|
|
bits int
|
|
br *blobref.BlobRef
|
|
children []span
|
|
}
|
|
|
|
func (s *span) isSingleBlob() bool {
|
|
return len(s.children) == 0
|
|
}
|
|
|
|
func (s *span) size() int64 {
|
|
size := s.to - s.from
|
|
for _, cs := range s.children {
|
|
size += cs.size()
|
|
}
|
|
return size
|
|
}
|
|
|
|
// noteEOFReader keeps track of when it's seen EOF, but otherwise
|
|
// delegates entirely to r.
|
|
type noteEOFReader struct {
|
|
r io.Reader
|
|
sawEOF bool
|
|
}
|
|
|
|
func (r *noteEOFReader) Read(p []byte) (n int, err error) {
|
|
n, err = r.r.Read(p)
|
|
if err == io.EOF {
|
|
r.sawEOF = true
|
|
}
|
|
return
|
|
}
|
|
|
|
func uploadString(bs blobserver.StatReceiver, s string) (*blobref.BlobRef, error) {
|
|
br := blobref.SHA1FromString(s)
|
|
hasIt, err := serverHasBlob(bs, br)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if hasIt {
|
|
return br, nil
|
|
}
|
|
_, err = bs.ReceiveBlob(br, strings.NewReader(s))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return br, nil
|
|
}
|
|
|
|
// uploadBytes gets a map from mapSource (of type either "bytes" or
|
|
// "file", which is a superset of "bytes"), sets it to the provided
|
|
// size, and populates with provided spans. The bytes or file schema
|
|
// blob is uploaded and its blobref is returned.
|
|
func uploadBytes(bs blobserver.StatReceiver, mapSource func() Map, size int64, s []span) (*blobref.BlobRef, error) {
|
|
parts := []BytesPart{}
|
|
err := addBytesParts(bs, &parts, s)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m := mapSource()
|
|
err = PopulateParts(m, size, parts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
json, err := m.JSON()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return uploadString(bs, json)
|
|
}
|
|
|
|
// addBytesParts uploads the provided spans to bs, appending elements to *dst.
|
|
func addBytesParts(bs blobserver.StatReceiver, dst *[]BytesPart, spans []span) error {
|
|
for _, sp := range spans {
|
|
if len(sp.children) == 1 && sp.children[0].isSingleBlob() {
|
|
// Remove an occasional useless indirection of
|
|
// what would become a bytes schema blob
|
|
// pointing to a single blobref. Just promote
|
|
// the blobref child instead.
|
|
child := sp.children[0]
|
|
*dst = append(*dst, BytesPart{
|
|
BlobRef: child.br,
|
|
Size: uint64(child.size()),
|
|
})
|
|
sp.children = nil
|
|
}
|
|
if len(sp.children) > 0 {
|
|
childrenSize := int64(0)
|
|
for _, cs := range sp.children {
|
|
childrenSize += cs.size()
|
|
}
|
|
br, err := uploadBytes(bs, newBytes, childrenSize, sp.children)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*dst = append(*dst, BytesPart{
|
|
BytesRef: br,
|
|
Size: uint64(childrenSize),
|
|
})
|
|
}
|
|
if sp.from == sp.to {
|
|
panic("Shouldn't happen. " + fmt.Sprintf("weird span with same from & to: %#v", sp))
|
|
}
|
|
*dst = append(*dst, BytesPart{
|
|
BlobRef: sp.br,
|
|
Size: uint64(sp.to - sp.from),
|
|
})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// writeFileMap uploads chunks of r to bs while populating fileMap and
|
|
// finally uploading fileMap. The returned blobref is of fileMap's
|
|
// JSON blob. It uses rolling checksum for the chunks sizes.
|
|
func writeFileMapRolling(bs blobserver.StatReceiver, fileMap Map, r io.Reader) (outbr *blobref.BlobRef, outerr error) {
|
|
rootFile := func() Map { return fileMap }
|
|
n, spans, err := writeFileChunks(bs, fileMap, r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// The top-level content parts
|
|
return uploadBytes(bs, rootFile, n, spans)
|
|
}
|
|
|
|
// WriteFileChunks uploads chunks of r to bs while populating fileMap.
|
|
// It does not upload fileMap.
|
|
func WriteFileChunks(bs blobserver.StatReceiver, fileMap Map, r io.Reader) error {
|
|
rootFile := func() Map { return fileMap }
|
|
|
|
n, spans, err := writeFileChunks(bs, fileMap, r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
topLevel := func(mapSource func() Map, size int64, s []span) error {
|
|
parts := []BytesPart{}
|
|
err := addBytesParts(bs, &parts, s)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m := mapSource()
|
|
err = PopulateParts(m, size, parts)
|
|
return err
|
|
}
|
|
|
|
// The top-level content parts
|
|
return topLevel(rootFile, n, spans)
|
|
}
|
|
|
|
func writeFileChunks(bs blobserver.StatReceiver, fileMap Map, r io.Reader) (n int64, spans []span, outerr error) {
|
|
src := ¬eEOFReader{r: r}
|
|
blobSize := 0 // of the next blob being built, should be same as buf.Len()
|
|
bufr := bufio.NewReaderSize(src, bufioReaderSize)
|
|
spans = []span{} // the tree of spans, cut on interesting rollsum boundaries
|
|
rs := rollsum.New()
|
|
last := n
|
|
buf := new(bytes.Buffer)
|
|
|
|
// TODO: keep multiple of these in-flight at a time.
|
|
uploadLastSpan := func() bool {
|
|
defer buf.Reset()
|
|
br, err := uploadString(bs, buf.String())
|
|
if err != nil {
|
|
outerr = err
|
|
return false
|
|
}
|
|
spans[len(spans)-1].br = br
|
|
return true
|
|
}
|
|
|
|
for {
|
|
c, err := bufr.ReadByte()
|
|
if err == io.EOF {
|
|
if n != last {
|
|
spans = append(spans, span{from: last, to: n})
|
|
if !uploadLastSpan() {
|
|
return
|
|
}
|
|
}
|
|
break
|
|
}
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
|
|
buf.WriteByte(c)
|
|
n++
|
|
blobSize++
|
|
rs.Roll(c)
|
|
|
|
var bits int
|
|
onRollSplit := rs.OnSplit()
|
|
switch {
|
|
case blobSize == maxBlobSize:
|
|
bits = 20 // arbitrary node weight; 1<<20 == 1MB
|
|
case src.sawEOF:
|
|
// Don't split. End is coming soon enough.
|
|
continue
|
|
case onRollSplit && n > firstChunkSize && blobSize > tooSmallThreshold:
|
|
bits = rs.Bits()
|
|
case n == firstChunkSize:
|
|
bits = 18 // 1 << 18 == 256KB
|
|
default:
|
|
// Don't split.
|
|
continue
|
|
}
|
|
blobSize = 0
|
|
|
|
// Take any spans from the end of the spans slice that
|
|
// have a smaller 'bits' score and make them children
|
|
// of this node.
|
|
var children []span
|
|
childrenFrom := len(spans)
|
|
for childrenFrom > 0 && spans[childrenFrom-1].bits < bits {
|
|
childrenFrom--
|
|
}
|
|
if nCopy := len(spans) - childrenFrom; nCopy > 0 {
|
|
children = make([]span, nCopy)
|
|
copy(children, spans[childrenFrom:])
|
|
spans = spans[:childrenFrom]
|
|
}
|
|
|
|
spans = append(spans, span{from: last, to: n, bits: bits, children: children})
|
|
last = n
|
|
if !uploadLastSpan() {
|
|
return
|
|
}
|
|
}
|
|
|
|
return n, spans, nil
|
|
|
|
}
|