Added ul/dl resumption, result states

svn path=/trunk/boinc/; revision=299
This commit is contained in:
Eric Heien 2002-08-12 21:54:19 +00:00
parent 6faddfaf49
commit ae95eb61c4
14 changed files with 310 additions and 133 deletions

View File

@ -1569,3 +1569,32 @@ Barry August 12, 2002
html_user/
index.php
Eric Heien August 12, 2002
- Added concept of result state. This replaces the old boolean
flags and represents what phase of the computation pipeline
each result is currently at (downloading, computing, uploading,
etc).
- Added file upload/download resumption. For downloads, this
involves checking how big the local file is, then asking
the server for the remainder of it. For uploads, this
involves asking the file_upload_handler how much of a certain
file it already has, then sending the remainder of it.
Both ul and dl resumption need to be thoroughly tested.
client/
client_state.C
client_state.h
client_types.C
client_types.h
cs_apps.C
cs_files.C
cs_scheduler.C
file_xfer.C
http.C
net_xfer.C
pers_file_xfer.C
scheduler_op.C
sched/
file_upload_handler.C

View File

@ -193,6 +193,7 @@ bool CLIENT_STATE::do_something() {
action |= start_apps();
action |= handle_running_apps();
action |= start_file_xfers();
action |= update_results();
write_state_file_if_needed();
}
if (!action) time_stats.update(true, !activities_suspended);
@ -244,6 +245,12 @@ int CLIENT_STATE::parse_state_file() {
if (project) {
retval = link_file_info(project, fip);
if (!retval) file_infos.push_back(fip);
// If the file had a failure before, there's no reason
// to start another file transfer
if (fip->status == FILE_FAILURE) {
if (fip->pers_file_xfer) delete fip->pers_file_xfer;
fip->pers_file_xfer = NULL;
}
// Init PERS_FILE_XFER and push it onto pers_file_xfer stack
if (fip->pers_file_xfer) {
fip->pers_file_xfer->init( fip, fip->upload_when_present );
@ -276,6 +283,7 @@ int CLIENT_STATE::parse_state_file() {
if (project) {
retval = link_result(project, rp);
if (!retval) results.push_back(rp);
rp->state = RESULT_NEW;
} else {
fprintf(stderr, "error: link_result failed\n");
delete rp;
@ -643,15 +651,16 @@ bool CLIENT_STATE::garbage_collect() {
while (pers_iter != pers_xfers->pers_file_xfers.end()) {
pfxp = *pers_iter;
if (pfxp->xfer_done) {
// TODO: *** Set the exit status of the related result
// to ERR_GIVEUP. The failure will be reported to the
// Set the status of the related file info to
// FILE_FAILURE. The failure will be reported to the
// server and related file infos, results, and workunits
// will be deleted if necessary
if (pfxp->pers_xfer_retval == ERR_GIVEUP) {
pfxp->fip->status = FILE_FAILURE;
}
pers_iter = pers_xfers->pers_file_xfers.erase(pers_iter);
if (pfxp->fip) {
pfxp->fip->pers_file_xfer = NULL;
}
pfxp->fip->pers_file_xfer = NULL;
delete pfxp;
// Update the client_state file
@ -668,7 +677,7 @@ bool CLIENT_STATE::garbage_collect() {
result_iter = results.begin();
while (result_iter != results.end()) {
rp = *result_iter;
if (rp->is_server_ack) {
if (rp->state == RESULT_SERVER_ACK) {
if (log_flags.state_debug) printf("deleting result %s\n", rp->name);
delete rp;
result_iter = results.erase(result_iter);
@ -676,7 +685,16 @@ bool CLIENT_STATE::garbage_collect() {
} else {
rp->wup->ref_cnt++;
for (i=0; i<rp->output_files.size(); i++) {
rp->output_files[i].file_info->ref_cnt++;
// If one of the file infos had a failure, mark the result
// as done and report the error. The result, workunits, and
// file infos will be cleaned up after the server is notified
if (rp->output_files[i].file_info->status == FILE_FAILURE) {
if (rp->state < RESULT_READY_TO_ACK) {
rp->state = RESULT_READY_TO_ACK;
}
} else {
rp->output_files[i].file_info->ref_cnt++;
}
}
result_iter++;
}
@ -726,6 +744,56 @@ bool CLIENT_STATE::garbage_collect() {
return action;
}
// update the state of results
//
bool CLIENT_STATE::update_results() {
RESULT* rp;
vector<RESULT*>::iterator result_iter;
bool action = false;
// delete RESULTs that have been finished and reported;
// reference-count files referred to by other results
//
result_iter = results.begin();
while (result_iter != results.end()) {
rp = *result_iter;
switch (rp->state) {
case RESULT_NEW:
if (input_files_available(rp)) {
rp->state = RESULT_FILES_DOWNLOADED;
action = true;
}
break;
case RESULT_FILES_DOWNLOADED:
// The transition to COMPUTE_DONE is performed
// in app_finished()
break;
case RESULT_COMPUTE_DONE:
// Once the computation has been done, check
// that the necessary files have been uploaded
// before moving on
if (rp->is_upload_done()) {
rp->state = RESULT_READY_TO_ACK;
action = true;
}
break;
case RESULT_READY_TO_ACK:
// The transition to SERVER_ACK is performed in
// handle_scheduler_reply()
break;
case RESULT_SERVER_ACK:
// The result has been received by the scheduling
// server. It will be deleted on the next
// garbage collection, which we trigger by
// setting action to true
action = true;
break;
}
result_iter++;
}
return action;
}
// TODO: write no more often than X seconds
// Write the client_state.xml file if necessary
//

View File

@ -100,6 +100,7 @@ private:
bool start_file_xfers();
void print_counts();
bool garbage_collect();
bool update_results();
// stuff related to scheduler RPCs
//

View File

@ -228,6 +228,16 @@ FILE_INFO::FILE_INFO() {
FILE_INFO::~FILE_INFO() {
}
int FILE_INFO::parse_server_response(char *buf) {
int status = -1;
parse_double(buf, "<nbytes>", upload_offset);
parse_int(buf, "<status>", status);
// TODO: decide what to do with error string
//if (!parse_str(buf, "<error>", upload_offset) ) return -1;
return status;
}
// If from server, make an exact copy of everything
// except the start/end tags and the <xml_signature> element.
//
@ -244,8 +254,9 @@ int FILE_INFO::parse(FILE* in, bool from_server) {
strcpy(md5_cksum, "");
max_nbytes = 0;
nbytes = 0;
upload_offset = -1;
generated_locally = false;
file_present = false;
status = FILE_NOT_PRESENT;
executable = false;
uploaded = false;
upload_when_present = false;
@ -286,7 +297,7 @@ int FILE_INFO::parse(FILE* in, bool from_server) {
else if (parse_double(buf, "<nbytes>", nbytes)) continue;
else if (parse_double(buf, "<max_nbytes>", max_nbytes)) continue;
else if (match_tag(buf, "<generated_locally/>")) generated_locally = true;
else if (match_tag(buf, "<file_present/>")) file_present = true;
else if (parse_int(buf, "<status>", status)) continue;
else if (match_tag(buf, "<executable/>")) executable = true;
else if (match_tag(buf, "<uploaded/>")) uploaded = true;
else if (match_tag(buf, "<upload_when_present/>")) upload_when_present = true;
@ -328,7 +339,7 @@ int FILE_INFO::write(FILE* out, bool to_server) {
);
if (!to_server) {
if (generated_locally) fprintf(out, " <generated_locally/>\n");
if (file_present) fprintf(out, " <file_present/>\n");
fprintf(out, " <status>%d</status>\n", status);
if (executable) fprintf(out, " <executable/>\n");
if (uploaded) fprintf(out, " <uploaded/>\n");
if (upload_when_present) fprintf(out, " <upload_when_present/>\n");
@ -547,8 +558,7 @@ void RESULT::clear() {
report_deadline = 0;
output_files.clear();
is_active = false;
is_compute_done = false;
is_server_ack = false;
state = RESULT_NEW;
final_cpu_time = 0;
exit_status = 0;
strcpy(stderr_out, "");
@ -602,6 +612,7 @@ int RESULT::parse_state(FILE* in) {
}
else if (parse_double(buf, "<final_cpu_time>", final_cpu_time)) continue;
else if (parse_int(buf, "<exit_status>", exit_status)) continue;
else if (parse_int(buf, "<state>", state)) continue;
else if (match_tag(buf, "<stderr_out>" )) {
while (fgets(buf, 256, in)) {
if (match_tag(buf, "</stderr_out>")) break;
@ -645,8 +656,10 @@ int RESULT::write(FILE* out, bool to_server) {
}
if (!to_server) {
fprintf(out,
" <state>%d</state>\n"
" <wu_name>%s</wu_name>\n"
" <report_deadline>%d</report_deadline>\n",
state,
wu_name,
report_deadline
);
@ -665,9 +678,9 @@ int RESULT::write(FILE* out, bool to_server) {
return 0;
}
// this is called only after is_compute_done is true.
// returns true if the result and it's associated files
// were successfully uploaded
// this is called only after the result state reaches
// COMPUTE_DONE is true. Returns true if the result
// and it's associated files were successfully uploaded
//
bool RESULT::is_upload_done() {
unsigned int i;

View File

@ -91,14 +91,19 @@ struct APP {
int write(FILE*);
};
#define FILE_NOT_PRESENT 0
#define FILE_PRESENT 1
#define FILE_FAILURE 2
class FILE_INFO {
public:
char name[256];
char md5_cksum[33];
double max_nbytes;
double nbytes;
double upload_offset;
bool generated_locally; // file is produced by app
bool file_present;
int status;
bool executable; // change file protections to make executable
bool uploaded; // file has been uploaded
bool upload_when_present;
@ -117,6 +122,7 @@ public:
FILE_INFO();
~FILE_INFO();
int parse_server_response(char*);
int parse(FILE*, bool from_server);
int write(FILE*, bool to_server);
int delete_file(); // attempt to delete the underlying file
@ -170,16 +176,21 @@ struct WORKUNIT {
int write(FILE*);
};
#define RESULT_NEW 0 // New result, files still need to be downloaded
#define RESULT_FILES_DOWNLOADED 1 // Files are downloaded, result can be computed
#define RESULT_COMPUTE_DONE 2 // Computation is done, files to be uploaded
#define RESULT_READY_TO_ACK 3 // Files are uploaded, notify scheduling server
#define RESULT_SERVER_ACK 4 // Received an ack from server, result is done
struct RESULT {
char name[256];
char wu_name[256];
int report_deadline;
vector<FILE_REF> output_files;
bool is_active; // an app is currently running for this
bool is_compute_done; // computation finished
bool is_server_ack; // ack received from scheduling server
double final_cpu_time;
int exit_status;
int state; // status of this result
int exit_status; // return value from the application
char stderr_out[STDERR_MAX_LEN];
APP* app;
WORKUNIT* wup;

View File

@ -38,7 +38,7 @@ int CLIENT_STATE::app_finished(ACTIVE_TASK& at) {
for (i=0; i<rp->output_files.size(); i++) {
fip = rp->output_files[i].file_info;
fip->file_present = true;
fip->status = FILE_PRESENT;
if (!fip->upload_when_present && !fip->sticky) {
fip->delete_file();
} else {
@ -48,7 +48,7 @@ int CLIENT_STATE::app_finished(ACTIVE_TASK& at) {
}
at.result->is_active = false;
at.result->is_compute_done = true;
at.result->state = RESULT_COMPUTE_DONE;
update_avg_cpu(at.result->project);
at.result->project->exp_avg_cpu += at.result->final_cpu_time;
return 0;
@ -95,12 +95,12 @@ bool CLIENT_STATE::input_files_available(RESULT* rp) {
avp = wup->avp;
for (i=0; i<avp->app_files.size(); i++) {
fip = avp->app_files[i].file_info;
if (!fip->file_present) return false;
if (fip->status != FILE_PRESENT) return false;
}
for (i=0; i<wup->input_files.size(); i++) {
fip = wup->input_files[i].file_info;
if (!fip->file_present) return false;
if (fip->status != FILE_PRESENT) return false;
}
return true;
}
@ -127,7 +127,7 @@ bool CLIENT_STATE::start_apps() {
// isn't done yet, the application isn't currently computing
// the result, and all the input files for the result are
// locally available
if (!rp->is_compute_done && !rp->is_active && input_files_available(rp)) {
if (rp->state == RESULT_FILES_DOWNLOADED && !rp->is_active ) {
if (log_flags.task_debug) {
printf("starting application for result %s\n", rp->name);
}

View File

@ -92,7 +92,7 @@ bool CLIENT_STATE::start_file_xfers() {
for (i=0; i<file_infos.size(); i++) {
fip = file_infos[i];
pfx = fip->pers_file_xfer;
if (!fip->generated_locally && !fip->file_present && !pfx) {
if (!fip->generated_locally && fip->status == FILE_NOT_PRESENT && !pfx) {
// Set up the persistent file transfer object. This will start
// the download when there is available bandwidth
//
@ -102,7 +102,7 @@ bool CLIENT_STATE::start_file_xfers() {
// Pop PERS_FILE_XFER onto pers_file_xfer stack
if (fip->pers_file_xfer) pers_xfers->insert( fip->pers_file_xfer );
action = true;
} else if ( fip->upload_when_present && fip->file_present && !fip->uploaded && !pfx ) {
} else if ( fip->upload_when_present && fip->status == FILE_PRESENT && !fip->uploaded && !pfx ) {
// Set up the persistent file transfer object. This will start
// the upload when there is available bandwidth
//

View File

@ -63,7 +63,7 @@ double CLIENT_STATE::current_water_days() {
for (i=0; i<results.size(); i++) {
rp = results[i];
if (rp->is_compute_done) continue;
if (rp->state > RESULT_COMPUTE_DONE) continue;
// TODO: subtract time already finished for WUs in progress
seconds_remaining += rp->wup->seconds_to_complete;
}
@ -244,10 +244,8 @@ int CLIENT_STATE::make_scheduler_request(PROJECT* p, double work_req) {
host_info.write(f);
for (i=0; i<results.size(); i++) {
rp = results[i];
if (rp->project == p && !rp->is_server_ack) {
if (rp->is_upload_done()) {
rp->write(f, true);
}
if (rp->project == p && rp->state == RESULT_READY_TO_ACK) {
rp->write(f, true);
}
}
fprintf(f, "</scheduler_request>\n");
@ -265,7 +263,9 @@ PROJECT* CLIENT_STATE::find_project_with_overdue_results() {
for (i=0; i<results.size(); i++) {
r = results[i];
if (r->is_compute_done && r->is_upload_done() && !r->is_server_ack) {
// If we've completed computation but haven't finished reporting the
// results to the server, return the project for this result
if (r->state == RESULT_READY_TO_ACK) {
if (r->project->min_rpc_time < now) {
return r->project;
}
@ -561,6 +561,7 @@ void CLIENT_STATE::handle_scheduler_reply(
*rp = sr.results[i];
retval = link_result(project, rp);
if (!retval) results.push_back(rp);
rp->state = RESULT_NEW;
}
}
@ -572,7 +573,7 @@ void CLIENT_STATE::handle_scheduler_reply(
printf("got ack for result %s\n", sr.result_acks[i].name);
}
if (rp) {
rp->is_server_ack = true;
rp->state = RESULT_SERVER_ACK;
} else {
fprintf(stderr,
"ERROR: got ack for result %s, can't find\n",

View File

@ -21,6 +21,7 @@
#include "util.h"
#include "file_names.h"
#include "filesys.h"
#include "log_flags.h"
#include "file_xfer.h"
#include "error_numbers.h"
@ -44,8 +45,6 @@ FILE_XFER::~FILE_XFER() {
#if 0
// Is there any reason to keep this around?
int FILE_XFER::init_download(char* url, char* outfile) {
int file_size;
if(url==NULL) {
fprintf(stderr, "error: FILE_XFER.init_download: unexpected NULL pointer url\n");
return ERR_NULL;
@ -55,9 +54,6 @@ int FILE_XFER::init_download(char* url, char* outfile) {
return ERR_NULL;
}
if (file_size(outfile, &file_size)) {
file_size = 0;
}
return HTTP_OP::init_get(url, outfile, file_size);
}
@ -76,9 +72,14 @@ int FILE_XFER::init_upload(char* url, char* infile) {
#endif
int FILE_XFER::init_download(FILE_INFO& file_info) {
int f_size;
fip = &file_info;
get_pathname(fip, pathname);
return HTTP_OP::init_get(fip->get_url(), pathname, false);
if (file_size(pathname, f_size)) {
f_size = 0;
}
return HTTP_OP::init_get(fip->get_url(), pathname, false, f_size);
}
// for uploads, we need to build a header with xml_signature etc.
@ -86,23 +87,38 @@ int FILE_XFER::init_download(FILE_INFO& file_info) {
// Do this in memory.
//
int FILE_XFER::init_upload(FILE_INFO& file_info) {
// If upload_offset < 0, we need to query the upload handler
// for the offset information
// TODO: give priority to URL of unfinished upload if there
// are multiple choices
//
fip = &file_info;
get_pathname(fip, pathname);
sprintf(header,
"<file_info>\n"
"%s"
"<xml_signature>\n"
"%s"
"</xml_signature>\n"
"</file_info>\n"
"<nbytes>%f</nbytes>\n"
"<offset>0</offset>\n"
"<data>\n",
file_info.signed_xml,
file_info.xml_signature,
file_info.nbytes
);
return HTTP_OP::init_post2(fip->get_url(), header, pathname, 0);
if (file_info.upload_offset < 0) {
sprintf(header,
"<file_size_req>%s</file_size_req>\n",
file_info.name
);
return HTTP_OP::init_post2(fip->get_url(), header, NULL, 0);
} else {
sprintf(header,
"<file_info>\n"
"%s"
"<xml_signature>\n"
"%s"
"</xml_signature>\n"
"</file_info>\n"
"<nbytes>%f</nbytes>\n"
"<offset>%.0f</offset>\n"
"<data>\n",
file_info.signed_xml,
file_info.xml_signature,
file_info.nbytes,
file_info.upload_offset
);
return HTTP_OP::init_post2(fip->get_url(), header, pathname, fip->upload_offset);
}
}
// Returns the total time that the file xfer has taken

View File

@ -69,11 +69,7 @@ static void parse_url(char* url, char* host, char* file) {
strcpy(host, buf);
}
// Note: HTTP 1.1 keeps connection open.
// We use 1.0 so we don't have to count bytes.
//
// Prints an HTTP 1.0 GET request header into buf
// Prints an HTTP 1.1 GET request header into buf
//
static void http_get_request_header(
char* buf, char* host, char* file, double offset
@ -92,19 +88,21 @@ static void http_get_request_header(
}
if (offset) {
sprintf(buf,
"GET /%s;byte-range %12.0f- HTTP/1.0\015\012"
"GET /%s HTTP/1.1\015\012"
"User-Agent: BOINC client\015\012"
"Host: %s:80\015\012"
"Range: bytes=%.0f-\015\012"
"Connection: close\015\012"
"Accept: */*\015\012"
"\015\012",
file, offset,
host
file, host, offset
);
} else {
sprintf(buf,
"GET /%s HTTP/1.0\015\012"
"GET /%s HTTP/1.1\015\012"
"User-Agent: BOINC client\015\012"
"Host: %s:80\015\012"
"Connection: close\015\012"
"Accept: */*\015\012"
"\015\012",
file,
@ -113,7 +111,7 @@ static void http_get_request_header(
}
}
// Prints an HTTP 1.0 HEAD request header into buf
// Prints an HTTP 1.1 HEAD request header into buf
//
static void http_head_request_header(char* buf, char* host, char* file) {
if(buf==NULL) {
@ -126,16 +124,17 @@ static void http_head_request_header(char* buf, char* host, char* file) {
fprintf(stderr, "error: http_head_request_header: unexpected NULL pointer file\n");
}
sprintf(buf,
"HEAD /%s HTTP/1.0\015\012"
"HEAD /%s HTTP/1.1\015\012"
"User-Agent: BOINC client\015\012"
"Host: %s:80\015\012"
"Connection: close\015\012"
"Accept: */*\015\012"
"\015\012",
file, host
);
}
// Prints an HTTP 1.0 POST request header into buf
// Prints an HTTP 1.1 POST request header into buf
//
static void http_post_request_header(
char* buf, char* host, char* file, int size
@ -153,10 +152,11 @@ static void http_post_request_header(
fprintf(stderr, "error: http_post_request_header: negative size\n");
}
sprintf(buf,
"POST /%s HTTP/1.0\015\012"
"POST /%s HTTP/1.1\015\012"
"Pragma: no-cache\015\012"
"Cache-Control: no-cache\015\012"
"Host: %s:80\015\012"
//"Connection: close\015\012"
"Content-Type: application/octet-stream\015\012"
"Content-Length: %d\015\012"
"\015\012",
@ -187,22 +187,24 @@ void http_put_request_header(
}
if (offset) {
sprintf(buf,
"PUT /%s;byte-range %d- HTTP/1.0\015\012"
"PUT /%s HTTP/1.1\015\012"
"Pragma: no-cache\015\012"
"Cache-Control: no-cache\015\012"
"Host: %s:80\015\012"
"Range: bytes=%d-\015\012"
"Connection: close\015\012"
"Content-Type: application/octet-stream\015\012"
"Content-Length: %d\015\012"
"\015\012",
file, offset,
host, size
file, host, offset, size
);
} else {
sprintf(buf,
"PUT /%s HTTP/1.0\015\012"
"PUT /%s HTTP/1.1\015\012"
"Pragma: no-cache\015\012"
"Cache-Control: no-cache\015\012"
"Host: %s:80\015\012"
"Connection: close\015\012"
"Content-Type: application/octet-stream\015\012"
"Content-Length: %d\015\012"
"\015\012",
@ -384,10 +386,6 @@ int HTTP_OP::init_post2(
fprintf(stderr, "error: HTTP_OP.init_post2: unexpected NULL pointer r1\n");
return ERR_NULL;
}
if(in==NULL) {
fprintf(stderr, "error: HTTP_OP.init_post2: unexpected NULL pointer in\n");
return ERR_NULL;
}
if(offset<0) {
fprintf(stderr, "error: HTTP_OP.init_post2: negative offset\n");
return ERR_NEG;
@ -395,11 +393,13 @@ int HTTP_OP::init_post2(
parse_url(url, hostname, filename);
NET_XFER::init(hostname, 80, HTTP_BLOCKSIZE);
req1 = r1;
strcpy(infile, in);
file_offset = offset;
retval = file_size(infile, content_length);
if (retval) return retval;
content_length -= (int)offset;
if (in) {
strcpy(infile, in);
file_offset = offset;
retval = file_size(infile, content_length);
if (retval) return retval;
content_length -= (int)offset;
}
content_length += strlen(req1);
http_op_type = HTTP_OP_POST2;
http_op_state = HTTP_STATE_CONNECTING;
@ -517,17 +517,24 @@ bool HTTP_OP_SET::poll() {
action = true;
n = send(htp->socket, htp->req1, strlen(htp->req1), 0);
htp->http_op_state = HTTP_STATE_REQUEST_BODY;
htp->file = fopen(htp->infile, "r");
if (!htp->file) {
fprintf(stderr, "HTTP_OP: no input file %s\n", htp->infile);
// If there's a file we also want to send, then start transferring
// it, otherwise, go on to the next step
if (htp->infile && strlen(htp->infile) > 0) {
htp->file = fopen(htp->infile, "r");
if (!htp->file) {
fprintf(stderr, "HTTP_OP: no input2 file %s\n", htp->infile);
htp->io_done = true;
htp->http_op_retval = ERR_FOPEN;
htp->http_op_state = HTTP_STATE_DONE;
break;
}
fseek(htp->file, (long)htp->file_offset, SEEK_SET);
htp->do_file_io = true;
} else {
htp->io_done = true;
htp->http_op_retval = ERR_FOPEN;
htp->http_op_state = HTTP_STATE_DONE;
break;
htp->do_file_io = false;
}
fseek(htp->file, (long)htp->file_offset, SEEK_SET);
htp->io_ready = false;
htp->do_file_io = true;
}
break;
case HTTP_STATE_REQUEST_BODY:
@ -551,7 +558,6 @@ bool HTTP_OP_SET::poll() {
read_http_reply_header(htp->socket, htp->hrh);
// TODO: handle all kinds of redirects here
if (htp->hrh.status == 301 || htp->hrh.status == 302) {
fprintf( stderr, "Redirect to %s\n", htp->hrh.redirect_location );
// Close the old socket
htp->close_socket();
switch (htp->http_op_type) {
@ -567,7 +573,7 @@ bool HTTP_OP_SET::poll() {
break;
case HTTP_OP_POST2:
// TODO: Change offset to correct value
htp->init_post2( htp->hrh.redirect_location, htp->req1, htp->infile,0 );
htp->init_post2( htp->hrh.redirect_location, htp->req1, htp->infile, 0 );
break;
}
// Open connection to the redirected server
@ -593,7 +599,7 @@ bool HTTP_OP_SET::poll() {
// Append to a file if it already exists, otherwise
// create a new one. init_get should have already
// deleted the file if necessary
htp->file = fopen(htp->outfile, "w");
htp->file = fopen(htp->outfile, "a");
if (!htp->file) {
fprintf(stderr,
"HTTP_OP: can't open output file %s\n",
@ -623,6 +629,7 @@ bool HTTP_OP_SET::poll() {
switch(htp->http_op_type) {
case HTTP_OP_POST2:
read_reply(htp->socket, htp->req1, 256);
// parse reply here?
break;
default:
action = true;

View File

@ -308,26 +308,26 @@ int NET_XFER::do_xfer(int& nbytes_transferred) {
#else
n = read(socket, buf, blocksize);
#endif
if (log_flags.net_xfer_debug) {
printf("read %d bytes from socket %d\n", n, socket);
}
if (n == 0) {
io_done = true;
want_download = false;
goto done;
} else if (n < 0) {
io_done = true;
error = ERR_READ;
goto done;
} else {
nbytes_transferred += n;
m = fwrite(buf, 1, n, file);
if (n != m) {
io_done = true;
error = ERR_FWRITE;
goto done;
}
if (log_flags.net_xfer_debug) {
printf("read %d bytes from socket %d\n", n, socket);
}
if (n == 0) {
io_done = true;
want_download = false;
goto done;
} else if (n < 0) {
io_done = true;
error = ERR_READ;
goto done;
} else {
nbytes_transferred += n;
m = fwrite(buf, 1, n, file);
if (n != m) {
io_done = true;
error = ERR_FWRITE;
goto done;
}
}
} else if (want_upload) {
m = fread(buf, 1, blocksize, file);
if (m == 0) {

View File

@ -110,16 +110,31 @@ bool PERS_FILE_XFER::poll(unsigned int now) {
fip->get_url(), fxp->file_xfer_retval );
}
if (fip->generated_locally) {
// If the file was generated locally (for upload), update stats
// and delete the local copy of the file if needed
//
gstate.update_net_stats(true, fip->nbytes, fxp->elapsed_time());
// If this was a file size query, redo the transfer with
// the information
if (fip->upload_offset<0) {
// Parse the server's response
// TODO: handle error response
fip->parse_server_response( fxp->req1 );
// file has been uploaded - delete if not sticky
if (!fip->sticky) {
fip->delete_file();
// Reset the file transfer so we start the actual transfer
fxp = NULL;
} else {
// Parse the server's response
// TODO: handle error response
fip->parse_server_response( fxp->req1 );
// If the file was generated locally (for upload), update stats
// and delete the local copy of the file if needed
//
gstate.update_net_stats(true, fip->nbytes, fxp->elapsed_time());
// file has been uploaded - delete if not sticky
if (!fip->sticky) {
fip->delete_file();
}
fip->uploaded = true;
}
fip->uploaded = true;
} else {
// Otherwise we downloaded the file. Update stats, verify
// the file with RSA or MD5, and change permissions
@ -144,24 +159,33 @@ bool PERS_FILE_XFER::poll(unsigned int now) {
retval = chmod(pathname, S_IREAD|S_IWRITE);
#endif
}
fip->file_present = true;
fip->status = FILE_PRESENT;
}
}
xfer_done = true;
pers_xfer_retval = 0;
} else {
// file xfer failed.
// Cycle to the next URL to try
fip->current_url = (fip->current_url + 1)%fip->urls.size();
// If it was a bad range request, delete the file and start over
if (fxp->file_xfer_retval == 416) {
fip->delete_file();
// TODO: remove fxp from file_xfer_set here and at other
// necessary places
fxp = NULL;
} else {
// Cycle to the next URL to try
fip->current_url = (fip->current_url + 1)%fip->urls.size();
// If we reach the URL that we started at, then we have tried all
// servers without success
if( fip->current_url == fip->start_url ) {
nretry++;
exp_backoff = exp(((double)rand()/(double)RAND_MAX)*nretry);
// Do an exponential backoff of e^nretry seconds, keeping within the bounds
// of PERS_RETRY_DELAY_MIN and PERS_RETRY_DELAY_MAX
next_request_time = now+(int)max(PERS_RETRY_DELAY_MIN,min(PERS_RETRY_DELAY_MAX,exp_backoff));
// If we reach the URL that we started at, then we have tried all
// servers without success
if( fip->current_url == fip->start_url ) {
nretry++;
exp_backoff = exp(((double)rand()/(double)RAND_MAX)*nretry);
// Do an exponential backoff of e^nretry seconds, keeping
// within the bounds of PERS_RETRY_DELAY_MIN and
// PERS_RETRY_DELAY_MAX
next_request_time = now+(int)max(PERS_RETRY_DELAY_MIN,min(PERS_RETRY_DELAY_MAX,exp_backoff));
}
}
// See if it's time to give up on the persistent file xfer

View File

@ -369,12 +369,16 @@ int SCHEDULER_REPLY::parse(FILE* in) {
code_sign_key_signature = 0;
p = fgets(buf, 256, in);
if (!match_tag(buf, "<scheduler_reply>")) {
// First part of content should either be tag (HTTP 1.0) or
// hex length of response (HTTP 1.1)
if (!match_tag(buf, "<scheduler_reply>") && !sscanf(buf,"%x",&retval)) {
fprintf(stderr, "SCHEDULER_REPLY::parse(): bad first tag %s\n", buf);
return ERR_XML_PARSE;
}
while (fgets(buf, 256, in)) {
if (match_tag(buf, "</scheduler_reply>")) {
if (match_tag(buf, "<scheduler_reply>")) {
// Do nothing
} else if (match_tag(buf, "</scheduler_reply>")) {
return 0;
} else if (parse_int(buf, "<hostid>", hostid)) {
continue;

View File

@ -28,6 +28,7 @@
//
// The return for this information request looks like
//
// <status>0</status>
// <nbytes>1234</nbytes>
//
// Where nbytes will be 0 if the file doesn't exist
@ -164,8 +165,10 @@ int handle_request(FILE* in, R_RSA_PUBLIC_KEY& key) {
sprintf( path, "%s/%s", BOINC_UPLOAD_DIR, file_name );
retval = stat( path, &sbuf );
if (retval && errno != ENOENT) print_status( -1, "cannot open file" );
else if (retval) printf("Content-type: text/plain\n\n<nbytes>0</nbytes>\n");
else printf("Content-type: text/plain\n\n<nbytes>%d</nbytes>\n", (int)sbuf.st_size);
else if (retval) printf("Content-type: text/plain\n\n<nbytes>0</nbytes>\n"
"<status>0</status>\n");
else printf("Content-type: text/plain\n\n<nbytes>%d</nbytes>\n"
"<status>0</status>\n", (int)sbuf.st_size);
exit(0);
}
else if (parse_double(buf, "<offset>", offset)) continue;