mirror of https://github.com/BOINC/boinc.git
- multi_thread: suspend/resume didn't work in the Unix implementation.
REMINDER TO SELF: you can't suspend or resume pthreads - period. So I changed things so that in the Unix implementation, we fork a second process, in which the threads run. The first process handles suspend/resume messages by signaling the 2nd process. The 2nd process generates system messages (frac done, CPU time) svn path=/trunk/boinc_samples/; revision=14977
This commit is contained in:
parent
cb3da20d48
commit
cfc5b5a230
|
@ -653,3 +653,15 @@ Charlie 28 Mar 2008
|
|||
|
||||
example_app/
|
||||
MakeMacExample.sh
|
||||
|
||||
David 28 Mar 2008
|
||||
- multi_thread: suspend/resume didn't work in the Unix implementation.
|
||||
REMINDER TO SELF: you can't suspend or resume pthreads - period.
|
||||
So I changed things so that in the Unix implementation,
|
||||
we fork a second process, in which the threads run.
|
||||
The first process handles suspend/resume messages
|
||||
by signaling the 2nd process.
|
||||
The 2nd process generates system messages (frac done, CPU time)
|
||||
|
||||
multi_thread/
|
||||
multi_thread.C
|
||||
|
|
|
@ -18,23 +18,40 @@
|
|||
// 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
|
||||
// Example multi-thread BOINC application.
|
||||
// It does 64 "blocks" of computation, where each block is about 1 GFLOP.
|
||||
// It divides this among a number N of "worker" threads.
|
||||
// N is passed in through init_data.xml, and defaults to 4.
|
||||
//
|
||||
// Doesn't do checkpointing.
|
||||
// It does 64 "units" of computation, where each units is about 1 GFLOP.
|
||||
// It divides this among N "worker" threads.
|
||||
// N is passed in the command line, and defaults to 1.
|
||||
//
|
||||
// The main issue is how to suspend/resume the threads.
|
||||
// The standard BOINC API doesn't work - it assumes that
|
||||
// the initial thread is the only one.
|
||||
// What we do instead is to have our initial thread launch the worker threads,
|
||||
// then go into a polling loop where it checks for suspend/resume messages
|
||||
// from the BOINC client, and handles them itself.
|
||||
// On Linux, there's no API to suspend/resume threads.
|
||||
// All you can do is SIGSTOP/SIGCONT, which affects the whole process.
|
||||
// So we use the following process/thread structure:
|
||||
//
|
||||
// Windows:
|
||||
// Initial thread:
|
||||
// - launches worker threads,
|
||||
// - in polling loop, checks for suspend/resume messages
|
||||
// from the BOINC client, and handles them itself.
|
||||
// Unix:
|
||||
// Initial process
|
||||
// - forks worker process
|
||||
// - in polling loop, checks for worker process completion
|
||||
// - doesn't send status msgs
|
||||
// Worker process
|
||||
// Initial thread:
|
||||
// - forks worker threads, wait for them to finish, exit
|
||||
// - uses BOINC runtime to send status messages (frac done, CPU time)
|
||||
//
|
||||
// Doesn't do checkpointing.
|
||||
|
||||
#include <stdio.h>
|
||||
#include <vector>
|
||||
#ifdef _WIN32
|
||||
#else
|
||||
#include <sys/types.h>
|
||||
#include <sys/wait.h>
|
||||
#include <signal.h>
|
||||
#include <pthread.h>
|
||||
#endif
|
||||
|
@ -46,9 +63,9 @@
|
|||
using std::vector;
|
||||
|
||||
#define DEFAULT_NTHREADS 1
|
||||
#define TOTAL_GFLOPS 64
|
||||
#define TOTAL_UNITS 64
|
||||
|
||||
int gflops_per_thread;
|
||||
int units_per_thread;
|
||||
|
||||
#ifdef _WIN32
|
||||
typedef HANDLE THREAD_ID;
|
||||
|
@ -69,77 +86,75 @@ struct THREAD {
|
|||
int index;
|
||||
int units_done;
|
||||
|
||||
void start(THREAD_FUNC);
|
||||
void suspend(bool);
|
||||
THREAD(THREAD_FUNC func, int i) {
|
||||
index = i;
|
||||
units_done = 0;
|
||||
#ifdef _WIN32
|
||||
id = (HANDLE) _beginthreadex(
|
||||
NULL,
|
||||
16384,
|
||||
func,
|
||||
this,
|
||||
0,
|
||||
NULL
|
||||
);
|
||||
if (!id) {
|
||||
fprintf(stderr, "Can't start thread\n");
|
||||
exit(1);
|
||||
}
|
||||
#else
|
||||
int retval;
|
||||
retval = pthread_create(&id, 0, func, (void*)this);
|
||||
if (retval) {
|
||||
fprintf(stderr, "can't start thread\n");
|
||||
exit(1);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef _WIN32
|
||||
void suspend(bool if_susp) {
|
||||
if (if_susp) {
|
||||
SuspendThread(id);
|
||||
} else {
|
||||
ResumeThread(id);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
};
|
||||
|
||||
struct THREAD_SET {
|
||||
vector<THREAD*> threads;
|
||||
#ifdef _WIN32
|
||||
void suspend(bool if_susp) {
|
||||
for (unsigned int i=0; i<threads.size(); i++) {
|
||||
THREAD* t = threads[i];
|
||||
if (t->id != THREAD_ID_NULL) t->suspend(if_susp);
|
||||
}
|
||||
fprintf(stderr, "suspended all\n");
|
||||
}
|
||||
bool done() {
|
||||
#endif
|
||||
bool all_done() {
|
||||
for (unsigned int i=0; i<threads.size(); i++) {
|
||||
THREAD* t = threads[i];
|
||||
if (t->id != THREAD_ID_NULL) return false;
|
||||
if (threads[i]->id != THREAD_ID_NULL) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
int units_done() {
|
||||
int count = 0;
|
||||
for (unsigned int i=0; i<threads.size(); i++) {
|
||||
THREAD* t = threads[i];
|
||||
count += t->units_done;
|
||||
count += threads[i]->units_done;
|
||||
}
|
||||
return count;
|
||||
}
|
||||
};
|
||||
|
||||
void THREAD::start(THREAD_FUNC func) {
|
||||
units_done = 0;
|
||||
#ifdef _WIN32
|
||||
id = (HANDLE) _beginthreadex(
|
||||
NULL,
|
||||
16384,
|
||||
func,
|
||||
this,
|
||||
0,
|
||||
NULL
|
||||
);
|
||||
if (!id) {
|
||||
fprintf(stderr, "Can't start thread\n");
|
||||
exit(1);
|
||||
}
|
||||
#else
|
||||
int retval;
|
||||
retval = pthread_create(&id, 0, func, (void*)this);
|
||||
if (retval) {
|
||||
fprintf(stderr, "can't start thread\n");
|
||||
exit(1);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
void THREAD::suspend(bool if_susp) {
|
||||
#ifdef _WIN32
|
||||
if (if_susp) {
|
||||
SuspendThread(id);
|
||||
} else {
|
||||
ResumeThread(id);
|
||||
}
|
||||
#else
|
||||
pthread_kill(id, if_susp?SIGSTOP:SIGCONT);
|
||||
#endif
|
||||
}
|
||||
|
||||
// do a billion floating-point ops
|
||||
// (note: I needed to add an arg to this;
|
||||
// otherwise the MS C++ compiler optimizes away
|
||||
// all but the first call to it!)
|
||||
//
|
||||
static double giga_flop(int foo) {
|
||||
static double do_a_giga_flop(int foo) {
|
||||
double x = 3.14159*foo;
|
||||
int i;
|
||||
for (i=0; i<500000000; i++) {
|
||||
|
@ -155,8 +170,8 @@ UINT WINAPI worker(void* p) {
|
|||
void* worker(void* p) {
|
||||
#endif
|
||||
THREAD* t = (THREAD*)p;
|
||||
for (int i=0; i<gflops_per_thread; i++) {
|
||||
double x = giga_flop(i);
|
||||
for (int i=0; i<units_per_thread; i++) {
|
||||
double x = do_a_giga_flop(i);
|
||||
t->units_done++;
|
||||
fprintf(stderr, "thread %d finished %d: %f\n", t->index, i, x);
|
||||
}
|
||||
|
@ -166,21 +181,44 @@ void* worker(void* p) {
|
|||
#endif
|
||||
}
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
void main_thread(int nthreads) {
|
||||
int i;
|
||||
THREAD_SET thread_set;
|
||||
BOINC_OPTIONS options;
|
||||
#ifdef _WIN32
|
||||
BOINC_STATUS status;
|
||||
APP_INIT_DATA aid;
|
||||
#endif
|
||||
THREAD_SET thread_set;
|
||||
for (i=0; i<nthreads; i++) {
|
||||
thread_set.threads.push_back(new THREAD(worker, i));
|
||||
}
|
||||
while (1) {
|
||||
double f = thread_set.units_done()/((double)TOTAL_UNITS);
|
||||
boinc_fraction_done(f);
|
||||
if (thread_set.all_done()) break;
|
||||
#ifdef _WIN32
|
||||
bool old_susp = status.suspended;
|
||||
boinc_get_status(&status);
|
||||
if (status.quit_request || status.abort_request || status.no_heartbeat) {
|
||||
exit(0);
|
||||
}
|
||||
if (status.suspended != old_susp) {
|
||||
thread_set.suspend(status.suspended);
|
||||
}
|
||||
boinc_sleep(0.1);
|
||||
#else
|
||||
boinc_sleep(1.0);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
BOINC_OPTIONS options;
|
||||
int nthreads = DEFAULT_NTHREADS;
|
||||
double start_time = dtime();
|
||||
|
||||
boinc_options_defaults(options);
|
||||
options.direct_process_action = false;
|
||||
boinc_init_options(&options);
|
||||
boinc_get_status(&status);
|
||||
options.direct_process_action = 0;
|
||||
|
||||
for (i=1; i<argc; i++) {
|
||||
for (int i=1; i<argc; i++) {
|
||||
if (!strcmp(argv[i], "--nthreads")) {
|
||||
nthreads = atoi(argv[++i]);
|
||||
} else {
|
||||
|
@ -188,26 +226,42 @@ int main(int argc, char** argv) {
|
|||
}
|
||||
}
|
||||
|
||||
gflops_per_thread = TOTAL_GFLOPS/nthreads;
|
||||
units_per_thread = TOTAL_UNITS/nthreads;
|
||||
|
||||
for (i=0; i<nthreads; i++) {
|
||||
THREAD* t = new THREAD;
|
||||
t->index = i;
|
||||
t->start(worker);
|
||||
thread_set.threads.push_back(t);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
double f = thread_set.units_done()/((double)TOTAL_GFLOPS);
|
||||
boinc_fraction_done(f);
|
||||
if (thread_set.done()) break;
|
||||
bool old_susp = status.suspended;
|
||||
#ifdef _WIN32
|
||||
boinc_init_options(&options);
|
||||
main_thread(nthreads);
|
||||
#else
|
||||
options.send_status_msgs = 0;
|
||||
boinc_init_options(&options);
|
||||
int pid = fork();
|
||||
if (pid) { // parent
|
||||
BOINC_STATUS status;
|
||||
boinc_get_status(&status);
|
||||
if (status.suspended != old_susp) {
|
||||
thread_set.suspend(status.suspended);
|
||||
int exit_status;
|
||||
while (1) {
|
||||
bool old_susp = status.suspended;
|
||||
boinc_get_status(&status);
|
||||
if (status.quit_request || status.abort_request || status.no_heartbeat) {
|
||||
kill(pid, SIGKILL);
|
||||
exit(0);
|
||||
}
|
||||
if (status.suspended != old_susp) {
|
||||
kill(pid, status.suspended?SIGSTOP:SIGCONT);
|
||||
}
|
||||
if (waitpid(pid, &exit_status, WNOHANG) == pid) {
|
||||
break;
|
||||
}
|
||||
boinc_sleep(0.1);
|
||||
}
|
||||
boinc_sleep(0.1);
|
||||
} else { // child (worker)
|
||||
memset(&options, 0, sizeof(options));
|
||||
options.send_status_msgs = 1;
|
||||
boinc_init_options(&options);
|
||||
main_thread(nthreads);
|
||||
}
|
||||
#endif
|
||||
|
||||
double elapsed_time = dtime()-start_time;
|
||||
fprintf(stderr,
|
||||
"All done. Used %d threads. Elapsed time %f\n",
|
||||
|
|
Loading…
Reference in New Issue