mirror of
https://github.com/BOINC/boinc.git
synced 2025-02-26 20:45:07 +00:00
*** empty log message ***
svn path=/trunk/boinc/; revision=11640
This commit is contained in:
parent
6c9cc128de
commit
3d43d682bf
@ -80,6 +80,9 @@ using namespace std;
|
||||
// Unless otherwise noted, "CPU time" refers to the sum over all episodes
|
||||
// (not counting the part after the last checkpoint in an episode).
|
||||
|
||||
// All variables that are accessed by two threads (i.e. worker and timer)
|
||||
// MUST be declared volatile.
|
||||
|
||||
static APP_INIT_DATA aid;
|
||||
static FILE_LOCK file_lock;
|
||||
APP_CLIENT_SHM* app_client_shm = 0;
|
||||
@ -87,13 +90,13 @@ static volatile int time_until_checkpoint;
|
||||
// time until enable checkpoint
|
||||
static volatile int time_until_fraction_done_update;
|
||||
// time until report fraction done to core client
|
||||
static double fraction_done;
|
||||
static double last_checkpoint_cpu_time;
|
||||
static bool ready_to_checkpoint = false;
|
||||
static bool in_critical_section = false;
|
||||
static volatile double fraction_done;
|
||||
static volatile double last_checkpoint_cpu_time;
|
||||
static volatile bool ready_to_checkpoint = false;
|
||||
static volatile bool in_critical_section = false;
|
||||
static volatile double last_wu_cpu_time;
|
||||
static bool standalone = false;
|
||||
static double initial_wu_cpu_time;
|
||||
static volatile bool standalone = false;
|
||||
static volatile double initial_wu_cpu_time;
|
||||
static volatile bool have_new_trickle_up = false;
|
||||
static volatile bool have_trickle_down = true;
|
||||
// on first call, scan slot dir for msgs
|
||||
|
@ -13212,3 +13212,39 @@ Rom 7 Dec 2006
|
||||
|
||||
clientgui/
|
||||
sg_DlgPreferences.cpp, .h
|
||||
|
||||
David 8 Dec 2006
|
||||
- API: fixed nasty bug that can result in application
|
||||
being both suspended (worker thread not running)
|
||||
and in a critical section (timer thread
|
||||
ignores messages to wake up worker thread).
|
||||
This is a deadlock; the app will never progress.
|
||||
The problem: bool in_critical_section
|
||||
needs to be declared volatile because it's used by both threads.
|
||||
Why didn't I listen to Bruce Allen when
|
||||
he told me to do this a long time ago?
|
||||
- Core client: deal with apps that stop accepting
|
||||
process control messages (due to the above bug).
|
||||
Several parts to this:
|
||||
- Add a timeout to process control message queue.
|
||||
If 180 seconds elapse with an unread process control
|
||||
message in the send buffer, kill and restart the app.
|
||||
Note: when a process is checkpointing it doesn't
|
||||
handle process control messages,
|
||||
so this timeout needs to be large enough to handle
|
||||
the longest possible checkpoint.
|
||||
I think 180 should be large enough.
|
||||
- Initialize message queues on app (re)start.
|
||||
- MSG_QUEUE::msg_queue_purge() was conceptually messed up.
|
||||
We don't want to purge ALL the messages of the opposite type,
|
||||
just the one at the tail of the queue.
|
||||
Whew! This one was exhausting.
|
||||
|
||||
api/
|
||||
boinc_api.C
|
||||
client/
|
||||
app.C
|
||||
app_control.C
|
||||
app_start.C
|
||||
lib/
|
||||
app_ipc.h
|
||||
|
64
client/app.C
64
client/app.C
@ -171,8 +171,6 @@ int ACTIVE_TASK::init(RESULT* rp) {
|
||||
max_cpu_time = rp->wup->rsc_fpops_bound/gstate.host_info.p_fpops;
|
||||
max_disk_usage = rp->wup->rsc_disk_bound;
|
||||
max_mem_usage = rp->wup->rsc_memory_bound;
|
||||
strcpy(process_control_queue.name, rp->name);
|
||||
strcpy(graphics_request_queue.name, rp->name);
|
||||
get_slot_dir(slot, slot_dir);
|
||||
return 0;
|
||||
}
|
||||
@ -575,56 +573,82 @@ int ACTIVE_TASK_SET::parse(MIOFILE& fin) {
|
||||
return ERR_XML_PARSE;
|
||||
}
|
||||
|
||||
void MSG_QUEUE::init(char* n) {
|
||||
strcpy(name, n);
|
||||
last_block = 0;
|
||||
msgs.clear();
|
||||
}
|
||||
|
||||
void MSG_QUEUE::msg_queue_send(const char* msg, MSG_CHANNEL& channel) {
|
||||
if ((msgs.size()==0) && channel.send_msg(msg)) {
|
||||
if (log_flags.app_msg_send) {
|
||||
msg_printf(NULL, MSG_INFO, "[app_msg_send] sent %s to %s", msg, name);
|
||||
}
|
||||
return;
|
||||
last_block = 0;
|
||||
return;
|
||||
}
|
||||
if (log_flags.app_msg_send) {
|
||||
msg_printf(NULL, MSG_INFO, "[app_msg_send] deferred %s to %s", msg, name);
|
||||
}
|
||||
msgs.push_back(std::string(msg));
|
||||
if (!last_block) last_block = gstate.now;
|
||||
}
|
||||
|
||||
void MSG_QUEUE::msg_queue_poll(MSG_CHANNEL& channel) {
|
||||
if (msgs.size() > 0) {
|
||||
if (log_flags.app_msg_send) {
|
||||
msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: %d msgs queued", (int)msgs.size());
|
||||
msg_printf(NULL, MSG_INFO,
|
||||
"[app_msg_send] poll: %d msgs queued for %s:",
|
||||
(int)msgs.size(), name
|
||||
);
|
||||
}
|
||||
if (channel.send_msg(msgs[0].c_str())) {
|
||||
if (log_flags.app_msg_send) {
|
||||
msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: delayed sent %s to %s", (msgs[0].c_str()), name);
|
||||
msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: delayed sent %s", (msgs[0].c_str()));
|
||||
}
|
||||
msgs.erase(msgs.begin());
|
||||
} else {
|
||||
last_block = 0;
|
||||
}
|
||||
for (unsigned int i=0; i<msgs.size(); i++) {
|
||||
if (log_flags.app_msg_send) {
|
||||
msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: still deferred: %s to %s", (msgs[0].c_str()), name);
|
||||
msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: deferred: %s", (msgs[0].c_str()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// delete any queued messages with the given string
|
||||
// if the last message in the buffer is "msg", remove it and return 1
|
||||
//
|
||||
int MSG_QUEUE::msg_queue_purge(const char* msg) {
|
||||
int count = msgs.size();
|
||||
if (!count) return 0;
|
||||
vector<string>::iterator iter = msgs.begin();
|
||||
int count = 0;
|
||||
while (iter != msgs.end()) {
|
||||
if (!strcmp(msg, iter->c_str())) {
|
||||
if (log_flags.app_msg_send) {
|
||||
msg_printf(NULL, MSG_INFO, "[app_msg_send] purged %s", msg);
|
||||
}
|
||||
iter = msgs.erase(iter);
|
||||
count++;
|
||||
} else {
|
||||
iter++;
|
||||
}
|
||||
for (int i=0; i<count-1; i++) {
|
||||
iter++;
|
||||
}
|
||||
return count;
|
||||
if (log_flags.app_msg_send) {
|
||||
msg_printf(NULL, MSG_INFO,
|
||||
"[app_msg_send] purge: wanted %s last msg is %s in %s",
|
||||
msg, iter->c_str(), name
|
||||
);
|
||||
}
|
||||
if (!strcmp(msg, iter->c_str())) {
|
||||
if (log_flags.app_msg_send) {
|
||||
msg_printf(NULL, MSG_INFO, "[app_msg_send] purged %s from %s", msg, name);
|
||||
}
|
||||
iter = msgs.erase(iter);
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool MSG_QUEUE::timeout(double diff) {
|
||||
if (!last_block) return false;
|
||||
if (gstate.now - last_block > diff) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void ACTIVE_TASK_SET::report_overdue() {
|
||||
unsigned int i;
|
||||
|
@ -152,22 +152,22 @@ int ACTIVE_TASK::preempt(bool quit_task) {
|
||||
// (accommodate apps that never checkpoint)
|
||||
//
|
||||
if (quit_task && (checkpoint_cpu_time>0)) {
|
||||
pending_suspend_via_quit = true;
|
||||
retval = request_exit();
|
||||
if (log_flags.cpu_sched) {
|
||||
msg_printf(result->project, MSG_INFO,
|
||||
"[cpu_sched] Preempting %s (removed from memory)",
|
||||
result->name
|
||||
);
|
||||
}
|
||||
pending_suspend_via_quit = true;
|
||||
retval = request_exit();
|
||||
} else {
|
||||
retval = suspend();
|
||||
if (log_flags.cpu_sched) {
|
||||
msg_printf(result->project, MSG_INFO,
|
||||
"[cpu_sched] Preempting %s (left in memory)",
|
||||
result->name
|
||||
);
|
||||
}
|
||||
retval = suspend();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
@ -380,9 +380,18 @@ void ACTIVE_TASK_SET::process_control_poll() {
|
||||
atp = active_tasks[i];
|
||||
if (!atp->process_exists()) continue;
|
||||
if (!atp->app_client_shm.shm) continue;
|
||||
atp->process_control_queue.msg_queue_poll(
|
||||
atp->app_client_shm.shm->process_control_request
|
||||
);
|
||||
|
||||
// if app has had the same message in its send buffer for 10 sec,
|
||||
// assume it's hung and restart it
|
||||
//
|
||||
if (atp->process_control_queue.timeout(180)) {
|
||||
msg_printf(NULL, MSG_INFO, "Restarting %s - message timeout", atp->result->name);
|
||||
atp->kill_task(true);
|
||||
} else {
|
||||
atp->process_control_queue.msg_queue_poll(
|
||||
atp->app_client_shm.shm->process_control_request
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -347,6 +347,9 @@ int ACTIVE_TASK::start(bool first_time) {
|
||||
episode_start_cpu_time = checkpoint_cpu_time;
|
||||
debt_interval_start_cpu_time = checkpoint_cpu_time;
|
||||
|
||||
graphics_request_queue.init(result->name); // reset message queues
|
||||
process_control_queue.init(result->name);
|
||||
|
||||
if (!app_client_shm.shm) {
|
||||
retval = get_shmem_seg_name();
|
||||
if (retval) {
|
||||
@ -358,7 +361,7 @@ int ACTIVE_TASK::start(bool first_time) {
|
||||
}
|
||||
}
|
||||
|
||||
// this must go AFTER creating shmem,
|
||||
// this must go AFTER creating shmem name,
|
||||
// since the shmem name is part of the file
|
||||
//
|
||||
retval = write_app_init_file();
|
||||
|
@ -107,9 +107,12 @@ struct SHARED_MEM {
|
||||
struct MSG_QUEUE {
|
||||
std::vector<std::string> msgs;
|
||||
char name[256];
|
||||
double last_block; // last time we found message channel full
|
||||
void init(char*);
|
||||
void msg_queue_send(const char*, MSG_CHANNEL& channel);
|
||||
void msg_queue_poll(MSG_CHANNEL& channel);
|
||||
int msg_queue_purge(const char*);
|
||||
bool timeout(double);
|
||||
};
|
||||
|
||||
#define DEFAULT_FRACTION_DONE_UPDATE_PERIOD 1
|
||||
|
Loading…
x
Reference in New Issue
Block a user