throughput limit and measurement

svn path=/trunk/boinc/; revision=973
This commit is contained in:
David Anderson 2003-02-26 00:47:57 +00:00
parent 049b42c00e
commit af0224bf6e
21 changed files with 358 additions and 216 deletions

View File

@ -3411,8 +3411,9 @@ David Feb 22
test.inc
Seth Feb 24
- changed windows graphics handling. client now has a child window in which all graphics
are displayed. when an app is running, this window sends a message to the app telling
- changed windows graphics handling.
client now has a child window in which all graphics are displayed.
when an app is running, this window sends a message to the app telling
it to draw on the window.
api/
@ -3421,3 +3422,48 @@ Seth Feb 24
wingui.C,h
wingui_mainwindow.C,h
wingui_sswindow.C,h (new)
David Feb 25 2003
- Implemented network throughput limits, part of global prefs.
There are separate limits for upload and download.
Implementation: the NET_XFER_SET objects now has fields for
1) the max # bytes/sec up and down (from global prefs)
2) the limit on bytes that can be xferred this second
Can go negative. If it's negative, don't include
sockets in select().
Replenish every second.
This replaces an implementation that didn't work
- Reimplemented the way network throughput is measured.
The old way measured throughput for each file xfer, and kept an average.
This doesn't work with multiple simultaneous xfers.
The new way uses a NET_STATS::poll() function.
If there has been an active file xfer during the last poll period
(new logic in NET_XFER_SET lets you find this out)
then you add the duration of the polling period,
and the number of bytes transferred.
The reported throughput is averaged with throughput
from previous sessions of the core client.
- Increased file xfer blocksize from 4KB to 16KB
- Got rid of dynamic buffer allocation in NET_XFER
- Renamed "water days" to "work buf days" everywhere
NOTE:
clock() measure calling process CPU time, not time of day.
Use time(0) if you want time of day.
client/
client_state.C,h
cs_scheduler.C
file_xfer.C
http.C
main.C
net_stats.C,h
net_xfer.C,h
pers_file_xfer.C
prefs.C,h
scheduler_op.C
test_file_xfer.C
test_http.C
test_net_xfer.C
test/
test.inc
test_uc.php

View File

@ -76,8 +76,6 @@ CLIENT_STATE::CLIENT_STATE() {
platform_name = HOST;
exit_after_app_start_secs = 0;
app_started = 0;
max_transfer_rate = 999999999;
max_bytes = 0;
user_idle = true;
use_http_proxy = false;
use_socks_proxy = false;
@ -92,6 +90,13 @@ CLIENT_STATE::CLIENT_STATE() {
time_tests_id = 0;
}
void CLIENT_STATE::install_global_prefs() {
net_xfers->max_bytes_sec_up = global_prefs.max_bytes_sec_up;
net_xfers->max_bytes_sec_down = global_prefs.max_bytes_sec_down;
net_xfers->bytes_left_up = global_prefs.max_bytes_sec_up;
net_xfers->bytes_left_down = global_prefs.max_bytes_sec_down;
}
int CLIENT_STATE::init() {
int retval;
unsigned int i;
@ -104,6 +109,7 @@ int CLIENT_STATE::init() {
if (retval) {
printf("No global preferences file; will use defaults.\n");
}
install_global_prefs();
// parse account files.
// If there are none, prompt user for project URL and create file
@ -397,7 +403,6 @@ int CLIENT_STATE::net_sleep(double x) {
// (in which case should call this again immediately)
//
bool CLIENT_STATE::do_something() {
int nbytes=0;
bool action = false, x;
if (check_time_tests() == TIME_TESTS_RUNNING) return false;
@ -405,20 +410,15 @@ bool CLIENT_STATE::do_something() {
check_suspend_activities();
print_log("Polling; active layers:\n");
net_stats.poll(*net_xfers);
if (activities_suspended) {
print_log("None (suspended)\n");
} else {
// Call these functions in bottom to top order with
// respect to the FSM hierarchy
if (max_bytes > 0) {
net_xfers->poll(max_bytes, nbytes);
}
if (nbytes) {
max_bytes -= nbytes;
action=true;
print_log("net_xfers\n");
}
x = net_xfers->poll();
if (x) { action=true; print_log("net_xfers\n"); }
x = http_ops->poll();
if (x) {action=true; print_log("http_ops\n"); }
@ -454,13 +454,12 @@ bool CLIENT_STATE::do_something() {
if (x) {action=true; print_log("update_results\n"); }
if (write_state_file_if_needed()) {
fprintf(stderr, "CLIENT_STATE::do_something(): could not write state file");
fprintf(stderr, "Couldn't write state file");
}
}
print_log("End poll\n");
if (!action) {
time_stats.update(true, !activities_suspended);
max_bytes = max_transfer_rate;
}
return action;
}
@ -1120,48 +1119,25 @@ void CLIENT_STATE::parse_cmdline(int argc, char** argv) {
for (i=1; i<argc; i++) {
if (!strcmp(argv[i], "-exit_when_idle")) {
exit_when_idle = true;
continue;
}
if (!strcmp(argv[i], "-no_time_test")) {
} else if (!strcmp(argv[i], "-no_time_test")) {
run_time_test = false;
continue;
}
if (!strcmp(argv[i], "-exit_after_app_start")) {
} else if (!strcmp(argv[i], "-exit_after_app_start")) {
exit_after_app_start_secs = atoi(argv[++i]);
continue;
}
if (!strcmp(argv[i], "-giveup_after")) {
} else if (!strcmp(argv[i], "-giveup_after")) {
giveup_after = atoi(argv[++i]);
continue;
}
if (!strcmp(argv[i], "-limit_transfer_rate")) {
max_transfer_rate = atoi(argv[++i]);
continue;
}
if (!strcmp(argv[i], "-min")) {
} else if (!strcmp(argv[i], "-min")) {
global_prefs.run_minimized = true;
continue;
}
// the above options are private (i.e. not shown by -help)
if (!strcmp(argv[i], "-update_prefs")) {
} else if (!strcmp(argv[i], "-update_prefs")) {
update_prefs = true;
}
if (!strcmp(argv[i], "-add_new_project")) {
} else if (!strcmp(argv[i], "-add_new_project")) {
add_new_project();
}
if (!strcmp(argv[i], "-version")) {
} else if (!strcmp(argv[i], "-version")) {
printf( "%.2f %s\n", MAJOR_VERSION+(MINOR_VERSION/100.0), HOST );
exit(0);
}
if (!strcmp(argv[i], "-help")) {
} else if (!strcmp(argv[i], "-help")) {
printf(
"Usage: %s [options]\n"
" -version show version info\n"
@ -1169,6 +1145,9 @@ void CLIENT_STATE::parse_cmdline(int argc, char** argv) {
argv[0]
);
exit(0);
} else {
printf("Unknown option: %s\n", argv[i]);
exit(1);
}
}
}

View File

@ -114,7 +114,6 @@ private:
// if nonzero, exit this many seconds after starting an app
time_t app_started;
// when the most recent app was started
int max_transfer_rate, max_bytes;
int parse_account_files();
int parse_state_file();
@ -139,6 +138,7 @@ private:
void print_summary();
bool garbage_collect();
bool update_results();
void install_global_prefs();
// stuff related to scheduler RPCs
//
@ -165,7 +165,7 @@ private:
bool scheduler_rpc_poll();
void update_avg_cpu(PROJECT*);
double estimate_duration(WORKUNIT*);
double current_water_days();
double current_work_buf_days();
// the following could be eliminated by using map instead of vector
//

View File

@ -47,7 +47,7 @@
// estimate the days of work remaining
//
double CLIENT_STATE::current_water_days() {
double CLIENT_STATE::current_work_buf_days() {
unsigned int i;
RESULT* rp;
double seconds_remaining=0;
@ -62,12 +62,12 @@ double CLIENT_STATE::current_water_days() {
return (seconds_remaining / SECONDS_PER_DAY);
}
// seconds of work needed to come up to high-water mark
// seconds of work needed to come up to the max buffer level
//
double CLIENT_STATE::work_needed_secs() {
double x = current_water_days();
if (x > global_prefs.high_water_days) return 0;
return (global_prefs.high_water_days - x)*SECONDS_PER_DAY;
double x = current_work_buf_days();
if (x > global_prefs.work_buf_max_days) return 0;
return (global_prefs.work_buf_max_days - x)*SECONDS_IN_DAY;
}
// update exponentially-averaged CPU times of all projects
@ -267,15 +267,15 @@ bool CLIENT_STATE::some_project_rpc_ok() {
bool CLIENT_STATE::scheduler_rpc_poll() {
double work_secs;
PROJECT* p;
bool action=false, below_low_water, should_get_work;
bool action=false, below_work_buf_min, should_get_work;
switch(scheduler_op->state) {
case SCHEDULER_OP_STATE_IDLE:
if (exit_when_idle && contacted_sched_server) {
should_get_work = false;
} else {
below_low_water = (current_water_days() <= global_prefs.low_water_days);
should_get_work = below_low_water && some_project_rpc_ok();
below_work_buf_min = (current_work_buf_days() <= global_prefs.work_buf_min_days);
should_get_work = below_work_buf_min && some_project_rpc_ok();
}
if (should_get_work) {
compute_resource_debts();
@ -381,6 +381,7 @@ int CLIENT_STATE::handle_scheduler_reply(
);
fclose(f);
global_prefs.parse_file();
install_global_prefs();
}
// deal with project preferences (should always be there)

View File

@ -176,10 +176,11 @@ bool FILE_XFER_SET::poll() {
printf("http op done; retval %d\n", fxp->http_op_retval);
}
if (fxp->http_op_retval == 0) {
// If this was a file size query, restart the transfer
// using the remote file size information
//
if (fxp->file_size_query) {
// Parse the server's response.
retval = fxp->parse_server_response(fxp->fip->upload_offset);
if (retval) {

View File

@ -39,7 +39,7 @@
#include "log_flags.h"
#include "http.h"
#define HTTP_BLOCKSIZE 4096
#define HTTP_BLOCKSIZE 16384
// Breaks a url down into its server and file path components
// TODO: deal with alternate protocols (ftp, gopher, etc) or disallow

View File

@ -74,6 +74,7 @@ int add_new_project() {
int main(int argc, char** argv) {
int retval;
double dt;
setbuf(stdout, 0);
if (lock_file(LOCK_FILE_NAME)) {
@ -87,11 +88,10 @@ int main(int argc, char** argv) {
if (retval) exit(retval);
while (1) {
if (!gstate.do_something()) {
double x;
gstate.net_sleep(1.);
x = 1;
dt = 1;
//boinc_sleep(1);
if (log_flags.time_debug) printf("SLEPT %f SECONDS\n", x);
if (log_flags.time_debug) printf("SLEPT %f SECONDS\n", dt);
fflush(stdout);
}

View File

@ -17,55 +17,70 @@
// Contributor(s):
//
// NET_STATS estimates average network throughput,
// i.e. the average total throughput in both the up and down directions.
// Here's how it works: NET_STATS::poll() is called every second or so.
// If there are any file transfers active,
// it increments elapsed time and byte counts,
// and maintains an exponential average of throughput.
#include <string.h>
#include "math.h"
#include "parse.h"
#include "time.h"
#include "util.h"
#include "error_numbers.h"
#include "net_stats.h"
// Is this a reasonable cutoff?
#define SMALL_FILE_CUTOFF 32000
#define EXP_DECAY_RATE (1./SECONDS_PER_DAY)
NET_STATS::NET_STATS() {
last_update_up = 0;
last_update_down = 0;
bwup = 0;
bwdown = 0;
last_time = 0;
memset(&up, 0, sizeof(up));
memset(&down, 0, sizeof(down));
}
// Update network statistics
void NET_STATS::update(bool is_upload, double nbytes, double nsecs) {
double w1, w2, bw;
// ignore small files since their transfer times don't
// reflect steady-state behavior
if (nbytes < SMALL_FILE_CUTOFF) return;
// The weight of the new item is a function of its size
w1 = 1 - exp(-nbytes/1.e7);
w2 = 1 - w1;
bw = nbytes/nsecs;
if (is_upload) {
if (!last_update_up) {
bwup = bw;
} else {
bwup = w1*bw + w2*bwup;
}
last_update_up = time(0);
} else {
if (!last_update_down) {
bwdown = bw;
} else {
bwdown = w1*bw + w2*bwdown;
}
last_update_down = time(0);
void NET_INFO::update(double dt, double nb, bool active) {
if (active) {
delta_t += dt;
delta_nbytes += nb-last_bytes;
}
last_bytes = nb;
}
double NET_INFO::throughput() {
double x, tp;
if (starting_throughput > 0) {
if (delta_t > 0) {
x = exp(delta_t*EXP_DECAY_RATE);
tp = delta_nbytes/delta_t;
return x*starting_throughput + (1-x)*tp;
} else {
return starting_throughput;
}
} else if (delta_t > 0) {
return delta_nbytes/delta_t;
}
return 0;
}
void NET_STATS::poll(NET_XFER_SET& nxs) {
double t, dt;
bool upload_active, download_active;
t = dtime();
if (last_time == 0) {
dt = 0;
} else {
dt = t - last_time;
}
last_time = t;
nxs.check_active(upload_active, download_active);
up.update(dt, nxs.bytes_up, upload_active);
down.update(dt, nxs.bytes_down, download_active);
}
// Write XML based network statistics
@ -74,18 +89,11 @@ int NET_STATS::write(FILE* out, bool to_server) {
fprintf(out,
"<net_stats>\n"
" <bwup>%f</bwup>\n"
" <bwdown>%f</bwdown>\n",
bwup,
bwdown
" <bwdown>%f</bwdown>\n"
"</net_stats>\n",
up.throughput(),
down.throughput()
);
if (!to_server) {
fprintf(out,
" <last_update_up>%d</last_update_up>\n"
" <last_update_down>%d</last_update_down>\n",
last_update_up, last_update_down
);
}
fprintf(out, "</net_stats>\n");
return 0;
}
@ -93,14 +101,19 @@ int NET_STATS::write(FILE* out, bool to_server) {
//
int NET_STATS::parse(FILE* in) {
char buf[256];
double bwup, bwdown;
memset(this, 0, sizeof(NET_STATS));
while (fgets(buf, 256, in)) {
if (match_tag(buf, "</net_stats>")) return 0;
else if (parse_double(buf, "<bwup>", bwup)) continue;
else if (parse_double(buf, "<bwdown>", bwdown)) continue;
else if (parse_int(buf, "<last_update_up>", last_update_up)) continue;
else if (parse_int(buf, "<last_update_down>", last_update_down)) continue;
else if (parse_double(buf, "<bwup>", bwup)) {
up.starting_throughput = bwup;
continue;
}
else if (parse_double(buf, "<bwdown>", bwdown)) {
down.starting_throughput = bwdown;
continue;
}
else fprintf(stderr, "NET_STATS::parse(): unrecognized: %s\n", buf);
}
return 1;

View File

@ -22,17 +22,29 @@
//
#include <stdio.h>
#include "net_xfer.h"
// there's one of these each for upload and download
//
struct NET_INFO {
double delta_t; // elapsed time of file transfer activity
// in this session of client
double delta_nbytes; // bytes transferred in this session
double last_bytes;
double starting_throughput; // throughput at start of session
void update(double dt, double nb, bool active);
double throughput();
};
class NET_STATS {
int last_update_up;
int last_update_down;
public:
double bwup;
double bwdown;
double last_time;
NET_INFO up;
NET_INFO down;
NET_STATS();
void update(bool is_upload, double nbytes, double nsecs);
// report a file transfer
void poll(NET_XFER_SET&);
int write(FILE*, bool to_server);
int parse(FILE*);

View File

@ -104,7 +104,7 @@ int NET_XFER::open_server() {
NetClose();
#endif
return -1;
}
}
#ifdef _WIN32
unsigned long one = 1;
@ -128,13 +128,13 @@ int NET_XFER::open_server() {
NetClose();
return -1;
}
if (WSAAsyncSelect( fd, g_myWnd->GetSafeHwnd(), WM_TIMER, FD_READ|FD_WRITE )) {
errno = WSAGetLastError();
if (errno != WSAEINPROGRESS && errno != WSAEWOULDBLOCK) {
closesocket(fd);
NetClose();
return -1;
}
if (WSAAsyncSelect( fd, g_myWnd->GetSafeHwnd(), WM_TIMER, FD_READ|FD_WRITE )) {
errno = WSAGetLastError();
if (errno != WSAEINPROGRESS && errno != WSAEWOULDBLOCK) {
closesocket(fd);
NetClose();
return -1;
}
}
#else
if (errno != EINPROGRESS) {
@ -152,7 +152,7 @@ int NET_XFER::open_server() {
void NET_XFER::close_socket() {
#ifdef _WIN32
NetClose();
NetClose();
if (socket) closesocket(socket);
#else
if (socket) close(socket);
@ -176,6 +176,17 @@ void NET_XFER::init(char* host, int p, int b) {
last_speed_update = 0;
}
NET_XFER_SET::NET_XFER_SET() {
max_bytes_sec_up = 0;
max_bytes_sec_down = 0;
bytes_left_up = 0;
bytes_left_down = 0;
bytes_up = 0;
bytes_down = 0;
up_active = false;
down_active = false;
}
// Insert a NET_XFER object into the set
//
int NET_XFER_SET::insert(NET_XFER* nxp) {
@ -205,46 +216,45 @@ int NET_XFER_SET::remove(NET_XFER* nxp) {
}
// Transfer data to/from active streams
// Nonblocking; keep doing I/O until would block.
// Transfer at most max_bytes bytes.
// TODO: implement other bandwidth constraints (ul/dl ratio, time of day)
// Nonblocking; keep doing I/O until would block, or we hit rate limits,
// or about .5 second goes by
//
int NET_XFER_SET::poll(int max_bytes, int& bytes_transferred) {
int n, retval;
bool NET_XFER_SET::poll() {
double bytes_xferred;
int retval;
struct timeval timeout;
time_t t = time(0);
bool action = false;
bytes_transferred = 0;
while (1) {
timeout.tv_sec = timeout.tv_usec = 0;
retval = do_select(max_bytes, n, timeout);
if (retval) return retval;
if (n == 0) break;
max_bytes -= n;
bytes_transferred += n;
if (max_bytes < 0) break;
timeout.tv_sec = timeout.tv_usec = 0;
retval = do_select(bytes_xferred, timeout);
if (retval) break;
if (bytes_xferred == 0) break;
action = true;
if (time(0) != t) break;
}
return 0;
return action;
}
// Wait at most x seconds for network I/O to become possible,
// then do some of it.
// then do a limited amount (one block per socket) of it.
//
int NET_XFER_SET::net_sleep(double x) {
int n, retval;
int retval;
double bytes_xferred;
struct timeval timeout;
timeout.tv_sec = (int)x;
timeout.tv_usec = (int)(1000000*(x - (int)x));
retval = do_select(100000000, n, timeout);
retval = do_select(bytes_xferred, timeout);
return retval;
}
// do a select and do I/O on as many sockets as possible.
// do a select and do I/O on as many sockets as possible,
// subject to rate limits
//
int NET_XFER_SET::do_select(
int max_bytes, int& bytes_transferred, timeval& timeout
) {
struct timeval zeros;
int NET_XFER_SET::do_select(double& bytes_transferred, timeval& timeout) {
int n, fd, retval;
socklen_t i;
NET_XFER *nxp;
@ -253,14 +263,23 @@ int NET_XFER_SET::do_select(
#elif GETSOCKOPT_SOCKLEN_T
socklen_t intsize = sizeof(int);
#else
//int intsize = sizeof(int);
socklen_t intsize = sizeof(int);
#endif
time_t t = time(0);
if (t != last_time) {
last_time = t;
if (bytes_left_up < max_bytes_sec_up) {
bytes_left_up += max_bytes_sec_up;
}
if (bytes_left_down < max_bytes_sec_down) {
bytes_left_down += max_bytes_sec_down;
}
}
bytes_transferred = 0;
fd_set read_fds, write_fds, error_fds;
memset(&zeros, 0, sizeof(zeros));
FD_ZERO(&read_fds);
FD_ZERO(&write_fds);
@ -273,13 +292,20 @@ int NET_XFER_SET::do_select(
if (!nxp->is_connected) {
FD_SET(net_xfers[i]->socket, &write_fds);
} else if (net_xfers[i]->want_download) {
FD_SET(net_xfers[i]->socket, &read_fds);
if (bytes_left_down > 0) {
FD_SET(net_xfers[i]->socket, &read_fds);
} else {
if (log_flags.net_xfer_debug) printf("Throttling download\n");
}
} else if (net_xfers[i]->want_upload) {
FD_SET(net_xfers[i]->socket, &write_fds);
if (bytes_left_up > 0) {
FD_SET(net_xfers[i]->socket, &write_fds);
} else {
if (log_flags.net_xfer_debug) printf("Throttling upload\n");
}
}
FD_SET(net_xfers[i]->socket, &error_fds);
}
//n = select(FD_SETSIZE, &read_fds, &write_fds, &error_fds, &zeros);
n = select(FD_SETSIZE, &read_fds, &write_fds, &error_fds, &timeout);
if (log_flags.net_xfer_debug) printf("select returned %d\n", n);
if (n == 0) return 0;
@ -287,6 +313,8 @@ int NET_XFER_SET::do_select(
// if got a descriptor, find the first one in round-robin order
// and do I/O on it
// TODO: use round-robin order
//
for (i=0; i<net_xfers.size(); i++) {
nxp = net_xfers[i];
fd = nxp->socket;
@ -313,11 +341,17 @@ int NET_XFER_SET::do_select(
bytes_transferred += 1;
}
} else if (nxp->do_file_io) {
if (max_bytes > 0) {
retval = nxp->do_xfer(n);
max_bytes -= n;
bytes_transferred += n;
nxp->update_speed(n);
retval = nxp->do_xfer(n);
nxp->update_speed(n);
bytes_transferred += n;
if (nxp->want_download) {
down_active = true;
bytes_left_down -= n;
bytes_down += n;
} else {
up_active = true;
bytes_left_up -= n;
bytes_up += n;
}
} else {
nxp->io_ready = true;
@ -325,7 +359,11 @@ int NET_XFER_SET::do_select(
} else if (FD_ISSET(fd, &error_fds)) {
if (log_flags.net_xfer_debug) printf("got error on socket %d\n", fd);
nxp = lookup_fd(fd);
nxp->got_error();
if (nxp) {
nxp->got_error();
} else {
fprintf(stderr, "do_select(): nxp not found\n");
}
}
}
return 0;
@ -346,11 +384,10 @@ NET_XFER* NET_XFER_SET::lookup_fd(int fd) {
//
int NET_XFER::do_xfer(int& nbytes_transferred) {
int n, m, nleft, offset;
char* buf = (char*)malloc(blocksize);
char buf[MAX_BLOCKSIZE];
nbytes_transferred = 0;
if (!buf) return ERR_MALLOC;
if (want_download) {
#ifdef _WIN32
n = recv(socket, buf, blocksize, 0);
@ -363,11 +400,9 @@ int NET_XFER::do_xfer(int& nbytes_transferred) {
if (n == 0) {
io_done = true;
want_download = false;
goto done;
} else if (n < 0) {
io_done = true;
error = ERR_READ;
goto done;
} else {
nbytes_transferred += n;
m = fwrite(buf, 1, n, file);
@ -375,7 +410,6 @@ int NET_XFER::do_xfer(int& nbytes_transferred) {
fprintf(stdout, "Error: incomplete disk write\n");
io_done = true;
error = ERR_FWRITE;
goto done;
}
}
} else if (want_upload) {
@ -383,11 +417,11 @@ int NET_XFER::do_xfer(int& nbytes_transferred) {
if (m == 0) {
want_upload = false;
io_done = true;
goto done;
return 0;
} else if (m < 0) {
io_done = true;
error = ERR_FREAD;
goto done;
return 0;
}
nleft = m;
offset = 0;
@ -403,19 +437,17 @@ int NET_XFER::do_xfer(int& nbytes_transferred) {
if (n < 0) {
error = ERR_WRITE;
io_done = true;
goto done;
break;
} else if (n < nleft) {
fseek( file, n+nbytes_transferred-blocksize, SEEK_CUR );
nbytes_transferred += n;
goto done;
break;
}
nleft -= n;
offset += n;
nbytes_transferred += n;
}
}
done:
free(buf);
return 0;
}
@ -423,10 +455,10 @@ done:
// Decay speed by 1/e every second
//
void NET_XFER::update_speed(int nbytes) {
clock_t now,delta_t;
time_t now, delta_t;
double x;
now = clock();
now = time(0);
if (last_speed_update==0) last_speed_update = now;
delta_t = now-last_speed_update;
if (delta_t<=0) delta_t = 0;
@ -442,3 +474,23 @@ void NET_XFER::got_error() {
printf("IO error on socket %d\n", socket);
}
}
// return true if an upload is currently in progress
// or has been since the last call to this.
// Similar for download.
//
void NET_XFER_SET::check_active(bool& up, bool& down) {
unsigned int i;
NET_XFER* nxp;
up = up_active;
down = down_active;
for (i=0; i<net_xfers.size(); i++) {
nxp = net_xfers[i];
if (nxp->is_connected && nxp->do_file_io) {
nxp->want_download?down=true:up=true;
}
}
up_active = false;
down_active = false;
}

View File

@ -27,6 +27,8 @@
// The following classes provide an interface for polling
// (non-blocking) network I/O.
#define MAX_BLOCKSIZE 16384
// represents a network connection, either being accessed directly
// or being transferred to/from a file
//
@ -62,15 +64,31 @@ public:
void got_error();
};
// bandwidth limitation is implemented at this level, as follows:
// There are limits max_bytes_sec_up and max_bytes_sec_down.
// We keep track of the last time and bytes_left_up and bytes_left_down;
// Each second we reset these to zero.
class NET_XFER_SET {
vector<NET_XFER*> net_xfers;
public:
NET_XFER_SET();
double max_bytes_sec_up, max_bytes_sec_down;
// user-specified limits on throughput
double bytes_left_up, bytes_left_down;
// bytes left to transfer in the current second
double bytes_up, bytes_down;
// total bytes transferred
bool up_active, down_active;
// has there been transfer activity since last call to check_active()?
time_t last_time;
int insert(NET_XFER*);
int remove(NET_XFER*);
int poll(int max_bytes, int& bytes_transferred);
bool poll();
int net_sleep(double);
int do_select(int max_bytes, int& bytes_transferred, struct timeval& timeout);
int do_select(double& bytes_transferred, struct timeval& timeout);
NET_XFER* lookup_fd(int); // lookup by fd
void check_active(bool&, bool&);
};
#endif

View File

@ -32,15 +32,14 @@
// PERS_FILE_XFER represents a persistent file transfer.
// A set of URLs is given.
//
// For download, the object attempts to download the file
// from any of the URLs.
// If one fails or is not available, try another,
// using an exponential backoff policy to avoid flooding servers.
//
// For upload, try to upload the file to the first URL;
// if that fails try the others.
//
PERS_FILE_XFER::PERS_FILE_XFER() {
nretry = 0;
@ -65,11 +64,9 @@ bool PERS_FILE_XFER::start_xfer() {
FILE_XFER *file_xfer;
int retval;
struct tm *newtime;
time_t aclock;
time_t now;
time( &aclock ); /* Get time in seconds */
newtime = localtime( &aclock ); /* Convert time to struct */
// Decide whether to start a new file transfer
//
if (!gstate.start_new_file_xfer()) {
@ -103,14 +100,17 @@ bool PERS_FILE_XFER::start_xfer() {
printf( "file_xfer insert failed\n" );
}
fxp->file_xfer_retval = retval;
handle_xfer_failure(aclock);
handle_xfer_failure(now);
fxp = NULL;
return false;
}
if (log_flags.file_xfer) {
now = time(0);
newtime = localtime(&now);
printf(
"started %s of %s to %s at time: %s\n",
(is_upload ? "upload" : "download"), fip->name, fip->get_url(),asctime( newtime )
(is_upload ? "upload" : "download"), fip->name, fip->get_url(),
asctime(newtime)
);
}
return true;
@ -143,7 +143,7 @@ bool PERS_FILE_XFER::poll(unsigned int now) {
}
}
if(dtime() - last_time <= 2) {
if (dtime() - last_time <= 2) {
time_so_far += dtime() - last_time;
}
last_time = dtime();
@ -157,25 +157,23 @@ bool PERS_FILE_XFER::poll(unsigned int now) {
}
if (fxp->file_xfer_retval == 0) {
// The transfer finished with no errors.
//
if (fip->generated_locally) {
// If the file was generated locally (for upload), update stats
// and delete the local copy of the file if needed
gstate.net_stats.update(true, fip->nbytes, time_so_far);
// file has been uploaded - delete if not sticky
//
if (!fip->sticky) {
fip->delete_file();
}
fip->uploaded = true;
xfer_done = true;
} else {
// Otherwise we downloaded the file. Update stats, verify
// the file with RSA or MD5, and change permissions
gstate.net_stats.update(false, fip->nbytes, time_so_far);
// verify the file with RSA or MD5, and change permissions
//
get_pathname(fip, pathname);
retval = verify_downloaded_file(pathname, *fip);
if (retval) {
fprintf(stdout, "checksum or signature error for %s\n", fip->name);
printf("checksum or signature error for %s\n", fip->name);
fip->status = retval;
} else {
if (log_flags.file_xfer_debug) {
@ -183,6 +181,7 @@ bool PERS_FILE_XFER::poll(unsigned int now) {
}
// Set the appropriate permissions depending on whether
// it's an executable or normal file
//
retval = fip->set_permissions();
fip->status = FILE_PRESENT;
}
@ -191,7 +190,8 @@ bool PERS_FILE_XFER::poll(unsigned int now) {
} else {
handle_xfer_failure(now);
}
// remove fxp from file_xfer_set here and deallocate it
// remove fxp from file_xfer_set and deallocate it
//
gstate.file_xfers->remove(fxp);
delete fxp;
fxp = NULL;
@ -215,11 +215,11 @@ void PERS_FILE_XFER::handle_xfer_failure(unsigned int cur_time) {
// See if it's time to give up on the persistent file xfer
//
if ((cur_time - first_request_time) > gstate.giveup_after) {
// Set the associated files status to a ERR_GIVEUP_DOWNLOAD and ERR_GIVEUP_UPLOAD failure
if(is_upload)
if (is_upload) {
fip->status = ERR_GIVEUP_UPLOAD;
else
} else {
fip->status = ERR_GIVEUP_DOWNLOAD;
}
xfer_done = true;
}
if (log_flags.file_xfer_debug) {
@ -238,23 +238,29 @@ int PERS_FILE_XFER::retry_and_backoff(unsigned int cur_time) {
time( &aclock ); /* Get time in seconds */
newtime = localtime( &aclock ); /* Convert time to struct */
// Cycle to the next URL to try
//
fip->current_url = (fip->current_url + 1)%fip->urls.size();
// If we reach the URL that we started at, then we have tried all
// servers without success
//
if (fip->current_url == fip->start_url) {
nretry++;
exp_backoff = exp(((double)rand()/(double)RAND_MAX)*nretry);
// Do an exponential backoff of e^nretry seconds, keeping
// within the bounds of PERS_RETRY_DELAY_MIN and
// Do an exponential backoff of e^nretry seconds,
// keeping within the bounds of PERS_RETRY_DELAY_MIN and
// PERS_RETRY_DELAY_MAX
//
next_request_time = cur_time+(int)max(PERS_RETRY_DELAY_MIN,min(PERS_RETRY_DELAY_MAX,exp_backoff));
}
if (log_flags.file_xfer_debug) {
printf(
"exponential back off is %d, current_time is %s\n", (int) exp_backoff,asctime( newtime )
);
printf(
"exponential back off is %d, current_time is %s\n",
(int) exp_backoff,asctime(newtime)
);
}
return 0;
}

View File

@ -40,12 +40,14 @@ GLOBAL_PREFS::GLOBAL_PREFS() {
run_on_startup = false;
confirm_before_connecting = false;
hangup_if_dialed = false;
high_water_days = 3;
low_water_days = 1;
work_buf_max_days = 3;
work_buf_min_days = 1;
disk_max_used_gb = 1;
disk_max_used_pct = 0.5;
disk_min_free_gb = 0.1;
idle_time_to_run = 0;
max_bytes_sec_up = 1e9;
max_bytes_sec_down = 1e9;
};
// Parse XML global prefs
@ -71,9 +73,9 @@ int GLOBAL_PREFS::parse(FILE* in) {
} else if (match_tag(buf, "<run_on_startup/>")) {
run_on_startup = true;
continue;
} else if (parse_double(buf, "<high_water_days>", high_water_days)) {
} else if (parse_double(buf, "<work_buf_max_days>", work_buf_max_days)) {
continue;
} else if (parse_double(buf, "<low_water_days>", low_water_days)) {
} else if (parse_double(buf, "<work_buf_min_days>", work_buf_min_days)) {
continue;
} else if (parse_double(buf, "<disk_max_used_gb>", disk_max_used_gb)) {
continue;
@ -83,6 +85,10 @@ int GLOBAL_PREFS::parse(FILE* in) {
continue;
} else if (parse_double(buf, "<idle_time_to_run>", idle_time_to_run)) {
continue;
} else if (parse_double(buf, "<max_bytes_sec_up>", max_bytes_sec_up)) {
continue;
} else if (parse_double(buf, "<max_bytes_sec_down>", max_bytes_sec_down)) {
continue;
}
}
return ERR_XML_PARSE;

View File

@ -40,12 +40,14 @@ struct GLOBAL_PREFS {
bool run_minimized;
bool run_on_startup;
bool hangup_if_dialed;
double high_water_days;
double low_water_days;
double work_buf_max_days;
double work_buf_min_days;
double disk_max_used_gb;
double disk_max_used_pct;
double disk_min_free_gb;
double idle_time_to_run;
double max_bytes_sec_up;
double max_bytes_sec_down;
GLOBAL_PREFS();
int parse(FILE*);

View File

@ -38,7 +38,7 @@ SCHEDULER_OP::SCHEDULER_OP(HTTP_OP_SET* h) {
http_ops = h;
}
// try to get enough work to bring us up to high-water mark
// try to get enough work to bring us up to max buffer level
//
int SCHEDULER_OP::init_get_work() {
int retval;
@ -69,7 +69,7 @@ int SCHEDULER_OP::init_get_work() {
}
// report results for a particular project.
// also get work from that project if below high-water mark
// also get work from that project if below max buffer level
//
int SCHEDULER_OP::init_return_results(PROJECT* p, double ns) {
must_get_work = false;

View File

@ -44,7 +44,7 @@ int main() {
NET_XFER_SET nxs;
HTTP_OP_SET hos(&nxs);
FILE_XFER_SET fxs(&hos);
int retval, n;
int retval;
FILE_XFER* fx1=0, *fx2 = 0;
bool do_upload = true;
bool do_download = false;
@ -102,7 +102,7 @@ int main() {
}
while (1) {
nxs.poll(100000, n);
nxs.poll();
hos.poll();
fxs.poll();
if (fx1 && fx1->file_xfer_done) {

View File

@ -37,7 +37,7 @@ int main() {
NET_XFER_SET nxs;
HTTP_OP_SET hos(&nxs);
HTTP_OP *op1=0, *op2=0, *op3=0;
int retval, n;
int retval;
#ifdef _WIN32
NetOpen();
@ -71,7 +71,7 @@ int main() {
hos.insert(op3);
while (1) {
nxs.poll(100000, n);
nxs.poll();
hos.poll();
if (op1 && op1->http_op_done()) {
printf("op1 done; status %d\n", op1->hrh.status);

View File

@ -57,7 +57,7 @@ int main() {
"\015\012";
while (1) {
nxs.poll(100000, n);
nxs.poll();
switch(nxp->net_xfer_state) {
case UNCONNECTED:
if(nxp->is_connected) {

View File

@ -10,4 +10,5 @@
<time_debug/>
<http_debug/>
<measurement_debug/>
<net_xfer_debug/>
</log_flags>

View File

@ -243,7 +243,11 @@ class Project {
if ($user->project_prefs) {
$pp = "<project_preferences>\n$user->project_prefs</project_preferences>\n";
}
db_query("insert into user values (0, $now, '$user->email_addr', '$user->name', 'foobar', '$user->authenticator', 'Peru', '12345', 0, 0, 0, '$user->global_prefs', '$pp', 0, 'home')");
$gp = null;
if ($user->global_prefs) {
$gp = "<global_preferences>\n$user->global_prefs</global_preferences>\n";
}
db_query("insert into user values (0, $now, '$user->email_addr', '$user->name', 'foobar', '$user->authenticator', 'Peru', '12345', 0, 0, 0, '$gp', '$pp', 0, 'home')");
}
echo "adding apps\n";

View File

@ -21,6 +21,7 @@
$user = new User();
$user->project_prefs = "<project_specific>\nfoobar\n</project_specific>\n";
$user->global_prefs = "<max_bytes_sec_down>400000</max_bytes_sec_down>\n";
$project->add_user($user);
$project->install(); // must install projects before adding to hosts