pkg/schema: break static-sets in subsets for large directories

The current maximum size for a schema blob is 1MB. For a large enough
directory (~20000 children), the resulting static-set JSON schema is
over that maximum size.

We could increase that maximum, but we would eventually hit the maximum
blob size (16MB), which would only allow for ~300000 children. Even if
that is an uncommon size, it is technically possible to have such large
directories, so I don't think it would be reasonable to restrict users
to such a limit. So it does not seems like enough of a solution.

The solution proposed in this CL is to spread the children of a
directory (when they are more numerous than a given maximum, here set to
10000) onto several static-sets, recursively if needed. These
static-sets (subsets of the whole lot of children) are stored in the new
"mergeSets" field of their parent static-set schema. The actual fileRefs
or dirRefs, are still stored in the "members" field of the subset they were
spread in. The "mergeSets" and "members" field of a static-set are therefore
mutually exclusive.

Fixes #924

Change-Id: Ibe47b50795d5288fe904d3cce0cc7f780d313408
This commit is contained in:
mpl 2017-05-02 17:39:48 +02:00
parent 4d65ed951b
commit db2604f981
10 changed files with 351 additions and 49 deletions

View File

@ -352,16 +352,24 @@ func (up *Uploader) open(path string) (*os.File, error) {
return os.Open(path)
}
func (n *node) directoryStaticSet() (*schema.StaticSet, error) {
ss := new(schema.StaticSet)
// directoryStaticSet returns the set of static-set schemas that represent the
// children of n. If the number of children is small (<10000), they're all members
// of the first (and only) static-set returned. Otherwise they are split and spread
// on several subsets (i.e. "mergeSets") which are all returned.
func (n *node) directoryStaticSet() ([]*schema.Blob, error) {
var members []blob.Ref
ss := schema.NewStaticSet()
for _, c := range n.children {
pr, err := c.PutResult()
if err != nil {
return nil, fmt.Errorf("Error populating directory static set for child %q: %v", c.fullPath, err)
}
ss.Add(pr.BlobRef)
members = append(members, pr.BlobRef)
}
return ss, nil
subsets := ss.SetStaticSetMembers(members)
allSets := []*schema.Blob{ss.Blob()}
allSets = append(allSets, subsets...)
return allSets, nil
}
func (up *Uploader) uploadNode(ctx context.Context, n *node) (*client.PutResult, error) {
@ -393,7 +401,16 @@ func (up *Uploader) uploadNode(ctx context.Context, n *node) (*client.PutResult,
if err != nil {
return nil, err
}
sspr, err := up.UploadBlob(ctxbg, ss)
if len(ss) > 1 {
// large directory, so the top static-set is divided in subsets that we have to upload too
for _, v := range ss[1:] {
if _, err := up.UploadBlob(ctxbg, v); err != nil {
return nil, err
}
}
}
// upload the top static-set
sspr, err := up.UploadBlob(ctxbg, ss[0])
if err != nil {
return nil, err
}

View File

@ -87,7 +87,7 @@ func (c *makeStaticCmd) RunCommand(args []string) error {
return m.CamliType
}
var ss schema.StaticSet
ss := schema.NewStaticSet()
pnDes, ok := res.Meta[pn.String()]
if !ok {
return fmt.Errorf("permanode %v not described", pn)
@ -99,6 +99,7 @@ func (c *makeStaticCmd) RunCommand(args []string) error {
if len(members) == 0 {
return fmt.Errorf("permanode %v has no camliMember attributes", pn)
}
var memberRefs []blob.Ref
for _, fileRefStr := range members {
if camliType(fileRefStr) != "permanode" {
continue
@ -108,10 +109,17 @@ func (c *makeStaticCmd) RunCommand(args []string) error {
continue
}
if camliType(contentRef) == "file" {
ss.Add(blob.MustParse(contentRef))
memberRefs = append(memberRefs, blob.MustParse(contentRef))
}
}
subsets := ss.SetStaticSetMembers(memberRefs)
// Large directories may have subsets. Upload any of those too:
for _, v := range subsets {
if _, err := cl.UploadBlob(context.Background(), v); err != nil {
return err
}
}
b := ss.Blob()
_, err = cl.UploadBlob(ctxbg, b)
if err != nil {

View File

@ -5,8 +5,15 @@ Example:
{"camliVersion": 1,
"camliType": "static-set",
// Required.
// May be ordered to unordered, depending on context/needs. If unordered,
// Either one of members or mergeSets is required, and they are mutually exclusive.
// If a directory has enough children that the resulting static-set blob
// would be larger than the maximum schema blob size, then the children are
// actually spread (recursively, if needed) onto several static-sets. When that is
// the case, these subsets are stored in mergeSets instead of members. Members
// stores the actual file or directory schemas (the actual members of the top
// static-set entity).
//
// Members can be ordered or unordered, depending on context/needs. If unordered,
// it's recommended but not required to sort the blobrefs.
"members": [
"digalg-blobref-item1", // maybe a file?
@ -17,7 +24,13 @@ Example:
"digalg-blobref-item6", // ... and what's valid depends on context
"digalg-blobref-item7", // ... a permanode in a directory would
"digalg-blobref-item8" // ... be invalid, for instance.
]
],
"mergeSets": [
"digalg-blobref-subset1", // another static-set, with either members or subsets
"digalg-blobref-subset2", // ''
"digalg-blobref-subset3", // ''
"digalg-blobref-subset4", // ''
]
}
Note: dynamic sets are structured differently, using a permanode and

View File

@ -221,10 +221,8 @@ func (id *IndexDeps) UploadFile(fileName string, contents string, modTime time.T
// If modTime is zero, it's not used.
func (id *IndexDeps) UploadDir(dirName string, children []blob.Ref, modTime time.Time) blob.Ref {
// static-set entries blob
ss := new(schema.StaticSet)
for _, child := range children {
ss.Add(child)
}
ss := schema.NewStaticSet()
ss.SetStaticSetMembers(children)
ssjson := ss.Blob().JSON()
ssb := &test.Blob{Contents: ssjson}
id.BlobSource.AddBlob(ssb)

View File

@ -150,10 +150,14 @@ func (b *Blob) DirectoryEntries() (br blob.Ref, ok bool) {
return b.ss.Entries, true
}
// StaticSetMembers returns the refs in the "members" field if b is a valid
// "static-set" schema. Note that if it is a large static-set, the members are
// actually spread as subsets in "mergeSets". See StaticSetMergeSets.
func (b *Blob) StaticSetMembers() []blob.Ref {
if b.Type() != "static-set" {
return nil
}
s := make([]blob.Ref, 0, len(b.ss.Members))
for _, ref := range b.ss.Members {
if ref.Valid() {
@ -163,6 +167,22 @@ func (b *Blob) StaticSetMembers() []blob.Ref {
return s
}
// StaticSetMergeSets returns the refs of the static-sets in "mergeSets". These
// are the subsets of all the static-set members in the case of a large directory.
func (b *Blob) StaticSetMergeSets() []blob.Ref {
if b.Type() != "static-set" {
return nil
}
s := make([]blob.Ref, 0, len(b.ss.MergeSets))
for _, ref := range b.ss.MergeSets {
if ref.Valid() {
s = append(s, ref)
}
}
return s
}
func (b *Blob) ShareAuthType() string {
s, ok := b.AsShare()
if !ok {

View File

@ -93,7 +93,16 @@ func (dr *DirReader) StaticSet(ctx context.Context) ([]blob.Ref, error) {
if !staticSetBlobref.Valid() {
return nil, errors.New("schema/dirreader: Invalid blobref")
}
rsc, _, err := dr.fetcher.Fetch(ctx, staticSetBlobref)
members, err := staticSet(ctx, staticSetBlobref, dr.fetcher)
if err != nil {
return nil, err
}
dr.staticSet = members
return dr.staticSet, nil
}
func staticSet(ctx context.Context, staticSetBlobref blob.Ref, fetcher blob.Fetcher) ([]blob.Ref, error) {
rsc, _, err := fetcher.Fetch(ctx, staticSetBlobref)
if err != nil {
return nil, fmt.Errorf("schema/dirreader: fetching schema blob %s: %v", staticSetBlobref, err)
}
@ -105,13 +114,32 @@ func (dr *DirReader) StaticSet(ctx context.Context) ([]blob.Ref, error) {
if ss.Type != "static-set" {
return nil, fmt.Errorf("schema/dirreader: expected \"static-set\" schema blob for %s, got %q", staticSetBlobref, ss.Type)
}
for _, member := range ss.Members {
if !member.Valid() {
return nil, fmt.Errorf("schema/dirreader: invalid (static-set member) blobref referred by \"static-set\" schema blob %v", staticSetBlobref)
var members []blob.Ref
if len(ss.Members) > 0 {
// We have fileRefs or dirRefs in ss.Members, so we are either in the static-set
// of a small directory, or one of the "leaf" subsets of a large directory spread.
for _, member := range ss.Members {
if !member.Valid() {
return nil, fmt.Errorf("schema/dirreader: invalid (static-set member) blobref referred by \"static-set\" schema blob %v", staticSetBlobref)
}
members = append(members, member)
}
dr.staticSet = append(dr.staticSet, member)
return members, nil
}
return dr.staticSet, nil
// We are either at the top static-set of a large directory, or in a "non-leaf"
// subset of a large directory.
for _, toMerge := range ss.MergeSets {
if !toMerge.Valid() {
return nil, fmt.Errorf("schema/dirreader: invalid (static-set subset) blobref referred by \"static-set\" schema blob %v", staticSetBlobref)
}
// TODO(mpl): do it concurrently
subset, err := staticSet(ctx, toMerge, fetcher)
if err != nil {
return nil, fmt.Errorf("schema/dirreader: could not get members of %q, subset of %v: %v", toMerge, staticSetBlobref, err)
}
members = append(members, subset...)
}
return members, nil
}
// Readdir implements the Directory interface.

View File

@ -447,3 +447,71 @@ func (s summary) String() string {
}
return fmt.Sprintf("%d bytes, starting with %q", len(s), []byte(s[:plen]))
}
func TestReadDirs(t *testing.T) {
oldMaxStaticSetMembers := maxStaticSetMembers
maxStaticSetMembers = 10
defer func() {
maxStaticSetMembers = oldMaxStaticSetMembers
}()
// small directory, no splitting needed.
testReadDir(t, []*test.Blob{
&test.Blob{"AAAAAaaaaa"},
&test.Blob{"BBBBBbbbbb"},
&test.Blob{"CCCCCccccc"},
})
// large (over maxStaticSetMembers) directory. splitting, but no recursion needed.
var members []*test.Blob
for i := 0; i < maxStaticSetMembers+3; i++ {
members = append(members, &test.Blob{fmt.Sprintf("sha1-%2d", i)})
}
testReadDir(t, members)
// very large (over maxStaticSetMembers^2) directory. splitting with recursion.
members = nil
for i := 0; i < maxStaticSetMembers*maxStaticSetMembers+3; i++ {
members = append(members, &test.Blob{fmt.Sprintf("sha1-%3d", i)})
}
testReadDir(t, members)
}
func testReadDir(t *testing.T, members []*test.Blob) {
fetcher := &test.Fetcher{}
for _, v := range members {
fetcher.AddBlob(v)
}
var membersRefs []blob.Ref
for _, v := range members {
membersRefs = append(membersRefs, v.BlobRef())
}
ssb := NewStaticSet()
subsets := ssb.SetStaticSetMembers(membersRefs)
for _, v := range subsets {
fetcher.AddBlob(&test.Blob{v.str})
}
fetcher.AddBlob(&test.Blob{ssb.Blob().str})
dir := NewDirMap("whatever").PopulateDirectoryMap(ssb.Blob().BlobRef())
dirBlob := dir.Blob()
fetcher.AddBlob(&test.Blob{dirBlob.str})
dr, err := NewDirReader(context.Background(), fetcher, dirBlob.BlobRef())
if err != nil {
t.Fatal(err)
}
children, err := dr.StaticSet(context.Background())
if err != nil {
t.Fatal(err)
}
asMap := make(map[blob.Ref]bool)
for _, v := range children {
asMap[v] = true
}
for _, v := range membersRefs {
if _, ok := asMap[v]; !ok {
t.Errorf("%q not found among directory's children", v.String())
}
}
}

View File

@ -280,8 +280,9 @@ type superset struct {
// See doc/schema/bytes.txt and doc/schema/files/file.txt.
Parts []*BytesPart `json:"parts"`
Entries blob.Ref `json:"entries"` // for directories, a blobref to a static-set
Members []blob.Ref `json:"members"` // for static sets (for directory static-sets: blobrefs to child dirs/files)
Entries blob.Ref `json:"entries"` // for directories, a blobref to a static-set
Members []blob.Ref `json:"members"` // for static sets (for directory static-sets: blobrefs to child dirs/files)
MergeSets []blob.Ref `json:"mergeSets"` // each is a "sub static-set", that has either Members or MergeSets. For large dirs.
// Search allows a "share" blob to share an entire search. Contrast with "target".
Search SearchQuery `json:"search"`
@ -568,15 +569,84 @@ func (d *defaultStatHasher) Hash(fileName string) (blob.Ref, error) {
return blob.RefFromHash(h), nil
}
type StaticSet struct {
l sync.Mutex
refs []blob.Ref
// maximum number of static-set members in a static-set schema. As noted in
// https://github.com/camlistore/camlistore/issues/924 , 33k members result in a
// 1.7MB blob, so 10k members seems reasonable to stay under the MaxSchemaBlobSize (1MB)
// limit. This is not a const, so we can lower it during tests and test the logic
// without having to create thousands of blobs.
var maxStaticSetMembers = 10000
// NewStaticSet returns the "static-set" schema for a directory. Its members
// should be populated with SetStaticSetMembers.
func NewStaticSet() *Builder {
return base(1, "static-set")
}
func (ss *StaticSet) Add(ref blob.Ref) {
ss.l.Lock()
defer ss.l.Unlock()
ss.refs = append(ss.refs, ref)
// SetStaticSetMembers sets the given members as the static-set members of this
// builder. If the members are so numerous that they would not fit on a schema
// blob, they are spread (recursively, if needed) onto sub static-sets. In which
// case, these subsets are set as "mergeSets" of this builder. All the created
// subsets are returned, so the caller can upload them along with the top
// static-set created from this builder.
// SetStaticSetMembers panics if bb isn't a "static-set" claim type.
func (bb *Builder) SetStaticSetMembers(members []blob.Ref) []*Blob {
if bb.Type() != "static-set" {
panic("called SetStaticSetMembers on non static-set")
}
if len(members) <= maxStaticSetMembers {
ms := make([]string, len(members))
for i := range members {
ms[i] = members[i].String()
}
bb.m["members"] = ms
return nil
}
// too many members to fit in one static-set, so we spread them in
// several sub static-sets.
subsetsNumber := len(members) / maxStaticSetMembers
var perSubset int
if subsetsNumber < maxStaticSetMembers {
// this means we can fill each subset up to maxStaticSetMembers,
// and stash the rest in one last subset.
perSubset = maxStaticSetMembers
} else {
// otherwise we need to divide the members evenly in
// (maxStaticSetMembers - 1) subsets, and each of these subsets
// will also (recursively) have subsets of its own. There might
// also be a rest in one last subset, as above.
subsetsNumber = maxStaticSetMembers - 1
perSubset = len(members) / subsetsNumber
}
// only the subsets at this level
subsets := make([]*Blob, 0, subsetsNumber)
// subsets at this level, plus all the children subsets.
allSubsets := make([]*Blob, 0, subsetsNumber)
for i := 0; i < subsetsNumber; i++ {
ss := NewStaticSet()
subss := ss.SetStaticSetMembers(members[i*perSubset : (i+1)*perSubset])
subsets = append(subsets, ss.Blob())
allSubsets = append(allSubsets, ss.Blob())
for _, v := range subss {
allSubsets = append(allSubsets, v)
}
}
// Deal with the rest (of the euclidian division)
if perSubset*subsetsNumber < len(members) {
ss := NewStaticSet()
ss.SetStaticSetMembers(members[perSubset*subsetsNumber:])
allSubsets = append(allSubsets, ss.Blob())
subsets = append(subsets, ss.Blob())
}
mss := make([]string, len(subsets))
for i := range subsets {
mss[i] = subsets[i].BlobRef().String()
}
bb.m["mergeSets"] = mss
return allSubsets
}
func base(version int, ctype string) *Builder {
@ -615,23 +685,6 @@ func NewHashPlannedPermanode(h hash.Hash) *Builder {
return NewPlannedPermanode(blob.RefFromHash(h).String())
}
// Blob returns a Camli map of camliType "static-set"
// TODO: delete this method
func (ss *StaticSet) Blob() *Blob {
bb := base(1, "static-set")
ss.l.Lock()
defer ss.l.Unlock()
members := make([]string, 0, len(ss.refs))
if ss.refs != nil {
for _, ref := range ss.refs {
members = append(members, ref.String())
}
}
bb.m["members"] = members
return bb.Blob()
}
// JSON returns the map m encoded as JSON in its
// recommended canonical form. The canonical form is readable with newlines and indentation,
// and always starts with the header bytes:

View File

@ -18,6 +18,7 @@ package schema
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
@ -30,6 +31,7 @@ import (
"perkeep.org/internal/osutil"
"perkeep.org/internal/testhooks"
"perkeep.org/pkg/blob"
"perkeep.org/pkg/test"
. "perkeep.org/pkg/test/asserts"
)
@ -708,3 +710,94 @@ func TestTimezoneEXIFCorrection(t *testing.T) {
}
}
}
func TestLargeDirs(t *testing.T) {
oldMaxStaticSetMembers := maxStaticSetMembers
maxStaticSetMembers = 10
defer func() {
maxStaticSetMembers = oldMaxStaticSetMembers
}()
// small directory, no splitting needed.
testLargeDir(t, []blob.Ref{
(&test.Blob{"AAAAAaaaaa"}).BlobRef(),
(&test.Blob{"BBBBBbbbbb"}).BlobRef(),
(&test.Blob{"CCCCCccccc"}).BlobRef(),
})
// large (over maxStaticSetMembers) directory. splitting, but no recursion needed.
var members []blob.Ref
for i := 0; i < maxStaticSetMembers+3; i++ {
members = append(members, (&test.Blob{fmt.Sprintf("%2d", i)}).BlobRef())
}
testLargeDir(t, members)
// very large (over maxStaticSetMembers^2) directory. splitting with recursion.
members = nil
for i := 0; i < maxStaticSetMembers*maxStaticSetMembers+3; i++ {
members = append(members, (&test.Blob{fmt.Sprintf("%3d", i)}).BlobRef())
}
testLargeDir(t, members)
}
func testLargeDir(t *testing.T, members []blob.Ref) {
ssb := NewStaticSet()
subsets := ssb.SetStaticSetMembers(members)
refToBlob := make(map[string]*Blob, len(subsets))
for _, v := range subsets {
refToBlob[v.BlobRef().String()] = v
}
var findMember func(blob.Ref, []blob.Ref) bool
findMember = func(member blob.Ref, entries []blob.Ref) bool {
for _, v := range entries {
if member == v {
return true
}
subsetBlob, ok := refToBlob[v.String()]
if !ok {
continue
}
children := subsetBlob.StaticSetMembers()
if len(children) == 0 {
children = subsetBlob.StaticSetMergeSets()
}
if findMember(member, children) {
return true
}
}
return false
}
var membersOrSubsets []string
if ssb.m["members"] != nil && len(ssb.m["members"].([]string)) > 0 {
membersOrSubsets = ssb.m["members"].([]string)
} else {
membersOrSubsets = ssb.m["mergeSets"].([]string)
}
for _, mb := range members {
var found bool
for _, v := range membersOrSubsets {
if mb.String() == v {
found = true
break
}
subsetBlob, ok := refToBlob[v]
if !ok {
continue
}
children := subsetBlob.StaticSetMembers()
if len(children) == 0 {
children = subsetBlob.StaticSetMergeSets()
}
if findMember(mb, children) {
found = true
break
}
}
if !found {
t.Errorf("member %q not found while following the subset schemas", mb)
}
}
}

View File

@ -228,9 +228,13 @@ func mkdir(am auth.AuthMode, children []blob.Ref) (blob.Ref, error) {
return blob.Ref{}, err
}
var newdir blob.Ref
var ss schema.StaticSet
for _, br := range children {
ss.Add(br)
ss := schema.NewStaticSet()
subsets := ss.SetStaticSetMembers(children)
for _, v := range subsets {
// TODO(mpl): make them concurrent
if _, err := cl.UploadBlob(context.TODO(), v); err != nil {
return newdir, err
}
}
ssb := ss.Blob()
if _, err := cl.UploadBlob(context.TODO(), ssb); err != nil {