From e9aa34829e77786e6487bc7d38aa943df9ee622d Mon Sep 17 00:00:00 2001 From: Eric Heien Date: Wed, 7 Aug 2002 22:52:10 +0000 Subject: [PATCH] Initial persistent file transfer implementation. svn path=/trunk/boinc/; revision=274 --- checkin_notes | 35 ++++++ client/Makefile.in | 1 + client/client_state.C | 87 +++++++++++++- client/client_state.h | 8 ++ client/client_types.C | 41 ++++++- client/client_types.h | 9 +- client/cs_files.C | 108 ++++-------------- client/cs_scheduler.C | 7 ++ client/file_xfer.C | 17 ++- client/hostinfo_unix.C | 4 +- client/http.C | 24 ++-- client/http.h | 2 +- client/pers_file_xfer.C | 220 +++++++++++++++++++++++++++++++++--- client/pers_file_xfer.h | 31 +++-- client/scheduler_op.C | 2 +- client/test_file_xfer.C | 18 +++ client/test_http.C | 2 +- client/util.C | 18 +++ client/util.h | 3 + sched/file_upload_handler.C | 35 +++++- 20 files changed, 529 insertions(+), 143 deletions(-) diff --git a/checkin_notes b/checkin_notes index 9fbccabf6c..bf5220508c 100755 --- a/checkin_notes +++ b/checkin_notes @@ -1443,6 +1443,7 @@ David A. doc/ api.html + Barry Luong August 6, 2002 - Added team pages in html_user/ @@ -1471,3 +1472,37 @@ Barry Luong August 6, 2002 team_remove_inactive_action.php team_remove_inactive_form.php + +Eric Heien August 7, 2002 + - Added PERS_FILE_XFER (persistent file transfer) + functionality. This includes the notions of retrying + when unable to connect, exponential backoff, and giving + up after a period of time. Giving up is currently + not fully implemented. Includes initial work for + supporting upload/download resumption. All features + still need to be thoroughly tested. + - Added initial functionality to calculate allowable disk + usage. + + client/ + Makefile.in + client_state.C + client_state.h + client_types.C + client_types.h + cs_files.C + cs_scheduler.C + file_xfer.C + hostinfo_unix.C + http.C + http.h + pers_file_xfer.C + pers_file_xfer.h + scheduler_op.C + test_file_xfer.C + test_http.C + util.C + util.h + sched/ + file_upload_handler.C + diff --git a/client/Makefile.in b/client/Makefile.in index e9b40177c2..8c131cf62c 100644 --- a/client/Makefile.in +++ b/client/Makefile.in @@ -41,6 +41,7 @@ OBJS = \ log_flags.o \ net_stats.o \ net_xfer.o \ + pers_file_xfer.o \ prefs.o \ scheduler_op.o \ speed_stats.o \ diff --git a/client/client_state.C b/client/client_state.C index c0889c884d..b8624031df 100644 --- a/client/client_state.C +++ b/client/client_state.C @@ -25,6 +25,7 @@ #include "error_numbers.h" #include "file_names.h" +#include "filesys.h" #include "hostinfo.h" #include "log_flags.h" #include "parse.h" @@ -40,10 +41,12 @@ CLIENT_STATE::CLIENT_STATE() { net_xfers = new NET_XFER_SET; http_ops = new HTTP_OP_SET(net_xfers); file_xfers = new FILE_XFER_SET(http_ops); + pers_xfers = new PERS_FILE_XFER_SET(file_xfers); scheduler_op = new SCHEDULER_OP(http_ops); client_state_dirty = false; exit_when_idle = false; run_time_test = true; + giveup_after = PERS_GIVEUP; contacted_sched_server = false; activities_suspended = false; version = VERSION; @@ -61,6 +64,9 @@ int CLIENT_STATE::init(PREFS* p) { } prefs = p; + // Initialize the random number generator + srand( clock() ); + // copy all PROJECTs from the prefs to the client state. // for (i=0; iprojects.size(); i++) { @@ -108,6 +114,31 @@ int CLIENT_STATE::time_tests() { return 0; } +// Update the net_stats object, since it's private +// +void CLIENT_STATE::update_net_stats(bool is_upload, double nbytes, double nsecs) { + net_stats.update( is_upload, nbytes, nsecs ); +} + +// Insert an object into the file_xfers object, since it's private +// +int CLIENT_STATE::insert_file_xfer( FILE_XFER *fxp ) { + return file_xfers->insert(fxp); +} + +// Return the maximum allowed disk usage as determined by user preferences. +// Since there are three different settings in the prefs, it returns the least +// of the three. +double CLIENT_STATE::allowed_disk_usage() { + double percent_space,min_val; + + // Calculate allowed disk usage based on % pref + percent_space = host_info.d_total*prefs->disk_max_used_pct/100.0; + + // Return the minimum of the three + return min(min(prefs->disk_max_used_gb, percent_space),min_val); +} + // See if (on the basis of user prefs) we should suspend activities. // If so, suspend tasks // @@ -136,6 +167,7 @@ int CLIENT_STATE::check_suspend_activities() { // finite state machine abstraction of the client. Each of the key // elements of the client is given a chance to perform work here. // return true if something happened +// TODO: handle errors passed back up to here? // bool CLIENT_STATE::do_something() { int nbytes; @@ -145,6 +177,12 @@ bool CLIENT_STATE::do_something() { if (!activities_suspended) { net_xfers->poll(999999, nbytes); if (nbytes) action = true; + // If pers_xfers returns true, we've made a change to a + // persistent transfer which must be recorded in the + // client_state.xml file + if (pers_xfers->poll()) { + action = client_state_dirty = true; + } action |= http_ops->poll(); action |= file_xfers->poll(); action |= active_tasks.poll(); @@ -205,6 +243,11 @@ int CLIENT_STATE::parse_state_file() { if (project) { retval = link_file_info(project, fip); if (!retval) file_infos.push_back(fip); + // Init PERS_FILE_XFER and push it onto pers_file_xfer stack + if (fip->pers_file_xfer) { + fip->pers_file_xfer->init( fip, fip->upload_when_present ); + pers_xfers->insert( fip->pers_file_xfer ); + } } else { delete fip; } @@ -572,9 +615,11 @@ void CLIENT_STATE::print_counts() { // bool CLIENT_STATE::garbage_collect() { unsigned int i; + PERS_FILE_XFER* pfxp; FILE_INFO* fip; RESULT* rp; WORKUNIT* wup; + vector::iterator pers_iter; vector::iterator result_iter; vector::iterator wu_iter; vector::iterator fi_iter; @@ -590,6 +635,32 @@ bool CLIENT_STATE::garbage_collect() { fip->ref_cnt = 0; } + // delete PERS_FILE_XFERs that have finished and their + // associated FILE_INFO and FILE_XFER objects + // + pers_iter = pers_xfers->pers_file_xfers.begin(); + while (pers_iter != pers_xfers->pers_file_xfers.end()) { + pfxp = *pers_iter; + if (pfxp->xfer_done) { + // TODO: *** Set the exit status of the related result + // to ERR_GIVEUP. The failure will be reported to the + // server and related file infos, results, and workunits + // will be deleted if necessary + + pers_iter = pers_xfers->pers_file_xfers.erase(pers_iter); + if (pfxp->fip) { + pfxp->fip->pers_file_xfer = NULL; + } + delete pfxp; + + // Update the client_state file + client_state_dirty = true; + action = true; + } else { + pers_iter++; + } + } + // delete RESULTs that have been finished and reported; // reference-count files referred to by other results // @@ -655,6 +726,7 @@ bool CLIENT_STATE::garbage_collect() { } // TODO: write no more often than X seconds +// Write the client_state.xml file if necessary // int CLIENT_STATE::write_state_file_if_needed() { int retval; @@ -674,16 +746,21 @@ void CLIENT_STATE::parse_cmdline(int argc, char** argv) { for (i=1; i projects; @@ -57,6 +63,7 @@ private: int version; char* platform_name; NET_XFER_SET* net_xfers; + PERS_FILE_XFER_SET* pers_xfers; HTTP_OP_SET* http_ops; FILE_XFER_SET* file_xfers; ACTIVE_TASK_SET active_tasks; @@ -100,6 +107,7 @@ private: bool contacted_sched_server; void compute_resource_debts(); public: + bool start_new_file_xfer(); PROJECT* next_project(PROJECT*); PROJECT* next_project_master_pending(); double work_needed_secs(); diff --git a/client/client_types.C b/client/client_types.C index fb1b5d3af3..b45384cbe5 100644 --- a/client/client_types.C +++ b/client/client_types.C @@ -23,6 +23,7 @@ #include "file_names.h" #include "filesys.h" #include "parse.h" +#include "pers_file_xfer.h" #include "client_types.h" @@ -176,7 +177,9 @@ void PROJECT::copy_state_fields(PROJECT& p) { hostid = p.hostid; exp_avg_cpu = p.exp_avg_cpu; exp_avg_mod_time = p.exp_avg_mod_time; - code_sign_key = strdup(p.code_sign_key); + if( p.code_sign_key ) { + code_sign_key = strdup(p.code_sign_key); + } nrpc_failures = p.nrpc_failures; min_rpc_time = p.min_rpc_time; } @@ -231,6 +234,8 @@ FILE_INFO::~FILE_INFO() { int FILE_INFO::parse(FILE* in, bool from_server) { char buf[256]; STRING256 url; + PERS_FILE_XFER *pfxp; + int retval; if(in==NULL) { fprintf(stderr, "error: FILE_INFO.parse: unexpected NULL pointer in\n"); return ERR_NULL; @@ -246,10 +251,12 @@ int FILE_INFO::parse(FILE* in, bool from_server) { upload_when_present = false; sticky = false; signature_required = false; - file_xfer = NULL; + pers_file_xfer = NULL; result = NULL; project = NULL; urls.clear(); + start_url = -1; + current_url = -1; if (from_server) { signed_xml = strdup(""); } else { @@ -285,6 +292,15 @@ int FILE_INFO::parse(FILE* in, bool from_server) { else if (match_tag(buf, "")) upload_when_present = true; else if (match_tag(buf, "")) sticky = true; else if (match_tag(buf, "")) signature_required = true; + else if (match_tag(buf, "")) { + pfxp = new PERS_FILE_XFER; + retval = pfxp->parse(in); + if (!retval) { + pers_file_xfer = pfxp; + } else { + delete pfxp; + } + } else if (!from_server && match_tag(buf, "")) { dup_element_contents(in, "", &signed_xml); continue; @@ -322,6 +338,9 @@ int FILE_INFO::write(FILE* out, bool to_server) { for (i=0; i%s\n", urls[i].text); } + if (!to_server && pers_file_xfer) { + pers_file_xfer->write(out); + } if (!to_server) { if (signed_xml) { fprintf(out, "\n%s\n", signed_xml); @@ -343,6 +362,24 @@ int FILE_INFO::delete_file() { return file_delete(path); } +// get the currently selected url to download/upload file, or +// select one if none is chosen yet +// +char* FILE_INFO::get_url() { + double temp; + if( current_url < 0 ) { + temp = rand(); + temp *= urls.size(); + temp /= RAND_MAX; + current_url = (int)temp; + start_url = current_url; + } + + return urls[current_url].text; +} + +// Parse XML based app_version information, usually from client_state.xml +// int APP_VERSION::parse(FILE* in) { char buf[256]; FILE_REF file_ref; diff --git a/client/client_types.h b/client/client_types.h index dc9335a600..6d785cc656 100644 --- a/client/client_types.h +++ b/client/client_types.h @@ -34,7 +34,7 @@ #define STDERR_MAX_LEN 4096 -class FILE_XFER; +class PERS_FILE_XFER; struct RESULT; struct STRING256 { @@ -106,11 +106,13 @@ public: bool upload_when_present; bool sticky; // don't delete unless instructed to do so bool signature_required; // true iff associated with app version - FILE_XFER* file_xfer; // nonzero if in the process of being up/downloaded + PERS_FILE_XFER* pers_file_xfer; // nonzero if in the process of being up/downloaded RESULT* result; // for upload files (to authenticate) PROJECT* project; int ref_cnt; vector urls; + int start_url; + int current_url; char* signed_xml; char* xml_signature; char* file_signature; @@ -120,6 +122,7 @@ public: int parse(FILE*, bool from_server); int write(FILE*, bool to_server); int delete_file(); // attempt to delete the underlying file + char* get_url(); }; // Describes a connection between a file and a workunit, result, or application. @@ -192,4 +195,6 @@ struct RESULT { bool is_upload_done(); // files uploaded? }; +int verify_downloaded_file(char* pathname, FILE_INFO& file_info); + #endif diff --git a/client/cs_files.C b/client/cs_files.C index db5c1b13d2..1e6c5deac0 100644 --- a/client/cs_files.C +++ b/client/cs_files.C @@ -82,104 +82,36 @@ int verify_downloaded_file(char* pathname, FILE_INFO& file_info) { // scan all FILE_INFOs. // start downloads and uploads as needed. -// check for completion of existing transfers // bool CLIENT_STATE::start_file_xfers() { unsigned int i; FILE_INFO* fip; - FILE_XFER* fxp; - char pathname[256]; - int retval; + PERS_FILE_XFER *pfx; bool action = false; for (i=0; ifile_xfer; - if (!fip->generated_locally && !fip->file_present && !fxp) { - fxp = new FILE_XFER; - fxp->init_download(*fip); - retval = file_xfers->insert(fxp); - if (retval) { - fprintf(stderr, - "couldn't start download for %s: error %d\n", - fip->urls[0].text, retval - ); - } else { - fip->file_xfer = fxp; - if (log_flags.file_xfer) { - printf( - "started download of %s\n", - fip->urls[0].text - ); - } - } + pfx = fip->pers_file_xfer; + if (!fip->generated_locally && !fip->file_present && !pfx) { + // Set up the persistent file transfer object. This will start + // the download when there is available bandwidth + // + pfx = new PERS_FILE_XFER; + pfx->init( fip, false ); + fip->pers_file_xfer = pfx; + // Pop PERS_FILE_XFER onto pers_file_xfer stack + if (fip->pers_file_xfer) pers_xfers->insert( fip->pers_file_xfer ); action = true; - } else if ( - fip->upload_when_present && fip->file_present - && !fip->uploaded && !fxp - ) { - fxp = new FILE_XFER; - fxp->init_upload(*fip); - retval = file_xfers->insert(fxp); - if (retval) { - fprintf(stderr, - "couldn't start upload for %s: error %d\n", - fip->urls[0].text, retval - ); - } else { - fip->file_xfer = fxp; - if (log_flags.file_xfer) { - printf("started upload to %s\n", fip->urls[0].text); - } - } + } else if ( fip->upload_when_present && fip->file_present && !fip->uploaded && !pfx ) { + // Set up the persistent file transfer object. This will start + // the upload when there is available bandwidth + // + pfx = new PERS_FILE_XFER; + pfx->init( fip, true ); + fip->pers_file_xfer = pfx; + // Pop PERS_FILE_XFER onto pers_file_xfer stack + if (fip->pers_file_xfer) pers_xfers->insert( fip->pers_file_xfer ); action = true; - } else if (fxp) { - if (fxp->file_xfer_done) { - action = true; - if (log_flags.file_xfer) { - printf( - "file transfer done for %s; retval %d\n", - fip->urls[0].text, fxp->file_xfer_retval - ); - } - file_xfers->remove(fxp); - fip->file_xfer = 0; - if (fip->generated_locally) { - if (fxp->file_xfer_retval == 0) { - net_stats.update(true, fip->nbytes, fxp->elapsed_time()); - - // file has been uploaded - delete if not sticky - if (!fip->sticky) { - fip->delete_file(); - } - fip->uploaded = true; - } - } else { - if (fxp->file_xfer_retval == 0) { - net_stats.update(false, fip->nbytes, fxp->elapsed_time()); - get_pathname(fip, pathname); - retval = verify_downloaded_file(pathname, *fip); - if (retval) { - fprintf(stderr, - "checksum or signature error for %s\n", fip->name - ); - } else { - if (log_flags.file_xfer_debug) { - printf("MD5 checksum validated for %s\n", pathname); - } - if (fip->executable) { - retval = chmod(pathname, S_IEXEC|S_IREAD|S_IWRITE); - } else { - get_pathname(fip, pathname); - retval = chmod(pathname, S_IREAD|S_IWRITE); - } - fip->file_present = true; - } - } - } - client_state_dirty = true; - delete fxp; - } } } return action; diff --git a/client/cs_scheduler.C b/client/cs_scheduler.C index 7cfd1bd7e5..760f61ada0 100644 --- a/client/cs_scheduler.C +++ b/client/cs_scheduler.C @@ -47,6 +47,13 @@ #define EXP_DECAY_RATE (1./(3600*24*7)) #define SECONDS_IN_DAY 86400 +// Decide whether to start a new file transfer +// +bool CLIENT_STATE::start_new_file_xfer() { + // **** this should do a little more than this + return true; +} + // estimate the days of work remaining // double CLIENT_STATE::current_water_days() { diff --git a/client/file_xfer.C b/client/file_xfer.C index cb8910d113..0b5ecbc8fb 100644 --- a/client/file_xfer.C +++ b/client/file_xfer.C @@ -44,6 +44,8 @@ FILE_XFER::~FILE_XFER() { #if 0 // Is there any reason to keep this around? int FILE_XFER::init_download(char* url, char* outfile) { + int file_size; + if(url==NULL) { fprintf(stderr, "error: FILE_XFER.init_download: unexpected NULL pointer url\n"); return ERR_NULL; @@ -52,7 +54,11 @@ int FILE_XFER::init_download(char* url, char* outfile) { fprintf(stderr, "error: FILE_XFER.init_download: unexpected NULL pointer outfile\n"); return ERR_NULL; } - return HTTP_OP::init_get(url, outfile); + + if (file_size(outfile, &file_size)) { + file_size = 0; + } + return HTTP_OP::init_get(url, outfile, file_size); } // Is there any reason to keep this around? @@ -72,7 +78,7 @@ int FILE_XFER::init_upload(char* url, char* infile) { int FILE_XFER::init_download(FILE_INFO& file_info) { fip = &file_info; get_pathname(fip, pathname); - return HTTP_OP::init_get((char*)(&fip->urls[0]), pathname); + return HTTP_OP::init_get(fip->get_url(), pathname, false); } // for uploads, we need to build a header with xml_signature etc. @@ -96,7 +102,7 @@ int FILE_XFER::init_upload(FILE_INFO& file_info) { file_info.xml_signature, file_info.nbytes ); - return HTTP_OP::init_post2((char*)(&fip->urls[0]), header, pathname, 0); + return HTTP_OP::init_post2(fip->get_url(), header, pathname, 0); } // Returns the total time that the file xfer has taken @@ -153,7 +159,6 @@ int FILE_XFER_SET::remove(FILE_XFER* fxp) { } fprintf(stderr, "FILE_XFER_SET::remove(): not found\n"); return 1; - } // Run through the FILE_XFER_SET and determine if any of the file @@ -176,7 +181,11 @@ bool FILE_XFER_SET::poll() { if (fxp->http_op_retval == 200) { fxp->file_xfer_retval = 0; } else { + // Remove the transfer from the set. The actual object + // will be removed later by it's associated PERS_FILE_XFER + remove(fxp); fxp->file_xfer_retval = fxp->http_op_retval; + i--; } } } diff --git a/client/hostinfo_unix.C b/client/hostinfo_unix.C index de392d2057..326d29c03f 100644 --- a/client/hostinfo_unix.C +++ b/client/hostinfo_unix.C @@ -232,9 +232,7 @@ int get_host_info2(HOST_INFO &host); // General function to get all relevant host information // int get_host_info(HOST_INFO& host) { - int timezone; // seconds added to local time to get UTC - - host.timezone = 0; + host.timezone = 0; // seconds added to local time to get UTC strcpy(host.domain_name,""); strcpy(host.serialnum,""); strcpy(host.ip_addr,""); diff --git a/client/http.C b/client/http.C index ea54fc32b2..035fb46a8f 100644 --- a/client/http.C +++ b/client/http.C @@ -76,7 +76,7 @@ static void parse_url(char* url, char* host, char* file) { // Prints an HTTP 1.0 GET request header into buf // static void http_get_request_header( - char* buf, char* host, char* file, int offset + char* buf, char* host, char* file, double offset ) { if(buf==NULL) { fprintf(stderr, "error: http_get_request_header: unexpected NULL pointer buf\n"); @@ -92,7 +92,7 @@ static void http_get_request_header( } if (offset) { sprintf(buf, - "GET /%s;byte-range %d- HTTP/1.0\015\012" + "GET /%s;byte-range %12.0f- HTTP/1.0\015\012" "User-Agent: BOINC client\015\012" "Host: %s:80\015\012" "Accept: */*\015\012" @@ -314,7 +314,7 @@ int HTTP_OP::init_head(char* url) { // Initialize HTTP GET operation to url // -int HTTP_OP::init_get(char* url, char* out, int off) { +int HTTP_OP::init_get(char* url, char* out, bool del_old_file, double off) { if(url==NULL) { fprintf(stderr, "error: HTTP_OP.init_get: unexpected NULL pointer url\n"); return ERR_NULL; @@ -327,6 +327,9 @@ int HTTP_OP::init_get(char* url, char* out, int off) { fprintf(stderr, "error: HTTP_OP.init_get: negative off\n"); return ERR_NEG; } + if (del_old_file) { + unlink(out); + } file_offset = off; parse_url(url, hostname, filename); NET_XFER::init(hostname, 80, HTTP_BLOCKSIZE); @@ -556,7 +559,8 @@ bool HTTP_OP_SET::poll() { htp->init_head( htp->hrh.redirect_location ); break; case HTTP_OP_GET: - htp->init_get( htp->hrh.redirect_location, htp->outfile ); + // *** Not sure if delete_old_file should be true + htp->init_get( htp->hrh.redirect_location, htp->outfile, true ); break; case HTTP_OP_POST: htp->init_post( htp->hrh.redirect_location, htp->infile, htp->outfile ); @@ -580,13 +584,15 @@ bool HTTP_OP_SET::poll() { htp->http_op_state = HTTP_STATE_DONE; htp->http_op_retval = 0; break; - case HTTP_OP_GET: case HTTP_OP_POST: - htp->http_op_state = HTTP_STATE_REPLY_BODY; - - // KLUDGE - should check for file first unlink(htp->outfile); - + case HTTP_OP_GET: + htp->http_op_state = HTTP_STATE_REPLY_BODY; + + // TODO: + // Append to a file if it already exists, otherwise + // create a new one. init_get should have already + // deleted the file if necessary htp->file = fopen(htp->outfile, "w"); if (!htp->file) { fprintf(stderr, diff --git a/client/http.h b/client/http.h index 12afe1a608..8e85703438 100644 --- a/client/http.h +++ b/client/http.h @@ -62,7 +62,7 @@ public: int http_op_retval; int init_head(char* url); - int init_get(char* url, char* outfile, int offset=0); + int init_get(char* url, char* outfile, bool del_old_file, double offset=0); int init_post(char* url, char* infile, char* outfile); int init_post2( char* url, char* req1, char* infile, double offset diff --git a/client/pers_file_xfer.C b/client/pers_file_xfer.C index ffc252dfd7..00b5f4681a 100644 --- a/client/pers_file_xfer.C +++ b/client/pers_file_xfer.C @@ -8,7 +8,7 @@ // License for the specific language governing rights and limitations // under the License. // -// The Original Code is the Berkeley Open Network Computing. +// The Original Code is the Berkeley Open Infrastructure for Network Computing. // // The Initial Developer of the Original Code is the SETI@home project. // Portions created by the SETI@home project are Copyright (C) 2002 @@ -17,6 +17,20 @@ // Contributor(s): // +#include +#include + +#include +#include + +#include "client_state.h" +#include "client_types.h" +#include "error_numbers.h" +#include "file_names.h" +#include "log_flags.h" +#include "parse.h" +#include "util.h" + // PERS_FILE_XFER represents a persistent file transfer. // A set of URL is given. @@ -27,47 +41,223 @@ // For upload, try to upload the file to the first URL; // if that fails try the others. +// +int PERS_FILE_XFER::init(FILE_INFO* the_file, bool is_file_upload) { + fxp = NULL; + this->fip = the_file; + nretry = 0; + first_request_time = time(NULL); + next_request_time = first_request_time; + is_upload = is_file_upload; + pers_xfer_retval = -1; + xfer_done = false; -int PERS_FILE_XFER::init(FILE_INFO&, bool is_upload) { + return 0; } -// -void PERS_FILE_XFER::try() { +// Try to start the file transfer associated with this persistent file transfer. +// Returns true if transfer was successfully started, false otherwise +// +bool PERS_FILE_XFER::start_xfer() { + FILE_XFER *file_xfer; + int retval; + + // Decide whether to start a new file transfer + if(gstate.start_new_file_xfer()) { + // Create a new FILE_XFER object and initialize a + // download or upload for the persistent file transfer + file_xfer = new FILE_XFER; + if (is_upload) { + retval = file_xfer->init_upload(*fip); + } else { + retval = file_xfer->init_download(*fip); + } + if (!retval) { + retval = gstate.insert_file_xfer(file_xfer); + } + if (retval) { + fprintf(stderr, "couldn't start %s for %s: error %d\n", + (is_upload ? "upload" : "download"), fip->get_url(), retval); + } else { + fxp = file_xfer; + if (log_flags.file_xfer) { + printf("started %s of %s\n", (is_upload ? "upload" : "download"), fip->get_url()); + } + return true; + } + } + return false; } -void PERS_FILE_XFER::poll(unsigned int now) { +// Poll the status of this persistent file transfer. If it's time to start it, then +// attempt to start it. If it has finished or failed, then deal with it appropriately +// +int PERS_FILE_XFER::poll(unsigned int now) { + double exp_backoff; + int retval; + char pathname[256]; + if (fxp) { if (fxp->file_xfer_done) { if (fxp->file_xfer_retval == 0) { + // The transfer finished with no errors. We will clean up the + // PERS_FILE_XFER object in garbage_collect() later + // + if (log_flags.file_xfer) { + printf( "file transfer done for %s; retval %d\n", + fip->get_url(), fxp->file_xfer_retval ); + } + if (fip->generated_locally) { + // If the file was generated locally (for upload), update stats + // and delete the local copy of the file if needed + // + gstate.update_net_stats(true, fip->nbytes, fxp->elapsed_time()); + + // file has been uploaded - delete if not sticky + if (!fip->sticky) { + fip->delete_file(); + } + fip->uploaded = true; + } else { + // Otherwise we downloaded the file. Update stats, verify + // the file with RSA or MD5, and change permissions + gstate.update_net_stats(false, fip->nbytes, fxp->elapsed_time()); + get_pathname(fip, pathname); + retval = verify_downloaded_file(pathname, *fip); + if (retval) { + fprintf(stderr, "checksum or signature error for %s\n", fip->name); + } else { + if (log_flags.file_xfer_debug) { + printf("MD5 checksum validated for %s\n", pathname); + } + // Set the appropriate permissions depending on whether + // it's an executable or normal file + if (fip->executable) { + retval = chmod(pathname, S_IEXEC|S_IREAD|S_IWRITE); + } else { + get_pathname(fip, pathname); + retval = chmod(pathname, S_IREAD|S_IWRITE); + } + fip->file_present = true; + } + } + xfer_done = true; + pers_xfer_retval = 0; } else { // file xfer failed. + // Cycle to the next URL to try + fip->current_url = (fip->current_url + 1)%fip->urls.size(); + + // If we reach the URL that we started at, then we have tried all + // servers without success + if( fip->current_url == fip->start_url ) { + nretry++; + exp_backoff = exp(((double)rand()/(double)RAND_MAX)*nretry); + // Do an exponential backoff of e^nretry seconds, keeping within the bounds + // of PERS_RETRY_DELAY_MIN and PERS_RETRY_DELAY_MAX + next_request_time = now+(int)max(PERS_RETRY_DELAY_MIN,min(PERS_RETRY_DELAY_MAX,exp_backoff)); + } + // See if it's time to give up on the persistent file xfer // - diff = now - fip->last_xfer_time; - if (diff > PERS_GIVEUP) { - pers_xfer_done = true; - pers_file_xfer_retval = ERR_GIVEUP; + if ((now - first_request_time) > gstate.giveup_after) { + xfer_done = true; + pers_xfer_retval = ERR_GIVEUP; } } + // Disassociate the finished file transfer + fxp = NULL; + + return true; } } else { // No file xfer is active. // We must be waiting after a failure. // See if it's time to try again. // - if (now > retry_time) { - try(); + if (now >= (unsigned int)next_request_time) { + return start_xfer(); } } + + return false; +} + +// Parse XML information about a single persistent file transfer +// +int PERS_FILE_XFER::parse(FILE* fin) { + char buf[256]; + if(fin==NULL) { + fprintf(stderr, "error: PERS_FILE_XFER_SET.parse: unexpected NULL pointer fin\n"); + return ERR_NULL; + } + while (fgets(buf, 256, fin)) { + if (match_tag(buf, "")) return 0; + else if (parse_int(buf, "", nretry)) continue; + else if (parse_int(buf, "", first_request_time)) continue; + else if (parse_int(buf, "", next_request_time)) continue; + else fprintf(stderr, "PERS_FILE_XFER::parse(): unrecognized: %s\n", buf); + } + return -1; +} + +// Write XML information about a particular persistent file transfer +// +int PERS_FILE_XFER::write(FILE* fout) { + if(fout==NULL) { + fprintf(stderr, "error: PERS_FILE_XFER_SET.write: unexpected NULL pointer fout\n"); + return ERR_NULL; + } + fprintf(fout, + " \n" + " %d\n" + " %d\n" + " %d\n" + " \n", + nretry, first_request_time, next_request_time); + return 0; +} + +PERS_FILE_XFER_SET::PERS_FILE_XFER_SET(FILE_XFER_SET* p) { + if(p==NULL) { + fprintf(stderr, "error: PERS_FILE_XFER_SET: unexpected NULL pointer p\n"); + } + file_xfers = p; } int PERS_FILE_XFER_SET::poll() { - unsigned int ; - PERS_FILE_XFER* pfxp; + unsigned int i; bool action = false; - int now = time(0); + int now = time(NULL); for (i=0; ipoll(now); + action |= pers_file_xfers[i]->poll(now); } + + return action; +} + +// Insert a PERS_FILE_XFER object into the set. We will decide which ones to start +// when we hit the polling loop +// +int PERS_FILE_XFER_SET::insert(PERS_FILE_XFER* pfx) { + pers_file_xfers.push_back(pfx); + return 0; +} + +// Remove a PERS_FILE_XFER object from the set. What should the action here be? +// +int PERS_FILE_XFER_SET::remove(PERS_FILE_XFER* pfx) { + vector::iterator iter; + + iter = pers_file_xfers.begin(); + while (iter != pers_file_xfers.end()) { + if (*iter == pfx) { + pers_file_xfers.erase(iter); + return 0; + } + iter++; + } + fprintf(stderr, "PERS_FILE_XFER_SET::remove(): not found\n"); + return 1; } diff --git a/client/pers_file_xfer.h b/client/pers_file_xfer.h index fdad8d1735..6398069016 100644 --- a/client/pers_file_xfer.h +++ b/client/pers_file_xfer.h @@ -17,6 +17,10 @@ // Contributor(s): // +#include +#include "client_types.h" +#include "file_xfer.h" + // PERS_FILE_XFER represents a persistent file transfer. // A set of URL is given in the FILE_INFO. @@ -28,26 +32,35 @@ // For upload, try to upload the file to the first URL; // if that fails try the others. -#define PERS_RETRY_DELAY_MIN 60 -#define PERS_RETRY_DELAY_MAX (256*60) -#define PERS_GIVEUP (3600*24*7) +#define PERS_RETRY_DELAY_MIN 60 // 1 minute +#define PERS_RETRY_DELAY_MAX (60*60*4) // 4 hours +#define PERS_GIVEUP (60*60*24*7*2) // 2 weeks // give up on xfer if this time elapses since last byte xferred class PERS_FILE_XFER { - int url_index; // which URL to try next int nretry; // # of retries so far - FILE_INFO* fip; + int first_request_time; // UNIX time of first file request + int next_request_time; // UNIX time to next retry the file request bool is_upload; - FILE_XFER* fxp; // nonzero if file xfer in progress - int retry_time; // don't retry until this time - int init(FILE_INFO&, bool is_upload); +public: + int pers_xfer_retval; + bool xfer_done; + FILE_XFER* fxp; // nonzero if file xfer in progress + FILE_INFO* fip; + + int init(FILE_INFO*, bool is_file_upload); + int poll(unsigned int now); + int write(FILE* fout); + int parse(FILE* fin); + bool start_xfer(); }; class PERS_FILE_XFER_SET { - vectorpers_file_xfers; FILE_XFER_SET* file_xfers; public: + vectorpers_file_xfers; + PERS_FILE_XFER_SET(FILE_XFER_SET*); int insert(PERS_FILE_XFER*); int remove(PERS_FILE_XFER*); diff --git a/client/scheduler_op.C b/client/scheduler_op.C index 4ca73c2d36..609305bd19 100644 --- a/client/scheduler_op.C +++ b/client/scheduler_op.C @@ -148,7 +148,7 @@ int SCHEDULER_OP::init_master_fetch(PROJECT* p) { if (log_flags.sched_op_debug) { printf("Fetching master file for %s\n", project->master_url); } - retval = http_op.init_get(project->master_url, MASTER_FILE_NAME); + retval = http_op.init_get(project->master_url, MASTER_FILE_NAME, true); if (retval) return retval; retval = http_ops->insert(&http_op); if (retval) return retval; diff --git a/client/test_file_xfer.C b/client/test_file_xfer.C index f64071cbfa..6bed368b49 100644 --- a/client/test_file_xfer.C +++ b/client/test_file_xfer.C @@ -27,6 +27,13 @@ #define DOWNLOAD_URL "http://localhost.localdomain/download/input" #define UPLOAD_URL "http://localhost.localdomain/boinc-cgi/file_upload_handler" +// Skeleton class to force compilation +class PERS_FILE_XFER { +public: + int write(FILE* fout); + int parse(FILE* fin); +}; + int main() { NET_XFER_SET nxs; @@ -107,3 +114,14 @@ int main() { } printf("all done\n"); } + +// Skeleton function to force compilation +int PERS_FILE_XFER::parse(FILE* fin) { + return 0; +} + +// Skeleton function to force compilation +int PERS_FILE_XFER::write(FILE* fout) { + return 0; +} + diff --git a/client/test_http.C b/client/test_http.C index 3733a5bb2b..7b1d46034b 100644 --- a/client/test_http.C +++ b/client/test_http.C @@ -47,7 +47,7 @@ int main() { #if 0 op1 = new HTTP_OP; - retval = op1->init_get("http://localhost.localdomain/my_index.html", "test_out1"); + retval = op1->init_get("http://localhost.localdomain/my_index.html", "test_out1", true); if (retval) { printf("init_post: %d\n", retval); exit(1); diff --git a/client/util.C b/client/util.C index b03ede6df5..3890a23951 100644 --- a/client/util.C +++ b/client/util.C @@ -44,6 +44,24 @@ void gettimeofday(timeval *t, void *tz) { #include "util.h" +// return minimum of two double precision numbers +double min( double a, double b ) { + if (ab) { + return a; + } else { + return b; + } +} + // return time of day as a double // double dtime() { diff --git a/client/util.h b/client/util.h index cab119426f..b9a0d9193a 100644 --- a/client/util.h +++ b/client/util.h @@ -18,4 +18,7 @@ // extern double dtime(); +extern double min( double a, double b ); +extern double max( double a, double b ); extern void boinc_sleep( int seconds ); + diff --git a/sched/file_upload_handler.C b/sched/file_upload_handler.C index 8032050622..436b28a4c9 100644 --- a/sched/file_upload_handler.C +++ b/sched/file_upload_handler.C @@ -1,4 +1,5 @@ -// The input to this program looks like this: +// There are two possible inputs to this program. +// One is for uploading a completed result, this looks as follows: // // // ... @@ -11,17 +12,34 @@ // // ... (data) // -// The return looks like +// The return for an uploaded result looks like // // 0 // or // 2 // bad file size +// +// The other kind of input is a file size request. This is used +// to determine how much of a file has been uploaded already. +// This kind of request should always be made before starting +// a file upload. +// The input for this looks as follows: +// result_1234_file +// +// The return for this information request looks like +// +// 1234 +// +// Where nbytes will be 0 if the file doesn't exist +// #include #include #include #include +#include +#include +#include #include "parse.h" #include "crypt.h" @@ -117,7 +135,7 @@ int copy_socket_to_file(FILE* in, char* path, double offset, double nbytes) { int handle_request(FILE* in, R_RSA_PUBLIC_KEY& key) { char buf[256]; double nbytes=0, offset=0; - char path[256]; + char path[256],file_name[256]; FILE_INFO file_info; int retval; bool is_valid; @@ -139,6 +157,17 @@ int handle_request(FILE* in, R_RSA_PUBLIC_KEY& key) { } continue; } + // Handle a file size request + else if (parse_str(buf, "", file_name)) { + struct stat sbuf; + // TODO: put checking here to ensure path doesn't point somewhere bad + sprintf( path, "%s/%s", BOINC_UPLOAD_DIR, file_name ); + retval = stat( path, &sbuf ); + if (retval && errno != ENOENT) print_status( -1, "cannot open file" ); + else if (retval) printf("Content-type: text/plain\n\n0\n"); + else printf("Content-type: text/plain\n\n%d\n", (int)sbuf.st_size); + exit(0); + } else if (parse_double(buf, "", offset)) continue; else if (parse_double(buf, "", nbytes)) continue; else if (match_tag(buf, "")) {