Initial persistent file transfer implementation.

svn path=/trunk/boinc/; revision=274
This commit is contained in:
Eric Heien 2002-08-07 22:52:10 +00:00
parent bf9b213f18
commit e9aa34829e
20 changed files with 529 additions and 143 deletions

View File

@ -1443,6 +1443,7 @@ David A.
doc/
api.html
Barry Luong August 6, 2002
- Added team pages in html_user/
@ -1471,3 +1472,37 @@ Barry Luong August 6, 2002
team_remove_inactive_action.php
team_remove_inactive_form.php
Eric Heien August 7, 2002
- Added PERS_FILE_XFER (persistent file transfer)
functionality. This includes the notions of retrying
when unable to connect, exponential backoff, and giving
up after a period of time. Giving up is currently
not fully implemented. Includes initial work for
supporting upload/download resumption. All features
still need to be thoroughly tested.
- Added initial functionality to calculate allowable disk
usage.
client/
Makefile.in
client_state.C
client_state.h
client_types.C
client_types.h
cs_files.C
cs_scheduler.C
file_xfer.C
hostinfo_unix.C
http.C
http.h
pers_file_xfer.C
pers_file_xfer.h
scheduler_op.C
test_file_xfer.C
test_http.C
util.C
util.h
sched/
file_upload_handler.C

View File

@ -41,6 +41,7 @@ OBJS = \
log_flags.o \
net_stats.o \
net_xfer.o \
pers_file_xfer.o \
prefs.o \
scheduler_op.o \
speed_stats.o \

View File

@ -25,6 +25,7 @@
#include "error_numbers.h"
#include "file_names.h"
#include "filesys.h"
#include "hostinfo.h"
#include "log_flags.h"
#include "parse.h"
@ -40,10 +41,12 @@ CLIENT_STATE::CLIENT_STATE() {
net_xfers = new NET_XFER_SET;
http_ops = new HTTP_OP_SET(net_xfers);
file_xfers = new FILE_XFER_SET(http_ops);
pers_xfers = new PERS_FILE_XFER_SET(file_xfers);
scheduler_op = new SCHEDULER_OP(http_ops);
client_state_dirty = false;
exit_when_idle = false;
run_time_test = true;
giveup_after = PERS_GIVEUP;
contacted_sched_server = false;
activities_suspended = false;
version = VERSION;
@ -61,6 +64,9 @@ int CLIENT_STATE::init(PREFS* p) {
}
prefs = p;
// Initialize the random number generator
srand( clock() );
// copy all PROJECTs from the prefs to the client state.
//
for (i=0; i<p->projects.size(); i++) {
@ -108,6 +114,31 @@ int CLIENT_STATE::time_tests() {
return 0;
}
// Update the net_stats object, since it's private
//
void CLIENT_STATE::update_net_stats(bool is_upload, double nbytes, double nsecs) {
net_stats.update( is_upload, nbytes, nsecs );
}
// Insert an object into the file_xfers object, since it's private
//
int CLIENT_STATE::insert_file_xfer( FILE_XFER *fxp ) {
return file_xfers->insert(fxp);
}
// Return the maximum allowed disk usage as determined by user preferences.
// Since there are three different settings in the prefs, it returns the least
// of the three.
double CLIENT_STATE::allowed_disk_usage() {
double percent_space,min_val;
// Calculate allowed disk usage based on % pref
percent_space = host_info.d_total*prefs->disk_max_used_pct/100.0;
// Return the minimum of the three
return min(min(prefs->disk_max_used_gb, percent_space),min_val);
}
// See if (on the basis of user prefs) we should suspend activities.
// If so, suspend tasks
//
@ -136,6 +167,7 @@ int CLIENT_STATE::check_suspend_activities() {
// finite state machine abstraction of the client. Each of the key
// elements of the client is given a chance to perform work here.
// return true if something happened
// TODO: handle errors passed back up to here?
//
bool CLIENT_STATE::do_something() {
int nbytes;
@ -145,6 +177,12 @@ bool CLIENT_STATE::do_something() {
if (!activities_suspended) {
net_xfers->poll(999999, nbytes);
if (nbytes) action = true;
// If pers_xfers returns true, we've made a change to a
// persistent transfer which must be recorded in the
// client_state.xml file
if (pers_xfers->poll()) {
action = client_state_dirty = true;
}
action |= http_ops->poll();
action |= file_xfers->poll();
action |= active_tasks.poll();
@ -205,6 +243,11 @@ int CLIENT_STATE::parse_state_file() {
if (project) {
retval = link_file_info(project, fip);
if (!retval) file_infos.push_back(fip);
// Init PERS_FILE_XFER and push it onto pers_file_xfer stack
if (fip->pers_file_xfer) {
fip->pers_file_xfer->init( fip, fip->upload_when_present );
pers_xfers->insert( fip->pers_file_xfer );
}
} else {
delete fip;
}
@ -572,9 +615,11 @@ void CLIENT_STATE::print_counts() {
//
bool CLIENT_STATE::garbage_collect() {
unsigned int i;
PERS_FILE_XFER* pfxp;
FILE_INFO* fip;
RESULT* rp;
WORKUNIT* wup;
vector<PERS_FILE_XFER*>::iterator pers_iter;
vector<RESULT*>::iterator result_iter;
vector<WORKUNIT*>::iterator wu_iter;
vector<FILE_INFO*>::iterator fi_iter;
@ -590,6 +635,32 @@ bool CLIENT_STATE::garbage_collect() {
fip->ref_cnt = 0;
}
// delete PERS_FILE_XFERs that have finished and their
// associated FILE_INFO and FILE_XFER objects
//
pers_iter = pers_xfers->pers_file_xfers.begin();
while (pers_iter != pers_xfers->pers_file_xfers.end()) {
pfxp = *pers_iter;
if (pfxp->xfer_done) {
// TODO: *** Set the exit status of the related result
// to ERR_GIVEUP. The failure will be reported to the
// server and related file infos, results, and workunits
// will be deleted if necessary
pers_iter = pers_xfers->pers_file_xfers.erase(pers_iter);
if (pfxp->fip) {
pfxp->fip->pers_file_xfer = NULL;
}
delete pfxp;
// Update the client_state file
client_state_dirty = true;
action = true;
} else {
pers_iter++;
}
}
// delete RESULTs that have been finished and reported;
// reference-count files referred to by other results
//
@ -655,6 +726,7 @@ bool CLIENT_STATE::garbage_collect() {
}
// TODO: write no more often than X seconds
// Write the client_state.xml file if necessary
//
int CLIENT_STATE::write_state_file_if_needed() {
int retval;
@ -674,16 +746,21 @@ void CLIENT_STATE::parse_cmdline(int argc, char** argv) {
for (i=1; i<argc; i++) {
if (!strcmp(argv[i], "-exit_when_idle")) {
exit_when_idle = true;
continue;
}
continue;
}
if (!strcmp(argv[i], "-no_time_test")) {
run_time_test = false;
continue;
};
if (!strcmp(argv[i], "-exit_after")) {
exit_after = atoi(argv[i+1]);
continue;
};
exit_after = atoi(argv[i+1]);
continue;
};
// Give up on file transfers after x seconds. Default value is 1209600 (2 weeks)
if (!strcmp(argv[i], "-giveup_after")) {
giveup_after = atoi(argv[i+1]);
continue;
};
}
}

View File

@ -30,6 +30,7 @@
#include "http.h"
#include "net_stats.h"
#include "net_xfer.h"
#include "pers_file_xfer.h"
#include "prefs.h"
#include "scheduler_op.h"
#include "time_stats.h"
@ -45,6 +46,11 @@ public:
bool time_to_exit();
bool run_time_tests();
int time_tests();
double current_disk_usage();
double allowed_disk_usage();
void update_net_stats(bool is_upload, double nbytes, double nsecs);
int insert_file_xfer( FILE_XFER *fxp );
int giveup_after;
private:
vector<PROJECT*> projects;
@ -57,6 +63,7 @@ private:
int version;
char* platform_name;
NET_XFER_SET* net_xfers;
PERS_FILE_XFER_SET* pers_xfers;
HTTP_OP_SET* http_ops;
FILE_XFER_SET* file_xfers;
ACTIVE_TASK_SET active_tasks;
@ -100,6 +107,7 @@ private:
bool contacted_sched_server;
void compute_resource_debts();
public:
bool start_new_file_xfer();
PROJECT* next_project(PROJECT*);
PROJECT* next_project_master_pending();
double work_needed_secs();

View File

@ -23,6 +23,7 @@
#include "file_names.h"
#include "filesys.h"
#include "parse.h"
#include "pers_file_xfer.h"
#include "client_types.h"
@ -176,7 +177,9 @@ void PROJECT::copy_state_fields(PROJECT& p) {
hostid = p.hostid;
exp_avg_cpu = p.exp_avg_cpu;
exp_avg_mod_time = p.exp_avg_mod_time;
code_sign_key = strdup(p.code_sign_key);
if( p.code_sign_key ) {
code_sign_key = strdup(p.code_sign_key);
}
nrpc_failures = p.nrpc_failures;
min_rpc_time = p.min_rpc_time;
}
@ -231,6 +234,8 @@ FILE_INFO::~FILE_INFO() {
int FILE_INFO::parse(FILE* in, bool from_server) {
char buf[256];
STRING256 url;
PERS_FILE_XFER *pfxp;
int retval;
if(in==NULL) {
fprintf(stderr, "error: FILE_INFO.parse: unexpected NULL pointer in\n");
return ERR_NULL;
@ -246,10 +251,12 @@ int FILE_INFO::parse(FILE* in, bool from_server) {
upload_when_present = false;
sticky = false;
signature_required = false;
file_xfer = NULL;
pers_file_xfer = NULL;
result = NULL;
project = NULL;
urls.clear();
start_url = -1;
current_url = -1;
if (from_server) {
signed_xml = strdup("");
} else {
@ -285,6 +292,15 @@ int FILE_INFO::parse(FILE* in, bool from_server) {
else if (match_tag(buf, "<upload_when_present/>")) upload_when_present = true;
else if (match_tag(buf, "<sticky/>")) sticky = true;
else if (match_tag(buf, "<signature_required/>")) signature_required = true;
else if (match_tag(buf, "<persistent_file_xfer>")) {
pfxp = new PERS_FILE_XFER;
retval = pfxp->parse(in);
if (!retval) {
pers_file_xfer = pfxp;
} else {
delete pfxp;
}
}
else if (!from_server && match_tag(buf, "<signed_xml>")) {
dup_element_contents(in, "</signed_xml>", &signed_xml);
continue;
@ -322,6 +338,9 @@ int FILE_INFO::write(FILE* out, bool to_server) {
for (i=0; i<urls.size(); i++) {
fprintf(out, "<url>%s</url>\n", urls[i].text);
}
if (!to_server && pers_file_xfer) {
pers_file_xfer->write(out);
}
if (!to_server) {
if (signed_xml) {
fprintf(out, "<signed_xml>\n%s</signed_xml>\n", signed_xml);
@ -343,6 +362,24 @@ int FILE_INFO::delete_file() {
return file_delete(path);
}
// get the currently selected url to download/upload file, or
// select one if none is chosen yet
//
char* FILE_INFO::get_url() {
double temp;
if( current_url < 0 ) {
temp = rand();
temp *= urls.size();
temp /= RAND_MAX;
current_url = (int)temp;
start_url = current_url;
}
return urls[current_url].text;
}
// Parse XML based app_version information, usually from client_state.xml
//
int APP_VERSION::parse(FILE* in) {
char buf[256];
FILE_REF file_ref;

View File

@ -34,7 +34,7 @@
#define STDERR_MAX_LEN 4096
class FILE_XFER;
class PERS_FILE_XFER;
struct RESULT;
struct STRING256 {
@ -106,11 +106,13 @@ public:
bool upload_when_present;
bool sticky; // don't delete unless instructed to do so
bool signature_required; // true iff associated with app version
FILE_XFER* file_xfer; // nonzero if in the process of being up/downloaded
PERS_FILE_XFER* pers_file_xfer; // nonzero if in the process of being up/downloaded
RESULT* result; // for upload files (to authenticate)
PROJECT* project;
int ref_cnt;
vector<STRING256> urls;
int start_url;
int current_url;
char* signed_xml;
char* xml_signature;
char* file_signature;
@ -120,6 +122,7 @@ public:
int parse(FILE*, bool from_server);
int write(FILE*, bool to_server);
int delete_file(); // attempt to delete the underlying file
char* get_url();
};
// Describes a connection between a file and a workunit, result, or application.
@ -192,4 +195,6 @@ struct RESULT {
bool is_upload_done(); // files uploaded?
};
int verify_downloaded_file(char* pathname, FILE_INFO& file_info);
#endif

View File

@ -82,104 +82,36 @@ int verify_downloaded_file(char* pathname, FILE_INFO& file_info) {
// scan all FILE_INFOs.
// start downloads and uploads as needed.
// check for completion of existing transfers
//
bool CLIENT_STATE::start_file_xfers() {
unsigned int i;
FILE_INFO* fip;
FILE_XFER* fxp;
char pathname[256];
int retval;
PERS_FILE_XFER *pfx;
bool action = false;
for (i=0; i<file_infos.size(); i++) {
fip = file_infos[i];
fxp = fip->file_xfer;
if (!fip->generated_locally && !fip->file_present && !fxp) {
fxp = new FILE_XFER;
fxp->init_download(*fip);
retval = file_xfers->insert(fxp);
if (retval) {
fprintf(stderr,
"couldn't start download for %s: error %d\n",
fip->urls[0].text, retval
);
} else {
fip->file_xfer = fxp;
if (log_flags.file_xfer) {
printf(
"started download of %s\n",
fip->urls[0].text
);
}
}
pfx = fip->pers_file_xfer;
if (!fip->generated_locally && !fip->file_present && !pfx) {
// Set up the persistent file transfer object. This will start
// the download when there is available bandwidth
//
pfx = new PERS_FILE_XFER;
pfx->init( fip, false );
fip->pers_file_xfer = pfx;
// Pop PERS_FILE_XFER onto pers_file_xfer stack
if (fip->pers_file_xfer) pers_xfers->insert( fip->pers_file_xfer );
action = true;
} else if (
fip->upload_when_present && fip->file_present
&& !fip->uploaded && !fxp
) {
fxp = new FILE_XFER;
fxp->init_upload(*fip);
retval = file_xfers->insert(fxp);
if (retval) {
fprintf(stderr,
"couldn't start upload for %s: error %d\n",
fip->urls[0].text, retval
);
} else {
fip->file_xfer = fxp;
if (log_flags.file_xfer) {
printf("started upload to %s\n", fip->urls[0].text);
}
}
} else if ( fip->upload_when_present && fip->file_present && !fip->uploaded && !pfx ) {
// Set up the persistent file transfer object. This will start
// the upload when there is available bandwidth
//
pfx = new PERS_FILE_XFER;
pfx->init( fip, true );
fip->pers_file_xfer = pfx;
// Pop PERS_FILE_XFER onto pers_file_xfer stack
if (fip->pers_file_xfer) pers_xfers->insert( fip->pers_file_xfer );
action = true;
} else if (fxp) {
if (fxp->file_xfer_done) {
action = true;
if (log_flags.file_xfer) {
printf(
"file transfer done for %s; retval %d\n",
fip->urls[0].text, fxp->file_xfer_retval
);
}
file_xfers->remove(fxp);
fip->file_xfer = 0;
if (fip->generated_locally) {
if (fxp->file_xfer_retval == 0) {
net_stats.update(true, fip->nbytes, fxp->elapsed_time());
// file has been uploaded - delete if not sticky
if (!fip->sticky) {
fip->delete_file();
}
fip->uploaded = true;
}
} else {
if (fxp->file_xfer_retval == 0) {
net_stats.update(false, fip->nbytes, fxp->elapsed_time());
get_pathname(fip, pathname);
retval = verify_downloaded_file(pathname, *fip);
if (retval) {
fprintf(stderr,
"checksum or signature error for %s\n", fip->name
);
} else {
if (log_flags.file_xfer_debug) {
printf("MD5 checksum validated for %s\n", pathname);
}
if (fip->executable) {
retval = chmod(pathname, S_IEXEC|S_IREAD|S_IWRITE);
} else {
get_pathname(fip, pathname);
retval = chmod(pathname, S_IREAD|S_IWRITE);
}
fip->file_present = true;
}
}
}
client_state_dirty = true;
delete fxp;
}
}
}
return action;

View File

@ -47,6 +47,13 @@
#define EXP_DECAY_RATE (1./(3600*24*7))
#define SECONDS_IN_DAY 86400
// Decide whether to start a new file transfer
//
bool CLIENT_STATE::start_new_file_xfer() {
// **** this should do a little more than this
return true;
}
// estimate the days of work remaining
//
double CLIENT_STATE::current_water_days() {

View File

@ -44,6 +44,8 @@ FILE_XFER::~FILE_XFER() {
#if 0
// Is there any reason to keep this around?
int FILE_XFER::init_download(char* url, char* outfile) {
int file_size;
if(url==NULL) {
fprintf(stderr, "error: FILE_XFER.init_download: unexpected NULL pointer url\n");
return ERR_NULL;
@ -52,7 +54,11 @@ int FILE_XFER::init_download(char* url, char* outfile) {
fprintf(stderr, "error: FILE_XFER.init_download: unexpected NULL pointer outfile\n");
return ERR_NULL;
}
return HTTP_OP::init_get(url, outfile);
if (file_size(outfile, &file_size)) {
file_size = 0;
}
return HTTP_OP::init_get(url, outfile, file_size);
}
// Is there any reason to keep this around?
@ -72,7 +78,7 @@ int FILE_XFER::init_upload(char* url, char* infile) {
int FILE_XFER::init_download(FILE_INFO& file_info) {
fip = &file_info;
get_pathname(fip, pathname);
return HTTP_OP::init_get((char*)(&fip->urls[0]), pathname);
return HTTP_OP::init_get(fip->get_url(), pathname, false);
}
// for uploads, we need to build a header with xml_signature etc.
@ -96,7 +102,7 @@ int FILE_XFER::init_upload(FILE_INFO& file_info) {
file_info.xml_signature,
file_info.nbytes
);
return HTTP_OP::init_post2((char*)(&fip->urls[0]), header, pathname, 0);
return HTTP_OP::init_post2(fip->get_url(), header, pathname, 0);
}
// Returns the total time that the file xfer has taken
@ -153,7 +159,6 @@ int FILE_XFER_SET::remove(FILE_XFER* fxp) {
}
fprintf(stderr, "FILE_XFER_SET::remove(): not found\n");
return 1;
}
// Run through the FILE_XFER_SET and determine if any of the file
@ -176,7 +181,11 @@ bool FILE_XFER_SET::poll() {
if (fxp->http_op_retval == 200) {
fxp->file_xfer_retval = 0;
} else {
// Remove the transfer from the set. The actual object
// will be removed later by it's associated PERS_FILE_XFER
remove(fxp);
fxp->file_xfer_retval = fxp->http_op_retval;
i--;
}
}
}

View File

@ -232,9 +232,7 @@ int get_host_info2(HOST_INFO &host);
// General function to get all relevant host information
//
int get_host_info(HOST_INFO& host) {
int timezone; // seconds added to local time to get UTC
host.timezone = 0;
host.timezone = 0; // seconds added to local time to get UTC
strcpy(host.domain_name,"");
strcpy(host.serialnum,"");
strcpy(host.ip_addr,"");

View File

@ -76,7 +76,7 @@ static void parse_url(char* url, char* host, char* file) {
// Prints an HTTP 1.0 GET request header into buf
//
static void http_get_request_header(
char* buf, char* host, char* file, int offset
char* buf, char* host, char* file, double offset
) {
if(buf==NULL) {
fprintf(stderr, "error: http_get_request_header: unexpected NULL pointer buf\n");
@ -92,7 +92,7 @@ static void http_get_request_header(
}
if (offset) {
sprintf(buf,
"GET /%s;byte-range %d- HTTP/1.0\015\012"
"GET /%s;byte-range %12.0f- HTTP/1.0\015\012"
"User-Agent: BOINC client\015\012"
"Host: %s:80\015\012"
"Accept: */*\015\012"
@ -314,7 +314,7 @@ int HTTP_OP::init_head(char* url) {
// Initialize HTTP GET operation to url
//
int HTTP_OP::init_get(char* url, char* out, int off) {
int HTTP_OP::init_get(char* url, char* out, bool del_old_file, double off) {
if(url==NULL) {
fprintf(stderr, "error: HTTP_OP.init_get: unexpected NULL pointer url\n");
return ERR_NULL;
@ -327,6 +327,9 @@ int HTTP_OP::init_get(char* url, char* out, int off) {
fprintf(stderr, "error: HTTP_OP.init_get: negative off\n");
return ERR_NEG;
}
if (del_old_file) {
unlink(out);
}
file_offset = off;
parse_url(url, hostname, filename);
NET_XFER::init(hostname, 80, HTTP_BLOCKSIZE);
@ -556,7 +559,8 @@ bool HTTP_OP_SET::poll() {
htp->init_head( htp->hrh.redirect_location );
break;
case HTTP_OP_GET:
htp->init_get( htp->hrh.redirect_location, htp->outfile );
// *** Not sure if delete_old_file should be true
htp->init_get( htp->hrh.redirect_location, htp->outfile, true );
break;
case HTTP_OP_POST:
htp->init_post( htp->hrh.redirect_location, htp->infile, htp->outfile );
@ -580,13 +584,15 @@ bool HTTP_OP_SET::poll() {
htp->http_op_state = HTTP_STATE_DONE;
htp->http_op_retval = 0;
break;
case HTTP_OP_GET:
case HTTP_OP_POST:
htp->http_op_state = HTTP_STATE_REPLY_BODY;
// KLUDGE - should check for file first
unlink(htp->outfile);
case HTTP_OP_GET:
htp->http_op_state = HTTP_STATE_REPLY_BODY;
// TODO:
// Append to a file if it already exists, otherwise
// create a new one. init_get should have already
// deleted the file if necessary
htp->file = fopen(htp->outfile, "w");
if (!htp->file) {
fprintf(stderr,

View File

@ -62,7 +62,7 @@ public:
int http_op_retval;
int init_head(char* url);
int init_get(char* url, char* outfile, int offset=0);
int init_get(char* url, char* outfile, bool del_old_file, double offset=0);
int init_post(char* url, char* infile, char* outfile);
int init_post2(
char* url, char* req1, char* infile, double offset

View File

@ -8,7 +8,7 @@
// License for the specific language governing rights and limitations
// under the License.
//
// The Original Code is the Berkeley Open Network Computing.
// The Original Code is the Berkeley Open Infrastructure for Network Computing.
//
// The Initial Developer of the Original Code is the SETI@home project.
// Portions created by the SETI@home project are Copyright (C) 2002
@ -17,6 +17,20 @@
// Contributor(s):
//
#include <math.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include "client_state.h"
#include "client_types.h"
#include "error_numbers.h"
#include "file_names.h"
#include "log_flags.h"
#include "parse.h"
#include "util.h"
// PERS_FILE_XFER represents a persistent file transfer.
// A set of URL is given.
@ -27,47 +41,223 @@
// For upload, try to upload the file to the first URL;
// if that fails try the others.
//
int PERS_FILE_XFER::init(FILE_INFO* the_file, bool is_file_upload) {
fxp = NULL;
this->fip = the_file;
nretry = 0;
first_request_time = time(NULL);
next_request_time = first_request_time;
is_upload = is_file_upload;
pers_xfer_retval = -1;
xfer_done = false;
int PERS_FILE_XFER::init(FILE_INFO&, bool is_upload) {
return 0;
}
//
void PERS_FILE_XFER::try() {
// Try to start the file transfer associated with this persistent file transfer.
// Returns true if transfer was successfully started, false otherwise
//
bool PERS_FILE_XFER::start_xfer() {
FILE_XFER *file_xfer;
int retval;
// Decide whether to start a new file transfer
if(gstate.start_new_file_xfer()) {
// Create a new FILE_XFER object and initialize a
// download or upload for the persistent file transfer
file_xfer = new FILE_XFER;
if (is_upload) {
retval = file_xfer->init_upload(*fip);
} else {
retval = file_xfer->init_download(*fip);
}
if (!retval) {
retval = gstate.insert_file_xfer(file_xfer);
}
if (retval) {
fprintf(stderr, "couldn't start %s for %s: error %d\n",
(is_upload ? "upload" : "download"), fip->get_url(), retval);
} else {
fxp = file_xfer;
if (log_flags.file_xfer) {
printf("started %s of %s\n", (is_upload ? "upload" : "download"), fip->get_url());
}
return true;
}
}
return false;
}
void PERS_FILE_XFER::poll(unsigned int now) {
// Poll the status of this persistent file transfer. If it's time to start it, then
// attempt to start it. If it has finished or failed, then deal with it appropriately
//
int PERS_FILE_XFER::poll(unsigned int now) {
double exp_backoff;
int retval;
char pathname[256];
if (fxp) {
if (fxp->file_xfer_done) {
if (fxp->file_xfer_retval == 0) {
// The transfer finished with no errors. We will clean up the
// PERS_FILE_XFER object in garbage_collect() later
//
if (log_flags.file_xfer) {
printf( "file transfer done for %s; retval %d\n",
fip->get_url(), fxp->file_xfer_retval );
}
if (fip->generated_locally) {
// If the file was generated locally (for upload), update stats
// and delete the local copy of the file if needed
//
gstate.update_net_stats(true, fip->nbytes, fxp->elapsed_time());
// file has been uploaded - delete if not sticky
if (!fip->sticky) {
fip->delete_file();
}
fip->uploaded = true;
} else {
// Otherwise we downloaded the file. Update stats, verify
// the file with RSA or MD5, and change permissions
gstate.update_net_stats(false, fip->nbytes, fxp->elapsed_time());
get_pathname(fip, pathname);
retval = verify_downloaded_file(pathname, *fip);
if (retval) {
fprintf(stderr, "checksum or signature error for %s\n", fip->name);
} else {
if (log_flags.file_xfer_debug) {
printf("MD5 checksum validated for %s\n", pathname);
}
// Set the appropriate permissions depending on whether
// it's an executable or normal file
if (fip->executable) {
retval = chmod(pathname, S_IEXEC|S_IREAD|S_IWRITE);
} else {
get_pathname(fip, pathname);
retval = chmod(pathname, S_IREAD|S_IWRITE);
}
fip->file_present = true;
}
}
xfer_done = true;
pers_xfer_retval = 0;
} else {
// file xfer failed.
// Cycle to the next URL to try
fip->current_url = (fip->current_url + 1)%fip->urls.size();
// If we reach the URL that we started at, then we have tried all
// servers without success
if( fip->current_url == fip->start_url ) {
nretry++;
exp_backoff = exp(((double)rand()/(double)RAND_MAX)*nretry);
// Do an exponential backoff of e^nretry seconds, keeping within the bounds
// of PERS_RETRY_DELAY_MIN and PERS_RETRY_DELAY_MAX
next_request_time = now+(int)max(PERS_RETRY_DELAY_MIN,min(PERS_RETRY_DELAY_MAX,exp_backoff));
}
// See if it's time to give up on the persistent file xfer
//
diff = now - fip->last_xfer_time;
if (diff > PERS_GIVEUP) {
pers_xfer_done = true;
pers_file_xfer_retval = ERR_GIVEUP;
if ((now - first_request_time) > gstate.giveup_after) {
xfer_done = true;
pers_xfer_retval = ERR_GIVEUP;
}
}
// Disassociate the finished file transfer
fxp = NULL;
return true;
}
} else {
// No file xfer is active.
// We must be waiting after a failure.
// See if it's time to try again.
//
if (now > retry_time) {
try();
if (now >= (unsigned int)next_request_time) {
return start_xfer();
}
}
return false;
}
// Parse XML information about a single persistent file transfer
//
int PERS_FILE_XFER::parse(FILE* fin) {
char buf[256];
if(fin==NULL) {
fprintf(stderr, "error: PERS_FILE_XFER_SET.parse: unexpected NULL pointer fin\n");
return ERR_NULL;
}
while (fgets(buf, 256, fin)) {
if (match_tag(buf, "</persistent_file_xfer>")) return 0;
else if (parse_int(buf, "<num_retries>", nretry)) continue;
else if (parse_int(buf, "<first_request_time>", first_request_time)) continue;
else if (parse_int(buf, "<next_request_time>", next_request_time)) continue;
else fprintf(stderr, "PERS_FILE_XFER::parse(): unrecognized: %s\n", buf);
}
return -1;
}
// Write XML information about a particular persistent file transfer
//
int PERS_FILE_XFER::write(FILE* fout) {
if(fout==NULL) {
fprintf(stderr, "error: PERS_FILE_XFER_SET.write: unexpected NULL pointer fout\n");
return ERR_NULL;
}
fprintf(fout,
" <persistent_file_xfer>\n"
" <num_retries>%d</num_retries>\n"
" <first_request_time>%d</first_request_time>\n"
" <next_request_time>%d</next_request_time>\n"
" </persistent_file_xfer>\n",
nretry, first_request_time, next_request_time);
return 0;
}
PERS_FILE_XFER_SET::PERS_FILE_XFER_SET(FILE_XFER_SET* p) {
if(p==NULL) {
fprintf(stderr, "error: PERS_FILE_XFER_SET: unexpected NULL pointer p\n");
}
file_xfers = p;
}
int PERS_FILE_XFER_SET::poll() {
unsigned int ;
PERS_FILE_XFER* pfxp;
unsigned int i;
bool action = false;
int now = time(0);
int now = time(NULL);
for (i=0; i<pers_file_xfers.size(); i++) {
pers_file_xfers[i]->poll(now);
action |= pers_file_xfers[i]->poll(now);
}
return action;
}
// Insert a PERS_FILE_XFER object into the set. We will decide which ones to start
// when we hit the polling loop
//
int PERS_FILE_XFER_SET::insert(PERS_FILE_XFER* pfx) {
pers_file_xfers.push_back(pfx);
return 0;
}
// Remove a PERS_FILE_XFER object from the set. What should the action here be?
//
int PERS_FILE_XFER_SET::remove(PERS_FILE_XFER* pfx) {
vector<PERS_FILE_XFER*>::iterator iter;
iter = pers_file_xfers.begin();
while (iter != pers_file_xfers.end()) {
if (*iter == pfx) {
pers_file_xfers.erase(iter);
return 0;
}
iter++;
}
fprintf(stderr, "PERS_FILE_XFER_SET::remove(): not found\n");
return 1;
}

View File

@ -17,6 +17,10 @@
// Contributor(s):
//
#include <time.h>
#include "client_types.h"
#include "file_xfer.h"
// PERS_FILE_XFER represents a persistent file transfer.
// A set of URL is given in the FILE_INFO.
@ -28,26 +32,35 @@
// For upload, try to upload the file to the first URL;
// if that fails try the others.
#define PERS_RETRY_DELAY_MIN 60
#define PERS_RETRY_DELAY_MAX (256*60)
#define PERS_GIVEUP (3600*24*7)
#define PERS_RETRY_DELAY_MIN 60 // 1 minute
#define PERS_RETRY_DELAY_MAX (60*60*4) // 4 hours
#define PERS_GIVEUP (60*60*24*7*2) // 2 weeks
// give up on xfer if this time elapses since last byte xferred
class PERS_FILE_XFER {
int url_index; // which URL to try next
int nretry; // # of retries so far
FILE_INFO* fip;
int first_request_time; // UNIX time of first file request
int next_request_time; // UNIX time to next retry the file request
bool is_upload;
FILE_XFER* fxp; // nonzero if file xfer in progress
int retry_time; // don't retry until this time
int init(FILE_INFO&, bool is_upload);
public:
int pers_xfer_retval;
bool xfer_done;
FILE_XFER* fxp; // nonzero if file xfer in progress
FILE_INFO* fip;
int init(FILE_INFO*, bool is_file_upload);
int poll(unsigned int now);
int write(FILE* fout);
int parse(FILE* fin);
bool start_xfer();
};
class PERS_FILE_XFER_SET {
vector<PERS_FILE_XFER>pers_file_xfers;
FILE_XFER_SET* file_xfers;
public:
vector<PERS_FILE_XFER*>pers_file_xfers;
PERS_FILE_XFER_SET(FILE_XFER_SET*);
int insert(PERS_FILE_XFER*);
int remove(PERS_FILE_XFER*);

View File

@ -148,7 +148,7 @@ int SCHEDULER_OP::init_master_fetch(PROJECT* p) {
if (log_flags.sched_op_debug) {
printf("Fetching master file for %s\n", project->master_url);
}
retval = http_op.init_get(project->master_url, MASTER_FILE_NAME);
retval = http_op.init_get(project->master_url, MASTER_FILE_NAME, true);
if (retval) return retval;
retval = http_ops->insert(&http_op);
if (retval) return retval;

View File

@ -27,6 +27,13 @@
#define DOWNLOAD_URL "http://localhost.localdomain/download/input"
#define UPLOAD_URL "http://localhost.localdomain/boinc-cgi/file_upload_handler"
// Skeleton class to force compilation
class PERS_FILE_XFER {
public:
int write(FILE* fout);
int parse(FILE* fin);
};
int main() {
NET_XFER_SET nxs;
@ -107,3 +114,14 @@ int main() {
}
printf("all done\n");
}
// Skeleton function to force compilation
int PERS_FILE_XFER::parse(FILE* fin) {
return 0;
}
// Skeleton function to force compilation
int PERS_FILE_XFER::write(FILE* fout) {
return 0;
}

View File

@ -47,7 +47,7 @@ int main() {
#if 0
op1 = new HTTP_OP;
retval = op1->init_get("http://localhost.localdomain/my_index.html", "test_out1");
retval = op1->init_get("http://localhost.localdomain/my_index.html", "test_out1", true);
if (retval) {
printf("init_post: %d\n", retval);
exit(1);

View File

@ -44,6 +44,24 @@ void gettimeofday(timeval *t, void *tz) {
#include "util.h"
// return minimum of two double precision numbers
double min( double a, double b ) {
if (a<b) {
return a;
} else {
return b;
}
}
// return maximum of two double precision numbers
double max( double a, double b ) {
if (a>b) {
return a;
} else {
return b;
}
}
// return time of day as a double
//
double dtime() {

View File

@ -18,4 +18,7 @@
//
extern double dtime();
extern double min( double a, double b );
extern double max( double a, double b );
extern void boinc_sleep( int seconds );

View File

@ -1,4 +1,5 @@
// The input to this program looks like this:
// There are two possible inputs to this program.
// One is for uploading a completed result, this looks as follows:
//
// <file_info>
// ...
@ -11,17 +12,34 @@
// <data>
// ... (data)
//
// The return looks like
// The return for an uploaded result looks like
//
// <status>0</status>
// or
// <status>2</status>
// <error>bad file size</error>
//
// The other kind of input is a file size request. This is used
// to determine how much of a file has been uploaded already.
// This kind of request should always be made before starting
// a file upload.
// The input for this looks as follows:
// <file_size_req>result_1234_file</file_size_req>
//
// The return for this information request looks like
//
// <nbytes>1234</nbytes>
//
// Where nbytes will be 0 if the file doesn't exist
//
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <errno.h>
#include "parse.h"
#include "crypt.h"
@ -117,7 +135,7 @@ int copy_socket_to_file(FILE* in, char* path, double offset, double nbytes) {
int handle_request(FILE* in, R_RSA_PUBLIC_KEY& key) {
char buf[256];
double nbytes=0, offset=0;
char path[256];
char path[256],file_name[256];
FILE_INFO file_info;
int retval;
bool is_valid;
@ -139,6 +157,17 @@ int handle_request(FILE* in, R_RSA_PUBLIC_KEY& key) {
}
continue;
}
// Handle a file size request
else if (parse_str(buf, "<file_size_req>", file_name)) {
struct stat sbuf;
// TODO: put checking here to ensure path doesn't point somewhere bad
sprintf( path, "%s/%s", BOINC_UPLOAD_DIR, file_name );
retval = stat( path, &sbuf );
if (retval && errno != ENOENT) print_status( -1, "cannot open file" );
else if (retval) printf("Content-type: text/plain\n\n<nbytes>0</nbytes>\n");
else printf("Content-type: text/plain\n\n<nbytes>%d</nbytes>\n", (int)sbuf.st_size);
exit(0);
}
else if (parse_double(buf, "<offset>", offset)) continue;
else if (parse_double(buf, "<nbytes>", nbytes)) continue;
else if (match_tag(buf, "<data>")) {