MGR: Async RPCs: RPC thread is now a joinable thread which does one RPC and exits. Main thread creates a new RPC thread for each RPC request.

svn path=/trunk/boinc/; revision=16423
This commit is contained in:
Charlie Fenton 2008-11-05 06:03:13 +00:00
parent 90a0ebe69d
commit 5a86e7529e
4 changed files with 102 additions and 145 deletions

View File

@ -9184,4 +9184,15 @@ Rom 4 Nov 2008
clientgui/
BOINCGUIApp.cpp
Charlie 4 Nov 2008
- MGR: Async RPCs: RPC thread is now a joinable thread which does
one RPC and exits. Main thread creates a new RPC thread for
each RPC request after waiting for any previous RPC thread to
exit. This simplifies the logic, eliminates Yield(), Sleep()
and nanosleep() calls from RPC threadm and will hopefully
eliminate exess CPU usage on Fedora.
clientgui/
AsyncRPC.cpp
MainDocument.cpp, .h

View File

@ -97,76 +97,46 @@ int AsyncRPC::RPC_Wait(RPC_SELECTOR which_rpc, void *arg1, void *arg2,
}
RPCThread::RPCThread(CMainDocument *pDoc) : wxThread() {
RPCThread::RPCThread(CMainDocument *pDoc) : wxThread(wxTHREAD_JOINABLE) {
m_pDoc = pDoc;
}
void RPCThread::OnExit() {
// Tell CMainDocument that thread has gracefully ended
m_pDoc->m_RPCThread = NULL;
}
// We don't need critical sections because:
// 1. CMainDocument never modifies mDoc->current_rpc_request while the
// async RPC thread is using it.
// 2. The async RPC thread never modifies either mDoc->current_rpc_request
// or the vector of requests mDoc->RPC_requests.
void *RPCThread::Entry() {
int retval;
int retval = 0;
CRPCFinishedEvent RPC_done_event( wxEVT_RPC_FINISHED );
ASYNC_RPC_REQUEST *current_request = m_pDoc->GetCurrentRPCRequest();
// check if we were asked to exit
while(!TestDestroy()) {
if (! m_pDoc->IsConnected()) {
#ifdef __WXMSW__
Sleep(1);
#else
timespec ts = {0, 1000000}; /// 1 imllisecond
nanosleep(&ts, NULL); /// 1 imllisecond or less
#endif
// Do nothing if no active RPC request (should never happen)
if (m_pDoc->GetCurrentRPCRequest()->isActive) {
// check if we were asked to exit
if(! TestDestroy()) {
retval = ProcessRPCRequest();
}
// Wait until CMainDocument issues next RPC request
if (!m_pDoc->GetCurrentRPCRequest()->isActive) {
#ifdef __WXMSW__ // Until we can suspend the thread without Deadlock on Windows
Sleep(1);
#elif defined(__WXMAC__)
Yield();
#else
// Some Linux systems may not support POSIX sched_yield(),
// in which case wxThread::Yield() returns immediately.
timespec ts = {0, 1}; /// 1 nanosecond
nanosleep(&ts, NULL); /// 1 nanosecond or less
#endif
continue;
}
retval = ProcessRPCRequest();
wxPostEvent( wxTheApp, RPC_done_event );
}
#ifndef __WXMSW__ // Deadlocks on Windows
// Use a critical section to prevent a crash during
// manager shutdown due to a rare race condition
m_pDoc->m_critsect.Enter();
m_pDoc->m_critsect.Leave();
#endif // !!__WXMSW__ // Deadlocks on Windows
// We don't need critical sections because the RPC thread is
// joinable and the main thread calls Wait().
// wxCriticalSectionLocker(m_pDoc->m_critsect); // Deactivation is an atomic operation
current_request->retval = retval;
current_request->isActive = false;
wxPostEvent( wxTheApp, RPC_done_event );
// Tell CMainDocument that thread has gracefully ended
m_pDoc->m_RPCThread = NULL;
Exit();
return NULL;
}
int RPCThread::ProcessRPCRequest() {
int retval = 0;
ASYNC_RPC_REQUEST *current_request;
ASYNC_RPC_REQUEST *current_request = m_pDoc->GetCurrentRPCRequest();
current_request = m_pDoc->GetCurrentRPCRequest();
switch (current_request->which_rpc) {
// RPC_SELECTORS with no arguments
case RPC_RUN_BENCHMARKS:
@ -391,20 +361,10 @@ int RPCThread::ProcessRPCRequest() {
break;
}
// Deactivation is an atomic operation
current_request->retval = retval;
current_request->isActive = false;
return retval;
}
// We don't need critical sections (except when exiting Manager) because:
// 1. CMainDocument never modifies mDoc->current_rpc_request while the
// async RPC thread is using it.
// 2. The async RPC thread never modifies either mDoc->current_rpc_request
// or the vector of requests mDoc->RPC_requests.
// TODO: combine RPC requests for different buffers, then just copy the buffer.
int CMainDocument::RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority) {
@ -445,16 +405,29 @@ int CMainDocument::RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority) {
// Start this RPC if no other RPC is already in progress.
if (RPC_requests.size() == 1) {
// Make sure activation is an atomic operation
if (m_RPCThread) {
m_RPCThread->Wait();
}
// We don't need critical sections because the RPC thread is
// joinable and the main thread calls Wait().
// m_critsect.Enter(); // Make sure activation is an atomic operation
request.isActive = false;
current_rpc_request = request;
current_rpc_request.isActive = true;
wxASSERT(m_RPCThread == NULL);
m_RPCThread = new RPCThread(this);
wxASSERT(m_RPCThread);
retval2 = m_RPCThread->Create();
wxASSERT(!retval2);
retval2 = m_RPCThread->Run();
wxASSERT(!retval2);
// m_critsect.Leave(); // Make sure activation is an atomic operation
}
#ifndef __WXMSW__ // Deadlocks on Windows
if (current_rpc_request.isActive && m_RPCThread->IsPaused()) {
m_RPCThread->Resume();
}
#endif // !!__WXMSW__ // Deadlocks on Windows
// If this is a user-initiated event wait for completion but show
// a dialog allowing the user to cancel.
@ -494,13 +467,6 @@ int CMainDocument::RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority) {
// Allow RPC thread to run while we wait for it.
if (!current_rpc_request.isActive) {
HandleCompletedRPC();
#ifndef __WXMSW__ // Deadlocks on Windows
} else {
// for safety
if (m_RPCThread->IsPaused()) {
m_RPCThread->Resume();
}
#endif // __WXMSW__ // Deadlocks on Windows
}
} else {
// RPC_WAIT_DLG_DELAY has expired; check if Manager is minimized.
@ -512,6 +478,8 @@ int CMainDocument::RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority) {
// NOTE: CBOINCGUIApp::FilterEvent() blocks those events
// which might cause posting of more RPC requests while
// we are in this loop, to prevent undesirable recursion.
// It does allow wxEVT_RPC_FINISHED so we don't need to
// explicity call HandleCompletedRPC() here.
//
keepLooping = true;
if (wxGetApp().Pending()) {
@ -547,29 +515,11 @@ int CMainDocument::RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority) {
// This is ugly but necessary. We must then reconnect and
// start a new RPC thread.
if (current_rpc_request.isActive) {
current_rpc_request.isActive = false;
m_RPCThread->Pause(); // Needed on Windows
rpcClient.close();
m_RPCThread->Kill();
#ifdef __WXMSW__
m_RPCThread->Delete(); // Needed on Windows, crashes on Mac/Linux
#endif
m_RPCThread = NULL;
RPC_requests.clear();
current_rpc_request.clear();
m_bNeedRefresh = false;
m_bNeedTaskBarRefresh = false;
KillRPCThread();
// We will be reconnected to the same client (if possible) by
// CBOINCDialUpManager::OnPoll() and CNetworkConnection::Poll().
m_pNetworkConnection->SetStateDisconnected();
m_RPCThread = new RPCThread(this);
wxASSERT(m_RPCThread);
retval2 = m_RPCThread->Create();
wxASSERT(!retval2);
retval2 = m_RPCThread->Run();
wxASSERT(!retval2);
// m_RPCThread->Pause();
}
}
if (m_RPCWaitDlg) {
@ -583,32 +533,45 @@ int CMainDocument::RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority) {
}
void CMainDocument::KillRPCThread() {
// We don't need critical sections because the RPC thread is
// joinable and the main thread calls Wait().
// wxCriticalSectionLocker(this->m_critsect);
if (!m_RPCThread) {
return;
}
current_rpc_request.isActive = false;
m_RPCThread->Pause(); // May be needed on Windows ??
rpcClient.close();
m_RPCThread->Kill();
#ifdef __WXMSW__
m_RPCThread->Delete(); // My be needed on Windows, but may crash on Mac/Linux ??
#endif
m_RPCThread = NULL;
RPC_requests.clear();
current_rpc_request.clear();
m_bNeedRefresh = false;
m_bNeedTaskBarRefresh = false;
}
void CMainDocument::OnRPCComplete(CRPCFinishedEvent&) {
HandleCompletedRPC();
}
void CMainDocument::HandleCompletedRPC() {
int retval;
int retval = 0, retval2 = 0;
int i, n, requestIndex = -1;
bool stillWaitingForPendingRequests = false;
if(current_rpc_request.isActive) return;
if (current_rpc_request.isActive) return;
// We can get here either via a CRPCFinishedEvent event posted
// by the RPC thread or by a call from RequestRPC. If we were
// called from RequestRPC, the CRPCFinishedEvent will still be
// on the event queue, so we get called twice. Check for this here.
if (current_rpc_request.which_rpc == 0) return; // already handled by a call from RequestRPC
#ifndef __WXMSW__ // Deadlocks on Windows
m_RPCThread->Pause();
// Pause() does not take effect until the RPC thread calls TestDestroy()
// We must wait for that to happen to avoid a possible race condition.
do {
// TODO: is there a way for main UNIX thread to yield wih no minimum delay?
timespec ts = {0, 1}; /// 1 nanosecond
nanosleep(&ts, NULL); /// 1 nanosecond or less
} while (!m_RPCThread->IsPaused());
#endif
// Find our completed request in the queue
n = RPC_requests.size();
@ -826,15 +789,25 @@ void CMainDocument::HandleCompletedRPC() {
// We can't start this until finished processing the previous RPC's
// event because the two requests may write into the same buffer.
if (RPC_requests.size() > 0) {
// Make sure activation is an atomic operation
if (m_RPCThread) {
m_RPCThread->Wait();
}
// We don't need critical sections because the RPC thread is
// joinable and the main thread calls Wait().
// wxCriticalSectionLocker(this->m_critsect); // Make sure activation is an atomic operation
RPC_requests[0].isActive = false;
current_rpc_request = RPC_requests[0];
current_rpc_request.isActive = true;
#ifndef __WXMSW__ // Deadlocks on Windows
if (m_RPCThread->IsPaused()) {
m_RPCThread->Resume();
}
#endif // ! __WXMSW__ // Deadlocks on Windows
wxASSERT(m_RPCThread == NULL);
m_RPCThread = new RPCThread(this);
wxASSERT(m_RPCThread);
retval2 = m_RPCThread->Create();
wxASSERT(!retval2);
retval2 = m_RPCThread->Run();
wxASSERT(!retval2);
}
}

View File

@ -431,23 +431,13 @@ int CMainDocument::OnInit() {
m_pClientManager = new CBOINCClientManager();
wxASSERT(m_pClientManager);
m_RPCThread = NULL;
m_RPCWaitDlg = NULL;
m_bWaitingForRPC = false;
m_bNeedRefresh = false;
m_bNeedTaskBarRefresh = false;
current_rpc_request.clear();
m_RPCThread = new RPCThread(this);
wxASSERT(m_RPCThread);
iRetVal = m_RPCThread->Create();
wxASSERT(!iRetVal);
m_RPCThread->Run();
#ifndef __WXMSW__
m_RPCThread->Pause();
#endif
return iRetVal;
}
@ -466,25 +456,7 @@ int CMainDocument::OnExit() {
}
if (m_RPCThread) {
// Use a critical section to prevent a crash during
// manager shutdown due to a rare race condition
#ifndef __WXMSW__
m_critsect.Enter();
m_RPCThread->Delete();
// On some platforms, Delete() takes effect only when thread calls TestDestroy()
m_RPCThread->Resume();
m_critsect.Leave();
#endif
wxStopWatch ThreadDeleteTimer = wxStopWatch();
// RPC thread sets m_RPCThread to NULL when it exits
while (m_RPCThread) {
// Allow 5 seconds for RPC thread to exit gracefully
if (ThreadDeleteTimer.Time() > 5000) {
m_RPCThread->Pause(); // Needed on Windows
m_RPCThread->Kill();
break;
}
}
KillRPCThread();
m_RPCThread = NULL;
}

View File

@ -177,22 +177,23 @@ public:
public:
int RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority = false);
void OnRPCComplete(CRPCFinishedEvent& event);
void HandleCompletedRPC();
ASYNC_RPC_REQUEST* GetCurrentRPCRequest() { return &current_rpc_request; }
bool WaitingForRPC() { return m_bWaitingForRPC; }
wxDialog* GetRPCWaitDialog() { return m_RPCWaitDlg; }
// void TestAsyncRPC(); // For testing Async RPCs
RPCThread* m_RPCThread;
wxCriticalSection m_critsect;
bool m_bNeedRefresh;
bool m_bNeedTaskBarRefresh;
private:
void HandleCompletedRPC();
void KillRPCThread();
int CopyProjectsToStateFile(PROJECTS& p, CC_STATE& state);
ASYNC_RPC_REQUEST current_rpc_request;
AsyncRPCDlg* m_RPCWaitDlg;
std::vector<ASYNC_RPC_REQUEST> RPC_requests;
bool m_bWaitingForRPC;
bool m_bNeedRefresh;
bool m_bNeedTaskBarRefresh;
//
// Project Tab