David 11 Dec 2012

- Win process control (affects API and wrapper):
		Since Win doesn't have an API for process suspend/resume,
		we were suspending processes by
			1) enumerating all the threads in the system (typically several thousand)
			2) suspending those belonging to the given process
		The problem: for each thread, the code was calling a function
		in diagnostics_win.cpp to see if the thread was exempted from suspension.
		This check (which is unnecessary anyway if we're suspending another process)
		was surrounded by a semaphore acquire/release.
		The result: performance problems.
		It could take a minute to suspend the threads.
		Solution:
			1) do the check for exemption only if we're suspending threads
				in our own process (i.e. from the API)
			2) if we're suspending multiple processes, enumerate the threads
				only once, and see if each one belongs to any of the processes
			3) have the wrapper elevate itself to normal priority.
				Otherwise it can get preempted for long periods,
				sometimes in the middle of scanning the threads.
		Note: post-9x versions of Win have a process group API that includes suspend/resume.
		We'll switch to this soon.
This commit is contained in:
David Anderson 2012-12-11 16:05:40 -08:00 committed by Oliver Bock
parent 7f36658d4b
commit 17a4ab8db9
7 changed files with 104 additions and 77 deletions

View File

@ -860,10 +860,10 @@ int boinc_wu_cpu_time(double& cpu_t) {
int suspend_activities() {
#ifdef _WIN32
static DWORD pid;
if (!pid) pid = GetCurrentProcessId();
static vector<int> pids;
if (options.multi_thread) {
suspend_or_resume_threads(pid, timer_thread_id, false);
if (pids.size() == 0) pids.push_back(GetCurrentProcessId());
suspend_or_resume_threads(pids, timer_thread_id, false, true);
} else {
SuspendThread(worker_thread_handle);
}
@ -872,7 +872,7 @@ int suspend_activities() {
// suspension is done by signal handler in worker thread
//
if (options.multi_process) {
suspend_or_resume_descendants(0, false);
suspend_or_resume_descendants(false);
}
#endif
return 0;
@ -880,16 +880,16 @@ int suspend_activities() {
int resume_activities() {
#ifdef _WIN32
static DWORD pid;
if (!pid) pid = GetCurrentProcessId();
static vector<int> pids;
if (options.multi_thread) {
suspend_or_resume_threads(pid, timer_thread_id, true);
if (pids.size() == 0) pids.push_back(GetCurrentProcessId());
suspend_or_resume_threads(pids, timer_thread_id, true, true);
} else {
ResumeThread(worker_thread_handle);
}
#else
if (options.multi_process) {
suspend_or_resume_descendants(0, true);
suspend_or_resume_descendants(true);
}
#endif
return 0;

View File

@ -47,13 +47,6 @@
using std::string;
using std::vector;
bool in_vector(int n, vector<int>& v) {
for (unsigned int i=0; i<v.size(); i++) {
if (v[i] == n) return true;
}
return false;
}
#ifndef _WIN32
jmp_buf resume;

View File

@ -40,11 +40,15 @@
#endif
#include "procinfo.h"
#include "str_util.h"
#include "util.h"
#include "proc_control.h"
using std::vector;
//#define DEBUG
static void get_descendants_aux(PROC_MAP& pm, int pid, vector<int>& pids) {
PROC_MAP::iterator i = pm.find(pid);
if (i == pm.end()) return;
@ -67,64 +71,81 @@ void get_descendants(int pid, vector<int>& pids) {
retval = procinfo_setup(pm);
if (retval) return;
get_descendants_aux(pm, pid, pids);
#ifdef DEBUG
fprintf(stderr, "descendants of %d:\n", pid);
for (unsigned int i=0; i<pids.size(); i++) {
fprintf(stderr, " %d\n", pids[i]);
}
#endif
}
#ifdef _WIN32
// signature of OpenThread()
//
typedef HANDLE (WINAPI *tOT)(
DWORD dwDesiredAccess, BOOL bInheritHandle, DWORD dwThreadId
);
// Suspend or resume the threads in a given process,
// Suspend or resume the threads in a set of processes,
// but don't suspend 'calling_thread'.
//
// The only way to do this on Windows is to enumerate
// all the threads in the entire system,
// and find those belonging to the process (ugh!!)
// and find those belonging to one of the process (ugh!!)
//
int suspend_or_resume_threads(
DWORD pid, DWORD calling_thread_id, bool resume
vector<int>pids, DWORD calling_thread_id, bool resume, bool check_exempt
) {
HANDLE threads, thread;
static HMODULE hKernel32Lib = NULL;
THREADENTRY32 te = {0};
static tOT pOT = NULL;
// Dynamically link to the proper function pointers.
if (!hKernel32Lib) {
hKernel32Lib = GetModuleHandleA("kernel32.dll");
}
if (!pOT) {
pOT = (tOT) GetProcAddress( hKernel32Lib, "OpenThread" );
}
if (!pOT) {
return -1;
#ifdef DEBUG
fprintf(stderr, "start: check_exempt %d %s\n", check_exempt, precision_time_to_string(dtime()));
fprintf(stderr, "%s processes", resume?"resume":"suspend");
for (unsigned int i=0; i<pids.size(); i++) {
fprintf(stderr, " %d", pids[i]);
}
fprintf(stderr, "\n");
#endif
threads = CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0);
if (threads == INVALID_HANDLE_VALUE) return -1;
if (threads == INVALID_HANDLE_VALUE) {
fprintf(stderr, "CreateToolhelp32Snapshot failed\n");
return -1;
}
te.dwSize = sizeof(THREADENTRY32);
if (!Thread32First(threads, &te)) {
fprintf(stderr, "Thread32First failed\n");
CloseHandle(threads);
return -1;
}
do {
if (!diagnostics_is_thread_exempt_suspend(te.th32ThreadID)) continue;
if (check_exempt && !diagnostics_is_thread_exempt_suspend(te.th32ThreadID)) {
#ifdef DEBUG
fprintf(stderr, "thread is exempt\n");
#endif
continue;
}
//fprintf(stderr, "thread %d PID %d %s\n", te.th32ThreadID, te.th32OwnerProcessID, precision_time_to_string(dtime()));
if (te.th32ThreadID == calling_thread_id) continue;
if (te.th32OwnerProcessID == pid) {
thread = pOT(THREAD_SUSPEND_RESUME, FALSE, te.th32ThreadID);
resume ? ResumeThread(thread) : SuspendThread(thread);
CloseHandle(thread);
}
if (!in_vector(te.th32OwnerProcessID, pids)) continue;
thread = OpenThread(THREAD_SUSPEND_RESUME, FALSE, te.th32ThreadID);
if (resume) {
DWORD n = ResumeThread(thread);
#ifdef DEBUG
fprintf(stderr, "ResumeThread returns %d\n", n);
#endif
} else {
DWORD n = SuspendThread(thread);
#ifdef DEBUG
fprintf(stderr, "SuspendThread returns %d\n", n);
#endif
}
CloseHandle(thread);
} while (Thread32Next(threads, &te));
CloseHandle (threads);
#ifdef DEBUG
fprintf(stderr, "end: %s\n", precision_time_to_string(dtime()));
#endif
return 0;
}
@ -192,39 +213,32 @@ void kill_descendants(int child_pid) {
}
#endif
void suspend_or_resume_all(vector<int>& pids, bool resume) {
for (unsigned int i=0; i<pids.size(); i++) {
#ifdef _WIN32
suspend_or_resume_threads(pids[i], 0, resume);
#else
kill(pids[i], resume?SIGCONT:SIGSTOP);
#endif
}
}
// suspend/resume the descendants of the given process
// suspend/resume the descendants of the calling process
// (or if pid==0, the calling process)
//
void suspend_or_resume_descendants(int pid, bool resume) {
void suspend_or_resume_descendants(bool resume) {
vector<int> descendants;
if (!pid) {
#ifdef _WIN32
pid = GetCurrentProcessId();
#else
pid = getpid();
#endif
}
int pid = GetCurrentProcessId();
get_descendants(pid, descendants);
suspend_or_resume_all(descendants, resume);
suspend_or_resume_threads(descendants, 0, resume, false);
#else
int pid = getpid();
get_descendants(pid, descendants);
for (unsigned int i=0; i<descendants.size(); i++) {
kill(descendants[i], resume?SIGCONT:SIGSTOP);
}
#endif
}
// used by the wrapper
//
void suspend_or_resume_process(int pid, bool resume) {
#ifdef _WIN32
suspend_or_resume_threads(pid, 0, resume);
vector<int> pids;
pids.push_back(pid);
suspend_or_resume_threads(pids, 0, resume, false);
#else
::kill(pid, resume?SIGCONT:SIGSTOP);
#endif
}

View File

@ -28,11 +28,13 @@ extern bool any_process_exists(std::vector<int>& pids);
extern void kill_all(std::vector<int>& pids);
#ifdef _WIN32
extern void kill_descendants();
extern int suspend_or_resume_threads(DWORD pid, DWORD threadid, bool resume);
extern int suspend_or_resume_threads(
std::vector<int> pids, DWORD threadid, bool resume, bool check_exempt
);
#else
extern void kill_descendants(int child_pid=0);
#endif
extern void suspend_or_resume_descendants(int pid, bool resume);
extern void suspend_or_resume_descendants(bool resume);
extern void suspend_or_resume_process(int pid, bool resume);
#endif

View File

@ -31,6 +31,8 @@
#include <signal.h>
#endif
#include "util.h"
#include "procinfo.h"
using std::vector;
@ -64,13 +66,6 @@ void add_child_totals(PROCINFO& pi, PROC_MAP& pm, PROC_MAP::iterator i) {
}
}
static inline bool in_vector(int n, vector<int>& v) {
for (unsigned int i=0; i<v.size(); i++) {
if (n == v[i]) return true;
}
return false;
}
// Fill in the given PROCINFO (initially zero except for id)
// with totals from that process and all its descendants.
// Set PROCINFO.is_boinc_app for all of them.

View File

@ -61,6 +61,13 @@ extern void update_average(double, double, double, double, double&, double&);
extern int boinc_calling_thread_cpu_time(double&);
inline bool in_vector(int n, std::vector<int>& v) {
for (unsigned int i=0; i<v.size(); i++) {
if (n == v[i]) return true;
}
return false;
}
// fake a crash
//
extern void boinc_crash();

View File

@ -66,6 +66,12 @@
#include "regexp.h"
inline void debug_msg(const char* x) {
#if 0
fprintf(stderr, "%s\n", x);
#endif
}
#define JOB_FILENAME "job.xml"
#define CHECKPOINT_FILENAME "wrapper_checkpoint.txt"
@ -792,7 +798,7 @@ void TASK::kill() {
void TASK::stop() {
if (multi_process) {
suspend_or_resume_descendants(0, false);
suspend_or_resume_descendants(false);
} else {
suspend_or_resume_process(pid, false);
}
@ -801,7 +807,7 @@ void TASK::stop() {
void TASK::resume() {
if (multi_process) {
suspend_or_resume_descendants(0, true);
suspend_or_resume_descendants(true);
} else {
suspend_or_resume_process(pid, true);
}
@ -820,24 +826,30 @@ double TASK::cpu_time() {
void poll_boinc_messages(TASK& task) {
BOINC_STATUS status;
boinc_get_status(&status);
//fprintf(stderr, "wrapper: polling\n");
if (status.no_heartbeat) {
debug_msg("wrapper: kill");
task.kill();
exit(0);
}
if (status.quit_request) {
debug_msg("wrapper: quit");
task.kill();
exit(0);
}
if (status.abort_request) {
debug_msg("wrapper: abort");
task.kill();
exit(0);
}
if (status.suspended) {
if (!task.suspended) {
debug_msg("wrapper: suspend");
task.stop();
}
} else {
if (task.suspended) {
debug_msg("wrapper: resume");
task.resume();
}
}
@ -880,6 +892,10 @@ int main(int argc, char** argv) {
double checkpoint_cpu_time;
// total CPU time at last checkpoint
#ifdef _WIN32
SetPriorityClass(GetCurrentProcess(), NORMAL_PRIORITY_CLASS);
#endif
for (int j=1; j<argc; j++) {
if (!strcmp(argv[j], "--nthreads")) {
nthreads = atoi(argv[++j]);