MGR: Async RPCs: Use wxCondition to block RPC thread when it is idle instead of creating and a new RPC thread for each RPC

svn path=/trunk/boinc/; revision=16497
This commit is contained in:
Charlie Fenton 2008-11-15 13:21:08 +00:00
parent 98d6931d63
commit c219928655
5 changed files with 137 additions and 86 deletions

View File

@ -9448,3 +9448,14 @@ David 14 Nov 2008
rr_sim.cpp
lib/
util.h
Charlie 15 Nov 08
- MGR: Async RPCs: RPC thread is again a detached thread which stays
running until Manager exits. Use wxCondition to block RPC thread
when it is idle. This still avoids any Yield(), Sleep() and
calls from RPC thread and reduces CPU usage from creating a new
RPC thread for each RPC operation.
clientgui/
AsyncRPC.cpp, .h
MainDocument.cpp, .h

View File

@ -99,39 +99,66 @@ int AsyncRPC::RPC_Wait(RPC_SELECTOR which_rpc, void *arg1, void *arg2,
}
RPCThread::RPCThread(CMainDocument *pDoc) : wxThread(wxTHREAD_JOINABLE) {
RPCThread::RPCThread(CMainDocument *pDoc, wxMutex* pRPC_Thread_Mutex, wxCondition* pRPC_Thread_Condition) : wxThread() {
m_pDoc = pDoc;
m_pRPC_Thread_Mutex = pRPC_Thread_Mutex;
m_pRPC_Thread_Condition = pRPC_Thread_Condition;
}
void RPCThread::OnExit() {
// Tell CMainDocument that thread has gracefully ended
m_pDoc->m_RPCThread = NULL;
}
void *RPCThread::Entry() {
int retval = 0;
CRPCFinishedEvent RPC_done_event( wxEVT_RPC_FINISHED );
ASYNC_RPC_REQUEST *current_request = m_pDoc->GetCurrentRPCRequest();
ASYNC_RPC_REQUEST *current_request;
wxCondError condErr = wxCOND_NO_ERROR;
m_pRPC_Thread_Mutex->Lock();
// check if we were asked to exit
if(TestDestroy()) {
while(true) {
// Wait for main thread to wake us
// This does the following:
// (1) Unlocks the Mutex an puts the thread to sleep as an atomic operation.
// (2) On Signal from main thread, wakes and then automatically locks m_pRPC_Thread_Mutex again.
if (!m_pDoc->m_bShutDownRPCThread) {
condErr = m_pRPC_Thread_Condition->Wait();
wxASSERT(condErr == wxCOND_NO_ERROR);
}
current_request = m_pDoc->GetCurrentRPCRequest();
// check if we were asked to exit
if (TestDestroy()) {
current_request->retval = retval;
current_request->isActive = false;
#if 0 //ndef __WXMSW__ // Deadlocks on Windows
// Use a critical section to prevent a crash during
// manager shutdown due to a rare race condition
m_pDoc->m_critsect.Enter();
m_pDoc->m_critsect.Leave();
#endif // !!__WXMSW__ // Deadlocks on Windows
return NULL;
}
if (m_pDoc->m_bShutDownRPCThread) continue;
if (!current_request->isActive) continue; // Should never happen
retval = ProcessRPCRequest();
current_request->retval = retval;
current_request->isActive = false;
return NULL;
wxPostEvent( wxTheApp, RPC_done_event );
}
// Do nothing if no active RPC request (should never happen)
if (m_pDoc->GetCurrentRPCRequest()->isActive) {
retval = ProcessRPCRequest();
}
// We don't need critical sections because the RPC thread is
// joinable and the main thread calls Wait().
// wxCriticalSectionLocker(m_pDoc->m_critsect); // Deactivation is an atomic operation
current_request->retval = retval;
current_request->isActive = false;
wxPostEvent( wxTheApp, RPC_done_event );
return NULL;
}
@ -372,7 +399,8 @@ int RPCThread::ProcessRPCRequest() {
int CMainDocument::RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority) {
std::vector<ASYNC_RPC_REQUEST>::iterator iter;
int retval = 0, retval2 = 0;
int retval = 0;
wxMutexError mutexErr = wxMUTEX_NO_ERROR;
bool keepLooping = true;
if ( (request.rpcType < RPC_TYPE_WAIT_FOR_COMPLETION) ||
@ -408,30 +436,19 @@ int CMainDocument::RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority) {
// Start this RPC if no other RPC is already in progress.
if (RPC_requests.size() == 1) {
if (m_RPCThread) {
m_RPCThread->Wait();
delete m_RPCThread;
m_RPCThread = NULL;
}
// We don't need critical sections because the RPC thread is
// joinable and the main thread calls Wait().
// m_critsect.Enter(); // Make sure activation is an atomic operation
// Make sure activation is an atomic operation
request.isActive = false;
current_rpc_request = request;
current_rpc_request.isActive = true;
wxASSERT(m_RPCThread == NULL);
m_RPCThread = new RPCThread(this);
wxASSERT(m_RPCThread);
retval2 = m_RPCThread->Create();
wxASSERT(!retval2);
retval2 = m_RPCThread->Run();
wxASSERT(!retval2);
// m_critsect.Leave(); // Make sure activation is an atomic operation
// Wait for thread to unlock mutex with m_pRPC_Thread_Condition->Wait()
mutexErr = m_pRPC_Thread_Mutex->Lock(); // Blocks until thread unlocks the mutex
wxASSERT(mutexErr == wxMUTEX_NO_ERROR);
mutexErr = m_pRPC_Thread_Mutex->Unlock(); // Release the mutex so thread can lock it
wxASSERT(mutexErr == wxMUTEX_NO_ERROR);
m_pRPC_Thread_Condition->Signal(); // Unblock the thread
}
// If this is a user-initiated event wait for completion but show
@ -520,7 +537,12 @@ int CMainDocument::RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority) {
// This is ugly but necessary. We must then reconnect and
// start a new RPC thread.
if (current_rpc_request.isActive) {
KillRPCThread();
current_rpc_request.isActive = false;
rpcClient.close();
RPC_requests.clear();
current_rpc_request.clear();
m_bNeedRefresh = false;
m_bNeedTaskBarRefresh = false;
// We will be reconnected to the same client (if possible) by
// CBOINCDialUpManager::OnPoll() and CNetworkConnection::Poll().
@ -539,38 +561,39 @@ int CMainDocument::RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority) {
void CMainDocument::KillRPCThread() {
// We don't need critical sections because the RPC thread is
// joinable and the main thread calls Wait().
// wxCriticalSectionLocker(this->m_critsect);
wxMutexError mutexErr = wxMUTEX_NO_ERROR;
if (!m_RPCThread) {
return;
}
rpcClient.close();
// Wait up to RPC_KILL_DELAY for thread to exit on its own
wxStopWatch threadDelay = wxStopWatch();
while (threadDelay.Time() < RPC_KILL_DELAY) {
if (!current_rpc_request.isActive) {
m_RPCThread->Wait();
delete m_RPCThread;
m_RPCThread = NULL;
}
}
if (m_RPCThread) {
// If thread did not exit, kill it
m_RPCThread->Kill();
delete m_RPCThread;
m_RPCThread = NULL;
}
current_rpc_request.isActive = false;
RPC_requests.clear();
current_rpc_request.clear();
m_bNeedRefresh = false;
m_bNeedTaskBarRefresh = false;
m_bShutDownRPCThread = true;
// On some platforms, Delete() takes effect only when thread calls TestDestroy()
// Wait for thread to unlock mutex with m_pRPC_Thread_Condition->Wait()
mutexErr = m_pRPC_Thread_Mutex->Lock(); // Blocks until thread unlocks the mutex
wxASSERT(mutexErr == wxMUTEX_NO_ERROR);
mutexErr = m_pRPC_Thread_Mutex->Unlock(); // Release the mutex so thread can lock it
wxASSERT(mutexErr == wxMUTEX_NO_ERROR);
m_pRPC_Thread_Condition->Signal(); // Unblock the thread
wxStopWatch ThreadDeleteTimer = wxStopWatch();
// RPC thread sets m_RPCThread to NULL when it exits
while (m_RPCThread) {
// Wait up to RPC_KILL_DELAY for thread to exit on its own
if (ThreadDeleteTimer.Time() > RPC_KILL_DELAY) {
m_RPCThread->Kill();
break;
}
}
}
@ -579,11 +602,13 @@ void CMainDocument::OnRPCComplete(CRPCFinishedEvent&) {
}
void CMainDocument::HandleCompletedRPC() {
int retval = 0, retval2 = 0;
int retval = 0;
wxMutexError mutexErr = wxMUTEX_NO_ERROR;
int i, n, requestIndex = -1;
bool stillWaitingForPendingRequests = false;
if (current_rpc_request.isActive) return;
// We can get here either via a CRPCFinishedEvent event posted
// by the RPC thread or by a call from RequestRPC. If we were
// called from RequestRPC, the CRPCFinishedEvent will still be
@ -802,32 +827,22 @@ void CMainDocument::HandleCompletedRPC() {
current_rpc_request.clear();
if (m_RPCThread) {
m_RPCThread->Wait();
delete m_RPCThread;
m_RPCThread = NULL;
}
// Start the next RPC request.
// We can't start this until finished processing the previous RPC's
// event because the two requests may write into the same buffer.
if (RPC_requests.size() > 0) {
// We don't need critical sections because the RPC thread is
// joinable and the main thread calls Wait().
// wxCriticalSectionLocker(this->m_critsect); // Make sure activation is an atomic operation
// Make sure activation is an atomic operation
RPC_requests[0].isActive = false;
current_rpc_request = RPC_requests[0];
current_rpc_request.isActive = true;
wxASSERT(m_RPCThread == NULL);
m_RPCThread = new RPCThread(this);
wxASSERT(m_RPCThread);
retval2 = m_RPCThread->Create();
wxASSERT(!retval2);
retval2 = m_RPCThread->Run();
wxASSERT(!retval2);
// Wait for thread to unlock mutex with m_pRPC_Thread_Condition->Wait()
mutexErr = m_pRPC_Thread_Mutex->Lock(); // Blocks until thread unlocks the mutex
wxASSERT(mutexErr == wxMUTEX_NO_ERROR);
mutexErr = m_pRPC_Thread_Mutex->Unlock(); // Release the mutex so thread can lock it
wxASSERT(mutexErr == wxMUTEX_NO_ERROR);
m_pRPC_Thread_Condition->Signal(); // Unblock the thread
}
}

View File

@ -287,13 +287,15 @@ private:
class RPCThread : public wxThread
{
public:
RPCThread(CMainDocument *pDoc);
RPCThread(CMainDocument *pDoc, wxMutex* pRPC_Thread_Mutex, wxCondition* pRPC_Thread_Condition);
virtual void *Entry();
virtual void OnExit();
private:
int ProcessRPCRequest();
CMainDocument* m_pDoc;
wxMutex* m_pRPC_Thread_Mutex;
wxCondition* m_pRPC_Thread_Condition;
};

View File

@ -431,13 +431,27 @@ int CMainDocument::OnInit() {
m_pClientManager = new CBOINCClientManager();
wxASSERT(m_pClientManager);
m_RPCThread = NULL;
m_RPCWaitDlg = NULL;
m_bWaitingForRPC = false;
m_bNeedRefresh = false;
m_bNeedTaskBarRefresh = false;
m_bShutDownRPCThread = false;
current_rpc_request.clear();
m_pRPC_Thread_Mutex = new wxMutex();
wxASSERT(m_pRPC_Thread_Mutex);
m_pRPC_Thread_Condition = new wxCondition(*m_pRPC_Thread_Mutex);
m_RPCThread = new RPCThread(this, m_pRPC_Thread_Mutex, m_pRPC_Thread_Condition);
wxASSERT(m_RPCThread);
iRetVal = m_RPCThread->Create();
wxASSERT(!iRetVal);
m_RPCThread->Run();
return iRetVal;
}
@ -460,6 +474,12 @@ int CMainDocument::OnExit() {
m_RPCThread = NULL;
}
delete m_pRPC_Thread_Mutex;
m_pRPC_Thread_Mutex = NULL;
delete m_pRPC_Thread_Condition;
m_pRPC_Thread_Condition = NULL;
rpcClient.close();
if (m_pNetworkConnection) {

View File

@ -183,6 +183,7 @@ public:
// void TestAsyncRPC(); // For testing Async RPCs
RPCThread* m_RPCThread;
wxCriticalSection m_critsect;
bool m_bShutDownRPCThread;
private:
void HandleCompletedRPC();
@ -194,6 +195,8 @@ private:
bool m_bWaitingForRPC;
bool m_bNeedRefresh;
bool m_bNeedTaskBarRefresh;
wxMutex* m_pRPC_Thread_Mutex;
wxCondition* m_pRPC_Thread_Condition;
//
// Project Tab