volunteer data archival: various fixes

This commit is contained in:
David Anderson 2013-05-09 14:53:26 -07:00
parent 7a63c1bb71
commit ea114741b9
7 changed files with 177 additions and 85 deletions

View File

@ -30,6 +30,14 @@
// default: 200000
// --file_size x
// default: 1e12
// --debug_status
// --debug_ft
// write recovery details to stdout
// --log_actions
// write actions to stdout
// --sim_duration_years;
// --random
// srand to pid
//
// outputs:
// stdout: log info
@ -49,6 +57,8 @@
using std::set;
bool log_actions = false;
// We simulate policies based on coding and replication.
//
// Coding means that data is divided into M = N+K units,
@ -107,9 +117,6 @@ struct PARAMS {
// These are measured starting from the time when the file's
// initial downloads have all succeeded or failed
#define EVENT_DEBUG
//#define RECOVERY_DEBUG
SIMULATOR sim;
int next_file_id=0;
int next_host_id=0;
@ -127,7 +134,9 @@ char* now_str() {
}
void show_msg(char* msg) {
printf("%s: %s", time_str(sim.now), msg);
if (log_actions) {
printf("%s: %s", time_str(sim.now), msg);
}
}
struct CHUNK;
@ -214,13 +223,13 @@ struct SIM_FILE : VDA_FILE_AUX, EVENT {
meta_chunk->data_needed = true;
} else {
meta_chunk = new META_CHUNK(this, NULL, size, 0, id);
#ifdef EVENT_DEBUG
printf("created file %d: size %f GB encoded size %f GB\n",
id, size/1e9, disk_usage.value/1e9
);
if (log_actions) {
printf("created file %d: size %f GB encoded size %f GB\n",
id, size/1e9, disk_usage.value/1e9
);
}
t = sim.now + 500.*86400;
sim.insert(this);
#endif
}
meta_chunk->recovery_plan();
meta_chunk->recovery_action(sim.now);
@ -230,15 +239,28 @@ struct SIM_FILE : VDA_FILE_AUX, EVENT {
}
void recover() {
printf("recovery_plan():\n");
if (debug_status) {
printf("recovery_plan():\n");
}
meta_chunk->recovery_plan();
printf("decide_reconstruct():\n");
if (meta_chunk->status == UNRECOVERABLE) {
printf("FILE IS LOST!!\n");
exit(0);
}
if (debug_status) {
printf("decide_reconstruct():\n");
}
meta_chunk->decide_reconstruct();
printf("reconstruct_and_cleanup():\n");
if (debug_status) {
printf("reconstruct_and_cleanup():\n");
}
meta_chunk->reconstruct_and_cleanup();
printf("recovery_action():\n");
if (debug_status) {
printf("recovery_action():\n");
}
meta_chunk->recovery_action(sim.now);
meta_chunk->compute_min_failures();
printf(" Min failures: %d\n", meta_chunk->min_failures);
fault_tolerance.sample(
meta_chunk->min_failures-1, collecting_stats(), sim.now
);
@ -270,9 +292,9 @@ void CHUNK_ON_HOST::start_upload() {
transfer_in_progress = true;
transfer_wait = true;
t = sim.now + drand()*params.connect_interval;
#ifdef EVENT_DEBUG
//printf("%s: waiting to start upload of %s\n", now_str(), physical_file_name);
#endif
if (log_actions) {
printf("%s: waiting to start upload of %s\n", now_str(), physical_file_name);
}
sim.insert(this);
}
@ -280,9 +302,9 @@ void CHUNK_ON_HOST::start_download() {
transfer_in_progress = true;
transfer_wait = true;
t = sim.now + drand()*params.connect_interval;
#ifdef EVENT_DEBUG
//printf("%s: waiting to start download of %s\n", now_str(), physical_file_name);
#endif
if (log_actions) {
printf("%s: waiting to start download of %s\n", now_str(), physical_file_name);
}
sim.insert(this);
}
@ -293,18 +315,22 @@ void CHUNK_ON_HOST::handle() {
if (transfer_wait) {
transfer_wait = false;
if (present_on_host) {
#ifdef EVENT_DEBUG
printf("%s: starting upload of %s\n", now_str(), physical_file_name);
#endif
if (log_actions) {
printf("%s: starting upload of %s\n",
now_str(), physical_file_name
);
}
chunk->parent->dfile->upload_rate.sample_inc(
host->transfer_rate,
chunk->parent->dfile->collecting_stats(),
sim.now
);
} else {
#ifdef EVENT_DEBUG
printf("%s: starting download of %s\n", now_str(), physical_file_name);
#endif
if (log_actions) {
printf("%s: starting download of %s\n",
now_str(), physical_file_name
);
}
chunk->parent->dfile->download_rate.sample_inc(
host->transfer_rate,
chunk->parent->dfile->collecting_stats(),
@ -318,9 +344,11 @@ void CHUNK_ON_HOST::handle() {
transfer_in_progress = false;
if (present_on_host) {
// it was an upload
#ifdef EVENT_DEBUG
printf("%s: upload of %s completed\n", now_str(), physical_file_name);
#endif
if (log_actions) {
printf("%s: upload of %s completed\n",
now_str(), physical_file_name
);
}
chunk->parent->dfile->upload_rate.sample_inc(
-host->transfer_rate,
chunk->parent->dfile->collecting_stats(),
@ -329,9 +357,11 @@ void CHUNK_ON_HOST::handle() {
chunk->upload_complete();
} else {
present_on_host = true;
#ifdef EVENT_DEBUG
printf("%s: download of %s completed\n", now_str(), physical_file_name);
#endif
if (log_actions) {
printf("%s: download of %s completed\n",
now_str(), physical_file_name
);
}
chunk->parent->dfile->download_rate.sample_inc(
-host->transfer_rate,
chunk->parent->dfile->collecting_stats(),
@ -368,9 +398,9 @@ void SIM_HOST::handle() {
set<SIM_HOST*>::iterator i = hosts.find(this);
hosts.erase(i);
#ifdef EVENT_DEBUG
printf("%s: host %d failed\n", now_str(), id);
#endif
if (log_actions) {
printf("%s: host %d failed\n", now_str(), id);
}
set<CHUNK_ON_HOST*>::iterator p;
for (p = chunks.begin(); p != chunks.end(); p++) {
CHUNK_ON_HOST* c = *p;
@ -387,7 +417,7 @@ CHUNK::CHUNK(META_CHUNK* mc, double s, int index) {
sprintf(name, "%s.%d", parent->name, index);
VDA_FILE_AUX* fp = parent->dfile;
fp->pending_init_downloads += fp->policy.replication;
fp->disk_usage.sample_inc(size, false, sim.now);
fp->disk_usage.sample_inc(size, false, sim.now, "init");
}
// if there aren't enough replicas of this chunk,
@ -410,9 +440,11 @@ int CHUNK::assign() {
#endif
CHUNK_ON_HOST *c = new CHUNK_ON_HOST();
sprintf(c->physical_file_name, "chunk %s on host %d", name, h->id);
#ifdef EVENT_DEBUG
printf("%s: assigning chunk %s to host %d\n", now_str(), name, h->id);
#endif
if (log_actions) {
printf("%s: assigning chunk %s to host %d\n",
now_str(), name, h->id
);
}
c->host = h;
c->chunk = this;
h->chunks.insert(c);
@ -440,9 +472,9 @@ int CHUNK::start_upload() {
void CHUNK::host_failed(VDA_CHUNK_HOST* p) {
set<VDA_CHUNK_HOST*>::iterator i = hosts.find(p);
hosts.erase(i);
#ifdef EVENT_DEBUG
printf("%s: handling loss of %s\n", now_str(), p->physical_file_name);
#endif
if (log_actions) {
printf("%s: handling loss of %s\n", now_str(), p->physical_file_name);
}
SIM_FILE* sfp = (SIM_FILE*)parent->dfile;
sfp->recover();
}
@ -453,7 +485,8 @@ void CHUNK::upload_complete() {
parent->dfile->disk_usage.sample_inc(
size,
parent->dfile->collecting_stats(),
sim.now
sim.now,
"upload_complete"
);
}
SIM_FILE* sfp = (SIM_FILE*)parent->dfile;
@ -477,12 +510,29 @@ int META_CHUNK::upload_all() {
}
int META_CHUNK::encode(bool) {
printf("%s: encoding metachunk %s\n", now_str(), name);
if (log_actions) {
printf("%s: encoding metachunk %s\n", now_str(), name);
}
// new chunks count toward server disk usage
//
if (bottom_level) {
for (unsigned int i=0; i<children.size(); i++) {
CHUNK& c = *(CHUNK*)children[i];
if (!c.present_on_server) {
dfile->disk_usage.sample_inc(
c.size, dfile->collecting_stats(), sim.now, "encode"
);
}
}
}
return 0;
}
int META_CHUNK::decode() {
printf("%s: decoding metachunk %s\n", now_str(), name);
if (log_actions) {
printf("%s: decoding metachunk %s\n", now_str(), name);
}
return 0;
}
@ -494,6 +544,10 @@ set<SIM_FILE*> dfiles;
int main(int argc, char** argv) {
POLICY policy;
bool log_disk_usage = false;
bool log_fault_tolerance = false;
bool log_download = false;
bool log_upload = false;
// default policy
//
@ -528,6 +582,24 @@ int main(int argc, char** argv) {
params.mean_xfer_rate = atof(argv[++i]);
} else if (!strcmp(argv[i], "--file_size")) {
params.file_size = atof(argv[++i]);
} else if (!strcmp(argv[i], "--debug_status")) {
debug_status = true;
} else if (!strcmp(argv[i], "--debug_ft")) {
debug_ft = true;
} else if (!strcmp(argv[i], "--log_actions")) {
log_actions = true;
} else if (!strcmp(argv[i], "--log_disk_usage")) {
log_disk_usage = true;
} else if (!strcmp(argv[i], "--log_fault_tolerance")) {
log_fault_tolerance = true;
} else if (!strcmp(argv[i], "--log_download")) {
log_download = true;
} else if (!strcmp(argv[i], "--log_upload")) {
log_upload = true;
} else if (!strcmp(argv[i], "--sim_duration_years")) {
params.sim_duration = atof(argv[++i])*86400*365;
} else if (!strcmp(argv[i], "--random")) {
srand(getpid());
} else {
printf("bad arg %s\n", argv[i]);
exit(1);
@ -546,6 +618,10 @@ int main(int argc, char** argv) {
#endif
SIM_FILE* dfile = new SIM_FILE(params.file_size);
dfile->policy = policy;
if (log_disk_usage) dfile->disk_usage.log_changes = true;
if (log_fault_tolerance) dfile->fault_tolerance.log_changes = true;
if (log_download) dfile->download_rate.log_changes = true;
if (log_upload) dfile->upload_rate.log_changes = true;
sim.insert(dfile);
sim.simulate(params.sim_duration);

View File

@ -39,23 +39,20 @@ void STATS_ITEM::init(const char* n, const char* filename, STATS_KIND k) {
}
extreme_val_time = 0;
first = true;
log_changes = false;
}
void STATS_ITEM::sample(double v, bool collecting_stats, double now) {
char buf[256];
if (value != v) {
if (value != v && log_changes) {
char buf[256];
switch (kind) {
case DISK:
#if 0
sprintf(buf, "%s: %f GB -> %f GB\n", name, value/1e9, v/1e9);
show_msg(buf);
#endif
break;
case NETWORK:
#if 0
sprintf(buf, "%s: %f Mbps -> %f Mbps\n", name, value/1e6, v/1e6);
show_msg(buf);
#endif
break;
case FAULT_TOLERANCE:
sprintf(buf, "%s: %.0f -> %.0f\n", name, value, v);
@ -94,8 +91,11 @@ void STATS_ITEM::sample(double v, bool collecting_stats, double now) {
fprintf(f, "%f %f\n", now, v);
}
void STATS_ITEM::sample_inc(double inc, bool collecting_stats, double now) {
void STATS_ITEM::sample_inc(double inc, bool collecting_stats, double now, const char* reason) {
sample(value+inc, collecting_stats, now);
if (reason) {
printf(" reason: %s\n", reason);
}
}
void STATS_ITEM::print(double now) {

View File

@ -37,10 +37,11 @@ struct STATS_ITEM {
bool first;
char name[256];
FILE* f;
bool log_changes;
void init(const char* n, const char* filename, STATS_KIND k);
void sample(double v, bool collecting_stats, double now);
void sample_inc(double inc, bool collecting_stats, double now);
void sample_inc(double inc, bool collecting_stats, double now, const char* reason=NULL);
void print(double now);
void print_summary(FILE* f, double now);
};

View File

@ -29,7 +29,8 @@
using std::vector;
using std::set;
#define DEBUG_RECOVERY
bool debug_status = false;
bool debug_ft = false;
///////////////// Utility functions ///////////////////////
@ -47,14 +48,13 @@ bool compare_min_failures(const DATA_UNIT* d1, const DATA_UNIT* d2) {
char* time_str(double t) {
static char buf[256];
int n = (int)t;
int nsec = n % 60;
n /= 60;
int nmin = n % 60;
n /= 60;
int nhour = n % 24;
n /= 24;
sprintf(buf, "%4d days %02d:%02d:%02d", n, nhour, nmin, nsec);
int nsec = (int)fmod(t, 60);
t /= 60;
int nmin = (int)fmod(t, 60);
t /= 60;
int nhour = (int)fmod(t, 24);
t /= 24;
sprintf(buf, "%4d days %02d:%02d:%02d", (int)t, nhour, nmin, nsec);
return buf;
}
@ -199,18 +199,18 @@ int META_CHUNK::recovery_action(double now) {
if (data_now_present) {
status = PRESENT;
}
#ifdef DEBUG_RECOVERY
printf(" meta chunk %s: status %s have_unrec_children %d\n",
name, status_str(status), have_unrecoverable_children
);
#endif
if (debug_status) {
printf(" meta chunk %s: status %s have_unrec_children %d\n",
name, status_str(status), have_unrecoverable_children
);
}
for (i=0; i<children.size(); i++) {
DATA_UNIT* c = children[i];
#ifdef DEBUG_RECOVERY
printf(" child %s status %s in rec set %d\n",
c->name, status_str(c->status), c->in_recovery_set
);
#endif
if (debug_status) {
printf(" child %s status %s in rec set %d\n",
c->name, status_str(c->status), c->in_recovery_set
);
}
switch (status) {
case PRESENT:
if (c->status == UNRECOVERABLE) {
@ -261,7 +261,7 @@ int META_CHUNK::compute_min_failures() {
}
if ((int)(present.size()) >= coding.n) {
status = PRESENT;
min_failures = INT_MAX;
min_failures = dfile->policy.max_ft;
} else if ((int)(present.size() + recoverable.size()) >= coding.n) {
status = RECOVERABLE;
@ -280,10 +280,14 @@ int META_CHUNK::compute_min_failures() {
//
for (i=0; i<j; i++) {
DATA_UNIT* c = recoverable[i];
//printf(" Min failures of %s: %d\n", c->name, c->min_failures);
if (debug_ft) {
printf(" Min failures of %s: %d\n", c->name, c->min_failures);
};
min_failures += c->min_failures;
}
//printf(" our min failures: %d\n", min_failures);
if (debug_ft) {
printf(" our min failures: %d\n", min_failures);
}
}
return 0;
}
@ -464,14 +468,14 @@ void CHUNK::recovery_plan() {
} else {
status = UNRECOVERABLE;
}
#ifdef DEBUG_RECOVERY
printf(" chunk %s: status %s\n", name, status_str(status));
#endif
if (debug_status) {
printf(" chunk %s: status %s\n", name, status_str(status));
}
}
int CHUNK::compute_min_failures() {
if (present_on_server) {
min_failures = INT_MAX;
min_failures = parent->dfile->policy.max_ft;
return 0;
}
int nreplicas = 0;
@ -502,11 +506,14 @@ int CHUNK::recovery_action(double now) {
VDA_FILE_AUX* fp = parent->dfile;
if (data_now_present) {
present_on_server = true;
#if 0
fp->disk_usage.sample_inc(
size,
fp->collecting_stats(),
now
now,
"recovery_action: now present"
);
#endif
status = PRESENT;
}
if (status == PRESENT && (int)(hosts.size()) < fp->policy.replication) {
@ -517,11 +524,11 @@ int CHUNK::recovery_action(double now) {
if (download_in_progress()) {
keep_present = true;
}
#ifdef DEBUG_RECOVERY
printf(" chunk %s: data_needed %d present_on_server %d keep_present %d\n",
name, data_needed, present_on_server, keep_present
);
#endif
if (debug_status) {
printf(" chunk %s: data_needed %d present_on_server %d keep_present %d\n",
name, data_needed, present_on_server, keep_present
);
}
if (present_on_server) {
if (!keep_present) {
sprintf(buf,
@ -536,7 +543,8 @@ int CHUNK::recovery_action(double now) {
parent->dfile->disk_usage.sample_inc(
-size,
fp->collecting_stats(),
now
now,
"recovery_action: don't need"
);
}
} else {

View File

@ -30,6 +30,9 @@
#include "stats.h"
#include "vda_policy.h"
extern bool debug_status;
extern bool debug_ft;
// a host with rpc_time < now-HOST_TIMEOUT is considered dead.
// Make sure you set next_rpc_delay accordingly (e.g., to 86400)
//

View File

@ -37,6 +37,7 @@ int POLICY::parse(const char* filename) {
fclose(f);
return -1;
}
max_ft = replication;
n = fscanf(f, "%d", &coding_levels);
if (n != 1) {
printf("parse error in %s\n", filename);
@ -55,6 +56,7 @@ int POLICY::parse(const char* filename) {
sprintf(buf, "(%d %d %d) ", c.n, c.k, c.n_upload);
strcat(description, buf);
max_ft *= c.n;
}
fclose(f);
sprintf(buf, "X%d", replication);

View File

@ -34,6 +34,8 @@ struct CODING {
struct POLICY {
int replication;
int coding_levels;
int max_ft;
// max fault tolerance given this policy
CODING codings[10];
double chunk_sizes[10];
double chunk_size() {