*** empty log message ***

svn path=/trunk/boinc/; revision=3787
This commit is contained in:
Brian Boshes 2004-07-06 17:37:58 +00:00
parent 6f599dfe87
commit ef4bb882d2
11 changed files with 337 additions and 93 deletions

View File

@ -14614,7 +14614,7 @@ David 4 July 2004
main.C
sched_shmem.C,h
David 7 July 2004
David 6 July 2004
- Completed (more or less) the implementation of trickle messages.
Some changes:
- Trickle-up messages now have a "variety",
@ -14647,3 +14647,61 @@ David 7 July 2004
server_types.C,h
trickle_handler.C
Brian 6 July 2004
- Changed cliet so that when a file info is compared to
an existing file info with the same name, the differences
in whether it is an upload or download are updated and
any urls that are not present in the file info are added to the url list.
- When a url is requested for a file info, the file info
checks whether it is an upload, and then looks for the
substring "file_upload_handler" any url it is trying to
use to upload the file. The opposite is true for downloads.
- Changed get_url into three separate functions:
get_current_url(bool), returns the current good url
get_init_url(bool), returns the initial good url and sets the start url
get_next_url(book), increments current_url and finds the next good url
- Changed client so that a file_info will save its signed xml
even if it doesn't include have xml signature.
- Updated the client so that the check to see if the url is
valid for the type of network request is a private function of FILE_INFO
- Modified the backoff_retry function in persistant_file_xfers
so that it uses the updated get_url that checks if the url
is correct for the type of transfer. Otherwise it could
increment the current_url index to point to an invalid url
- If a file_info doesn't have a max number of bytes and a new
file_info with the same name does, update the max number
of bytes of the existing file_info and add the <max_nbytes> to the signed_xml.
- when file_xfer is complete, the client checks if the file
was meant to be uploaded when present OR generated locally
before setting it to ready to report, instead of only the later
- With the addition of multiple urls, the issue of sending urls
that didn't communicate well or got garbage. Whenever a file
get's a server error, it calls, check_giveup, which only gives
up on the file if there are no more urls to try. Also is called
if the downloaded file is garbage or has the same name but not the
correct size.
- added functionality for send_file_list but does not do anything yet
client/
client_types.C,h
client_state.h
cs_apps.C
cs_file.C
cs_scheduler.C
file_xfer.C
pers_file_xfer.C,h
scheduler_op.C,h

View File

@ -82,6 +82,7 @@ void PROJECT::init() {
work_done_this_period = 0;
next_runnable_result = NULL;
work_request = 0;
send_file_list = false;
}
PROJECT::~PROJECT() {
@ -291,6 +292,7 @@ int PROJECT::parse_state(MIOFILE& in) {
nrpc_failures = 0;
master_url_fetch_pending = false;
sched_rpc_pending = false;
send_file_list = false;
scheduler_urls.clear();
while (in.fgets(buf, 256)) {
if (match_tag(buf, "</project>")) return 0;
@ -327,6 +329,7 @@ int PROJECT::parse_state(MIOFILE& in) {
else if (parse_int(buf, "<min_rpc_time>", (int&)min_rpc_time)) continue;
else if (match_tag(buf, "<master_url_fetch_pending/>")) master_url_fetch_pending = true;
else if (match_tag(buf, "<sched_rpc_pending/>")) sched_rpc_pending = true;
else if (match_tag(buf, "<send_file_list/>")) send_file_list = true;
else if (parse_double(buf, "<debt>", debt)) continue;
else scope_messages.printf("PROJECT::parse_state(): unrecognized: %s\n", buf);
}
@ -395,7 +398,8 @@ int PROJECT::write_state(MIOFILE& out) {
(int)min_rpc_time,
debt,
master_url_fetch_pending?" <master_url_fetch_pending/>\n":"",
sched_rpc_pending?" <sched_rpc_pending/>\n":""
sched_rpc_pending?" <sched_rpc_pending/>\n":"",
send_file_list?" <send_file_list/>\n":""
);
if (strlen(code_sign_key)) {
out.printf(
@ -656,7 +660,7 @@ int FILE_INFO::write(MIOFILE& out, bool to_server) {
if (retval) return retval;
}
if (!to_server) {
if (strlen(signed_xml) && strlen(xml_signature)) {
if (strlen(signed_xml)) {
out.printf(" <signed_xml>\n%s </signed_xml>\n", signed_xml);
}
if (strlen(xml_signature)) {
@ -684,20 +688,96 @@ int FILE_INFO::delete_file() {
return retval;
}
// get the currently selected url to download/upload file, or
// select one if none is chosen yet
// If a file has multiple replicas, we want to choose
// a random one to try first, and then cycle through others
// if transfers fail.
// Call this to get the initial url,
//
char* FILE_INFO::get_url() {
// Files may have URLs for both upload and download.
// The is_upload arg says which kind you want.
// NULL return means there is no URL of the requested type
//
char* FILE_INFO::get_init_url(bool is_upload) {
double temp;
if (current_url < 0) {
temp = rand();
temp *= urls.size();
temp /= RAND_MAX;
current_url = (int)temp;
start_url = current_url;
}
temp = rand();
temp *= urls.size();
temp /= RAND_MAX;
current_url = (int)temp;
start_url = current_url;
while(1) {
if(!is_correct_url_type(is_upload, urls[current_url])) {
current_url = (current_url + 1)%urls.size();
if (current_url == start_url) {
msg_printf(project, MSG_ERROR, "Couldn't find suitable url for %s\n", name);
return NULL;
}
} else {
start_url = current_url;
return urls[current_url].text;
}
}
}
return urls[current_url].text;
// Call this to get the next URL of the indicated type.
// NULL return means you've tried them all.
//
char* FILE_INFO::get_next_url(bool is_upload) {
while(1) {
current_url = (current_url + 1)%urls.size();
if (current_url == start_url) {
return NULL;
}
if(is_correct_url_type(is_upload, urls[current_url])) {
return urls[current_url].text;
}
}
}
char* FILE_INFO::get_current_url(bool is_upload) {
if (current_url < 0) {
return get_init_url(is_upload);
}
return urls[current_url].text;
}
// Checks if the url includes the phrase "file_upload_handler"
// The inclusion of this phrase indicates the url is an upload url
//
bool FILE_INFO::is_correct_url_type(bool is_upload, STRING256 url) {
if(is_upload && !strstr(url.text, "file_upload_handler") ||
!is_upload && strstr(url.text, "file_upload_handler")) {
return false;
} else {
return true;
}
}
// merges information from a new FILE_INFO that has the same name as a
// FILE_INFO that is already present in the client state
// Potentially changes upload_when_present, max_nbytes, and signed_xml
//
int FILE_INFO::merge_info(FILE_INFO& new_info) {
char buf[256];
bool has_url;
unsigned int i, j;
upload_when_present = new_info.upload_when_present;
if(max_nbytes <= 0 && new_info.max_nbytes) {
max_nbytes = new_info.max_nbytes;
sprintf(buf, " <max_nbytes>%.0f</max_nbytes>\n", new_info.max_nbytes);
strcat(signed_xml, buf);
}
for(i = 0; i < new_info.urls.size(); i++) {
has_url = false;
for(j = 0; j < urls.size(); j++) {
if(!strcmp(urls[j].text, new_info.urls[i].text)) {
has_url = true;
}
}
if(!has_url) {
urls.push_back(new_info.urls[i]);
}
}
return 0;
}
// Returns true if the file had an unrecoverable error
@ -1088,3 +1168,16 @@ void RESULT::get_app_version_string(string& str) {
sprintf(buf, " %.2f", wup->version_num/100.);
str = app->name + string(buf);
}
// resets all FILE_INFO's in result to uploaded = false
// used in file_xfers when an uploaded file is required
// without calling this before sending result to be uploaded,
// upload would terminate without sending files
void RESULT::reset_result_files() {
unsigned int i;
for (i=0; i<output_files.size(); i++) {
output_files[i].file_info->uploaded = false;
}
}

View File

@ -89,9 +89,13 @@ public:
int parse(MIOFILE&, bool from_server);
int write(MIOFILE&, bool to_server);
int delete_file(); // attempt to delete the underlying file
char* get_url();
char* get_init_url(bool);
char* get_next_url(bool);
char* get_current_url(bool);
bool is_correct_url_type(bool, STRING256);
bool had_failure(int& failnum);
bool verify_existing_file();
int merge_info(FILE_INFO&);
};
// Describes a connection between a file and a workunit, result, or application.
@ -164,6 +168,9 @@ public:
bool tentative; // master URL and account ID not confirmed
bool anonymous_platform; // app_versions.xml file found in project dir;
// use those apps rather then getting from server
bool send_file_list;
// send the list of permanent files associated with the
// project in the next scheduler reply for the project
char code_sign_key[MAX_BLOB_LEN];
std::vector<FILE_REF> user_files;
int parse_preferences_for_user_files();
@ -296,6 +303,7 @@ struct RESULT {
int write(MIOFILE&, bool to_server);
bool is_upload_done(); // files uploaded?
void get_app_version_string(std::string&);
void reset_result_files();
};
int verify_downloaded_file(char* pathname, FILE_INFO& file_info);

View File

@ -233,6 +233,12 @@ bool CLIENT_STATE::have_free_cpu() {
//
void CLIENT_STATE::assign_results_to_projects() {
// Before assigning a result to an active task, check if that result is a file xfer
// this will be appearent by the lack of files associated with the workunit's app
// Running this function will find these results and mark them as completed.
handle_file_xfer_apps();
for (unsigned int i=0; i<active_tasks.active_tasks.size(); ++i) {
ACTIVE_TASK *atp = active_tasks.active_tasks[i];
if (atp->result->already_selected) continue;
@ -496,3 +502,18 @@ int CLIENT_STATE::choose_version_num(char* app_name, SCHEDULER_REPLY& sr) {
return best;
}
// goes through results and checks if the associated apps has no app files
// then there is nothing to do, never start the app, close the result
void CLIENT_STATE::handle_file_xfer_apps() {
for(vector <RESULT*>::const_iterator i = results.begin();
i!=results.end(); ++i)
{
RESULT* rp = *i;
if(rp->wup->avp->app_files.size() == 0 && rp->state == RESULT_FILES_DOWNLOADED) {
rp->state = RESULT_FILES_UPLOADING;
rp->reset_result_files();
}
}
}

View File

@ -201,7 +201,7 @@ bool CLIENT_STATE::handle_pers_file_xfers() {
//
if (pfx->xfer_done) {
fip = pfx->fip;
if (fip->generated_locally) {
if (fip->generated_locally || fip->upload_when_present) {
// file has been uploaded - delete if not sticky
//
if (!fip->sticky) {

View File

@ -280,6 +280,7 @@ int CLIENT_STATE::make_scheduler_request(PROJECT* p, double work_req) {
MIOFILE mf;
unsigned int i;
RESULT* rp;
FILE_INFO* fip;
int retval;
double size;
char cross_project_id[MD5_LEN];
@ -379,9 +380,22 @@ int CLIENT_STATE::make_scheduler_request(PROJECT* p, double work_req) {
rp->write(mf, true);
}
}
if (p->send_file_list) {
fprintf(f, " <reply_file_list>\n");
for(i=0; i<file_infos.size(); i++) {
fip = file_infos[i];
if(fip->project == p && fip->sticky == true) {
fip->write(mf, true);
}
}
fprintf(f, " </reply_file_list>\n");
p->send_file_list = false;
}
read_trickle_files(p, f);
fprintf(f, "</scheduler_request>\n");
fclose(f);
fclose(f);
return 0;
}
@ -698,9 +712,13 @@ int CLIENT_STATE::handle_scheduler_reply(
if (!retval) apps.push_back(app);
}
}
FILE_INFO* fip;
for (i=0; i<sr.file_infos.size(); i++) {
if (!lookup_file_info(project, sr.file_infos[i].name)) {
FILE_INFO* fip = new FILE_INFO;
fip = lookup_file_info(project, sr.file_infos[i].name);
if(fip) {
fip->merge_info(sr.file_infos[i]);
} else {
fip = new FILE_INFO;
*fip = sr.file_infos[i];
retval = link_file_info(project, fip);
if (!retval) file_infos.push_back(fip);
@ -761,6 +779,9 @@ int CLIENT_STATE::handle_scheduler_reply(
if (sr.message_ack) {
remove_trickle_files(project);
}
if (sr.send_file_list) {
project->send_file_list = true;
}
project->sched_rpc_pending = false;
set_client_state_dirty("handle_scheduler_reply");
scope_messages.printf("CLIENT_STATE::handle_scheduler_reply(): State after handle_scheduler_reply():\n");

View File

@ -60,7 +60,7 @@ int FILE_XFER::init_download(FILE_INFO& file_info) {
}
bytes_xferred = f_size;
return HTTP_OP::init_get(fip->get_url(), pathname, false, (int)f_size);
return HTTP_OP::init_get(fip->get_current_url(is_upload), pathname, false, (int)f_size);
}
// for uploads, we need to build a header with xml_signature etc.
@ -88,7 +88,7 @@ int FILE_XFER::init_upload(FILE_INFO& file_info) {
file_info.name
);
file_size_query = true;
return HTTP_OP::init_post2(fip->get_url(), header, NULL, 0);
return HTTP_OP::init_post2(fip->get_current_url(is_upload), header, NULL, 0);
} else {
bytes_xferred = file_info.upload_offset;
sprintf(header,
@ -113,7 +113,7 @@ int FILE_XFER::init_upload(FILE_INFO& file_info) {
);
file_size_query = false;
return HTTP_OP::init_post2(
fip->get_url(), header, pathname, fip->upload_offset
fip->get_current_url(is_upload), header, pathname, fip->upload_offset
);
}
}
@ -241,7 +241,7 @@ bool FILE_XFER_SET::poll() {
}
}
}
} else if (fxp->file_xfer_retval == HTTP_STATUS_RANGE_REQUEST_ERROR) {
} else if (fxp->file_xfer_retval == HTTP_STATUS_RANGE_REQUEST_ERROR) {
fxp->fip->error_msg = "Existing file too large; can't resume";
}
}

View File

@ -73,7 +73,11 @@ int PERS_FILE_XFER::init(FILE_INFO* f, bool is_file_upload) {
fip = f;
is_upload = is_file_upload;
xfer_done = false;
char* p = f->get_init_url(is_file_upload);
if (!p) {
msg_printf(NULL, MSG_ERROR, "No URL for file transfer of %s", f->name);
return ERR_NULL;
}
return 0;
}
@ -134,7 +138,7 @@ int PERS_FILE_XFER::start_xfer() {
if (retval) {
msg_printf(
fip->project, MSG_ERROR, "Couldn't start %s for %s: error %d",
(is_upload ? "upload" : "download"), fip->get_url(), retval
(is_upload ? "upload" : "download"), fip->get_current_url(is_upload), retval
);
handle_xfer_failure();
delete fxp;
@ -146,7 +150,7 @@ int PERS_FILE_XFER::start_xfer() {
if (retval) {
msg_printf(
fip->project, MSG_ERROR, "Couldn't start %s for %s: error %d",
(is_upload ? "upload" : "download"), fip->get_url(), retval
(is_upload ? "upload" : "download"), fip->get_current_url(is_upload), retval
);
fxp->file_xfer_retval = retval;
handle_xfer_failure();
@ -160,7 +164,7 @@ int PERS_FILE_XFER::start_xfer() {
(is_upload ? "upload" : "download"), fip->name
);
}
scope_messages.printf("PERS_FILE_XFER::start_xfer(): URL: %s\n",fip->get_url());
scope_messages.printf("PERS_FILE_XFER::start_xfer(): URL: %s\n",fip->get_current_url(is_upload));
return 0;
}
@ -170,6 +174,8 @@ int PERS_FILE_XFER::start_xfer() {
//
bool PERS_FILE_XFER::poll(time_t now) {
int retval;
char pathname[256];
double existing_size = 0;
SCOPE_MSG_LOG scope_messages(log_messages, CLIENT_MSG_LOG::DEBUG_FILE_XFER);
@ -197,69 +203,105 @@ bool PERS_FILE_XFER::poll(time_t now) {
last_time = dtime();
if (fxp->file_xfer_done) {
scope_messages.printf(
"PERS_FILE_XFER::poll(): file transfer status %d",
fxp->file_xfer_retval
);
if (fxp->file_xfer_retval == 0) {
// The transfer finished with no errors.
//
if (log_flags.file_xfer) {
msg_printf(
fip->project, MSG_INFO, "Finished %s of %s",
is_upload?"upload":"download", fip->name
);
if (fxp->xfer_speed < 0) {
msg_printf(fip->project, MSG_INFO, "No data transferred");
} else {
msg_printf(
fip->project, MSG_INFO, "Approximate throughput %f bytes/sec",
fxp->xfer_speed
);
}
}
xfer_done = true;
} else if (fxp->file_xfer_retval == ERR_UPLOAD_PERMANENT) {
if (log_flags.file_xfer) {
msg_printf(
fip->project, MSG_INFO, "Permanently failed %s of %s",
is_upload?"upload":"download", fip->name
);
}
giveup("server rejected file");
} else {
if (log_flags.file_xfer) {
msg_printf(
fip->project, MSG_INFO, "Temporarily failed %s of %s",
is_upload?"upload":"download", fip->name
);
}
handle_xfer_failure();
}
// remove fxp from file_xfer_set and deallocate it
//
gstate.file_xfers->remove(fxp);
delete fxp;
fxp = NULL;
// check if the file was actually downloaded, if not check if there are more urls to try
// if there are no bytes downloaded, than the was probably a wrong url downloaded
get_pathname(fip, pathname);
if (!file_size(pathname, existing_size) && existing_size == fip->nbytes) {
retval = verify_downloaded_file(pathname, *fip);
if (!retval) {
scope_messages.printf(
"PERS_FILE_XFER::poll(): file transfer status %d",
fxp->file_xfer_retval
);
if (fxp->file_xfer_retval == 0) {
// The transfer finished with no errors.
//
if (log_flags.file_xfer) {
msg_printf(
fip->project, MSG_INFO, "Finished %s of %s",
is_upload?"upload":"download", fip->name
);
if (fxp->xfer_speed < 0) {
msg_printf(fip->project, MSG_INFO, "No data transferred");
} else {
msg_printf(
fip->project, MSG_INFO, "Approximate throughput %f bytes/sec",
fxp->xfer_speed
);
}
}
xfer_done = true;
} else if (fxp->file_xfer_retval == ERR_UPLOAD_PERMANENT) {
if (log_flags.file_xfer) {
msg_printf(
fip->project, MSG_INFO, "Permanently failed %s of %s",
is_upload?"upload":"download", fip->name
);
}
check_giveup("server rejected file");
} else {
if (log_flags.file_xfer) {
msg_printf(
fip->project, MSG_INFO, "Temporarily failed %s of %s",
is_upload?"upload":"download", fip->name
);
}
handle_xfer_failure();
}
// remove fxp from file_xfer_set and deallocate it
//
gstate.file_xfers->remove(fxp);
delete fxp;
fxp = NULL;
return true;
return true;
}
}
check_giveup("File downloaded was not the correct file or was garbage from bad URL");
return false;
}
return false;
}
void PERS_FILE_XFER::giveup(char* why) {
if (is_upload) {
fip->status = ERR_GIVEUP_UPLOAD;
} else {
fip->status = ERR_GIVEUP_DOWNLOAD;
}
xfer_done = true;
msg_printf(
fip->project, MSG_ERROR, "Giving up on %s of %s: %s",
is_upload?"upload":"download", fip->name, why
);
fip->error_msg = why;
// Takes a reason why a transfer has failed. Checks to see if there are no more valid URLs
// listed in the file_info. If no more urls are present, the file is then given up on, the reason is
// listed in the error_msg field, and the appropriate status code is given. If there are more
// URLs to try, the file_xfer is restarted with these new urls until a good transfer is made
// or it completely gives up.
//
void PERS_FILE_XFER::check_giveup(char* why) {
if(fip->get_next_url(fip->upload_when_present) == NULL) {
// the file has no appropriate download location
// remove the file from the directory and delete the file xfer object
gstate.file_xfers->remove(fxp);
delete fxp;
fxp = NULL;
// apply the correct error code
if (is_upload) {
fip->status = ERR_GIVEUP_UPLOAD;
} else {
fip->status = ERR_GIVEUP_DOWNLOAD;
}
// end the xfer so it will be deleted
xfer_done = true;
msg_printf(
fip->project, MSG_ERROR, "Giving up on %s of %s: %s",
is_upload?"upload":"download", fip->name, why
);
fip->error_msg = why;
// delete the associated file in the project directory
fip->delete_file();
} else {
if (is_upload) {
if (gstate.exit_before_upload) {
exit(0);
}
fxp->init_upload(*fip);
} else {
fxp->init_download(*fip);
}
}
}
// Handle a transfer failure
@ -277,7 +319,7 @@ void PERS_FILE_XFER::handle_xfer_failure() {
// If it is uploading and receives a HTTP_STATUS_NOT_FOUND then
// the file upload handler could not be found.
if (!fxp->is_upload) {
giveup("file was not found on server");
check_giveup("file was not found on server");
return;
} else {
retry_or_backoff();
@ -287,7 +329,7 @@ void PERS_FILE_XFER::handle_xfer_failure() {
// See if it's time to give up on the persistent file xfer
//
if ((now - first_request_time) > gstate.file_xfer_giveup_period) {
giveup("too much elapsed time");
check_giveup("too much elapsed time");
} else {
retry_or_backoff();
}
@ -307,13 +349,10 @@ void PERS_FILE_XFER::retry_or_backoff() {
// newtime = localtime(&now);
// Cycle to the next URL to try
//
fip->current_url = (fip->current_url + 1)%fip->urls.size();
// If we reach the URL that we started at, then we have tried all
// servers without success
//
if (fip->current_url == fip->start_url) {
if (fip->get_next_url(is_upload) == NULL) {
nretry++;
// Do an exponential backoff of e^nretry seconds,

View File

@ -69,7 +69,7 @@ public:
bool poll(time_t now);
void handle_xfer_failure();
void retry_or_backoff();
void giveup(char*);
void check_giveup(char*);
int write(MIOFILE& fout);
int parse(MIOFILE& fin);
int start_xfer();

View File

@ -585,6 +585,7 @@ int SCHEDULER_REPLY::parse(FILE* in, PROJECT* project) {
code_sign_key_signature = 0;
message_ack = false;
project_is_down = false;
send_file_list = false;
p = fgets(buf, 256, in);
if (!p) {
@ -693,7 +694,9 @@ int SCHEDULER_REPLY::parse(FILE* in, PROJECT* project) {
continue;
} else if (match_tag(buf, "<non_cpu_intensive/>")) {
continue;
} else if (strlen(buf)>1){
} else if (match_tag(buf, "<request_file_list/>")) {
send_file_list = true;
} else if (strlen(buf)>1){
scope_messages.printf("SCHEDULER_REPLY::parse(): unrecognized %s\n", buf);
}
}

View File

@ -113,6 +113,7 @@ struct SCHEDULER_REPLY {
char* code_sign_key_signature;
bool message_ack;
bool project_is_down;
bool send_file_list;
SCHEDULER_REPLY();
~SCHEDULER_REPLY();