From af0224bf6eaa4b9123bd855bdd47c1a2b90a5408 Mon Sep 17 00:00:00 2001 From: David Anderson Date: Wed, 26 Feb 2003 00:47:57 +0000 Subject: [PATCH] throughput limit and measurement svn path=/trunk/boinc/; revision=973 --- checkin_notes | 50 ++++++++++++- client/client_state.C | 69 ++++++----------- client/client_state.h | 4 +- client/cs_scheduler.C | 17 +++-- client/file_xfer.C | 3 +- client/http.C | 2 +- client/main.C | 6 +- client/net_stats.C | 113 +++++++++++++++------------- client/net_stats.h | 24 ++++-- client/net_xfer.C | 162 ++++++++++++++++++++++++++-------------- client/net_xfer.h | 22 +++++- client/pers_file_xfer.C | 60 ++++++++------- client/prefs.C | 14 +++- client/prefs.h | 6 +- client/scheduler_op.C | 4 +- client/test_file_xfer.C | 4 +- client/test_http.C | 4 +- client/test_net_xfer.C | 2 +- test/log_flags.xml | 1 + test/test.inc | 6 +- test/test_uc.php | 1 + 21 files changed, 358 insertions(+), 216 deletions(-) diff --git a/checkin_notes b/checkin_notes index b735906037..43015bfe81 100755 --- a/checkin_notes +++ b/checkin_notes @@ -3411,8 +3411,9 @@ David Feb 22 test.inc Seth Feb 24 - - changed windows graphics handling. client now has a child window in which all graphics - are displayed. when an app is running, this window sends a message to the app telling + - changed windows graphics handling. + client now has a child window in which all graphics are displayed. + when an app is running, this window sends a message to the app telling it to draw on the window. api/ @@ -3421,3 +3422,48 @@ Seth Feb 24 wingui.C,h wingui_mainwindow.C,h wingui_sswindow.C,h (new) + +David Feb 25 2003 + - Implemented network throughput limits, part of global prefs. + There are separate limits for upload and download. + Implementation: the NET_XFER_SET objects now has fields for + 1) the max # bytes/sec up and down (from global prefs) + 2) the limit on bytes that can be xferred this second + Can go negative. If it's negative, don't include + sockets in select(). + Replenish every second. + This replaces an implementation that didn't work + - Reimplemented the way network throughput is measured. + The old way measured throughput for each file xfer, and kept an average. + This doesn't work with multiple simultaneous xfers. + The new way uses a NET_STATS::poll() function. + If there has been an active file xfer during the last poll period + (new logic in NET_XFER_SET lets you find this out) + then you add the duration of the polling period, + and the number of bytes transferred. + The reported throughput is averaged with throughput + from previous sessions of the core client. + - Increased file xfer blocksize from 4KB to 16KB + - Got rid of dynamic buffer allocation in NET_XFER + - Renamed "water days" to "work buf days" everywhere + NOTE: + clock() measure calling process CPU time, not time of day. + Use time(0) if you want time of day. + + client/ + client_state.C,h + cs_scheduler.C + file_xfer.C + http.C + main.C + net_stats.C,h + net_xfer.C,h + pers_file_xfer.C + prefs.C,h + scheduler_op.C + test_file_xfer.C + test_http.C + test_net_xfer.C + test/ + test.inc + test_uc.php diff --git a/client/client_state.C b/client/client_state.C index ef62cb268b..7fa3d7d646 100644 --- a/client/client_state.C +++ b/client/client_state.C @@ -76,8 +76,6 @@ CLIENT_STATE::CLIENT_STATE() { platform_name = HOST; exit_after_app_start_secs = 0; app_started = 0; - max_transfer_rate = 999999999; - max_bytes = 0; user_idle = true; use_http_proxy = false; use_socks_proxy = false; @@ -92,6 +90,13 @@ CLIENT_STATE::CLIENT_STATE() { time_tests_id = 0; } +void CLIENT_STATE::install_global_prefs() { + net_xfers->max_bytes_sec_up = global_prefs.max_bytes_sec_up; + net_xfers->max_bytes_sec_down = global_prefs.max_bytes_sec_down; + net_xfers->bytes_left_up = global_prefs.max_bytes_sec_up; + net_xfers->bytes_left_down = global_prefs.max_bytes_sec_down; +} + int CLIENT_STATE::init() { int retval; unsigned int i; @@ -104,6 +109,7 @@ int CLIENT_STATE::init() { if (retval) { printf("No global preferences file; will use defaults.\n"); } + install_global_prefs(); // parse account files. // If there are none, prompt user for project URL and create file @@ -397,7 +403,6 @@ int CLIENT_STATE::net_sleep(double x) { // (in which case should call this again immediately) // bool CLIENT_STATE::do_something() { - int nbytes=0; bool action = false, x; if (check_time_tests() == TIME_TESTS_RUNNING) return false; @@ -405,20 +410,15 @@ bool CLIENT_STATE::do_something() { check_suspend_activities(); print_log("Polling; active layers:\n"); + net_stats.poll(*net_xfers); if (activities_suspended) { print_log("None (suspended)\n"); } else { // Call these functions in bottom to top order with // respect to the FSM hierarchy - if (max_bytes > 0) { - net_xfers->poll(max_bytes, nbytes); - } - if (nbytes) { - max_bytes -= nbytes; - action=true; - print_log("net_xfers\n"); - } + x = net_xfers->poll(); + if (x) { action=true; print_log("net_xfers\n"); } x = http_ops->poll(); if (x) {action=true; print_log("http_ops\n"); } @@ -454,13 +454,12 @@ bool CLIENT_STATE::do_something() { if (x) {action=true; print_log("update_results\n"); } if (write_state_file_if_needed()) { - fprintf(stderr, "CLIENT_STATE::do_something(): could not write state file"); + fprintf(stderr, "Couldn't write state file"); } } print_log("End poll\n"); if (!action) { time_stats.update(true, !activities_suspended); - max_bytes = max_transfer_rate; } return action; } @@ -1120,48 +1119,25 @@ void CLIENT_STATE::parse_cmdline(int argc, char** argv) { for (i=1; i global_prefs.high_water_days) return 0; - return (global_prefs.high_water_days - x)*SECONDS_PER_DAY; + double x = current_work_buf_days(); + if (x > global_prefs.work_buf_max_days) return 0; + return (global_prefs.work_buf_max_days - x)*SECONDS_IN_DAY; } // update exponentially-averaged CPU times of all projects @@ -267,15 +267,15 @@ bool CLIENT_STATE::some_project_rpc_ok() { bool CLIENT_STATE::scheduler_rpc_poll() { double work_secs; PROJECT* p; - bool action=false, below_low_water, should_get_work; + bool action=false, below_work_buf_min, should_get_work; switch(scheduler_op->state) { case SCHEDULER_OP_STATE_IDLE: if (exit_when_idle && contacted_sched_server) { should_get_work = false; } else { - below_low_water = (current_water_days() <= global_prefs.low_water_days); - should_get_work = below_low_water && some_project_rpc_ok(); + below_work_buf_min = (current_work_buf_days() <= global_prefs.work_buf_min_days); + should_get_work = below_work_buf_min && some_project_rpc_ok(); } if (should_get_work) { compute_resource_debts(); @@ -381,6 +381,7 @@ int CLIENT_STATE::handle_scheduler_reply( ); fclose(f); global_prefs.parse_file(); + install_global_prefs(); } // deal with project preferences (should always be there) diff --git a/client/file_xfer.C b/client/file_xfer.C index 71cd61c7bb..d665884879 100644 --- a/client/file_xfer.C +++ b/client/file_xfer.C @@ -176,10 +176,11 @@ bool FILE_XFER_SET::poll() { printf("http op done; retval %d\n", fxp->http_op_retval); } if (fxp->http_op_retval == 0) { + // If this was a file size query, restart the transfer // using the remote file size information + // if (fxp->file_size_query) { - // Parse the server's response. retval = fxp->parse_server_response(fxp->fip->upload_offset); if (retval) { diff --git a/client/http.C b/client/http.C index f29a1dd3c5..2f01bba15a 100644 --- a/client/http.C +++ b/client/http.C @@ -39,7 +39,7 @@ #include "log_flags.h" #include "http.h" -#define HTTP_BLOCKSIZE 4096 +#define HTTP_BLOCKSIZE 16384 // Breaks a url down into its server and file path components // TODO: deal with alternate protocols (ftp, gopher, etc) or disallow diff --git a/client/main.C b/client/main.C index c236b520b8..dabc06667b 100644 --- a/client/main.C +++ b/client/main.C @@ -74,6 +74,7 @@ int add_new_project() { int main(int argc, char** argv) { int retval; + double dt; setbuf(stdout, 0); if (lock_file(LOCK_FILE_NAME)) { @@ -87,11 +88,10 @@ int main(int argc, char** argv) { if (retval) exit(retval); while (1) { if (!gstate.do_something()) { - double x; gstate.net_sleep(1.); - x = 1; + dt = 1; //boinc_sleep(1); - if (log_flags.time_debug) printf("SLEPT %f SECONDS\n", x); + if (log_flags.time_debug) printf("SLEPT %f SECONDS\n", dt); fflush(stdout); } diff --git a/client/net_stats.C b/client/net_stats.C index 4675c24e6d..42bfa40901 100644 --- a/client/net_stats.C +++ b/client/net_stats.C @@ -17,55 +17,70 @@ // Contributor(s): // +// NET_STATS estimates average network throughput, +// i.e. the average total throughput in both the up and down directions. +// Here's how it works: NET_STATS::poll() is called every second or so. +// If there are any file transfers active, +// it increments elapsed time and byte counts, +// and maintains an exponential average of throughput. + #include #include "math.h" #include "parse.h" #include "time.h" +#include "util.h" #include "error_numbers.h" #include "net_stats.h" -// Is this a reasonable cutoff? -#define SMALL_FILE_CUTOFF 32000 +#define EXP_DECAY_RATE (1./SECONDS_PER_DAY) NET_STATS::NET_STATS() { - last_update_up = 0; - last_update_down = 0; - bwup = 0; - bwdown = 0; + last_time = 0; + memset(&up, 0, sizeof(up)); + memset(&down, 0, sizeof(down)); } -// Update network statistics -void NET_STATS::update(bool is_upload, double nbytes, double nsecs) { - double w1, w2, bw; - - // ignore small files since their transfer times don't - // reflect steady-state behavior - - if (nbytes < SMALL_FILE_CUTOFF) return; - - // The weight of the new item is a function of its size - - w1 = 1 - exp(-nbytes/1.e7); - w2 = 1 - w1; - bw = nbytes/nsecs; - - if (is_upload) { - if (!last_update_up) { - bwup = bw; - } else { - bwup = w1*bw + w2*bwup; - } - last_update_up = time(0); - } else { - if (!last_update_down) { - bwdown = bw; - } else { - bwdown = w1*bw + w2*bwdown; - } - last_update_down = time(0); +void NET_INFO::update(double dt, double nb, bool active) { + if (active) { + delta_t += dt; + delta_nbytes += nb-last_bytes; } + last_bytes = nb; +} + +double NET_INFO::throughput() { + double x, tp; + if (starting_throughput > 0) { + if (delta_t > 0) { + x = exp(delta_t*EXP_DECAY_RATE); + tp = delta_nbytes/delta_t; + return x*starting_throughput + (1-x)*tp; + } else { + return starting_throughput; + } + } else if (delta_t > 0) { + return delta_nbytes/delta_t; + } + return 0; +} + +void NET_STATS::poll(NET_XFER_SET& nxs) { + double t, dt; + bool upload_active, download_active; + + t = dtime(); + if (last_time == 0) { + dt = 0; + } else { + dt = t - last_time; + } + last_time = t; + + nxs.check_active(upload_active, download_active); + up.update(dt, nxs.bytes_up, upload_active); + down.update(dt, nxs.bytes_down, download_active); } // Write XML based network statistics @@ -74,18 +89,11 @@ int NET_STATS::write(FILE* out, bool to_server) { fprintf(out, "\n" " %f\n" - " %f\n", - bwup, - bwdown + " %f\n" + "\n", + up.throughput(), + down.throughput() ); - if (!to_server) { - fprintf(out, - " %d\n" - " %d\n", - last_update_up, last_update_down - ); - } - fprintf(out, "\n"); return 0; } @@ -93,14 +101,19 @@ int NET_STATS::write(FILE* out, bool to_server) { // int NET_STATS::parse(FILE* in) { char buf[256]; + double bwup, bwdown; memset(this, 0, sizeof(NET_STATS)); while (fgets(buf, 256, in)) { if (match_tag(buf, "")) return 0; - else if (parse_double(buf, "", bwup)) continue; - else if (parse_double(buf, "", bwdown)) continue; - else if (parse_int(buf, "", last_update_up)) continue; - else if (parse_int(buf, "", last_update_down)) continue; + else if (parse_double(buf, "", bwup)) { + up.starting_throughput = bwup; + continue; + } + else if (parse_double(buf, "", bwdown)) { + down.starting_throughput = bwdown; + continue; + } else fprintf(stderr, "NET_STATS::parse(): unrecognized: %s\n", buf); } return 1; diff --git a/client/net_stats.h b/client/net_stats.h index b756b69186..5645bf4445 100644 --- a/client/net_stats.h +++ b/client/net_stats.h @@ -22,17 +22,29 @@ // #include +#include "net_xfer.h" + +// there's one of these each for upload and download +// +struct NET_INFO { + double delta_t; // elapsed time of file transfer activity + // in this session of client + double delta_nbytes; // bytes transferred in this session + double last_bytes; + double starting_throughput; // throughput at start of session + + void update(double dt, double nb, bool active); + double throughput(); +}; class NET_STATS { - int last_update_up; - int last_update_down; public: - double bwup; - double bwdown; + double last_time; + NET_INFO up; + NET_INFO down; NET_STATS(); - void update(bool is_upload, double nbytes, double nsecs); - // report a file transfer + void poll(NET_XFER_SET&); int write(FILE*, bool to_server); int parse(FILE*); diff --git a/client/net_xfer.C b/client/net_xfer.C index 51ed3344b9..85541598fa 100644 --- a/client/net_xfer.C +++ b/client/net_xfer.C @@ -104,7 +104,7 @@ int NET_XFER::open_server() { NetClose(); #endif return -1; - } + } #ifdef _WIN32 unsigned long one = 1; @@ -128,13 +128,13 @@ int NET_XFER::open_server() { NetClose(); return -1; } - if (WSAAsyncSelect( fd, g_myWnd->GetSafeHwnd(), WM_TIMER, FD_READ|FD_WRITE )) { - errno = WSAGetLastError(); - if (errno != WSAEINPROGRESS && errno != WSAEWOULDBLOCK) { - closesocket(fd); - NetClose(); - return -1; - } + if (WSAAsyncSelect( fd, g_myWnd->GetSafeHwnd(), WM_TIMER, FD_READ|FD_WRITE )) { + errno = WSAGetLastError(); + if (errno != WSAEINPROGRESS && errno != WSAEWOULDBLOCK) { + closesocket(fd); + NetClose(); + return -1; + } } #else if (errno != EINPROGRESS) { @@ -152,7 +152,7 @@ int NET_XFER::open_server() { void NET_XFER::close_socket() { #ifdef _WIN32 - NetClose(); + NetClose(); if (socket) closesocket(socket); #else if (socket) close(socket); @@ -176,6 +176,17 @@ void NET_XFER::init(char* host, int p, int b) { last_speed_update = 0; } +NET_XFER_SET::NET_XFER_SET() { + max_bytes_sec_up = 0; + max_bytes_sec_down = 0; + bytes_left_up = 0; + bytes_left_down = 0; + bytes_up = 0; + bytes_down = 0; + up_active = false; + down_active = false; +} + // Insert a NET_XFER object into the set // int NET_XFER_SET::insert(NET_XFER* nxp) { @@ -205,46 +216,45 @@ int NET_XFER_SET::remove(NET_XFER* nxp) { } // Transfer data to/from active streams -// Nonblocking; keep doing I/O until would block. -// Transfer at most max_bytes bytes. -// TODO: implement other bandwidth constraints (ul/dl ratio, time of day) +// Nonblocking; keep doing I/O until would block, or we hit rate limits, +// or about .5 second goes by // -int NET_XFER_SET::poll(int max_bytes, int& bytes_transferred) { - int n, retval; +bool NET_XFER_SET::poll() { + double bytes_xferred; + int retval; struct timeval timeout; + time_t t = time(0); + bool action = false; - bytes_transferred = 0; while (1) { - timeout.tv_sec = timeout.tv_usec = 0; - retval = do_select(max_bytes, n, timeout); - if (retval) return retval; - if (n == 0) break; - max_bytes -= n; - bytes_transferred += n; - if (max_bytes < 0) break; + timeout.tv_sec = timeout.tv_usec = 0; + retval = do_select(bytes_xferred, timeout); + if (retval) break; + if (bytes_xferred == 0) break; + action = true; + if (time(0) != t) break; } - return 0; + return action; } // Wait at most x seconds for network I/O to become possible, -// then do some of it. +// then do a limited amount (one block per socket) of it. // int NET_XFER_SET::net_sleep(double x) { - int n, retval; + int retval; + double bytes_xferred; struct timeval timeout; timeout.tv_sec = (int)x; timeout.tv_usec = (int)(1000000*(x - (int)x)); - retval = do_select(100000000, n, timeout); + retval = do_select(bytes_xferred, timeout); return retval; } -// do a select and do I/O on as many sockets as possible. +// do a select and do I/O on as many sockets as possible, +// subject to rate limits // -int NET_XFER_SET::do_select( - int max_bytes, int& bytes_transferred, timeval& timeout -) { - struct timeval zeros; +int NET_XFER_SET::do_select(double& bytes_transferred, timeval& timeout) { int n, fd, retval; socklen_t i; NET_XFER *nxp; @@ -253,14 +263,23 @@ int NET_XFER_SET::do_select( #elif GETSOCKOPT_SOCKLEN_T socklen_t intsize = sizeof(int); #else - //int intsize = sizeof(int); socklen_t intsize = sizeof(int); #endif + time_t t = time(0); + if (t != last_time) { + last_time = t; + if (bytes_left_up < max_bytes_sec_up) { + bytes_left_up += max_bytes_sec_up; + } + if (bytes_left_down < max_bytes_sec_down) { + bytes_left_down += max_bytes_sec_down; + } + } + bytes_transferred = 0; fd_set read_fds, write_fds, error_fds; - memset(&zeros, 0, sizeof(zeros)); FD_ZERO(&read_fds); FD_ZERO(&write_fds); @@ -273,13 +292,20 @@ int NET_XFER_SET::do_select( if (!nxp->is_connected) { FD_SET(net_xfers[i]->socket, &write_fds); } else if (net_xfers[i]->want_download) { - FD_SET(net_xfers[i]->socket, &read_fds); + if (bytes_left_down > 0) { + FD_SET(net_xfers[i]->socket, &read_fds); + } else { + if (log_flags.net_xfer_debug) printf("Throttling download\n"); + } } else if (net_xfers[i]->want_upload) { - FD_SET(net_xfers[i]->socket, &write_fds); + if (bytes_left_up > 0) { + FD_SET(net_xfers[i]->socket, &write_fds); + } else { + if (log_flags.net_xfer_debug) printf("Throttling upload\n"); + } } FD_SET(net_xfers[i]->socket, &error_fds); } - //n = select(FD_SETSIZE, &read_fds, &write_fds, &error_fds, &zeros); n = select(FD_SETSIZE, &read_fds, &write_fds, &error_fds, &timeout); if (log_flags.net_xfer_debug) printf("select returned %d\n", n); if (n == 0) return 0; @@ -287,6 +313,8 @@ int NET_XFER_SET::do_select( // if got a descriptor, find the first one in round-robin order // and do I/O on it + // TODO: use round-robin order + // for (i=0; isocket; @@ -313,11 +341,17 @@ int NET_XFER_SET::do_select( bytes_transferred += 1; } } else if (nxp->do_file_io) { - if (max_bytes > 0) { - retval = nxp->do_xfer(n); - max_bytes -= n; - bytes_transferred += n; - nxp->update_speed(n); + retval = nxp->do_xfer(n); + nxp->update_speed(n); + bytes_transferred += n; + if (nxp->want_download) { + down_active = true; + bytes_left_down -= n; + bytes_down += n; + } else { + up_active = true; + bytes_left_up -= n; + bytes_up += n; } } else { nxp->io_ready = true; @@ -325,7 +359,11 @@ int NET_XFER_SET::do_select( } else if (FD_ISSET(fd, &error_fds)) { if (log_flags.net_xfer_debug) printf("got error on socket %d\n", fd); nxp = lookup_fd(fd); - nxp->got_error(); + if (nxp) { + nxp->got_error(); + } else { + fprintf(stderr, "do_select(): nxp not found\n"); + } } } return 0; @@ -346,11 +384,10 @@ NET_XFER* NET_XFER_SET::lookup_fd(int fd) { // int NET_XFER::do_xfer(int& nbytes_transferred) { int n, m, nleft, offset; - char* buf = (char*)malloc(blocksize); + char buf[MAX_BLOCKSIZE]; nbytes_transferred = 0; - if (!buf) return ERR_MALLOC; if (want_download) { #ifdef _WIN32 n = recv(socket, buf, blocksize, 0); @@ -363,11 +400,9 @@ int NET_XFER::do_xfer(int& nbytes_transferred) { if (n == 0) { io_done = true; want_download = false; - goto done; } else if (n < 0) { io_done = true; error = ERR_READ; - goto done; } else { nbytes_transferred += n; m = fwrite(buf, 1, n, file); @@ -375,7 +410,6 @@ int NET_XFER::do_xfer(int& nbytes_transferred) { fprintf(stdout, "Error: incomplete disk write\n"); io_done = true; error = ERR_FWRITE; - goto done; } } } else if (want_upload) { @@ -383,11 +417,11 @@ int NET_XFER::do_xfer(int& nbytes_transferred) { if (m == 0) { want_upload = false; io_done = true; - goto done; + return 0; } else if (m < 0) { io_done = true; error = ERR_FREAD; - goto done; + return 0; } nleft = m; offset = 0; @@ -403,19 +437,17 @@ int NET_XFER::do_xfer(int& nbytes_transferred) { if (n < 0) { error = ERR_WRITE; io_done = true; - goto done; + break; } else if (n < nleft) { fseek( file, n+nbytes_transferred-blocksize, SEEK_CUR ); nbytes_transferred += n; - goto done; + break; } nleft -= n; offset += n; nbytes_transferred += n; } } -done: - free(buf); return 0; } @@ -423,10 +455,10 @@ done: // Decay speed by 1/e every second // void NET_XFER::update_speed(int nbytes) { - clock_t now,delta_t; + time_t now, delta_t; double x; - now = clock(); + now = time(0); if (last_speed_update==0) last_speed_update = now; delta_t = now-last_speed_update; if (delta_t<=0) delta_t = 0; @@ -442,3 +474,23 @@ void NET_XFER::got_error() { printf("IO error on socket %d\n", socket); } } + +// return true if an upload is currently in progress +// or has been since the last call to this. +// Similar for download. +// +void NET_XFER_SET::check_active(bool& up, bool& down) { + unsigned int i; + NET_XFER* nxp; + + up = up_active; + down = down_active; + for (i=0; iis_connected && nxp->do_file_io) { + nxp->want_download?down=true:up=true; + } + } + up_active = false; + down_active = false; +} diff --git a/client/net_xfer.h b/client/net_xfer.h index 141ea9b87b..00399367ea 100644 --- a/client/net_xfer.h +++ b/client/net_xfer.h @@ -27,6 +27,8 @@ // The following classes provide an interface for polling // (non-blocking) network I/O. +#define MAX_BLOCKSIZE 16384 + // represents a network connection, either being accessed directly // or being transferred to/from a file // @@ -62,15 +64,31 @@ public: void got_error(); }; +// bandwidth limitation is implemented at this level, as follows: +// There are limits max_bytes_sec_up and max_bytes_sec_down. +// We keep track of the last time and bytes_left_up and bytes_left_down; +// Each second we reset these to zero. + class NET_XFER_SET { vector net_xfers; public: + NET_XFER_SET(); + double max_bytes_sec_up, max_bytes_sec_down; + // user-specified limits on throughput + double bytes_left_up, bytes_left_down; + // bytes left to transfer in the current second + double bytes_up, bytes_down; + // total bytes transferred + bool up_active, down_active; + // has there been transfer activity since last call to check_active()? + time_t last_time; int insert(NET_XFER*); int remove(NET_XFER*); - int poll(int max_bytes, int& bytes_transferred); + bool poll(); int net_sleep(double); - int do_select(int max_bytes, int& bytes_transferred, struct timeval& timeout); + int do_select(double& bytes_transferred, struct timeval& timeout); NET_XFER* lookup_fd(int); // lookup by fd + void check_active(bool&, bool&); }; #endif diff --git a/client/pers_file_xfer.C b/client/pers_file_xfer.C index 15bf04a967..b05c97901e 100644 --- a/client/pers_file_xfer.C +++ b/client/pers_file_xfer.C @@ -32,15 +32,14 @@ // PERS_FILE_XFER represents a persistent file transfer. // A set of URLs is given. - +// // For download, the object attempts to download the file // from any of the URLs. // If one fails or is not available, try another, // using an exponential backoff policy to avoid flooding servers. - +// // For upload, try to upload the file to the first URL; // if that fails try the others. -// PERS_FILE_XFER::PERS_FILE_XFER() { nretry = 0; @@ -65,11 +64,9 @@ bool PERS_FILE_XFER::start_xfer() { FILE_XFER *file_xfer; int retval; struct tm *newtime; - time_t aclock; + time_t now; + - time( &aclock ); /* Get time in seconds */ - - newtime = localtime( &aclock ); /* Convert time to struct */ // Decide whether to start a new file transfer // if (!gstate.start_new_file_xfer()) { @@ -103,14 +100,17 @@ bool PERS_FILE_XFER::start_xfer() { printf( "file_xfer insert failed\n" ); } fxp->file_xfer_retval = retval; - handle_xfer_failure(aclock); + handle_xfer_failure(now); fxp = NULL; return false; } if (log_flags.file_xfer) { + now = time(0); + newtime = localtime(&now); printf( "started %s of %s to %s at time: %s\n", - (is_upload ? "upload" : "download"), fip->name, fip->get_url(),asctime( newtime ) + (is_upload ? "upload" : "download"), fip->name, fip->get_url(), + asctime(newtime) ); } return true; @@ -143,7 +143,7 @@ bool PERS_FILE_XFER::poll(unsigned int now) { } } - if(dtime() - last_time <= 2) { + if (dtime() - last_time <= 2) { time_so_far += dtime() - last_time; } last_time = dtime(); @@ -157,25 +157,23 @@ bool PERS_FILE_XFER::poll(unsigned int now) { } if (fxp->file_xfer_retval == 0) { // The transfer finished with no errors. + // if (fip->generated_locally) { - // If the file was generated locally (for upload), update stats - // and delete the local copy of the file if needed - gstate.net_stats.update(true, fip->nbytes, time_so_far); - // file has been uploaded - delete if not sticky + // if (!fip->sticky) { fip->delete_file(); } fip->uploaded = true; xfer_done = true; } else { - // Otherwise we downloaded the file. Update stats, verify - // the file with RSA or MD5, and change permissions - gstate.net_stats.update(false, fip->nbytes, time_so_far); + + // verify the file with RSA or MD5, and change permissions + // get_pathname(fip, pathname); retval = verify_downloaded_file(pathname, *fip); if (retval) { - fprintf(stdout, "checksum or signature error for %s\n", fip->name); + printf("checksum or signature error for %s\n", fip->name); fip->status = retval; } else { if (log_flags.file_xfer_debug) { @@ -183,6 +181,7 @@ bool PERS_FILE_XFER::poll(unsigned int now) { } // Set the appropriate permissions depending on whether // it's an executable or normal file + // retval = fip->set_permissions(); fip->status = FILE_PRESENT; } @@ -191,7 +190,8 @@ bool PERS_FILE_XFER::poll(unsigned int now) { } else { handle_xfer_failure(now); } - // remove fxp from file_xfer_set here and deallocate it + // remove fxp from file_xfer_set and deallocate it + // gstate.file_xfers->remove(fxp); delete fxp; fxp = NULL; @@ -215,11 +215,11 @@ void PERS_FILE_XFER::handle_xfer_failure(unsigned int cur_time) { // See if it's time to give up on the persistent file xfer // if ((cur_time - first_request_time) > gstate.giveup_after) { - // Set the associated files status to a ERR_GIVEUP_DOWNLOAD and ERR_GIVEUP_UPLOAD failure - if(is_upload) + if (is_upload) { fip->status = ERR_GIVEUP_UPLOAD; - else + } else { fip->status = ERR_GIVEUP_DOWNLOAD; + } xfer_done = true; } if (log_flags.file_xfer_debug) { @@ -238,23 +238,29 @@ int PERS_FILE_XFER::retry_and_backoff(unsigned int cur_time) { time( &aclock ); /* Get time in seconds */ newtime = localtime( &aclock ); /* Convert time to struct */ + // Cycle to the next URL to try + // fip->current_url = (fip->current_url + 1)%fip->urls.size(); // If we reach the URL that we started at, then we have tried all // servers without success + // if (fip->current_url == fip->start_url) { nretry++; exp_backoff = exp(((double)rand()/(double)RAND_MAX)*nretry); - // Do an exponential backoff of e^nretry seconds, keeping - // within the bounds of PERS_RETRY_DELAY_MIN and + + // Do an exponential backoff of e^nretry seconds, + // keeping within the bounds of PERS_RETRY_DELAY_MIN and // PERS_RETRY_DELAY_MAX + // next_request_time = cur_time+(int)max(PERS_RETRY_DELAY_MIN,min(PERS_RETRY_DELAY_MAX,exp_backoff)); } if (log_flags.file_xfer_debug) { - printf( - "exponential back off is %d, current_time is %s\n", (int) exp_backoff,asctime( newtime ) - ); + printf( + "exponential back off is %d, current_time is %s\n", + (int) exp_backoff,asctime(newtime) + ); } return 0; } diff --git a/client/prefs.C b/client/prefs.C index c809dca212..72fff48686 100644 --- a/client/prefs.C +++ b/client/prefs.C @@ -40,12 +40,14 @@ GLOBAL_PREFS::GLOBAL_PREFS() { run_on_startup = false; confirm_before_connecting = false; hangup_if_dialed = false; - high_water_days = 3; - low_water_days = 1; + work_buf_max_days = 3; + work_buf_min_days = 1; disk_max_used_gb = 1; disk_max_used_pct = 0.5; disk_min_free_gb = 0.1; idle_time_to_run = 0; + max_bytes_sec_up = 1e9; + max_bytes_sec_down = 1e9; }; // Parse XML global prefs @@ -71,9 +73,9 @@ int GLOBAL_PREFS::parse(FILE* in) { } else if (match_tag(buf, "")) { run_on_startup = true; continue; - } else if (parse_double(buf, "", high_water_days)) { + } else if (parse_double(buf, "", work_buf_max_days)) { continue; - } else if (parse_double(buf, "", low_water_days)) { + } else if (parse_double(buf, "", work_buf_min_days)) { continue; } else if (parse_double(buf, "", disk_max_used_gb)) { continue; @@ -83,6 +85,10 @@ int GLOBAL_PREFS::parse(FILE* in) { continue; } else if (parse_double(buf, "", idle_time_to_run)) { continue; + } else if (parse_double(buf, "", max_bytes_sec_up)) { + continue; + } else if (parse_double(buf, "", max_bytes_sec_down)) { + continue; } } return ERR_XML_PARSE; diff --git a/client/prefs.h b/client/prefs.h index f5fad1a939..b53faa70d4 100644 --- a/client/prefs.h +++ b/client/prefs.h @@ -40,12 +40,14 @@ struct GLOBAL_PREFS { bool run_minimized; bool run_on_startup; bool hangup_if_dialed; - double high_water_days; - double low_water_days; + double work_buf_max_days; + double work_buf_min_days; double disk_max_used_gb; double disk_max_used_pct; double disk_min_free_gb; double idle_time_to_run; + double max_bytes_sec_up; + double max_bytes_sec_down; GLOBAL_PREFS(); int parse(FILE*); diff --git a/client/scheduler_op.C b/client/scheduler_op.C index 13ce3dc4aa..d90f12aff0 100644 --- a/client/scheduler_op.C +++ b/client/scheduler_op.C @@ -38,7 +38,7 @@ SCHEDULER_OP::SCHEDULER_OP(HTTP_OP_SET* h) { http_ops = h; } -// try to get enough work to bring us up to high-water mark +// try to get enough work to bring us up to max buffer level // int SCHEDULER_OP::init_get_work() { int retval; @@ -69,7 +69,7 @@ int SCHEDULER_OP::init_get_work() { } // report results for a particular project. -// also get work from that project if below high-water mark +// also get work from that project if below max buffer level // int SCHEDULER_OP::init_return_results(PROJECT* p, double ns) { must_get_work = false; diff --git a/client/test_file_xfer.C b/client/test_file_xfer.C index 3091fa39f3..bbbd997ed7 100644 --- a/client/test_file_xfer.C +++ b/client/test_file_xfer.C @@ -44,7 +44,7 @@ int main() { NET_XFER_SET nxs; HTTP_OP_SET hos(&nxs); FILE_XFER_SET fxs(&hos); - int retval, n; + int retval; FILE_XFER* fx1=0, *fx2 = 0; bool do_upload = true; bool do_download = false; @@ -102,7 +102,7 @@ int main() { } while (1) { - nxs.poll(100000, n); + nxs.poll(); hos.poll(); fxs.poll(); if (fx1 && fx1->file_xfer_done) { diff --git a/client/test_http.C b/client/test_http.C index 7b1d46034b..210b5576ed 100644 --- a/client/test_http.C +++ b/client/test_http.C @@ -37,7 +37,7 @@ int main() { NET_XFER_SET nxs; HTTP_OP_SET hos(&nxs); HTTP_OP *op1=0, *op2=0, *op3=0; - int retval, n; + int retval; #ifdef _WIN32 NetOpen(); @@ -71,7 +71,7 @@ int main() { hos.insert(op3); while (1) { - nxs.poll(100000, n); + nxs.poll(); hos.poll(); if (op1 && op1->http_op_done()) { printf("op1 done; status %d\n", op1->hrh.status); diff --git a/client/test_net_xfer.C b/client/test_net_xfer.C index db807d6631..07416af1a2 100644 --- a/client/test_net_xfer.C +++ b/client/test_net_xfer.C @@ -57,7 +57,7 @@ int main() { "\015\012"; while (1) { - nxs.poll(100000, n); + nxs.poll(); switch(nxp->net_xfer_state) { case UNCONNECTED: if(nxp->is_connected) { diff --git a/test/log_flags.xml b/test/log_flags.xml index 762c40f921..8fbec299e7 100644 --- a/test/log_flags.xml +++ b/test/log_flags.xml @@ -10,4 +10,5 @@ + diff --git a/test/test.inc b/test/test.inc index 2795e59351..cf70d4bda7 100644 --- a/test/test.inc +++ b/test/test.inc @@ -243,7 +243,11 @@ class Project { if ($user->project_prefs) { $pp = "\n$user->project_prefs\n"; } - db_query("insert into user values (0, $now, '$user->email_addr', '$user->name', 'foobar', '$user->authenticator', 'Peru', '12345', 0, 0, 0, '$user->global_prefs', '$pp', 0, 'home')"); + $gp = null; + if ($user->global_prefs) { + $gp = "\n$user->global_prefs\n"; + } + db_query("insert into user values (0, $now, '$user->email_addr', '$user->name', 'foobar', '$user->authenticator', 'Peru', '12345', 0, 0, 0, '$gp', '$pp', 0, 'home')"); } echo "adding apps\n"; diff --git a/test/test_uc.php b/test/test_uc.php index ffe7c86bd8..d482461783 100644 --- a/test/test_uc.php +++ b/test/test_uc.php @@ -21,6 +21,7 @@ $user = new User(); $user->project_prefs = "\nfoobar\n\n"; + $user->global_prefs = "400000\n"; $project->add_user($user); $project->install(); // must install projects before adding to hosts