From ecc969a35104dbe1aa0d470d709cb727f1dc90cb Mon Sep 17 00:00:00 2001 From: David Anderson Date: Tue, 21 Nov 2006 00:45:22 +0000 Subject: [PATCH] *** empty log message *** svn path=/trunk/boinc/; revision=11558 --- api/boinc_api.C | 4 ++++ checkin_notes | 55 ++++++++++++++++++++++++++++++++++++++++++++ client/app.C | 42 +++++++++++++++++++++++++++++---- client/app_control.C | 32 ++++++++++++++++++-------- client/log_flags.C | 6 +++-- client/log_flags.h | 3 ++- lib/app_ipc.h | 1 + 7 files changed, 126 insertions(+), 17 deletions(-) diff --git a/api/boinc_api.C b/api/boinc_api.C index 6b5b862804..258561d75a 100644 --- a/api/boinc_api.C +++ b/api/boinc_api.C @@ -534,6 +534,7 @@ int boinc_wu_cpu_time(double& cpu_t) { // int suspend_activities() { BOINCINFO("Received Suspend Message"); + //fprintf(stderr, "suspending; %f %d\n", last_wu_cpu_time, options.direct_process_action); #ifdef _WIN32 if (options.direct_process_action) { // in Windows this is called from a separate "timer thread", @@ -550,6 +551,7 @@ int suspend_activities() { int resume_activities() { BOINCINFO("Received Resume Message"); #ifdef _WIN32 + //fprintf(stderr, "resuming; %f %d\n", last_wu_cpu_time, options.direct_process_action); if (options.direct_process_action) { // in Windows this is called from a separate "timer thread", // and Windows lets us resume the worker thread @@ -644,6 +646,7 @@ static void handle_trickle_down_msg() { static void handle_process_control_msg() { char buf[MSG_CHANNEL_SIZE]; if (app_client_shm->shm->process_control_request.get_msg(buf)) { + //fprintf(stderr, "%f: got %s\n", dtime(), buf); if (match_tag(buf, "")) { boinc_status.suspended = true; suspend_activities(); @@ -693,6 +696,7 @@ static void worker_timer(int /*a*/) { } } + //fprintf(stderr, "worker_timer: in_critical_section %d\n", in_critical_section); // handle messages from the core client // if (app_client_shm) { diff --git a/checkin_notes b/checkin_notes index a384fde8f2..90a293e3d6 100755 --- a/checkin_notes +++ b/checkin_notes @@ -12752,3 +12752,58 @@ Rom 20 Nov 2006 BOINCTaskBar.cpp sg_CustomControls.cpp, .h sg_ProjectsComponent.cpp + +David 20 Nov 2006 + - core client: fix a bug in the shared-memory message passing code. + Each message channel has a one-message buffer in shared mem. + The send_msg() function checks if this is full. + If not it puts the message there. + If so it stores the message in a queue. + Once a second, a poll function moves a message + from the queue to the buffer (if it's empty). + + What's wrong with this? (let's not always see the same hands). + Well, the send_msg() needs to handle the situation + where there's a message in the queue but none in the buffer. + + This results in a bug in the CPU throttling mechanism + that can cause a task to sleep forever. + - core client: a SECOND significant bug in the shmem msg passing code. + In general, the API library consumes at most one message per second + from a given channel. + That means that if you write more than one message/sec + (even for a short period) it creates a queue that never goes away. + If you write more than one message/sec indefinitely, + it creates an unbounded queue. + + At this point, the only channel that uses queuing and can + have more than one msg/sec is process control + (because of CPU throttling). + I put in kludge that handles this case: + if we want to send a and there's already + a in the queue, + remove the and don't send anything. + And conversely. + - core client: to debug all the above, added + and log flags. + Also added some commented-out printfs in the API library. + + NOTE: in the course of debugging this I realized that, in the Windows + version of the API, and message result + in calls to SuspendThread() and ResumeThread(), + which inc and dec a "suspend counter". + ResumeThread() decrements the counter, and resumes the thread + only if the counter is then zero. + This makes the process-control message-passing scheme fragile. + If for some reason you send two s and a , + the net result is suspended. + At some point we should change to API to do the right thing. + + api/ + boinc_api.C + client/ + app.C + app_control.C + log_flags.C,h + lib/ + app_ipc.h diff --git a/client/app.C b/client/app.C index 45ff53b69a..26e39ba340 100644 --- a/client/app.C +++ b/client/app.C @@ -577,22 +577,56 @@ int ACTIVE_TASK_SET::parse(MIOFILE& fin) { } void MSG_QUEUE::msg_queue_send(const char* msg, MSG_CHANNEL& channel) { - if (channel.send_msg(msg)) { - //msg_printf(NULL, MSG_INFO, "sent %s to %s", msg, name); + if ((msgs.size()==0) && channel.send_msg(msg)) { + if (log_flags.app_msg_send) { + msg_printf(NULL, MSG_INFO, "[app_msg_send] sent %s to %s", msg, name); + } return; } + if (log_flags.app_msg_send) { + msg_printf(NULL, MSG_INFO, "[app_msg_send] deferred %s to %s", msg, name); + } msgs.push_back(std::string(msg)); } void MSG_QUEUE::msg_queue_poll(MSG_CHANNEL& channel) { if (msgs.size() > 0) { + if (log_flags.app_msg_send) { + msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: %d msgs queued", msgs.size()); + } if (channel.send_msg(msgs[0].c_str())) { - //msg_printf(NULL, MSG_INFO, "sent %s to %s (delayed)", (msgs[0].c_str()), name); + if (log_flags.app_msg_send) { + msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: delayed sent %s to %s", (msgs[0].c_str()), name); + } msgs.erase(msgs.begin()); - } + } else { + if (log_flags.app_msg_send) { + msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: still deferred: %s to %s", (msgs[0].c_str()), name); + } + } } } +// delete any queued messages with the given string +// +int MSG_QUEUE::msg_queue_purge(const char* msg) { + vector::iterator iter = msgs.begin(); + int count = 0; + while (iter != msgs.end()) { + if (!strcmp(msg, iter->c_str())) { + if (log_flags.app_msg_send) { + msg_printf(NULL, MSG_INFO, "[app_msg_send] purged %s", msg); + } + iter = msgs.erase(iter); + count++; + } else { + iter++; + } + } + return count; +} + + void ACTIVE_TASK_SET::report_overdue() { unsigned int i; ACTIVE_TASK* atp; diff --git a/client/app_control.C b/client/app_control.C index e18552b88b..62c2fa4f16 100644 --- a/client/app_control.C +++ b/client/app_control.C @@ -800,10 +800,16 @@ void ACTIVE_TASK_SET::kill_tasks(PROJECT* proj) { // int ACTIVE_TASK::suspend() { if (!app_client_shm.shm) return 0; - process_control_queue.msg_queue_send( - "", - app_client_shm.shm->process_control_request - ); + if (task_state != PROCESS_EXECUTING) { + msg_printf(0, MSG_INFO, "Internal error: expected process to be executing"); + } + int n = process_control_queue.msg_queue_purge(""); + if (n == 0) { + process_control_queue.msg_queue_send( + "", + app_client_shm.shm->process_control_request + ); + } task_state = PROCESS_SUSPENDED; return 0; } @@ -812,13 +818,19 @@ int ACTIVE_TASK::suspend() { // int ACTIVE_TASK::unsuspend() { if (!app_client_shm.shm) return 0; + if (task_state != PROCESS_SUSPENDED) { + msg_printf(0, MSG_INFO, "Internal error: expected process to be suspended"); + } if (log_flags.cpu_sched) { msg_printf(0, MSG_INFO, "[cpu_sched] Resuming %s", result->name); } - process_control_queue.msg_queue_send( - "", - app_client_shm.shm->process_control_request - ); + int n = process_control_queue.msg_queue_purge(""); + if (n == 0) { + process_control_queue.msg_queue_send( + "", + app_client_shm.shm->process_control_request + ); + } task_state = PROCESS_EXECUTING; return 0; } @@ -849,9 +861,9 @@ bool ACTIVE_TASK::get_app_status_msg() { if (!app_client_shm.shm->app_status.get_msg(msg_buf)) { return false; } - if (log_flags.app_msg_debug) { + if (log_flags.app_msg_receive) { msg_printf(NULL, MSG_INFO, - "[app_msg_debug] slot %d msg: %s", slot, msg_buf + "[app_msg_receive] got msg from slot %d: %s", slot, msg_buf ); } want_network = 0; diff --git a/client/log_flags.C b/client/log_flags.C index 3e2f69a17e..c230830bb8 100644 --- a/client/log_flags.C +++ b/client/log_flags.C @@ -87,7 +87,8 @@ int LOG_FLAGS::parse(XML_PARSER& xp) { else if (xp.parse_bool(tag, "poll_debug", poll_debug)) continue; else if (xp.parse_bool(tag, "guirpc_debug", guirpc_debug)) continue; else if (xp.parse_bool(tag, "scrsave_debug", scrsave_debug)) continue; - else if (xp.parse_bool(tag, "app_msg_debug", app_msg_debug)) continue; + else if (xp.parse_bool(tag, "app_msg_send", app_msg_send)) continue; + else if (xp.parse_bool(tag, "app_msg_receive", app_msg_receive)) continue; else if (xp.parse_bool(tag, "mem_usage_debug", mem_usage_debug)) continue; else if (xp.parse_bool(tag, "network_status_debug", network_status_debug)) continue; else { @@ -138,7 +139,8 @@ void LOG_FLAGS::show() { show_flag(buf, poll_debug, "poll_debug"); show_flag(buf, guirpc_debug, "guirpc_debug"); show_flag(buf, scrsave_debug, "scrsave_debug"); - show_flag(buf, app_msg_debug, "app_msg_debug"); + show_flag(buf, app_msg_send, "app_msg_send"); + show_flag(buf, app_msg_receive, "app_msg_receive"); show_flag(buf, mem_usage_debug, "mem_usage_debug"); show_flag(buf, network_status_debug, "network_status_debug"); if (strlen(buf)) { diff --git a/client/log_flags.h b/client/log_flags.h index 1d9855cd13..7e613c8b73 100644 --- a/client/log_flags.h +++ b/client/log_flags.h @@ -64,7 +64,8 @@ struct LOG_FLAGS { bool poll_debug; // show what polls are responding bool guirpc_debug; bool scrsave_debug; - bool app_msg_debug; // show shared-mem message from apps + bool app_msg_send; // show shared-mem message to apps + bool app_msg_receive; // show shared-mem message from apps bool mem_usage_debug; // memory usage bool network_status_debug; diff --git a/lib/app_ipc.h b/lib/app_ipc.h index 87dad4eb1c..c67ac92e4e 100755 --- a/lib/app_ipc.h +++ b/lib/app_ipc.h @@ -109,6 +109,7 @@ struct MSG_QUEUE { char name[256]; void msg_queue_send(const char*, MSG_CHANNEL& channel); void msg_queue_poll(MSG_CHANNEL& channel); + int msg_queue_purge(const char*); }; #define DEFAULT_FRACTION_DONE_UPDATE_PERIOD 1