#include <vpr/Thread/ThreadPool.h>
Collaboration diagram for vpr::ThreadPool:

Public Member Functions | |
| ThreadPool (int numToStartWith=1) | |
| Constructor. | |
| ~ThreadPool () | |
| void | startFunc (thread_func_t func, void *arg=NULL) |
| Gives a function to the processes. | |
| void | startFunc (BaseThreadFunctor *theFunctor, void *argument) |
| void | startFunc (BaseThreadFunctor *theFunctor, const bool deleteFunctor=false) |
| OneThread * | addThread () |
| void | threadLoop (void *theThreadAsVoid) |
| Body of a general-purpose child process. | |
| void | threadSleep (OneThread *theThread) |
| Waits for work to do. | |
| OneThread * | getThread () |
| Gets a process to run. | |
| void | wait () |
| Waits until all threads are done doing their work. | |
| void | printList () |
Outline of thread pool:
|-Need to initialize all thread given a parameter to the constructor of the # to use
|-Need a function to block until all work is done that has started
|-Just repetiviely add work to the process
Definition at line 104 of file ThreadPool.h.
| vpr::ThreadPool::ThreadPool | ( | int | numToStartWith = 1 |
) |
Constructor.
Definition at line 74 of file ThreadPool.cpp.
References addThread(), vpr::MutexPosix::release(), vprDBG_ALL(), vprDBG_DETAILED_LVL, vprDBG_HVERB_LVL, vprDEBUG, and vprDEBUG_FLUSH.
00074 : readyThreads(0) 00075 { 00076 //DebugLock.acquire(); 00077 vprDEBUG(vprDBG_ALL, vprDBG_DETAILED_LVL) 00078 << "vprThreadPool::vprThreadPool: Entering.\n" << vprDEBUG_FLUSH; 00079 vprDEBUG(vprDBG_ALL, vprDBG_HVERB_LVL) 00080 << "\tvprThreadPool::vprThreadPool: Number threads: " << numToStartWith 00081 << std::endl << vprDEBUG_FLUSH; 00082 //DebugLock.release(); 00083 00084 listHead = NULL; 00085 workingCount = 0; 00086 listLock.release(); // release threadList 00087 finishedLock.release(); // Initialize if to threads being done 00088 00089 //-- Start the initial # of threads ---// 00090 for ( int index=0;index < numToStartWith;index++ ) 00091 { 00092 addThread(); 00093 } 00094 }
| vpr::ThreadPool::~ThreadPool | ( | ) |
Definition at line 96 of file ThreadPool.cpp.
References vpr::OneThread::next, and vpr::OneThread::thread.
00097 { 00098 OneThread* cur_thread = listHead; 00099 while ( cur_thread != NULL ) 00100 { 00101 delete cur_thread->thread; 00102 cur_thread->thread = NULL; 00103 OneThread* old_thread = cur_thread; 00104 cur_thread = cur_thread->next; 00105 delete old_thread; 00106 } 00107 }
| void vpr::ThreadPool::startFunc | ( | thread_func_t | func, | |
| void * | arg = NULL | |||
| ) | [inline] |
Gives a function to the processes.
Start a function going asynchronously. Called by master process.
Definition at line 116 of file ThreadPool.h.
Referenced by startFunc().
00117 { 00118 this->startFunc(new ThreadNonMemberFunctor(func, arg), true); 00119 }
| void vpr::ThreadPool::startFunc | ( | BaseThreadFunctor * | theFunctor, | |
| void * | argument | |||
| ) | [inline] |
Definition at line 121 of file ThreadPool.h.
References vpr::BaseThreadFunctor::setArg(), and startFunc().
00122 { 00123 theFunctor->setArg(argument); 00124 this->startFunc(theFunctor); 00125 }
| void vpr::ThreadPool::startFunc | ( | BaseThreadFunctor * | theFunctor, | |
| const bool | deleteFunctor = false | |||
| ) | [inline] |
Definition at line 127 of file ThreadPool.h.
References vpr::OneThread::deleteFunctor, vpr::OneThread::functor, getThread(), vpr::SemaphorePosix::release(), and vpr::OneThread::threadWait.
00129 { 00130 OneThread* theThread = getThread(); 00131 00132 theThread->deleteFunctor = deleteFunctor; 00133 theThread->functor = theFunctor; /* set address of func to exec */ 00134 theThread->threadWait.release(); /* wake up sleeping process */ 00135 //delete theFunctor; 00136 }
| OneThread * vpr::ThreadPool::addThread | ( | ) |
Definition at line 222 of file ThreadPool.cpp.
References vpr::OneThread::next, printList(), vpr::ThreadPosix::self(), vpr::OneThread::thread, threadLoop(), vprDBG_ALL(), vprDBG_DETAILED_LVL, vprDBG_HVERB_LVL, vprDEBUG, and vprDEBUG_FLUSH.
Referenced by ThreadPool().
00223 { 00224 static int numTimes = 0; 00225 // DebugLock.acquire(); 00226 vprDEBUG(vprDBG_ALL, vprDBG_DETAILED_LVL) << Thread::self() 00227 << " vpr::ThreadPool::addThread: Entering: " << ++numTimes << std::endl 00228 << vprDEBUG_FLUSH; 00229 // DebugLock.release(); 00230 00231 Guard<Mutex> guard(listLock); // Protect the head 00232 00233 //OneThread* newThread = new (this->getMyMemPool()->allocate(sizeof(OneThread))) OneThread; // Used placement new 00234 OneThread* newThread = new OneThread; 00235 newThread->next = NULL; 00236 00237 // ThreadMemberFunctor<ThreadPool>* memberFunctor = new ThreadMemberFunctor<ThreadPool>(this, ThreadPool::threadLoop, (void*)newThread); 00238 ThreadMemberFunctor<ThreadPool>* memberFunctor = new ThreadMemberFunctor<ThreadPool>(this, &ThreadPool::threadLoop, (void*)newThread); 00239 00240 newThread->thread = new Thread(memberFunctor); 00241 00242 // DebugLock.acquire(); 00243 vprDEBUG(vprDBG_ALL, vprDBG_HVERB_LVL) << newThread->thread 00244 << " vprThreadPool::addThread: List at end\n" << vprDEBUG_FLUSH; 00245 printList(); 00246 // DebugLock.release(); 00247 00248 return listHead; 00249 }
| void vpr::ThreadPool::threadLoop | ( | void * | theThreadAsVoid | ) |
Body of a general-purpose child process.
The argument, which must be declared void* to match the func prototype, is the vpr::OneThread structure that represents this process. The contents of that struct, in particular threadWait(), MUST be initialized by the parent.
Definition at line 118 of file ThreadPool.cpp.
References vpr::MutexPosix::acquire(), vpr::OneThread::functor, vpr::MutexPosix::release(), vpr::ThreadPosix::self(), threadSleep(), vprDBG_ALL(), vprDBG_DETAILED_LVL, vprDEBUG, and vprDEBUG_FLUSH.
Referenced by addThread().
00119 { 00120 // DebugLock.acquire(); 00121 vprDEBUG(vprDBG_ALL, vprDBG_DETAILED_LVL) << Thread::self() 00122 << " vpr::ThreadPool::threadLoop: Entering." 00123 << std::endl << vprDEBUG_FLUSH; 00124 // vprDEBUG(vprDBG_ALL, vprDBG_HVERB_LVL) << Thread::self() 00125 // << " vpr::ThreadPool::threadLoop: theThreadAsVoid:" 00126 // << theThreadAsVoid << endl << vprDEBUG_FLUSH; 00127 // DebugLock.release(); 00128 00129 listLock.acquire(); 00130 listLock.release(); // Do this to make sure addThread is done 00131 00132 OneThread* myThread = (OneThread*)theThreadAsVoid; 00133 00134 #ifdef VPR_USE_IRIX_SPROC 00135 00136 // --- SIGNAL Handlers ---- // 00137 prctl(PR_TERMCHILD); // What should I do with. FIX - Allen 00138 #endif 00139 00140 for ( ;; ) 00141 { 00142 // --- WAIT FOR WORK --- // 00143 threadSleep(myThread); 00144 // ASSERT: We now have work to do... 00145 // --- PROCESS ENTRY OVERHEAD --- // 00146 workingCountLock.acquire(); // Get access to the working thread count 00147 if ( workingCount == 0 ) 00148 { 00149 finishedLock.acquire(); // Now there are threads working 00150 } 00151 workingCount = workingCount + 1; // Update thread count 00152 workingCountLock.release(); 00153 00154 // --- DO THE WORK --- // 00155 myThread->functor->operator()(); 00156 00157 // --- PROCESS EXIT OVERHEAD --- // 00158 workingCountLock.acquire(); // Get access to the working count 00159 workingCount = workingCount - 1; 00160 if ( workingCount == 0 ) 00161 { 00162 finishedLock.release(); // Now there are no threads working 00163 } 00164 workingCountLock.release(); 00165 } 00166 }
| void vpr::ThreadPool::threadSleep | ( | OneThread * | theThread | ) |
Waits for work to do.
Put a vpr::OneThread structure on the ready list and sleep on it. Called by a child process when its work is done.
Definition at line 172 of file ThreadPool.cpp.
References vpr::SemaphorePosix::acquire(), vpr::MutexPosix::acquire(), vpr::OneThread::next, vpr::SemaphorePosix::release(), vpr::MutexPosix::release(), and vpr::OneThread::threadWait.
Referenced by threadLoop().
00173 { 00174 listLock.acquire(); // acquire exclusive rights to threadList 00175 theThread->next = listHead; // put self on head of the list 00176 listHead = theThread; 00177 listLock.release(); // release threadList 00178 00179 readyThreads.release(); // notify master, at least 1 on the list 00180 00181 theThread->threadWait.acquire(); // sleep until master needs/releases me 00182 }
| OneThread * vpr::ThreadPool::getThread | ( | ) |
Gets a process to run.
Acquire a vpr::OneThread structure from the ready list, waiting if necessary. Called by the master process as part of dispatching a thread.
Definition at line 189 of file ThreadPool.cpp.
References vpr::MutexPosix::acquire(), vpr::SemaphorePosix::acquire(), vpr::OneThread::next, and vpr::MutexPosix::release().
Referenced by startFunc().
00190 { 00191 OneThread* theThread; 00192 00193 readyThreads.acquire(); // wait until at least 1 thread is free 00194 00195 listLock.acquire(); // acquire exclusive rights to threadList 00196 theThread = listHead; // get address of first free OneThread 00197 listHead = theThread->next; // make next in list, the head of list 00198 listLock.release(); // release threadList 00199 00200 return theThread; 00201 }
| void vpr::ThreadPool::wait | ( | ) | [inline] |
Waits until all threads are done doing their work.
barrier() in version 0.92.1 to deal with a case where that name is a preprocessor macro.Definition at line 169 of file ThreadPool.h.
References vpr::MutexPosix::acquire(), and vpr::MutexPosix::release().
00170 { 00171 finishedLock.acquire(); // Get the lock that means threads done 00172 finishedLock.release(); // Reset it to done 00173 }
| void vpr::ThreadPool::printList | ( | ) |
Definition at line 205 of file ThreadPool.cpp.
References vpr::OneThread::next.
Referenced by addThread().
00206 { 00207 OneThread* curThread = listHead; 00208 int counter = 0; 00209 00210 std::cerr << "----- Thread List -----\n"; 00211 00212 while ( curThread != NULL ) 00213 { 00214 std::cerr << "Thread: " << counter++ << std::endl; 00215 std::cerr << "\tpid: " << *curThread << std::endl; 00216 curThread = curThread->next; 00217 } 00218 }
1.5.1