From ef4bb882d282c7b2099d197df33ac10516747ee5 Mon Sep 17 00:00:00 2001 From: Brian Boshes Date: Tue, 6 Jul 2004 17:37:58 +0000 Subject: [PATCH] *** empty log message *** svn path=/trunk/boinc/; revision=3787 --- checkin_notes | 60 +++++++++++++- client/client_types.C | 119 ++++++++++++++++++++++++--- client/client_types.h | 10 ++- client/cs_apps.C | 21 +++++ client/cs_files.C | 2 +- client/cs_scheduler.C | 27 ++++++- client/file_xfer.C | 8 +- client/pers_file_xfer.C | 175 ++++++++++++++++++++++++---------------- client/pers_file_xfer.h | 2 +- client/scheduler_op.C | 5 +- client/scheduler_op.h | 1 + 11 files changed, 337 insertions(+), 93 deletions(-) diff --git a/checkin_notes b/checkin_notes index 56d4c056f1..e660acc3ae 100755 --- a/checkin_notes +++ b/checkin_notes @@ -14614,7 +14614,7 @@ David 4 July 2004 main.C sched_shmem.C,h -David 7 July 2004 +David 6 July 2004 - Completed (more or less) the implementation of trickle messages. Some changes: - Trickle-up messages now have a "variety", @@ -14647,3 +14647,61 @@ David 7 July 2004 server_types.C,h trickle_handler.C +Brian 6 July 2004 + - Changed cliet so that when a file info is compared to + an existing file info with the same name, the differences + in whether it is an upload or download are updated and + any urls that are not present in the file info are added to the url list. + - When a url is requested for a file info, the file info + checks whether it is an upload, and then looks for the + substring "file_upload_handler" any url it is trying to + use to upload the file. The opposite is true for downloads. + + - Changed get_url into three separate functions: + get_current_url(bool), returns the current good url + get_init_url(bool), returns the initial good url and sets the start url + get_next_url(book), increments current_url and finds the next good url + + - Changed client so that a file_info will save its signed xml + even if it doesn't include have xml signature. + + - Updated the client so that the check to see if the url is + valid for the type of network request is a private function of FILE_INFO + + - Modified the backoff_retry function in persistant_file_xfers + so that it uses the updated get_url that checks if the url + is correct for the type of transfer. Otherwise it could + increment the current_url index to point to an invalid url + + - If a file_info doesn't have a max number of bytes and a new + file_info with the same name does, update the max number + of bytes of the existing file_info and add the to the signed_xml. + + - when file_xfer is complete, the client checks if the file + was meant to be uploaded when present OR generated locally + before setting it to ready to report, instead of only the later + + - With the addition of multiple urls, the issue of sending urls + that didn't communicate well or got garbage. Whenever a file + get's a server error, it calls, check_giveup, which only gives + up on the file if there are no more urls to try. Also is called + if the downloaded file is garbage or has the same name but not the + correct size. + + - added functionality for send_file_list but does not do anything yet + + + client/ + client_types.C,h + client_state.h + cs_apps.C + cs_file.C + cs_scheduler.C + file_xfer.C + pers_file_xfer.C,h + scheduler_op.C,h + + + + + diff --git a/client/client_types.C b/client/client_types.C index 8173a95456..2ea7c56564 100644 --- a/client/client_types.C +++ b/client/client_types.C @@ -82,6 +82,7 @@ void PROJECT::init() { work_done_this_period = 0; next_runnable_result = NULL; work_request = 0; + send_file_list = false; } PROJECT::~PROJECT() { @@ -291,6 +292,7 @@ int PROJECT::parse_state(MIOFILE& in) { nrpc_failures = 0; master_url_fetch_pending = false; sched_rpc_pending = false; + send_file_list = false; scheduler_urls.clear(); while (in.fgets(buf, 256)) { if (match_tag(buf, "")) return 0; @@ -327,6 +329,7 @@ int PROJECT::parse_state(MIOFILE& in) { else if (parse_int(buf, "", (int&)min_rpc_time)) continue; else if (match_tag(buf, "")) master_url_fetch_pending = true; else if (match_tag(buf, "")) sched_rpc_pending = true; + else if (match_tag(buf, "")) send_file_list = true; else if (parse_double(buf, "", debt)) continue; else scope_messages.printf("PROJECT::parse_state(): unrecognized: %s\n", buf); } @@ -395,7 +398,8 @@ int PROJECT::write_state(MIOFILE& out) { (int)min_rpc_time, debt, master_url_fetch_pending?" \n":"", - sched_rpc_pending?" \n":"" + sched_rpc_pending?" \n":"", + send_file_list?" \n":"" ); if (strlen(code_sign_key)) { out.printf( @@ -656,7 +660,7 @@ int FILE_INFO::write(MIOFILE& out, bool to_server) { if (retval) return retval; } if (!to_server) { - if (strlen(signed_xml) && strlen(xml_signature)) { + if (strlen(signed_xml)) { out.printf(" \n%s \n", signed_xml); } if (strlen(xml_signature)) { @@ -684,20 +688,96 @@ int FILE_INFO::delete_file() { return retval; } -// get the currently selected url to download/upload file, or -// select one if none is chosen yet +// If a file has multiple replicas, we want to choose +// a random one to try first, and then cycle through others +// if transfers fail. +// Call this to get the initial url, // -char* FILE_INFO::get_url() { +// Files may have URLs for both upload and download. +// The is_upload arg says which kind you want. +// NULL return means there is no URL of the requested type +// +char* FILE_INFO::get_init_url(bool is_upload) { double temp; - if (current_url < 0) { - temp = rand(); - temp *= urls.size(); - temp /= RAND_MAX; - current_url = (int)temp; - start_url = current_url; - } + temp = rand(); + temp *= urls.size(); + temp /= RAND_MAX; + current_url = (int)temp; + start_url = current_url; + while(1) { + if(!is_correct_url_type(is_upload, urls[current_url])) { + current_url = (current_url + 1)%urls.size(); + if (current_url == start_url) { + msg_printf(project, MSG_ERROR, "Couldn't find suitable url for %s\n", name); + return NULL; + } + } else { + start_url = current_url; + return urls[current_url].text; + } + } +} - return urls[current_url].text; +// Call this to get the next URL of the indicated type. +// NULL return means you've tried them all. +// +char* FILE_INFO::get_next_url(bool is_upload) { + while(1) { + current_url = (current_url + 1)%urls.size(); + if (current_url == start_url) { + return NULL; + } + if(is_correct_url_type(is_upload, urls[current_url])) { + return urls[current_url].text; + } + } +} + +char* FILE_INFO::get_current_url(bool is_upload) { + if (current_url < 0) { + return get_init_url(is_upload); + } + return urls[current_url].text; +} + +// Checks if the url includes the phrase "file_upload_handler" +// The inclusion of this phrase indicates the url is an upload url +// +bool FILE_INFO::is_correct_url_type(bool is_upload, STRING256 url) { + if(is_upload && !strstr(url.text, "file_upload_handler") || + !is_upload && strstr(url.text, "file_upload_handler")) { + return false; + } else { + return true; + } +} + +// merges information from a new FILE_INFO that has the same name as a +// FILE_INFO that is already present in the client state +// Potentially changes upload_when_present, max_nbytes, and signed_xml +// +int FILE_INFO::merge_info(FILE_INFO& new_info) { + char buf[256]; + bool has_url; + unsigned int i, j; + upload_when_present = new_info.upload_when_present; + if(max_nbytes <= 0 && new_info.max_nbytes) { + max_nbytes = new_info.max_nbytes; + sprintf(buf, " %.0f\n", new_info.max_nbytes); + strcat(signed_xml, buf); + } + for(i = 0; i < new_info.urls.size(); i++) { + has_url = false; + for(j = 0; j < urls.size(); j++) { + if(!strcmp(urls[j].text, new_info.urls[i].text)) { + has_url = true; + } + } + if(!has_url) { + urls.push_back(new_info.urls[i]); + } + } + return 0; } // Returns true if the file had an unrecoverable error @@ -1088,3 +1168,16 @@ void RESULT::get_app_version_string(string& str) { sprintf(buf, " %.2f", wup->version_num/100.); str = app->name + string(buf); } + +// resets all FILE_INFO's in result to uploaded = false +// used in file_xfers when an uploaded file is required +// without calling this before sending result to be uploaded, +// upload would terminate without sending files + +void RESULT::reset_result_files() { + unsigned int i; + + for (i=0; iuploaded = false; + } +} diff --git a/client/client_types.h b/client/client_types.h index d341b872a9..42f9c8e9a9 100644 --- a/client/client_types.h +++ b/client/client_types.h @@ -89,9 +89,13 @@ public: int parse(MIOFILE&, bool from_server); int write(MIOFILE&, bool to_server); int delete_file(); // attempt to delete the underlying file - char* get_url(); + char* get_init_url(bool); + char* get_next_url(bool); + char* get_current_url(bool); + bool is_correct_url_type(bool, STRING256); bool had_failure(int& failnum); bool verify_existing_file(); + int merge_info(FILE_INFO&); }; // Describes a connection between a file and a workunit, result, or application. @@ -164,6 +168,9 @@ public: bool tentative; // master URL and account ID not confirmed bool anonymous_platform; // app_versions.xml file found in project dir; // use those apps rather then getting from server + bool send_file_list; + // send the list of permanent files associated with the + // project in the next scheduler reply for the project char code_sign_key[MAX_BLOB_LEN]; std::vector user_files; int parse_preferences_for_user_files(); @@ -296,6 +303,7 @@ struct RESULT { int write(MIOFILE&, bool to_server); bool is_upload_done(); // files uploaded? void get_app_version_string(std::string&); + void reset_result_files(); }; int verify_downloaded_file(char* pathname, FILE_INFO& file_info); diff --git a/client/cs_apps.C b/client/cs_apps.C index 1280d48ea1..c6bed2c8fa 100644 --- a/client/cs_apps.C +++ b/client/cs_apps.C @@ -233,6 +233,12 @@ bool CLIENT_STATE::have_free_cpu() { // void CLIENT_STATE::assign_results_to_projects() { + // Before assigning a result to an active task, check if that result is a file xfer + // this will be appearent by the lack of files associated with the workunit's app + // Running this function will find these results and mark them as completed. + + handle_file_xfer_apps(); + for (unsigned int i=0; iresult->already_selected) continue; @@ -496,3 +502,18 @@ int CLIENT_STATE::choose_version_num(char* app_name, SCHEDULER_REPLY& sr) { return best; } +// goes through results and checks if the associated apps has no app files +// then there is nothing to do, never start the app, close the result + +void CLIENT_STATE::handle_file_xfer_apps() { + for(vector ::const_iterator i = results.begin(); + i!=results.end(); ++i) + { + RESULT* rp = *i; + if(rp->wup->avp->app_files.size() == 0 && rp->state == RESULT_FILES_DOWNLOADED) { + rp->state = RESULT_FILES_UPLOADING; + rp->reset_result_files(); + } + } +} + diff --git a/client/cs_files.C b/client/cs_files.C index f15dead052..83f777b881 100644 --- a/client/cs_files.C +++ b/client/cs_files.C @@ -201,7 +201,7 @@ bool CLIENT_STATE::handle_pers_file_xfers() { // if (pfx->xfer_done) { fip = pfx->fip; - if (fip->generated_locally) { + if (fip->generated_locally || fip->upload_when_present) { // file has been uploaded - delete if not sticky // if (!fip->sticky) { diff --git a/client/cs_scheduler.C b/client/cs_scheduler.C index c2008dbf3b..16f10c31fe 100644 --- a/client/cs_scheduler.C +++ b/client/cs_scheduler.C @@ -280,6 +280,7 @@ int CLIENT_STATE::make_scheduler_request(PROJECT* p, double work_req) { MIOFILE mf; unsigned int i; RESULT* rp; + FILE_INFO* fip; int retval; double size; char cross_project_id[MD5_LEN]; @@ -379,9 +380,22 @@ int CLIENT_STATE::make_scheduler_request(PROJECT* p, double work_req) { rp->write(mf, true); } } + if (p->send_file_list) { + fprintf(f, " \n"); + for(i=0; iproject == p && fip->sticky == true) { + fip->write(mf, true); + } + } + fprintf(f, " \n"); + p->send_file_list = false; + } + read_trickle_files(p, f); fprintf(f, "\n"); - fclose(f); + + fclose(f); return 0; } @@ -698,9 +712,13 @@ int CLIENT_STATE::handle_scheduler_reply( if (!retval) apps.push_back(app); } } + FILE_INFO* fip; for (i=0; imerge_info(sr.file_infos[i]); + } else { + fip = new FILE_INFO; *fip = sr.file_infos[i]; retval = link_file_info(project, fip); if (!retval) file_infos.push_back(fip); @@ -761,6 +779,9 @@ int CLIENT_STATE::handle_scheduler_reply( if (sr.message_ack) { remove_trickle_files(project); } + if (sr.send_file_list) { + project->send_file_list = true; + } project->sched_rpc_pending = false; set_client_state_dirty("handle_scheduler_reply"); scope_messages.printf("CLIENT_STATE::handle_scheduler_reply(): State after handle_scheduler_reply():\n"); diff --git a/client/file_xfer.C b/client/file_xfer.C index d68f55fea4..c5a460b1f3 100644 --- a/client/file_xfer.C +++ b/client/file_xfer.C @@ -60,7 +60,7 @@ int FILE_XFER::init_download(FILE_INFO& file_info) { } bytes_xferred = f_size; - return HTTP_OP::init_get(fip->get_url(), pathname, false, (int)f_size); + return HTTP_OP::init_get(fip->get_current_url(is_upload), pathname, false, (int)f_size); } // for uploads, we need to build a header with xml_signature etc. @@ -88,7 +88,7 @@ int FILE_XFER::init_upload(FILE_INFO& file_info) { file_info.name ); file_size_query = true; - return HTTP_OP::init_post2(fip->get_url(), header, NULL, 0); + return HTTP_OP::init_post2(fip->get_current_url(is_upload), header, NULL, 0); } else { bytes_xferred = file_info.upload_offset; sprintf(header, @@ -113,7 +113,7 @@ int FILE_XFER::init_upload(FILE_INFO& file_info) { ); file_size_query = false; return HTTP_OP::init_post2( - fip->get_url(), header, pathname, fip->upload_offset + fip->get_current_url(is_upload), header, pathname, fip->upload_offset ); } } @@ -241,7 +241,7 @@ bool FILE_XFER_SET::poll() { } } } - } else if (fxp->file_xfer_retval == HTTP_STATUS_RANGE_REQUEST_ERROR) { + } else if (fxp->file_xfer_retval == HTTP_STATUS_RANGE_REQUEST_ERROR) { fxp->fip->error_msg = "Existing file too large; can't resume"; } } diff --git a/client/pers_file_xfer.C b/client/pers_file_xfer.C index da3af60750..70233fd30b 100644 --- a/client/pers_file_xfer.C +++ b/client/pers_file_xfer.C @@ -73,7 +73,11 @@ int PERS_FILE_XFER::init(FILE_INFO* f, bool is_file_upload) { fip = f; is_upload = is_file_upload; xfer_done = false; - + char* p = f->get_init_url(is_file_upload); + if (!p) { + msg_printf(NULL, MSG_ERROR, "No URL for file transfer of %s", f->name); + return ERR_NULL; + } return 0; } @@ -134,7 +138,7 @@ int PERS_FILE_XFER::start_xfer() { if (retval) { msg_printf( fip->project, MSG_ERROR, "Couldn't start %s for %s: error %d", - (is_upload ? "upload" : "download"), fip->get_url(), retval + (is_upload ? "upload" : "download"), fip->get_current_url(is_upload), retval ); handle_xfer_failure(); delete fxp; @@ -146,7 +150,7 @@ int PERS_FILE_XFER::start_xfer() { if (retval) { msg_printf( fip->project, MSG_ERROR, "Couldn't start %s for %s: error %d", - (is_upload ? "upload" : "download"), fip->get_url(), retval + (is_upload ? "upload" : "download"), fip->get_current_url(is_upload), retval ); fxp->file_xfer_retval = retval; handle_xfer_failure(); @@ -160,7 +164,7 @@ int PERS_FILE_XFER::start_xfer() { (is_upload ? "upload" : "download"), fip->name ); } - scope_messages.printf("PERS_FILE_XFER::start_xfer(): URL: %s\n",fip->get_url()); + scope_messages.printf("PERS_FILE_XFER::start_xfer(): URL: %s\n",fip->get_current_url(is_upload)); return 0; } @@ -170,6 +174,8 @@ int PERS_FILE_XFER::start_xfer() { // bool PERS_FILE_XFER::poll(time_t now) { int retval; + char pathname[256]; + double existing_size = 0; SCOPE_MSG_LOG scope_messages(log_messages, CLIENT_MSG_LOG::DEBUG_FILE_XFER); @@ -197,69 +203,105 @@ bool PERS_FILE_XFER::poll(time_t now) { last_time = dtime(); if (fxp->file_xfer_done) { - scope_messages.printf( - "PERS_FILE_XFER::poll(): file transfer status %d", - fxp->file_xfer_retval - ); - if (fxp->file_xfer_retval == 0) { - // The transfer finished with no errors. - // - if (log_flags.file_xfer) { - msg_printf( - fip->project, MSG_INFO, "Finished %s of %s", - is_upload?"upload":"download", fip->name - ); - if (fxp->xfer_speed < 0) { - msg_printf(fip->project, MSG_INFO, "No data transferred"); - } else { - msg_printf( - fip->project, MSG_INFO, "Approximate throughput %f bytes/sec", - fxp->xfer_speed - ); - } - } - xfer_done = true; - } else if (fxp->file_xfer_retval == ERR_UPLOAD_PERMANENT) { - if (log_flags.file_xfer) { - msg_printf( - fip->project, MSG_INFO, "Permanently failed %s of %s", - is_upload?"upload":"download", fip->name - ); - } - giveup("server rejected file"); - } else { - if (log_flags.file_xfer) { - msg_printf( - fip->project, MSG_INFO, "Temporarily failed %s of %s", - is_upload?"upload":"download", fip->name - ); - } - handle_xfer_failure(); - } - // remove fxp from file_xfer_set and deallocate it - // - gstate.file_xfers->remove(fxp); - delete fxp; - fxp = NULL; + // check if the file was actually downloaded, if not check if there are more urls to try + // if there are no bytes downloaded, than the was probably a wrong url downloaded + get_pathname(fip, pathname); + if (!file_size(pathname, existing_size) && existing_size == fip->nbytes) { + retval = verify_downloaded_file(pathname, *fip); + if (!retval) { + scope_messages.printf( + "PERS_FILE_XFER::poll(): file transfer status %d", + fxp->file_xfer_retval + ); + if (fxp->file_xfer_retval == 0) { + // The transfer finished with no errors. + // + if (log_flags.file_xfer) { + msg_printf( + fip->project, MSG_INFO, "Finished %s of %s", + is_upload?"upload":"download", fip->name + ); + if (fxp->xfer_speed < 0) { + msg_printf(fip->project, MSG_INFO, "No data transferred"); + } else { + msg_printf( + fip->project, MSG_INFO, "Approximate throughput %f bytes/sec", + fxp->xfer_speed + ); + } + } + xfer_done = true; + } else if (fxp->file_xfer_retval == ERR_UPLOAD_PERMANENT) { + if (log_flags.file_xfer) { + msg_printf( + fip->project, MSG_INFO, "Permanently failed %s of %s", + is_upload?"upload":"download", fip->name + ); + } + check_giveup("server rejected file"); + } else { + if (log_flags.file_xfer) { + msg_printf( + fip->project, MSG_INFO, "Temporarily failed %s of %s", + is_upload?"upload":"download", fip->name + ); + } + handle_xfer_failure(); + } + // remove fxp from file_xfer_set and deallocate it + // + gstate.file_xfers->remove(fxp); + delete fxp; + fxp = NULL; - return true; + return true; + } + } + check_giveup("File downloaded was not the correct file or was garbage from bad URL"); + return false; } - return false; } -void PERS_FILE_XFER::giveup(char* why) { - if (is_upload) { - fip->status = ERR_GIVEUP_UPLOAD; - } else { - fip->status = ERR_GIVEUP_DOWNLOAD; - } - xfer_done = true; - msg_printf( - fip->project, MSG_ERROR, "Giving up on %s of %s: %s", - is_upload?"upload":"download", fip->name, why - ); - fip->error_msg = why; +// Takes a reason why a transfer has failed. Checks to see if there are no more valid URLs +// listed in the file_info. If no more urls are present, the file is then given up on, the reason is +// listed in the error_msg field, and the appropriate status code is given. If there are more +// URLs to try, the file_xfer is restarted with these new urls until a good transfer is made +// or it completely gives up. +// + +void PERS_FILE_XFER::check_giveup(char* why) { + if(fip->get_next_url(fip->upload_when_present) == NULL) { + // the file has no appropriate download location + // remove the file from the directory and delete the file xfer object + gstate.file_xfers->remove(fxp); + delete fxp; + fxp = NULL; + // apply the correct error code + if (is_upload) { + fip->status = ERR_GIVEUP_UPLOAD; + } else { + fip->status = ERR_GIVEUP_DOWNLOAD; + } + // end the xfer so it will be deleted + xfer_done = true; + msg_printf( + fip->project, MSG_ERROR, "Giving up on %s of %s: %s", + is_upload?"upload":"download", fip->name, why + ); + fip->error_msg = why; + // delete the associated file in the project directory + fip->delete_file(); + } else { + if (is_upload) { + if (gstate.exit_before_upload) { + exit(0); + } + fxp->init_upload(*fip); + } else { + fxp->init_download(*fip); + } + } } // Handle a transfer failure @@ -277,7 +319,7 @@ void PERS_FILE_XFER::handle_xfer_failure() { // If it is uploading and receives a HTTP_STATUS_NOT_FOUND then // the file upload handler could not be found. if (!fxp->is_upload) { - giveup("file was not found on server"); + check_giveup("file was not found on server"); return; } else { retry_or_backoff(); @@ -287,7 +329,7 @@ void PERS_FILE_XFER::handle_xfer_failure() { // See if it's time to give up on the persistent file xfer // if ((now - first_request_time) > gstate.file_xfer_giveup_period) { - giveup("too much elapsed time"); + check_giveup("too much elapsed time"); } else { retry_or_backoff(); } @@ -307,13 +349,10 @@ void PERS_FILE_XFER::retry_or_backoff() { // newtime = localtime(&now); // Cycle to the next URL to try - // - fip->current_url = (fip->current_url + 1)%fip->urls.size(); - // If we reach the URL that we started at, then we have tried all // servers without success // - if (fip->current_url == fip->start_url) { + if (fip->get_next_url(is_upload) == NULL) { nretry++; // Do an exponential backoff of e^nretry seconds, diff --git a/client/pers_file_xfer.h b/client/pers_file_xfer.h index 980d772974..d860b4f42a 100644 --- a/client/pers_file_xfer.h +++ b/client/pers_file_xfer.h @@ -69,7 +69,7 @@ public: bool poll(time_t now); void handle_xfer_failure(); void retry_or_backoff(); - void giveup(char*); + void check_giveup(char*); int write(MIOFILE& fout); int parse(MIOFILE& fin); int start_xfer(); diff --git a/client/scheduler_op.C b/client/scheduler_op.C index 533a50524c..fb4dd1371e 100644 --- a/client/scheduler_op.C +++ b/client/scheduler_op.C @@ -585,6 +585,7 @@ int SCHEDULER_REPLY::parse(FILE* in, PROJECT* project) { code_sign_key_signature = 0; message_ack = false; project_is_down = false; + send_file_list = false; p = fgets(buf, 256, in); if (!p) { @@ -693,7 +694,9 @@ int SCHEDULER_REPLY::parse(FILE* in, PROJECT* project) { continue; } else if (match_tag(buf, "")) { continue; - } else if (strlen(buf)>1){ + } else if (match_tag(buf, "")) { + send_file_list = true; + } else if (strlen(buf)>1){ scope_messages.printf("SCHEDULER_REPLY::parse(): unrecognized %s\n", buf); } } diff --git a/client/scheduler_op.h b/client/scheduler_op.h index 5bc414f16e..b3c3fa7bd4 100644 --- a/client/scheduler_op.h +++ b/client/scheduler_op.h @@ -113,6 +113,7 @@ struct SCHEDULER_REPLY { char* code_sign_key_signature; bool message_ack; bool project_is_down; + bool send_file_list; SCHEDULER_REPLY(); ~SCHEDULER_REPLY();