- example multi-thread app: change to use boinc_init_parallel().

It's way simpler now.


svn path=/trunk/boinc/; revision=20058
This commit is contained in:
David Anderson 2010-01-01 06:03:08 +00:00
parent e50bca6ca5
commit 1c127cd122
2 changed files with 20 additions and 109 deletions

View File

@ -10796,3 +10796,9 @@ David 31 Dec 2009
graphics_lib.cpp
lib/
win_util.cpp
David 31 Dec 2009
- example multi-thread app: change to use boinc_init_parallel().
It's way simpler now.
samples/multi_thread/
multi_thread.cpp

View File

@ -20,28 +20,6 @@
// It divides this among N "worker" threads.
// N is passed in the command line, and defaults to 1.
//
// The main issue is how to suspend/resume the threads.
// The standard BOINC API doesn't work - it assumes that
// the initial thread is the only one.
// On Linux, there's no API to suspend/resume threads.
// All you can do is SIGSTOP/SIGCONT, which affects the whole process.
// So we use the following process/thread structure:
//
// Windows:
// Initial thread:
// - launches worker threads,
// - in polling loop, checks for suspend/resume messages
// from the BOINC client, and handles them itself.
// Unix:
// Initial process
// - forks worker process
// - in polling loop, checks for worker process completion
// - doesn't send status msgs
// Worker process
// Initial thread:
// - forks worker threads, wait for them to finish, exit
// - uses BOINC runtime to send status messages (frac done, CPU time)
//
// Doesn't do checkpointing.
#include <stdio.h>
@ -61,8 +39,8 @@
using std::vector;
#define DEFAULT_NTHREADS 1
#define TOTAL_UNITS 64
#define DEFAULT_NTHREADS 4
#define TOTAL_UNITS 16
int units_per_thread;
@ -110,29 +88,10 @@ struct THREAD {
}
#endif
}
#ifdef _WIN32
void suspend(bool if_susp) {
if (if_susp) {
SuspendThread(id);
} else {
ResumeThread(id);
}
}
#endif
};
struct THREAD_SET {
vector<THREAD*> threads;
#ifdef _WIN32
void suspend(bool if_susp) {
for (unsigned int i=0; i<threads.size(); i++) {
THREAD* t = threads[i];
if (t->id != THREAD_ID_NULL) t->suspend(if_susp);
}
fprintf(stderr, "%s suspended all\n", boinc_msg_prefix());
}
#endif
bool all_done() {
for (unsigned int i=0; i<threads.size(); i++) {
if (threads[i]->id != THREAD_ID_NULL) return false;
@ -182,44 +141,13 @@ void* worker(void* p) {
#endif
}
void main_thread(int nthreads) {
int i;
#ifdef _WIN32
static BOINC_STATUS status;
#endif
THREAD_SET thread_set;
for (i=0; i<nthreads; i++) {
thread_set.threads.push_back(new THREAD(worker, i));
}
while (1) {
double f = thread_set.units_done()/((double)TOTAL_UNITS);
boinc_fraction_done(f);
if (thread_set.all_done()) break;
#ifdef _WIN32
int old_susp = status.suspended;
boinc_get_status(&status);
if (status.quit_request || status.abort_request || status.no_heartbeat) {
exit(0);
}
if (status.suspended != old_susp) {
thread_set.suspend(status.suspended != 0);
}
boinc_sleep(0.1);
#else
boinc_sleep(1.0);
#endif
}
}
int main(int argc, char** argv) {
BOINC_OPTIONS options;
int nthreads = DEFAULT_NTHREADS;
int i, nthreads = DEFAULT_NTHREADS;
double start_time = dtime();
boinc_options_defaults(options);
options.direct_process_action = 0;
boinc_init_parallel();
for (int i=1; i<argc; i++) {
for (i=1; i<argc; i++) {
if (!strcmp(argv[i], "--nthreads")) {
nthreads = atoi(argv[++i]);
} else {
@ -231,39 +159,16 @@ int main(int argc, char** argv) {
units_per_thread = TOTAL_UNITS/nthreads;
#ifdef _WIN32
boinc_init_options(&options);
main_thread(nthreads);
#else
options.send_status_msgs = 0;
boinc_init_options(&options);
int pid = fork();
if (pid) { // parent
BOINC_STATUS status;
boinc_get_status(&status);
int exit_status;
while (1) {
bool old_susp = status.suspended;
boinc_get_status(&status);
if (status.quit_request || status.abort_request || status.no_heartbeat) {
kill(pid, SIGKILL);
exit(0);
}
if (status.suspended != old_susp) {
kill(pid, status.suspended?SIGSTOP:SIGCONT);
}
if (waitpid(pid, &exit_status, WNOHANG) == pid) {
break;
}
boinc_sleep(0.1);
}
} else { // child (worker)
memset(&options, 0, sizeof(options));
options.send_status_msgs = 1;
boinc_init_options(&options);
main_thread(nthreads);
THREAD_SET thread_set;
for (i=0; i<nthreads; i++) {
thread_set.threads.push_back(new THREAD(worker, i));
}
while (1) {
double f = thread_set.units_done()/((double)TOTAL_UNITS);
boinc_fraction_done(f);
if (thread_set.all_done()) break;
boinc_sleep(1.0);
}
#endif
double elapsed_time = dtime()-start_time;
fprintf(stderr,