diff --git a/api/boinc_api.C b/api/boinc_api.C
index 6b5b862804..258561d75a 100644
--- a/api/boinc_api.C
+++ b/api/boinc_api.C
@@ -534,6 +534,7 @@ int boinc_wu_cpu_time(double& cpu_t) {
//
int suspend_activities() {
BOINCINFO("Received Suspend Message");
+ //fprintf(stderr, "suspending; %f %d\n", last_wu_cpu_time, options.direct_process_action);
#ifdef _WIN32
if (options.direct_process_action) {
// in Windows this is called from a separate "timer thread",
@@ -550,6 +551,7 @@ int suspend_activities() {
int resume_activities() {
BOINCINFO("Received Resume Message");
#ifdef _WIN32
+ //fprintf(stderr, "resuming; %f %d\n", last_wu_cpu_time, options.direct_process_action);
if (options.direct_process_action) {
// in Windows this is called from a separate "timer thread",
// and Windows lets us resume the worker thread
@@ -644,6 +646,7 @@ static void handle_trickle_down_msg() {
static void handle_process_control_msg() {
char buf[MSG_CHANNEL_SIZE];
if (app_client_shm->shm->process_control_request.get_msg(buf)) {
+ //fprintf(stderr, "%f: got %s\n", dtime(), buf);
if (match_tag(buf, "")) {
boinc_status.suspended = true;
suspend_activities();
@@ -693,6 +696,7 @@ static void worker_timer(int /*a*/) {
}
}
+ //fprintf(stderr, "worker_timer: in_critical_section %d\n", in_critical_section);
// handle messages from the core client
//
if (app_client_shm) {
diff --git a/checkin_notes b/checkin_notes
index a384fde8f2..90a293e3d6 100755
--- a/checkin_notes
+++ b/checkin_notes
@@ -12752,3 +12752,58 @@ Rom 20 Nov 2006
BOINCTaskBar.cpp
sg_CustomControls.cpp, .h
sg_ProjectsComponent.cpp
+
+David 20 Nov 2006
+ - core client: fix a bug in the shared-memory message passing code.
+ Each message channel has a one-message buffer in shared mem.
+ The send_msg() function checks if this is full.
+ If not it puts the message there.
+ If so it stores the message in a queue.
+ Once a second, a poll function moves a message
+ from the queue to the buffer (if it's empty).
+
+ What's wrong with this? (let's not always see the same hands).
+ Well, the send_msg() needs to handle the situation
+ where there's a message in the queue but none in the buffer.
+
+ This results in a bug in the CPU throttling mechanism
+ that can cause a task to sleep forever.
+ - core client: a SECOND significant bug in the shmem msg passing code.
+ In general, the API library consumes at most one message per second
+ from a given channel.
+ That means that if you write more than one message/sec
+ (even for a short period) it creates a queue that never goes away.
+ If you write more than one message/sec indefinitely,
+ it creates an unbounded queue.
+
+ At this point, the only channel that uses queuing and can
+ have more than one msg/sec is process control
+ (because of CPU throttling).
+ I put in kludge that handles this case:
+ if we want to send a and there's already
+ a in the queue,
+ remove the and don't send anything.
+ And conversely.
+ - core client: to debug all the above, added
+ and log flags.
+ Also added some commented-out printfs in the API library.
+
+ NOTE: in the course of debugging this I realized that, in the Windows
+ version of the API, and message result
+ in calls to SuspendThread() and ResumeThread(),
+ which inc and dec a "suspend counter".
+ ResumeThread() decrements the counter, and resumes the thread
+ only if the counter is then zero.
+ This makes the process-control message-passing scheme fragile.
+ If for some reason you send two s and a ,
+ the net result is suspended.
+ At some point we should change to API to do the right thing.
+
+ api/
+ boinc_api.C
+ client/
+ app.C
+ app_control.C
+ log_flags.C,h
+ lib/
+ app_ipc.h
diff --git a/client/app.C b/client/app.C
index 45ff53b69a..26e39ba340 100644
--- a/client/app.C
+++ b/client/app.C
@@ -577,22 +577,56 @@ int ACTIVE_TASK_SET::parse(MIOFILE& fin) {
}
void MSG_QUEUE::msg_queue_send(const char* msg, MSG_CHANNEL& channel) {
- if (channel.send_msg(msg)) {
- //msg_printf(NULL, MSG_INFO, "sent %s to %s", msg, name);
+ if ((msgs.size()==0) && channel.send_msg(msg)) {
+ if (log_flags.app_msg_send) {
+ msg_printf(NULL, MSG_INFO, "[app_msg_send] sent %s to %s", msg, name);
+ }
return;
}
+ if (log_flags.app_msg_send) {
+ msg_printf(NULL, MSG_INFO, "[app_msg_send] deferred %s to %s", msg, name);
+ }
msgs.push_back(std::string(msg));
}
void MSG_QUEUE::msg_queue_poll(MSG_CHANNEL& channel) {
if (msgs.size() > 0) {
+ if (log_flags.app_msg_send) {
+ msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: %d msgs queued", msgs.size());
+ }
if (channel.send_msg(msgs[0].c_str())) {
- //msg_printf(NULL, MSG_INFO, "sent %s to %s (delayed)", (msgs[0].c_str()), name);
+ if (log_flags.app_msg_send) {
+ msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: delayed sent %s to %s", (msgs[0].c_str()), name);
+ }
msgs.erase(msgs.begin());
- }
+ } else {
+ if (log_flags.app_msg_send) {
+ msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: still deferred: %s to %s", (msgs[0].c_str()), name);
+ }
+ }
}
}
+// delete any queued messages with the given string
+//
+int MSG_QUEUE::msg_queue_purge(const char* msg) {
+ vector::iterator iter = msgs.begin();
+ int count = 0;
+ while (iter != msgs.end()) {
+ if (!strcmp(msg, iter->c_str())) {
+ if (log_flags.app_msg_send) {
+ msg_printf(NULL, MSG_INFO, "[app_msg_send] purged %s", msg);
+ }
+ iter = msgs.erase(iter);
+ count++;
+ } else {
+ iter++;
+ }
+ }
+ return count;
+}
+
+
void ACTIVE_TASK_SET::report_overdue() {
unsigned int i;
ACTIVE_TASK* atp;
diff --git a/client/app_control.C b/client/app_control.C
index e18552b88b..62c2fa4f16 100644
--- a/client/app_control.C
+++ b/client/app_control.C
@@ -800,10 +800,16 @@ void ACTIVE_TASK_SET::kill_tasks(PROJECT* proj) {
//
int ACTIVE_TASK::suspend() {
if (!app_client_shm.shm) return 0;
- process_control_queue.msg_queue_send(
- "",
- app_client_shm.shm->process_control_request
- );
+ if (task_state != PROCESS_EXECUTING) {
+ msg_printf(0, MSG_INFO, "Internal error: expected process to be executing");
+ }
+ int n = process_control_queue.msg_queue_purge("");
+ if (n == 0) {
+ process_control_queue.msg_queue_send(
+ "",
+ app_client_shm.shm->process_control_request
+ );
+ }
task_state = PROCESS_SUSPENDED;
return 0;
}
@@ -812,13 +818,19 @@ int ACTIVE_TASK::suspend() {
//
int ACTIVE_TASK::unsuspend() {
if (!app_client_shm.shm) return 0;
+ if (task_state != PROCESS_SUSPENDED) {
+ msg_printf(0, MSG_INFO, "Internal error: expected process to be suspended");
+ }
if (log_flags.cpu_sched) {
msg_printf(0, MSG_INFO, "[cpu_sched] Resuming %s", result->name);
}
- process_control_queue.msg_queue_send(
- "",
- app_client_shm.shm->process_control_request
- );
+ int n = process_control_queue.msg_queue_purge("");
+ if (n == 0) {
+ process_control_queue.msg_queue_send(
+ "",
+ app_client_shm.shm->process_control_request
+ );
+ }
task_state = PROCESS_EXECUTING;
return 0;
}
@@ -849,9 +861,9 @@ bool ACTIVE_TASK::get_app_status_msg() {
if (!app_client_shm.shm->app_status.get_msg(msg_buf)) {
return false;
}
- if (log_flags.app_msg_debug) {
+ if (log_flags.app_msg_receive) {
msg_printf(NULL, MSG_INFO,
- "[app_msg_debug] slot %d msg: %s", slot, msg_buf
+ "[app_msg_receive] got msg from slot %d: %s", slot, msg_buf
);
}
want_network = 0;
diff --git a/client/log_flags.C b/client/log_flags.C
index 3e2f69a17e..c230830bb8 100644
--- a/client/log_flags.C
+++ b/client/log_flags.C
@@ -87,7 +87,8 @@ int LOG_FLAGS::parse(XML_PARSER& xp) {
else if (xp.parse_bool(tag, "poll_debug", poll_debug)) continue;
else if (xp.parse_bool(tag, "guirpc_debug", guirpc_debug)) continue;
else if (xp.parse_bool(tag, "scrsave_debug", scrsave_debug)) continue;
- else if (xp.parse_bool(tag, "app_msg_debug", app_msg_debug)) continue;
+ else if (xp.parse_bool(tag, "app_msg_send", app_msg_send)) continue;
+ else if (xp.parse_bool(tag, "app_msg_receive", app_msg_receive)) continue;
else if (xp.parse_bool(tag, "mem_usage_debug", mem_usage_debug)) continue;
else if (xp.parse_bool(tag, "network_status_debug", network_status_debug)) continue;
else {
@@ -138,7 +139,8 @@ void LOG_FLAGS::show() {
show_flag(buf, poll_debug, "poll_debug");
show_flag(buf, guirpc_debug, "guirpc_debug");
show_flag(buf, scrsave_debug, "scrsave_debug");
- show_flag(buf, app_msg_debug, "app_msg_debug");
+ show_flag(buf, app_msg_send, "app_msg_send");
+ show_flag(buf, app_msg_receive, "app_msg_receive");
show_flag(buf, mem_usage_debug, "mem_usage_debug");
show_flag(buf, network_status_debug, "network_status_debug");
if (strlen(buf)) {
diff --git a/client/log_flags.h b/client/log_flags.h
index 1d9855cd13..7e613c8b73 100644
--- a/client/log_flags.h
+++ b/client/log_flags.h
@@ -64,7 +64,8 @@ struct LOG_FLAGS {
bool poll_debug; // show what polls are responding
bool guirpc_debug;
bool scrsave_debug;
- bool app_msg_debug; // show shared-mem message from apps
+ bool app_msg_send; // show shared-mem message to apps
+ bool app_msg_receive; // show shared-mem message from apps
bool mem_usage_debug; // memory usage
bool network_status_debug;
diff --git a/lib/app_ipc.h b/lib/app_ipc.h
index 87dad4eb1c..c67ac92e4e 100755
--- a/lib/app_ipc.h
+++ b/lib/app_ipc.h
@@ -109,6 +109,7 @@ struct MSG_QUEUE {
char name[256];
void msg_queue_send(const char*, MSG_CHANNEL& channel);
void msg_queue_poll(MSG_CHANNEL& channel);
+ int msg_queue_purge(const char*);
};
#define DEFAULT_FRACTION_DONE_UPDATE_PERIOD 1