From ce52c9cf3e725f3267c1de0f8d3702135e905f83 Mon Sep 17 00:00:00 2001 From: David Anderson Date: Fri, 24 Feb 2012 22:55:11 +0000 Subject: [PATCH] - storage stuff svn path=/trunk/boinc/; revision=25341 --- checkin_notes | 19 +++++ db/boinc_db.h | 4 ++ db/schema_vda.sql | 4 +- sched/delete_file.cpp | 2 +- sched/get_file.cpp | 2 +- sched/put_file.cpp | 2 +- tools/backend_lib.cpp | 92 ++++++++++++++++-------- tools/backend_lib.h | 37 ++++++++-- vda/Makefile.am | 6 +- vda/sched_vda.cpp | 22 ++++++ vda/ssim.cpp | 17 +---- vda/vda_lib.cpp | 52 ++++---------- vda/vda_lib.h | 27 +------ vda/vda_lib2.cpp | 159 ---------------------------------------- vda/vda_policy.cpp | 59 +++++++++++++++ vda/vda_policy.h | 45 ++++++++++++ vda/vdad.cpp | 163 ++++++++++++++++++++++++++++++++++++++++++ 17 files changed, 431 insertions(+), 281 deletions(-) delete mode 100644 vda/vda_lib2.cpp create mode 100644 vda/vda_policy.cpp create mode 100644 vda/vda_policy.h diff --git a/checkin_notes b/checkin_notes index dcd796f200..a928bb20bb 100644 --- a/checkin_notes +++ b/checkin_notes @@ -2165,3 +2165,22 @@ David 24 Feb 2012 vda.cpp vda_lib2.cpp vda_lib.h + +David 24 Feb 2012 + - storage stuff + + db/ + boinc_db.h + schema_vda.sql + sched/ + delete_file.cpp + put_file.cpp + get_file.cpp + tools/ + backend_lib.cpp,h + vda/ + vda.cpp + vda_policy.cpp,h (new) + vda_lib.cpp,h + Makefile.am + ssim.cpp diff --git a/db/boinc_db.h b/db/boinc_db.h index 61fb05039a..54f06406f2 100644 --- a/db/boinc_db.h +++ b/db/boinc_db.h @@ -1160,6 +1160,10 @@ struct VDA_CHUNK_HOST { bool found; void clear(); + inline bool download_in_progress() { + return (transfer_in_progress && !present_on_host); + } + }; struct DB_VDA_FILE : public DB_BASE, public VDA_FILE { diff --git a/db/schema_vda.sql b/db/schema_vda.sql index b63deef26a..76ecc3d42a 100644 --- a/db/schema_vda.sql +++ b/db/schema_vda.sql @@ -24,8 +24,8 @@ create table vda_chunk_host ( ) engine = InnoDB; alter table vda_chunk_host - add index vch_file (vda_file_id), - add index vch_host (host_id); + add index vch_file (vda_file_id), + add index vch_host (host_id); alter table host add index host_cpu_eff (cpu_efficiency); diff --git a/sched/delete_file.cpp b/sched/delete_file.cpp index 106c897211..18048d5ba1 100644 --- a/sched/delete_file.cpp +++ b/sched/delete_file.cpp @@ -109,7 +109,7 @@ int main(int argc, char** argv) { exit(1); } - retval = delete_host_file(host_id, file_name); + retval = create_delete_file_msg(host_id, file_name); boinc_db.close(); return retval; } diff --git a/sched/get_file.cpp b/sched/get_file.cpp index b2bedfb153..f33c3aef7a 100644 --- a/sched/get_file.cpp +++ b/sched/get_file.cpp @@ -131,7 +131,7 @@ int main(int argc, char** argv) { } } - retval = get_file( + retval = create_get_file_msg( host_id, file_name, urls, max_nbytes, dtime() + max_latency, generate_upload_certificate, key diff --git a/sched/put_file.cpp b/sched/put_file.cpp index 5063e498b0..71995dc69a 100644 --- a/sched/put_file.cpp +++ b/sched/put_file.cpp @@ -129,7 +129,7 @@ int main(int argc, char** argv) { } } - retval = put_file( + retval = create_put_file_msg( host_id, file_name, urls, md5, nbytes, dtime()+max_latency ); diff --git a/tools/backend_lib.cpp b/tools/backend_lib.cpp index ff9bbdaa76..e5b53b2f5f 100644 --- a/tools/backend_lib.cpp +++ b/tools/backend_lib.cpp @@ -334,23 +334,16 @@ int create_work( // STUFF RELATED TO FILE UPLOAD/DOWNLOAD -int get_file( - int host_id, const char* file_name, vector urls, +int get_file_xml( + const char* file_name, vector urls, double max_nbytes, double report_deadline, bool generate_upload_certificate, - R_RSA_PRIVATE_KEY& key -) {; + R_RSA_PRIVATE_KEY& key, + char* out +) { char buf[8192]; - DB_MSG_TO_HOST mth; - int retval; - - mth.clear(); - mth.create_time = time(0); - mth.hostid = host_id; - strcpy(mth.variety, "file_xfer"); - mth.handled = false; - sprintf(mth.xml, + sprintf(out, "\n" " file_xfer\n" "\n" @@ -366,7 +359,7 @@ int get_file( ); for (unsigned int i=0; i%s\n", urls[i]); - strcat(mth.xml, buf); + strcat(out, buf); } sprintf(buf, "\n" @@ -388,11 +381,33 @@ int get_file( file_name, report_deadline ); - strcat(mth.xml, buf); + strcat(out, buf); if (generate_upload_certificate) { - add_signatures(mth.xml, key); + add_signatures(out, key); } + return 0; +} +int create_get_file_msg( + int host_id, const char* file_name, vector urls, + double max_nbytes, + double report_deadline, + bool generate_upload_certificate, + R_RSA_PRIVATE_KEY& key +) { + DB_MSG_TO_HOST mth; + int retval; + + mth.clear(); + mth.create_time = time(0); + mth.hostid = host_id; + strcpy(mth.variety, "file_xfer"); + mth.handled = false; + get_file_xml( + file_name, urls, max_nbytes, report_deadline, + generate_upload_certificate, key, + mth.xml + ); retval = mth.insert(); if (retval) { fprintf(stderr, "msg_to_host.insert(): %s\n", boincerror(retval)); @@ -401,20 +416,14 @@ int get_file( return 0; } -int put_file( - int host_id, const char* file_name, +int put_file_xml( + const char* file_name, vector urls, const char* md5, double nbytes, - double report_deadline + double report_deadline, + char* out ) { char buf[8192]; - DB_MSG_TO_HOST mth; - int retval; - mth.clear(); - mth.create_time = time(0); - mth.hostid = host_id; - strcpy(mth.variety, "file_xfer"); - mth.handled = false; - sprintf(mth.xml, + sprintf(out, "\n" " file_xfer\n" "\n" @@ -428,7 +437,7 @@ int put_file( ); for (unsigned int i=0; i%s\n", urls[i]); - strcat(mth.xml, buf); + strcat(out, buf); } sprintf(buf, " %s\n" @@ -455,7 +464,23 @@ int put_file( file_name, report_deadline ); - strcat(mth.xml, buf); + strcat(out, buf); + return 0; +} + +int create_put_file_msg( + int host_id, const char* file_name, + vector urls, const char* md5, double nbytes, + double report_deadline +) { + DB_MSG_TO_HOST mth; + int retval; + mth.clear(); + mth.create_time = time(0); + mth.hostid = host_id; + strcpy(mth.variety, "file_xfer"); + mth.handled = false; + put_file_xml(file_name, urls, md5, nbytes, report_deadline, mth.xml); retval = mth.insert(); if (retval) { fprintf(stderr, "msg_to_host.insert(): %s\n", boincerror(retval)); @@ -464,14 +489,19 @@ int put_file( return 0; } -int delete_host_file(int host_id, const char* file_name) { +int delete_file_xml(const char* file_name, char* out) { + sprintf(out, "%s\n", file_name); + return 0; +} + +int create_delete_file_msg(int host_id, const char* file_name) { DB_MSG_TO_HOST mth; int retval; mth.clear(); mth.create_time = time(0); mth.hostid = host_id; mth.handled = false; - sprintf(mth.xml, "%s\n", file_name); + delete_file_xml(file_name, mth.xml); sprintf(mth.variety, "delete_file"); retval = mth.insert(); if (retval) { diff --git a/tools/backend_lib.h b/tools/backend_lib.h index 3f9868262b..c223956a8b 100644 --- a/tools/backend_lib.h +++ b/tools/backend_lib.h @@ -73,16 +73,38 @@ extern int create_work( extern int stage_file(const char*, bool); -// the following 3 functions are for programs other than the schedule -// to do file operations. -// They work by creating MSG_TO_HOST records in the DB +// the following functions return XML that can be put in +// scheduler replies to do file operations // -extern int put_file( +extern int put_file_xml( + const char* file_name, vector urls, + const char* md5, double nbytes, double report_deadline, + char* buf +); +extern int get_file_xml( + const char* file_name, vector urls, + double max_nbytes, + double report_deadline, + bool generate_upload_certificate, + R_RSA_PRIVATE_KEY& key, + char* buf +); +extern int delete_file_xml( + const char* file_name, + char* buf +); + +// the following 3 functions are for programs other than the scheduler +// to do file operations. +// They work by creating MSG_TO_HOST records in the DB, +// containing the needed XML +// +extern int create_put_file_msg( int host_id, const char* file_name, vector urls, const char* md5, double nbytes, double report_deadline ); -extern int get_file( +extern int create_get_file_msg( int host_id, const char* file_name, vector urls, double max_nbytes, double report_deadline, @@ -90,7 +112,10 @@ extern int get_file( R_RSA_PRIVATE_KEY& key ); -extern int delete_host_file(int host_id, const char* file_name); +extern int create_delete_file_msg( + int host_id, + const char* file_name +); // cancel jobs from min_id to max_id inclusive // diff --git a/vda/Makefile.am b/vda/Makefile.am index c5f27461ec..86daafea0d 100644 --- a/vda/Makefile.am +++ b/vda/Makefile.am @@ -5,11 +5,11 @@ bin_PROGRAMS = vda vdad ssim AM_CXXFLAGS += $(MYSQL_CFLAGS) AM_LDFLAGS += -static -vda_SOURCES = vda.cpp vda_lib.cpp vda_lib2.cpp +vda_SOURCES = vda.cpp vda_policy.cpp vda_LDADD = $(SERVERLIBS) -vdad_SOURCES = vdad.cpp vda_lib.cpp vda_lib2.cpp +vdad_SOURCES = vdad.cpp vda_lib.cpp vda_policy.cpp vdad_LDADD = $(SERVERLIBS) -ssim_SOURCES = ssim.cpp vda_lib.cpp +ssim_SOURCES = ssim.cpp vda_lib.cpp vda_policy.cpp ssim_LDADD = $(SERVERLIBS) diff --git a/vda/sched_vda.cpp b/vda/sched_vda.cpp index bd8c69d89c..2146835914 100644 --- a/vda/sched_vda.cpp +++ b/vda/sched_vda.cpp @@ -34,6 +34,14 @@ static int mark_for_update(int vda_file_id) { return f.update_field("need_update=1"); } +// handle a scheduler request: +// - handle completed uploads +// - handle set of files present on client +// (update or create VDA_CHUNK_HOST record) +// - handle files expected but not present +// - issue delete commands if needed to enforce share +// - issue upload or download commands to client +// // relevant fields of SCHEDULER_REQUEST // file_infos: list of sticky files // file_xfer_results: list of completed file xfers @@ -117,4 +125,18 @@ void handle_vda() { it++; } } + + // issue upload and download commands + // + it = chunks.begin(); + while (it != chunks.end()) { + DB_VDA_CHUNK_HOST& ch = (*it).second; + if (!ch.transfer_in_progress) continue; + if (!ch.transfer_wait) continue; + if (ch.present_on_host) { + // upload + } else { + // download + } + } } diff --git a/vda/ssim.cpp b/vda/ssim.cpp index 8a3a0de0ae..6b443b808a 100644 --- a/vda/ssim.cpp +++ b/vda/ssim.cpp @@ -169,9 +169,6 @@ struct CHUNK_ON_HOST : VDA_CHUNK_HOST, EVENT { virtual void handle(); void start_upload(); void start_download(); - inline bool download_in_progress() { - return (transfer_in_progress && !present_on_host); - } void remove(); }; @@ -397,16 +394,7 @@ int CHUNK::assign() { return 0; } -bool CHUNK::download_in_progress() { - set::iterator i; - for (i=hosts.begin(); i!=hosts.end(); i++) { - CHUNK_ON_HOST* c = (CHUNK_ON_HOST*)*i; - if (c->download_in_progress()) return true; - } - return false; -} - -void CHUNK::start_upload() { +int CHUNK::start_upload() { // if no upload of this chunk is in progress, start one. // NOTE: all instances are inherently present_on_host, // since this is only called if chunk is not present on server @@ -414,10 +402,11 @@ void CHUNK::start_upload() { set::iterator i; for (i=hosts.begin(); i!=hosts.end(); i++) { CHUNK_ON_HOST* c = (CHUNK_ON_HOST*)*i; - if (c->transfer_in_progress) return; + if (c->transfer_in_progress) return 0; } CHUNK_ON_HOST* c = (CHUNK_ON_HOST*)*(hosts.begin()); c->start_upload(); + return 0; } void CHUNK::host_failed(VDA_CHUNK_HOST* p) { diff --git a/vda/vda_lib.cpp b/vda/vda_lib.cpp index 816b6d14b9..a78d71e813 100644 --- a/vda/vda_lib.cpp +++ b/vda/vda_lib.cpp @@ -19,52 +19,16 @@ #include #include +#include #include #include #include using std::vector; +using std::set; #include "vda_lib.h" -int POLICY::parse(const char* filename) { - int n; - char buf[256]; - - strcpy(description, ""); - - FILE* f = fopen(filename, "r"); - if (!f) { - fprintf(stderr, "No policy file %s\n", filename); - return -1; - } - n = fscanf(f, "%d", &replication); - if (n != 1) { - fprintf(stderr, "parse error in %s\n", filename); - return -1; - } - n = fscanf(f, "%d", &coding_levels); - if (n != 1) { - fprintf(stderr, "parse error in %s\n", filename); - return -1; - } - for (int i=0; i::iterator i; + for (i=hosts.begin(); i!=hosts.end(); i++) { + VDA_CHUNK_HOST* ch = *i; + if (ch->download_in_progress()) return true; + } + return false; +} + int CHUNK::recovery_action(double now) { int retval; VDA_FILE_AUX* fp = parent->dfile; @@ -442,7 +415,8 @@ int CHUNK::recovery_action(double now) { #endif if (data_needed) { if (!present_on_server) { - start_upload(); + retval = start_upload(); + if (retval) return retval; } } else { if (present_on_server) { diff --git a/vda/vda_lib.h b/vda/vda_lib.h index d1893323a8..607999c99e 100644 --- a/vda/vda_lib.h +++ b/vda/vda_lib.h @@ -25,34 +25,13 @@ #include "boinc_db.h" +#include "vda_policy.h" + // a host with rpc_time < now-HOST_TIMEOUT is considered dead. // Make sure you set next_rpc_delay accordingly (e.g., to 86400) // #define VDA_HOST_TIMEOUT (86400*4) -// parameters of 1 level of coding -// -struct CODING { - int n; - int k; - int m; // n + k - int n_upload; -}; - -// description of overall coding -// (possibly w/ multiple coding levels and replication) -// -struct POLICY { - int replication; - int coding_levels; - CODING codings[10]; - double chunk_sizes[10]; - - char description[256]; // derived from the above - - int parse(const char*); -}; - // keeps track of a time-varying property of a file // (server disk usage, up/download rate, fault tolerance level) // @@ -163,7 +142,7 @@ struct CHUNK : DATA_UNIT { CHUNK(META_CHUNK* mc, double s, int index); - void start_upload(); + int start_upload(); void host_failed(VDA_CHUNK_HOST* p); bool download_in_progress(); void upload_complete(); diff --git a/vda/vda_lib2.cpp b/vda/vda_lib2.cpp deleted file mode 100644 index 67230c2df9..0000000000 --- a/vda/vda_lib2.cpp +++ /dev/null @@ -1,159 +0,0 @@ -// This file is part of BOINC. -// http://boinc.berkeley.edu -// Copyright (C) 2012 University of California -// -// BOINC is free software; you can redistribute it and/or modify it -// under the terms of the GNU Lesser General Public License -// as published by the Free Software Foundation, -// either version 3 of the License, or (at your option) any later version. -// -// BOINC is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -// See the GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with BOINC. If not, see . - -// code used in vda and vdad, but not in the simulator - -#include "error_numbers.h" -#include "sched_msgs.h" -#include "util.h" - -#include "vda_lib.h" - -CHUNK::CHUNK(META_CHUNK* mc, double s, int index) { - parent = mc; - present_on_server = true; - size = s; - if (strlen(parent->name)) { - sprintf(name, "%s.%d", parent->name, index); - } else { - sprintf(name, "%d", index); - } -} - -// assign this chunk to a host -// -int CHUNK::assign() { - int host_id = parent->dfile->choose_host(); - if (!host_id) { - return ERR_NOT_FOUND; - } - DB_VDA_CHUNK_HOST ch; - ch.create_time = dtime(); - ch.vda_file_id = parent->dfile->id; - strcpy(ch.name, name); - ch.host_id = host_id; - ch.present_on_host = 0; - ch.transfer_in_progress = true; - ch.transfer_wait = true; - ch.transfer_request_time = ch.create_time; - ch.transfer_send_time = 0; - int retval = ch.insert(); - if (retval) { - log_messages.printf(MSG_CRITICAL, "ch.insert() failed\n"); - return retval; - } - return 0; -} - -bool CHUNK::download_in_progress() { - return false; -} - -void CHUNK::start_upload() { -} - -inline bool alive(DB_HOST& h) { - return (h.rpc_time > dtime()-VDA_HOST_TIMEOUT); -} - -char* host_alive_clause() { - static char buf[256]; - sprintf(buf, "rpc_time > %f", dtime() - VDA_HOST_TIMEOUT); - return buf; -} - -// Pick a host to send a chunk of this file to. -// The host must: -// 1) be alive (recent RPC time) -// 2) not have any chunks of this file -// -// We maintain a cache of such hosts -// The policy is: -// -// - scan the cache, removing hosts that are no longer alive; -// return if find a live host -// - pick a random starting point in host ID space, -// and enumerate 100 live hosts; wrap around if needed. -// Return one and put the rest in cache -// -int VDA_FILE_AUX::choose_host() { - int retval; - DB_HOST host; - - // replenish cache if needed - // - if (!available_hosts.size()) { - int nhosts_scanned = 0; - int rand_id; - for (int i=0; i<2; i++) { - char buf[256]; - if (i == 0) { - retval = host.max_id(rand_id, ""); - if (retval) { - log_messages.printf(MSG_CRITICAL, "host.max_id() failed\n"); - return 0; - } - rand_id = (int)(((double)id)*drand()); - sprintf(buf, - "where %s and id>=%d order by id limit 100", - host_alive_clause(), rand_id - ); - } else { - sprintf(buf, - "where %s and id<%d order by id limit %d", - host_alive_clause(), rand_id, 100-nhosts_scanned - ); - } - while (1) { - retval = host.enumerate(buf); - if (retval == ERR_DB_NOT_FOUND) break; - if (retval) { - log_messages.printf(MSG_CRITICAL, "host enum failed\n"); - return 0; - } - nhosts_scanned++; - DB_VDA_CHUNK_HOST ch; - char buf2[256]; - int count; - sprintf(buf2, "where vda_file_id=%d and host_id=%d", id, host.id); - retval = ch.count(count, buf2); - if (retval) { - log_messages.printf(MSG_CRITICAL, "ch.count failed\n"); - return 0; - } - if (count == 0) { - available_hosts.push_back(host.id); - } - if (nhosts_scanned == 100) break; - } - if (nhosts_scanned == 100) break; - } - } - - while (available_hosts.size()) { - int hostid = available_hosts.back(); - available_hosts.pop_back(); - retval = host.lookup_id(hostid); - if (retval || !alive(host)) { - continue; - } - return hostid; - } - - log_messages.printf(MSG_CRITICAL, "No hosts available\n"); - return 0; -} diff --git a/vda/vda_policy.cpp b/vda/vda_policy.cpp new file mode 100644 index 0000000000..5aa0705bb4 --- /dev/null +++ b/vda/vda_policy.cpp @@ -0,0 +1,59 @@ +// This file is part of BOINC. +// http://boinc.berkeley.edu +// Copyright (C) 2012 University of California +// +// BOINC is free software; you can redistribute it and/or modify it +// under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, +// either version 3 of the License, or (at your option) any later version. +// +// BOINC is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with BOINC. If not, see . + +#include +#include + +#include "vda_policy.h" + +int POLICY::parse(const char* filename) { + int n; + char buf[256]; + + strcpy(description, ""); + + FILE* f = fopen(filename, "r"); + if (!f) { + fprintf(stderr, "No policy file %s\n", filename); + return -1; + } + n = fscanf(f, "%d", &replication); + if (n != 1) { + fprintf(stderr, "parse error in %s\n", filename); + return -1; + } + n = fscanf(f, "%d", &coding_levels); + if (n != 1) { + fprintf(stderr, "parse error in %s\n", filename); + return -1; + } + for (int i=0; i. + +#ifndef _VDA_POLICY_ +#define _VDA_POLICY_ + +// parameters of 1 level of coding +// + +struct CODING { + int n; + int k; + int m; // n + k + int n_upload; +}; + +// description of overall coding +// (possibly w/ multiple coding levels and replication) +// +struct POLICY { + int replication; + int coding_levels; + CODING codings[10]; + double chunk_sizes[10]; + + char description[256]; // derived from the above + + int parse(const char*); +}; + +#endif diff --git a/vda/vdad.cpp b/vda/vdad.cpp index 4506050855..886b9bff47 100644 --- a/vda/vdad.cpp +++ b/vda/vdad.cpp @@ -26,8 +26,10 @@ #include #include #include +#include using std::vector; +using std::set; #include "boinc_db.h" #include "sched_config.h" @@ -117,6 +119,167 @@ int encode(const char* dir, CODING& c, double& size) { return 0; } +CHUNK::CHUNK(META_CHUNK* mc, double s, int index) { + parent = mc; + present_on_server = true; + size = s; + if (strlen(parent->name)) { + sprintf(name, "%s.%d", parent->name, index); + } else { + sprintf(name, "%d", index); + } +} + +// assign this chunk to a host +// +int CHUNK::assign() { + int host_id = parent->dfile->choose_host(); + if (!host_id) { + return ERR_NOT_FOUND; + } + DB_VDA_CHUNK_HOST ch; + ch.create_time = dtime(); + ch.vda_file_id = parent->dfile->id; + strcpy(ch.name, name); + ch.host_id = host_id; + ch.present_on_host = 0; + ch.transfer_in_progress = true; + ch.transfer_wait = true; + ch.transfer_request_time = ch.create_time; + ch.transfer_send_time = 0; + int retval = ch.insert(); + if (retval) { + log_messages.printf(MSG_CRITICAL, "ch.insert() failed\n"); + return retval; + } + return 0; +} + +int CHUNK::start_upload() { + // if no upload of this chunk is in progress, start one. + // NOTE: all instances are inherently present_on_host, + // since this is only called if chunk is not present on server + // + VDA_CHUNK_HOST* chp; + set::iterator i; + for (i=hosts.begin(); i!=hosts.end(); i++) { + chp = *i; + if (chp->transfer_in_progress) return 0; + } + chp = *(hosts.begin()); + DB_VDA_CHUNK_HOST dch; + char set_clause[256], where_clause[256]; + sprintf(set_clause, + "transfer_in_progress=1, transfer_wait=1, transfer_request_time=%f", + dtime() + ); + sprintf(where_clause, + "where vda_file_id=%d and host_id=%d and name='%s'", + chp->vda_file_id, + chp->host_id, + name + ); + int retval = dch.update_fields_noid(set_clause, where_clause); + return retval; +} + + +inline bool alive(DB_HOST& h) { + return (h.rpc_time > dtime()-VDA_HOST_TIMEOUT); +} + +char* host_alive_clause() { + static char buf[256]; + sprintf(buf, "rpc_time > %f", dtime() - VDA_HOST_TIMEOUT); + return buf; +} + +// Pick a host to send a chunk of this file to. +// The host must: +// 1) be alive (recent RPC time) +// 2) not have any chunks of this file +// +// We maintain a cache of such hosts +// The policy is: +// +// - scan the cache, removing hosts that are no longer alive; +// return if find a live host +// - pick a random starting point in host ID space, +// and enumerate 100 live hosts; wrap around if needed. +// Return one and put the rest in cache +// +int VDA_FILE_AUX::choose_host() { + int retval; + DB_HOST host; + + // replenish cache if needed + // + if (!available_hosts.size()) { + int nhosts_scanned = 0; + int rand_id; + for (int i=0; i<2; i++) { + char buf[256]; + if (i == 0) { + retval = host.max_id(rand_id, ""); + if (retval) { + log_messages.printf(MSG_CRITICAL, "host.max_id() failed\n"); + return 0; + } + rand_id = (int)(((double)id)*drand()); + sprintf(buf, + "where %s and id>=%d order by id limit 100", + host_alive_clause(), rand_id + ); + } else { + sprintf(buf, + "where %s and id<%d order by id limit %d", + host_alive_clause(), rand_id, 100-nhosts_scanned + ); + } + while (1) { + retval = host.enumerate(buf); + if (retval == ERR_DB_NOT_FOUND) break; + if (retval) { + log_messages.printf(MSG_CRITICAL, "host enum failed\n"); + return 0; + } + nhosts_scanned++; + DB_VDA_CHUNK_HOST ch; + char buf2[256]; + int count; + sprintf(buf2, "where vda_file_id=%d and host_id=%d", id, host.id); +#if 0 + retval = ch.count(count, buf2); + if (retval) { + log_messages.printf(MSG_CRITICAL, "ch.count failed\n"); + return 0; + } +#else + count = 0; +#endif + if (count == 0) { + available_hosts.push_back(host.id); + } + if (nhosts_scanned == 100) break; + } + if (nhosts_scanned == 100) break; + } + } + + while (available_hosts.size()) { + int hostid = available_hosts.back(); + available_hosts.pop_back(); + retval = host.lookup_id(hostid); + if (retval || !alive(host)) { + continue; + } + return hostid; + } + + log_messages.printf(MSG_CRITICAL, "No hosts available\n"); + return 0; +} + META_CHUNK::META_CHUNK(VDA_FILE_AUX* d, META_CHUNK* p, int index) { dfile = d; parent = p;