- API: add support for multi-process apps,

i.e. those that create subprocesses.
    Previously, the client's job control options (suspend/resume/quit)
    would not work for subprocesses.
    Multiprocess apps must initialize with something like:
        BOINC_OPTIONS options;
        boinc_options_defaults(options);
        options.multi_process = true;
        boinc_init_options(&options);
    Note: an application can be both multi-thread and multi-process.
    In this case set options.multi_thread as well.
- wrapper: add support for multi-process apps.
    Previously, suspend/resume operations did not work for subprocesses.
    If a task is multi-process, you must include
        <multi_process>1</multi_process>
    in its descriptor.


svn path=/trunk/boinc/; revision=23369
This commit is contained in:
David Anderson 2011-04-14 22:25:38 +00:00
parent 2cf2d19d0a
commit 7cb3400459
6 changed files with 231 additions and 135 deletions

View File

@ -80,17 +80,18 @@
#endif
#endif
#include "app_ipc.h"
#include "common_defs.h"
#include "diagnostics.h"
#include "parse.h"
#include "shmem.h"
#include "util.h"
#include "str_util.h"
#include "str_replace.h"
#include "error_numbers.h"
#include "filesys.h"
#include "mem_usage.h"
#include "error_numbers.h"
#include "common_defs.h"
#include "app_ipc.h"
#include "parse.h"
#include "procinfo.h"
#include "shmem.h"
#include "str_util.h"
#include "str_replace.h"
#include "util.h"
#include "boinc_api.h"
@ -119,7 +120,6 @@ static volatile bool ready_to_checkpoint = false;
static volatile int in_critical_section = 0;
static volatile double last_wu_cpu_time;
static volatile bool standalone = false;
static volatile bool is_parallel = false;
static volatile double initial_wu_cpu_time;
static volatile bool have_new_trickle_up = false;
static volatile bool have_trickle_down = true;
@ -347,6 +347,65 @@ static bool update_app_progress(double cpu_t, double cp_cpu_t) {
return app_client_shm->shm->app_status.send_msg(msg_buf);
}
static void handle_heartbeat_msg() {
char buf[MSG_CHANNEL_SIZE];
double dtemp;
bool btemp;
if (app_client_shm->shm->heartbeat.get_msg(buf)) {
boinc_status.network_suspended = false;
if (match_tag(buf, "<heartbeat/>")) {
heartbeat_giveup_time = interrupt_count + HEARTBEAT_GIVEUP_COUNT;
}
if (parse_double(buf, "<wss>", dtemp)) {
boinc_status.working_set_size = dtemp;
}
if (parse_double(buf, "<max_wss>", dtemp)) {
boinc_status.max_working_set_size = dtemp;
}
if (parse_bool(buf, "suspend_network", btemp)) {
boinc_status.network_suspended = btemp;
}
}
}
#ifndef _WIN32
// For multithread apps on Unix, the main process executes the following.
//
static void parallel_master(int child_pid) {
char buf[MSG_CHANNEL_SIZE];
int exit_status;
while (1) {
boinc_sleep(TIMER_PERIOD);
interrupt_count++;
if (app_client_shm) {
handle_heartbeat_msg();
if (app_client_shm->shm->process_control_request.get_msg(buf)) {
if (match_tag(buf, "<suspend/>")) {
kill(child_pid, SIGSTOP);
} else if (match_tag(buf, "<resume/>")) {
kill(child_pid, SIGCONT);
} else if (match_tag(buf, "<quit/>")) {
kill(child_pid, SIGKILL);
exit(0);
} else if (match_tag(buf, "<abort/>")) {
kill(child_pid, SIGKILL);
exit(EXIT_ABORTED_BY_CLIENT);
}
}
if (heartbeat_giveup_time < interrupt_count) {
kill(child_pid, SIGKILL);
exit(0);
}
}
if (interrupt_count % TIMERS_PER_SEC) continue;
if (waitpid(child_pid, &exit_status, WNOHANG) == child_pid) break;
}
boinc_finish(exit_status);
}
#endif
int boinc_init() {
int retval;
if (!diagnostics_is_initialized()) {
@ -359,6 +418,30 @@ int boinc_init() {
int boinc_init_options(BOINC_OPTIONS* opt) {
int retval;
#ifndef _WIN32
if (options.multi_thread) {
int child_pid = fork();
if (child_pid) {
// original process - master
//
options.send_status_msgs = false;
int retval = boinc_init_options_general(options);
if (retval) {
kill(child_pid, SIGKILL);
return retval;
}
parallel_master(child_pid);
}
// new process - slave
//
options.main_program = false;
options.check_heartbeat = false;
options.handle_process_control = false;
options.multi_thread = false;
options.multi_process = false;
return boinc_init_options(&options);
}
#endif
retval = boinc_init_options_general(*opt);
if (retval) return retval;
retval = start_timer_thread();
@ -370,6 +453,13 @@ int boinc_init_options(BOINC_OPTIONS* opt) {
return 0;
}
int boinc_init_parallel() {
BOINC_OPTIONS options;
boinc_options_defaults(options);
options.multi_thread = true;
boinc_init_options(&options);
}
int boinc_init_options_general(BOINC_OPTIONS& opt) {
int retval;
char buf[256];
@ -549,6 +639,12 @@ void boinc_exit(int status) {
}
}
// kill any processes the app may have created
//
if (options.multi_process) {
kill_descendants(0);
}
boinc_finish_diag();
// various platforms have problems shutting down a process
@ -683,12 +779,16 @@ int suspend_activities() {
static DWORD pid;
if (!pid) pid = GetCurrentProcessId();
if (options.direct_process_action) {
if (is_parallel) {
if (options.multi_thread) {
suspend_or_resume_threads(pid, timer_thread_id, false);
} else {
SuspendThread(worker_thread_handle);
}
}
#else
if (options.multi_process) {
suspend_or_resume_descendants(0, false);
}
#endif
return 0;
}
@ -700,12 +800,16 @@ int resume_activities() {
static DWORD pid;
if (!pid) pid = GetCurrentProcessId();
if (options.direct_process_action) {
if (is_parallel) {
if (options.multi_thread) {
suspend_or_resume_threads(pid, timer_thread_id, true);
} else {
ResumeThread(worker_thread_handle);
}
}
#else
if (options.multi_process) {
suspend_or_resume_descendants(0, true);
}
#endif
return 0;
}
@ -720,28 +824,6 @@ int restore_activities() {
return retval;
}
static void handle_heartbeat_msg() {
char buf[MSG_CHANNEL_SIZE];
double dtemp;
bool btemp;
if (app_client_shm->shm->heartbeat.get_msg(buf)) {
boinc_status.network_suspended = false;
if (match_tag(buf, "<heartbeat/>")) {
heartbeat_giveup_time = interrupt_count + HEARTBEAT_GIVEUP_COUNT;
}
if (parse_double(buf, "<wss>", dtemp)) {
boinc_status.working_set_size = dtemp;
}
if (parse_double(buf, "<max_wss>", dtemp)) {
boinc_status.max_working_set_size = dtemp;
}
if (parse_bool(buf, "suspend_network", btemp)) {
boinc_status.network_suspended = btemp;
}
}
}
static void handle_upload_file_status() {
char path[256], buf[256], log_name[256], *p, log_buf[256];
std::string filename;
@ -1321,68 +1403,3 @@ double boinc_get_fraction_done() {
double boinc_elapsed_time() {
return running_interrupt_count*TIMER_PERIOD;
}
#ifndef _WIN32
static void parallel_master(int child_pid) {
char buf[MSG_CHANNEL_SIZE];
int exit_status;
while (1) {
boinc_sleep(TIMER_PERIOD);
interrupt_count++;
if (app_client_shm) {
handle_heartbeat_msg();
if (app_client_shm->shm->process_control_request.get_msg(buf)) {
if (match_tag(buf, "<suspend/>")) {
kill(child_pid, SIGSTOP);
} else if (match_tag(buf, "<resume/>")) {
kill(child_pid, SIGCONT);
} else if (match_tag(buf, "<quit/>")) {
kill(child_pid, SIGKILL);
exit(0);
} else if (match_tag(buf, "<abort/>")) {
kill(child_pid, SIGKILL);
exit(EXIT_ABORTED_BY_CLIENT);
}
}
if (heartbeat_giveup_time < interrupt_count) {
kill(child_pid, SIGKILL);
exit(0);
}
}
if (interrupt_count % TIMERS_PER_SEC) continue;
if (waitpid(child_pid, &exit_status, WNOHANG) == child_pid) break;
}
boinc_finish(exit_status);
}
#endif
int boinc_init_parallel() {
#ifdef _WIN32
is_parallel = true;
return boinc_init();
#else
BOINC_OPTIONS options;
boinc_options_defaults(options);
int child_pid = fork();
if (child_pid) {
// original process - master
//
options.send_status_msgs = false;
int retval = boinc_init_options_general(options);
if (retval) {
kill(child_pid, SIGKILL);
return retval;
}
parallel_master(child_pid);
}
// new process - slave
//
options.main_program = false;
options.check_heartbeat = false;
options.handle_process_control = false;
return boinc_init_options(&options);
#endif
}

View File

@ -62,6 +62,10 @@ typedef struct BOINC_OPTIONS {
// if heartbeat fail, or get process control msg, take
// direction action (exit, suspend, resume).
// Otherwise just set flag in BOINC status
int multi_thread;
// set this if application creates threads in main process
int multi_process;
// set this if application creates subprocesses.
} BOINC_OPTIONS;
typedef struct BOINC_STATUS {
@ -108,7 +112,6 @@ extern int boinc_get_status(BOINC_STATUS*);
extern double boinc_get_fraction_done();
extern void boinc_register_timer_callback(FUNC_PTR);
extern double boinc_worker_thread_cpu_time();
extern int boinc_init_parallel();
#ifdef __APPLE__
extern int setMacPList(void);
@ -154,6 +157,8 @@ inline void boinc_options_defaults(BOINC_OPTIONS& b) {
b.send_status_msgs = 1;
b.direct_process_action = 1;
b.normal_thread_priority = 0;
b.multi_thread = 0;
b.multi_process = 0;
}

View File

@ -2247,3 +2247,28 @@ Rom 14 Apr 2011
clientgui/
sg_DlgMessages.cpp
David 14 Apr 2011
- API: add support for multi-process apps,
i.e. those that create subprocesses.
Previously, the client's job control options (suspend/resume/quit)
would not work for subprocesses.
Multiprocess apps must initialize with something like:
BOINC_OPTIONS options;
boinc_options_defaults(options);
options.multi_process = true;
boinc_init_options(&options);
Note: an application can be both multi-thread and multi-process.
In this case set options.multi_thread as well.
- wrapper: add support for multi-process apps.
Previously, suspend/resume operations did not work for subprocesses.
If a task is multi-process, you must include
<multi_process>1</multi_process>
in its descriptor.
lib/
procinfo.cpp,h
samples/wrapper/
wrapper.cpp
api/
boinc_api.cpp,h

View File

@ -24,6 +24,7 @@
#else
#include "config.h"
#include <sys/types.h>
#include <unistd.h>
#include <sys/wait.h>
#include <string.h>
#include <signal.h>
@ -89,5 +90,71 @@ void kill_all(vector<int>& pids) {
kill(pids[i], SIGTERM);
}
}
#endif
// Kill the descendants of the calling process.
// If child_pid is nonzero, give it a chance to exit gracefully on Unix
//
void kill_descendants(int child_pid) {
vector<int> descendants;
#ifdef _WIN32
// on Win, kill descendants directly
//
get_descendants(GetCurrentProcessId(), descendants);
kill_all(descendants);
#else
// on Unix, ask main process nicely.
// it descendants still exist after 10 sec, use the nuclear option
//
get_descendants(getpid(), descendants);
if (child_pid) {
::kill(child_pid, SIGTERM);
for (int i=0; i<10; i++) {
if (!any_process_exists(descendants)) {
return;
}
sleep(1);
}
kill_all(descendants);
// kill any processes that might have been created
// in the last 10 secs
get_descendants(getpid(), descendants);
}
kill_all(descendants);
#endif
}
void suspend_or_resume_all(vector<int>& pids, bool resume) {
for (unsigned int i=0; i<pids.size(); i++) {
#ifdef _WIN32
suspend_or_resume_threads(pids[i], resume);
#else
kill(pids[i], resume?SIGCONT:SIGSTOP);
#endif
}
}
// suspend/resume the descendants of the given process
// (or if pid==0, the calling process)
//
void suspend_or_resume_descendants(int pid, bool resume) {
vector<int> descendants;
if (!pid) {
#ifdef _WIN32
pid = GetCurrentProcessId();
#else
pid = getpid();
#endif
}
get_descendants(pid, descendants);
suspend_or_resume_all(descendants, resume);
}
void suspend_or_resume_process(int pid, bool resume) {
#ifdef _WIN32
suspend_or_resume_threads(pid, 0, resume);
#else
::kill(pid, resume?SIGCONT:SIGSTOP);
#endif
}

View File

@ -48,5 +48,8 @@ extern void procinfo_other(PROCINFO&, std::vector<PROCINFO>&);
extern void get_descendants(int pid, std::vector<int>& pids);
extern bool any_process_exists(std::vector<int>& pids);
extern void kill_all(std::vector<int>& pids);
extern void kill_descendants(int child_pid);
extern void suspend_or_resume_descendants(int pid, bool resume);
extern void suspend_or_resume_process(int pid, bool resume);
#endif

View File

@ -81,6 +81,7 @@ struct TASK {
// contribution of this task to overall fraction done
bool is_daemon;
bool append_cmdline_args;
bool multi_process;
// dynamic stuff follows
double final_cpu_time;
@ -209,6 +210,7 @@ int TASK::parse(XML_PARSER& xp) {
stat_first = true;
pid = 0;
is_daemon = false;
multi_process = false;
append_cmdline_args = false;
while (!xp.get(tag, sizeof(tag), is_tag)) {
@ -244,6 +246,7 @@ int TASK::parse(XML_PARSER& xp) {
else if (xp.parse_string(tag, "fraction_done_filename", fraction_done_filename)) continue;
else if (xp.parse_double(tag, "weight", weight)) continue;
else if (xp.parse_bool(tag, "daemon", is_daemon)) continue;
else if (xp.parse_bool(tag, "multi_process", multi_process)) continue;
else if (xp.parse_bool(tag, "append_cmdline_args", append_cmdline_args)) continue;
}
return ERR_XML_PARSE;
@ -567,48 +570,24 @@ bool TASK::poll(int& status) {
//
void TASK::kill() {
kill_daemons();
#ifdef _WIN32
// on Win, just kill all our descendants
//
vector<int> descendants;
get_descendants(GetCurrentProcessId(), descendants);
kill_all(descendants);
#else
// on Unix, ask main process nicely.
// it descendants still exist after 10 sec, use the nuclear option
//
vector<int> descendants;
get_descendants(getpid(), descendants);
::kill(pid, SIGTERM);
for (int i=0; i<10; i++) {
if (!any_process_exists(descendants)) {
return;
}
sleep(1);
}
kill_all(descendants);
// kill any processes that might have been created
// in the last 10 secs
get_descendants(getpid(), descendants);
kill_all(descendants);
#endif
kill_descendants(pid);
}
void TASK::stop() {
#ifdef _WIN32
suspend_or_resume_threads(pid, 0, false);
#else
::kill(pid, SIGSTOP);
#endif
if (multi_process) {
suspend_or_resume_descendants(0, false);
} else {
suspend_or_resume_process(pid, false);
}
suspended = true;
}
void TASK::resume() {
#ifdef _WIN32
suspend_or_resume_threads(pid, 0, true);
#else
::kill(pid, SIGCONT);
#endif
if (multi_process) {
suspend_or_resume_descendants(0, true);
} else {
suspend_or_resume_process(pid, true);
}
suspended = false;
}