From ea114741b9b153daf087447d38963330ac8c49bf Mon Sep 17 00:00:00 2001 From: David Anderson Date: Thu, 9 May 2013 14:53:26 -0700 Subject: [PATCH] volunteer data archival: various fixes --- vda/ssim.cpp | 164 +++++++++++++++++++++++++++++++++------------ vda/stats.cpp | 14 ++-- vda/stats.h | 3 +- vda/vda_lib.cpp | 74 +++++++++++--------- vda/vda_lib.h | 3 + vda/vda_policy.cpp | 2 + vda/vda_policy.h | 2 + 7 files changed, 177 insertions(+), 85 deletions(-) diff --git a/vda/ssim.cpp b/vda/ssim.cpp index bf0a4bc600..06cc0b3eb5 100644 --- a/vda/ssim.cpp +++ b/vda/ssim.cpp @@ -30,6 +30,14 @@ // default: 200000 // --file_size x // default: 1e12 +// --debug_status +// --debug_ft +// write recovery details to stdout +// --log_actions +// write actions to stdout +// --sim_duration_years; +// --random +// srand to pid // // outputs: // stdout: log info @@ -49,6 +57,8 @@ using std::set; +bool log_actions = false; + // We simulate policies based on coding and replication. // // Coding means that data is divided into M = N+K units, @@ -107,9 +117,6 @@ struct PARAMS { // These are measured starting from the time when the file's // initial downloads have all succeeded or failed -#define EVENT_DEBUG -//#define RECOVERY_DEBUG - SIMULATOR sim; int next_file_id=0; int next_host_id=0; @@ -127,7 +134,9 @@ char* now_str() { } void show_msg(char* msg) { - printf("%s: %s", time_str(sim.now), msg); + if (log_actions) { + printf("%s: %s", time_str(sim.now), msg); + } } struct CHUNK; @@ -214,13 +223,13 @@ struct SIM_FILE : VDA_FILE_AUX, EVENT { meta_chunk->data_needed = true; } else { meta_chunk = new META_CHUNK(this, NULL, size, 0, id); -#ifdef EVENT_DEBUG - printf("created file %d: size %f GB encoded size %f GB\n", - id, size/1e9, disk_usage.value/1e9 - ); + if (log_actions) { + printf("created file %d: size %f GB encoded size %f GB\n", + id, size/1e9, disk_usage.value/1e9 + ); + } t = sim.now + 500.*86400; sim.insert(this); -#endif } meta_chunk->recovery_plan(); meta_chunk->recovery_action(sim.now); @@ -230,15 +239,28 @@ struct SIM_FILE : VDA_FILE_AUX, EVENT { } void recover() { - printf("recovery_plan():\n"); + if (debug_status) { + printf("recovery_plan():\n"); + } meta_chunk->recovery_plan(); - printf("decide_reconstruct():\n"); + if (meta_chunk->status == UNRECOVERABLE) { + printf("FILE IS LOST!!\n"); + exit(0); + } + if (debug_status) { + printf("decide_reconstruct():\n"); + } meta_chunk->decide_reconstruct(); - printf("reconstruct_and_cleanup():\n"); + if (debug_status) { + printf("reconstruct_and_cleanup():\n"); + } meta_chunk->reconstruct_and_cleanup(); - printf("recovery_action():\n"); + if (debug_status) { + printf("recovery_action():\n"); + } meta_chunk->recovery_action(sim.now); meta_chunk->compute_min_failures(); + printf(" Min failures: %d\n", meta_chunk->min_failures); fault_tolerance.sample( meta_chunk->min_failures-1, collecting_stats(), sim.now ); @@ -270,9 +292,9 @@ void CHUNK_ON_HOST::start_upload() { transfer_in_progress = true; transfer_wait = true; t = sim.now + drand()*params.connect_interval; -#ifdef EVENT_DEBUG - //printf("%s: waiting to start upload of %s\n", now_str(), physical_file_name); -#endif + if (log_actions) { + printf("%s: waiting to start upload of %s\n", now_str(), physical_file_name); + } sim.insert(this); } @@ -280,9 +302,9 @@ void CHUNK_ON_HOST::start_download() { transfer_in_progress = true; transfer_wait = true; t = sim.now + drand()*params.connect_interval; -#ifdef EVENT_DEBUG - //printf("%s: waiting to start download of %s\n", now_str(), physical_file_name); -#endif + if (log_actions) { + printf("%s: waiting to start download of %s\n", now_str(), physical_file_name); + } sim.insert(this); } @@ -293,18 +315,22 @@ void CHUNK_ON_HOST::handle() { if (transfer_wait) { transfer_wait = false; if (present_on_host) { -#ifdef EVENT_DEBUG - printf("%s: starting upload of %s\n", now_str(), physical_file_name); -#endif + if (log_actions) { + printf("%s: starting upload of %s\n", + now_str(), physical_file_name + ); + } chunk->parent->dfile->upload_rate.sample_inc( host->transfer_rate, chunk->parent->dfile->collecting_stats(), sim.now ); } else { -#ifdef EVENT_DEBUG - printf("%s: starting download of %s\n", now_str(), physical_file_name); -#endif + if (log_actions) { + printf("%s: starting download of %s\n", + now_str(), physical_file_name + ); + } chunk->parent->dfile->download_rate.sample_inc( host->transfer_rate, chunk->parent->dfile->collecting_stats(), @@ -318,9 +344,11 @@ void CHUNK_ON_HOST::handle() { transfer_in_progress = false; if (present_on_host) { // it was an upload -#ifdef EVENT_DEBUG - printf("%s: upload of %s completed\n", now_str(), physical_file_name); -#endif + if (log_actions) { + printf("%s: upload of %s completed\n", + now_str(), physical_file_name + ); + } chunk->parent->dfile->upload_rate.sample_inc( -host->transfer_rate, chunk->parent->dfile->collecting_stats(), @@ -329,9 +357,11 @@ void CHUNK_ON_HOST::handle() { chunk->upload_complete(); } else { present_on_host = true; -#ifdef EVENT_DEBUG - printf("%s: download of %s completed\n", now_str(), physical_file_name); -#endif + if (log_actions) { + printf("%s: download of %s completed\n", + now_str(), physical_file_name + ); + } chunk->parent->dfile->download_rate.sample_inc( -host->transfer_rate, chunk->parent->dfile->collecting_stats(), @@ -368,9 +398,9 @@ void SIM_HOST::handle() { set::iterator i = hosts.find(this); hosts.erase(i); -#ifdef EVENT_DEBUG - printf("%s: host %d failed\n", now_str(), id); -#endif + if (log_actions) { + printf("%s: host %d failed\n", now_str(), id); + } set::iterator p; for (p = chunks.begin(); p != chunks.end(); p++) { CHUNK_ON_HOST* c = *p; @@ -387,7 +417,7 @@ CHUNK::CHUNK(META_CHUNK* mc, double s, int index) { sprintf(name, "%s.%d", parent->name, index); VDA_FILE_AUX* fp = parent->dfile; fp->pending_init_downloads += fp->policy.replication; - fp->disk_usage.sample_inc(size, false, sim.now); + fp->disk_usage.sample_inc(size, false, sim.now, "init"); } // if there aren't enough replicas of this chunk, @@ -410,9 +440,11 @@ int CHUNK::assign() { #endif CHUNK_ON_HOST *c = new CHUNK_ON_HOST(); sprintf(c->physical_file_name, "chunk %s on host %d", name, h->id); -#ifdef EVENT_DEBUG - printf("%s: assigning chunk %s to host %d\n", now_str(), name, h->id); -#endif + if (log_actions) { + printf("%s: assigning chunk %s to host %d\n", + now_str(), name, h->id + ); + } c->host = h; c->chunk = this; h->chunks.insert(c); @@ -440,9 +472,9 @@ int CHUNK::start_upload() { void CHUNK::host_failed(VDA_CHUNK_HOST* p) { set::iterator i = hosts.find(p); hosts.erase(i); -#ifdef EVENT_DEBUG - printf("%s: handling loss of %s\n", now_str(), p->physical_file_name); -#endif + if (log_actions) { + printf("%s: handling loss of %s\n", now_str(), p->physical_file_name); + } SIM_FILE* sfp = (SIM_FILE*)parent->dfile; sfp->recover(); } @@ -453,7 +485,8 @@ void CHUNK::upload_complete() { parent->dfile->disk_usage.sample_inc( size, parent->dfile->collecting_stats(), - sim.now + sim.now, + "upload_complete" ); } SIM_FILE* sfp = (SIM_FILE*)parent->dfile; @@ -477,12 +510,29 @@ int META_CHUNK::upload_all() { } int META_CHUNK::encode(bool) { - printf("%s: encoding metachunk %s\n", now_str(), name); + if (log_actions) { + printf("%s: encoding metachunk %s\n", now_str(), name); + } + + // new chunks count toward server disk usage + // + if (bottom_level) { + for (unsigned int i=0; idisk_usage.sample_inc( + c.size, dfile->collecting_stats(), sim.now, "encode" + ); + } + } + } return 0; } int META_CHUNK::decode() { - printf("%s: decoding metachunk %s\n", now_str(), name); + if (log_actions) { + printf("%s: decoding metachunk %s\n", now_str(), name); + } return 0; } @@ -494,6 +544,10 @@ set dfiles; int main(int argc, char** argv) { POLICY policy; + bool log_disk_usage = false; + bool log_fault_tolerance = false; + bool log_download = false; + bool log_upload = false; // default policy // @@ -528,6 +582,24 @@ int main(int argc, char** argv) { params.mean_xfer_rate = atof(argv[++i]); } else if (!strcmp(argv[i], "--file_size")) { params.file_size = atof(argv[++i]); + } else if (!strcmp(argv[i], "--debug_status")) { + debug_status = true; + } else if (!strcmp(argv[i], "--debug_ft")) { + debug_ft = true; + } else if (!strcmp(argv[i], "--log_actions")) { + log_actions = true; + } else if (!strcmp(argv[i], "--log_disk_usage")) { + log_disk_usage = true; + } else if (!strcmp(argv[i], "--log_fault_tolerance")) { + log_fault_tolerance = true; + } else if (!strcmp(argv[i], "--log_download")) { + log_download = true; + } else if (!strcmp(argv[i], "--log_upload")) { + log_upload = true; + } else if (!strcmp(argv[i], "--sim_duration_years")) { + params.sim_duration = atof(argv[++i])*86400*365; + } else if (!strcmp(argv[i], "--random")) { + srand(getpid()); } else { printf("bad arg %s\n", argv[i]); exit(1); @@ -546,6 +618,10 @@ int main(int argc, char** argv) { #endif SIM_FILE* dfile = new SIM_FILE(params.file_size); dfile->policy = policy; + if (log_disk_usage) dfile->disk_usage.log_changes = true; + if (log_fault_tolerance) dfile->fault_tolerance.log_changes = true; + if (log_download) dfile->download_rate.log_changes = true; + if (log_upload) dfile->upload_rate.log_changes = true; sim.insert(dfile); sim.simulate(params.sim_duration); diff --git a/vda/stats.cpp b/vda/stats.cpp index 5f68e308a6..d4b0361cfc 100644 --- a/vda/stats.cpp +++ b/vda/stats.cpp @@ -39,23 +39,20 @@ void STATS_ITEM::init(const char* n, const char* filename, STATS_KIND k) { } extreme_val_time = 0; first = true; + log_changes = false; } void STATS_ITEM::sample(double v, bool collecting_stats, double now) { - char buf[256]; - if (value != v) { + if (value != v && log_changes) { + char buf[256]; switch (kind) { case DISK: -#if 0 sprintf(buf, "%s: %f GB -> %f GB\n", name, value/1e9, v/1e9); show_msg(buf); -#endif break; case NETWORK: -#if 0 sprintf(buf, "%s: %f Mbps -> %f Mbps\n", name, value/1e6, v/1e6); show_msg(buf); -#endif break; case FAULT_TOLERANCE: sprintf(buf, "%s: %.0f -> %.0f\n", name, value, v); @@ -94,8 +91,11 @@ void STATS_ITEM::sample(double v, bool collecting_stats, double now) { fprintf(f, "%f %f\n", now, v); } -void STATS_ITEM::sample_inc(double inc, bool collecting_stats, double now) { +void STATS_ITEM::sample_inc(double inc, bool collecting_stats, double now, const char* reason) { sample(value+inc, collecting_stats, now); + if (reason) { + printf(" reason: %s\n", reason); + } } void STATS_ITEM::print(double now) { diff --git a/vda/stats.h b/vda/stats.h index 9de8c86619..4b6a35b340 100644 --- a/vda/stats.h +++ b/vda/stats.h @@ -37,10 +37,11 @@ struct STATS_ITEM { bool first; char name[256]; FILE* f; + bool log_changes; void init(const char* n, const char* filename, STATS_KIND k); void sample(double v, bool collecting_stats, double now); - void sample_inc(double inc, bool collecting_stats, double now); + void sample_inc(double inc, bool collecting_stats, double now, const char* reason=NULL); void print(double now); void print_summary(FILE* f, double now); }; diff --git a/vda/vda_lib.cpp b/vda/vda_lib.cpp index bf3454f26b..963ef97daf 100644 --- a/vda/vda_lib.cpp +++ b/vda/vda_lib.cpp @@ -29,7 +29,8 @@ using std::vector; using std::set; -#define DEBUG_RECOVERY +bool debug_status = false; +bool debug_ft = false; ///////////////// Utility functions /////////////////////// @@ -47,14 +48,13 @@ bool compare_min_failures(const DATA_UNIT* d1, const DATA_UNIT* d2) { char* time_str(double t) { static char buf[256]; - int n = (int)t; - int nsec = n % 60; - n /= 60; - int nmin = n % 60; - n /= 60; - int nhour = n % 24; - n /= 24; - sprintf(buf, "%4d days %02d:%02d:%02d", n, nhour, nmin, nsec); + int nsec = (int)fmod(t, 60); + t /= 60; + int nmin = (int)fmod(t, 60); + t /= 60; + int nhour = (int)fmod(t, 24); + t /= 24; + sprintf(buf, "%4d days %02d:%02d:%02d", (int)t, nhour, nmin, nsec); return buf; } @@ -199,18 +199,18 @@ int META_CHUNK::recovery_action(double now) { if (data_now_present) { status = PRESENT; } -#ifdef DEBUG_RECOVERY - printf(" meta chunk %s: status %s have_unrec_children %d\n", - name, status_str(status), have_unrecoverable_children - ); -#endif + if (debug_status) { + printf(" meta chunk %s: status %s have_unrec_children %d\n", + name, status_str(status), have_unrecoverable_children + ); + } for (i=0; iname, status_str(c->status), c->in_recovery_set - ); -#endif + if (debug_status) { + printf(" child %s status %s in rec set %d\n", + c->name, status_str(c->status), c->in_recovery_set + ); + } switch (status) { case PRESENT: if (c->status == UNRECOVERABLE) { @@ -261,7 +261,7 @@ int META_CHUNK::compute_min_failures() { } if ((int)(present.size()) >= coding.n) { status = PRESENT; - min_failures = INT_MAX; + min_failures = dfile->policy.max_ft; } else if ((int)(present.size() + recoverable.size()) >= coding.n) { status = RECOVERABLE; @@ -280,10 +280,14 @@ int META_CHUNK::compute_min_failures() { // for (i=0; iname, c->min_failures); + if (debug_ft) { + printf(" Min failures of %s: %d\n", c->name, c->min_failures); + }; min_failures += c->min_failures; } - //printf(" our min failures: %d\n", min_failures); + if (debug_ft) { + printf(" our min failures: %d\n", min_failures); + } } return 0; } @@ -464,14 +468,14 @@ void CHUNK::recovery_plan() { } else { status = UNRECOVERABLE; } -#ifdef DEBUG_RECOVERY - printf(" chunk %s: status %s\n", name, status_str(status)); -#endif + if (debug_status) { + printf(" chunk %s: status %s\n", name, status_str(status)); + } } int CHUNK::compute_min_failures() { if (present_on_server) { - min_failures = INT_MAX; + min_failures = parent->dfile->policy.max_ft; return 0; } int nreplicas = 0; @@ -502,11 +506,14 @@ int CHUNK::recovery_action(double now) { VDA_FILE_AUX* fp = parent->dfile; if (data_now_present) { present_on_server = true; +#if 0 fp->disk_usage.sample_inc( size, fp->collecting_stats(), - now + now, + "recovery_action: now present" ); +#endif status = PRESENT; } if (status == PRESENT && (int)(hosts.size()) < fp->policy.replication) { @@ -517,11 +524,11 @@ int CHUNK::recovery_action(double now) { if (download_in_progress()) { keep_present = true; } -#ifdef DEBUG_RECOVERY - printf(" chunk %s: data_needed %d present_on_server %d keep_present %d\n", - name, data_needed, present_on_server, keep_present - ); -#endif + if (debug_status) { + printf(" chunk %s: data_needed %d present_on_server %d keep_present %d\n", + name, data_needed, present_on_server, keep_present + ); + } if (present_on_server) { if (!keep_present) { sprintf(buf, @@ -536,7 +543,8 @@ int CHUNK::recovery_action(double now) { parent->dfile->disk_usage.sample_inc( -size, fp->collecting_stats(), - now + now, + "recovery_action: don't need" ); } } else { diff --git a/vda/vda_lib.h b/vda/vda_lib.h index 565293da47..7a127c9334 100644 --- a/vda/vda_lib.h +++ b/vda/vda_lib.h @@ -30,6 +30,9 @@ #include "stats.h" #include "vda_policy.h" +extern bool debug_status; +extern bool debug_ft; + // a host with rpc_time < now-HOST_TIMEOUT is considered dead. // Make sure you set next_rpc_delay accordingly (e.g., to 86400) // diff --git a/vda/vda_policy.cpp b/vda/vda_policy.cpp index 5c5f23c6af..c601a37bab 100644 --- a/vda/vda_policy.cpp +++ b/vda/vda_policy.cpp @@ -37,6 +37,7 @@ int POLICY::parse(const char* filename) { fclose(f); return -1; } + max_ft = replication; n = fscanf(f, "%d", &coding_levels); if (n != 1) { printf("parse error in %s\n", filename); @@ -55,6 +56,7 @@ int POLICY::parse(const char* filename) { sprintf(buf, "(%d %d %d) ", c.n, c.k, c.n_upload); strcat(description, buf); + max_ft *= c.n; } fclose(f); sprintf(buf, "X%d", replication); diff --git a/vda/vda_policy.h b/vda/vda_policy.h index 506899dbf1..4cd60892eb 100644 --- a/vda/vda_policy.h +++ b/vda/vda_policy.h @@ -34,6 +34,8 @@ struct CODING { struct POLICY { int replication; int coding_levels; + int max_ft; + // max fault tolerance given this policy CODING codings[10]; double chunk_sizes[10]; double chunk_size() {