vpr::ThreadPool Class Reference

A pool of threads to process user jobs. More...

#include <vpr/Thread/ThreadPool.h>

Collaboration diagram for vpr::ThreadPool:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ThreadPool (int numToStartWith=1)
 Constructor.
 ~ThreadPool ()
void startFunc (thread_func_t func, void *arg=NULL)
 Gives a function to the processes.
void startFunc (BaseThreadFunctor *theFunctor, void *argument)
void startFunc (BaseThreadFunctor *theFunctor, const bool deleteFunctor=false)
OneThreadaddThread ()
void threadLoop (void *theThreadAsVoid)
 Body of a general-purpose child process.
void threadSleep (OneThread *theThread)
 Waits for work to do.
OneThreadgetThread ()
 Gets a process to run.
void wait ()
 Waits until all threads are done doing their work.
void printList ()

Detailed Description

A pool of threads to process user jobs.

Outline of thread pool:

|-Need to initialize all thread given a parameter to the constructor of the # to use
|-Need a function to block until all work is done that has started
|-Just repetiviely add work to the process

Date:
February 5, 1997

Definition at line 104 of file ThreadPool.h.


Constructor & Destructor Documentation

vpr::ThreadPool::ThreadPool ( int  numToStartWith = 1  ) 

Constructor.

Definition at line 74 of file ThreadPool.cpp.

References addThread(), vpr::MutexPosix::release(), vprDBG_ALL(), vprDBG_DETAILED_LVL, vprDBG_HVERB_LVL, vprDEBUG, and vprDEBUG_FLUSH.

00074                                           : readyThreads(0)
00075 {
00076    //DebugLock.acquire();
00077    vprDEBUG(vprDBG_ALL, vprDBG_DETAILED_LVL)
00078       << "vprThreadPool::vprThreadPool: Entering.\n" << vprDEBUG_FLUSH;
00079    vprDEBUG(vprDBG_ALL, vprDBG_HVERB_LVL)
00080       << "\tvprThreadPool::vprThreadPool: Number threads: " << numToStartWith
00081       << std::endl << vprDEBUG_FLUSH;
00082    //DebugLock.release();
00083 
00084    listHead = NULL;
00085    workingCount = 0;
00086    listLock.release();             // release threadList
00087    finishedLock.release();         // Initialize if to threads being done
00088 
00089    //-- Start the initial # of threads ---//
00090    for ( int index=0;index < numToStartWith;index++ )
00091    {
00092       addThread();
00093    }
00094 }

vpr::ThreadPool::~ThreadPool (  ) 

Definition at line 96 of file ThreadPool.cpp.

References vpr::OneThread::next, and vpr::OneThread::thread.

00097 {
00098    OneThread* cur_thread = listHead;
00099    while ( cur_thread != NULL )
00100    {
00101       delete cur_thread->thread;
00102       cur_thread->thread = NULL;
00103       OneThread* old_thread = cur_thread;
00104       cur_thread = cur_thread->next;
00105       delete old_thread;
00106    }
00107 }


Member Function Documentation

void vpr::ThreadPool::startFunc ( thread_func_t  func,
void *  arg = NULL 
) [inline]

Gives a function to the processes.

Start a function going asynchronously. Called by master process.

Definition at line 116 of file ThreadPool.h.

Referenced by startFunc().

00117    {
00118       this->startFunc(new ThreadNonMemberFunctor(func, arg), true);
00119    }

void vpr::ThreadPool::startFunc ( BaseThreadFunctor theFunctor,
void *  argument 
) [inline]

Definition at line 121 of file ThreadPool.h.

References vpr::BaseThreadFunctor::setArg(), and startFunc().

00122    {
00123       theFunctor->setArg(argument);
00124       this->startFunc(theFunctor);
00125    }

void vpr::ThreadPool::startFunc ( BaseThreadFunctor theFunctor,
const bool  deleteFunctor = false 
) [inline]

Definition at line 127 of file ThreadPool.h.

References vpr::OneThread::deleteFunctor, vpr::OneThread::functor, getThread(), vpr::SemaphorePosix::release(), and vpr::OneThread::threadWait.

00129    {
00130       OneThread* theThread = getThread();
00131 
00132       theThread->deleteFunctor = deleteFunctor;
00133       theThread->functor = theFunctor;     /* set address of func to exec */
00134       theThread->threadWait.release();     /* wake up sleeping process */
00135       //delete theFunctor;
00136    }

OneThread * vpr::ThreadPool::addThread (  ) 

Definition at line 222 of file ThreadPool.cpp.

References vpr::OneThread::next, printList(), vpr::ThreadPosix::self(), vpr::OneThread::thread, threadLoop(), vprDBG_ALL(), vprDBG_DETAILED_LVL, vprDBG_HVERB_LVL, vprDEBUG, and vprDEBUG_FLUSH.

Referenced by ThreadPool().

00223 {
00224    static int numTimes = 0;
00225 //    DebugLock.acquire();
00226    vprDEBUG(vprDBG_ALL, vprDBG_DETAILED_LVL) << Thread::self()
00227       << " vpr::ThreadPool::addThread: Entering: " << ++numTimes << std::endl
00228       << vprDEBUG_FLUSH;
00229 //    DebugLock.release();
00230 
00231    Guard<Mutex> guard(listLock);   // Protect the head
00232 
00233    //OneThread* newThread = new (this->getMyMemPool()->allocate(sizeof(OneThread))) OneThread;    // Used placement new
00234    OneThread* newThread = new OneThread;
00235    newThread->next = NULL;
00236 
00237 //    ThreadMemberFunctor<ThreadPool>* memberFunctor = new ThreadMemberFunctor<ThreadPool>(this, ThreadPool::threadLoop, (void*)newThread);
00238    ThreadMemberFunctor<ThreadPool>* memberFunctor = new ThreadMemberFunctor<ThreadPool>(this, &ThreadPool::threadLoop, (void*)newThread);
00239 
00240    newThread->thread = new Thread(memberFunctor);
00241 
00242 //    DebugLock.acquire();
00243    vprDEBUG(vprDBG_ALL, vprDBG_HVERB_LVL) << newThread->thread
00244       << " vprThreadPool::addThread: List at end\n" << vprDEBUG_FLUSH;
00245    printList();
00246 //    DebugLock.release();
00247 
00248    return listHead;
00249 }

void vpr::ThreadPool::threadLoop ( void *  theThreadAsVoid  ) 

Body of a general-purpose child process.

The argument, which must be declared void* to match the func prototype, is the vpr::OneThread structure that represents this process. The contents of that struct, in particular threadWait(), MUST be initialized by the parent.

Definition at line 118 of file ThreadPool.cpp.

References vpr::MutexPosix::acquire(), vpr::OneThread::functor, vpr::MutexPosix::release(), vpr::ThreadPosix::self(), threadSleep(), vprDBG_ALL(), vprDBG_DETAILED_LVL, vprDEBUG, and vprDEBUG_FLUSH.

Referenced by addThread().

00119 {
00120 //   DebugLock.acquire();
00121    vprDEBUG(vprDBG_ALL, vprDBG_DETAILED_LVL) << Thread::self()
00122       << " vpr::ThreadPool::threadLoop: Entering."
00123       << std::endl << vprDEBUG_FLUSH;
00124 //      vprDEBUG(vprDBG_ALL, vprDBG_HVERB_LVL) << Thread::self()
00125 //      << " vpr::ThreadPool::threadLoop: theThreadAsVoid:"
00126 //      << theThreadAsVoid << endl << vprDEBUG_FLUSH;
00127 //   DebugLock.release();
00128 
00129    listLock.acquire();
00130    listLock.release();     // Do this to make sure addThread is done
00131 
00132    OneThread* myThread = (OneThread*)theThreadAsVoid;
00133 
00134 #ifdef VPR_USE_IRIX_SPROC
00135 
00136    // --- SIGNAL Handlers ---- //
00137    prctl(PR_TERMCHILD);       // What should I do with. FIX - Allen
00138 #endif
00139 
00140    for ( ;; )
00141    {
00142       // --- WAIT FOR WORK --- //
00143       threadSleep(myThread);
00144       // ASSERT:  We now have work to do...
00145       // --- PROCESS ENTRY OVERHEAD --- //
00146       workingCountLock.acquire();     // Get access to the working thread count
00147       if ( workingCount == 0 )
00148       {
00149          finishedLock.acquire();       // Now there are threads working
00150       }
00151       workingCount = workingCount + 1;    // Update thread count
00152       workingCountLock.release();
00153 
00154       // --- DO THE WORK --- //
00155       myThread->functor->operator()();
00156 
00157       // --- PROCESS EXIT OVERHEAD --- //
00158       workingCountLock.acquire();     // Get access to the working count
00159       workingCount = workingCount - 1;
00160       if ( workingCount == 0 )
00161       {
00162          finishedLock.release();       // Now there are no threads working
00163       }
00164       workingCountLock.release();
00165    }
00166 }

void vpr::ThreadPool::threadSleep ( OneThread theThread  ) 

Waits for work to do.

Put a vpr::OneThread structure on the ready list and sleep on it. Called by a child process when its work is done.

Definition at line 172 of file ThreadPool.cpp.

References vpr::SemaphorePosix::acquire(), vpr::MutexPosix::acquire(), vpr::OneThread::next, vpr::SemaphorePosix::release(), vpr::MutexPosix::release(), and vpr::OneThread::threadWait.

Referenced by threadLoop().

00173 {
00174    listLock.acquire();               // acquire exclusive rights to threadList
00175    theThread->next = listHead;   // put self on head of the list
00176    listHead = theThread;
00177    listLock.release();               // release threadList
00178 
00179    readyThreads.release();           // notify master, at least 1 on the list
00180 
00181    theThread->threadWait.acquire();  // sleep until master needs/releases me
00182 }

OneThread * vpr::ThreadPool::getThread (  ) 

Gets a process to run.

Acquire a vpr::OneThread structure from the ready list, waiting if necessary. Called by the master process as part of dispatching a thread.

Definition at line 189 of file ThreadPool.cpp.

References vpr::MutexPosix::acquire(), vpr::SemaphorePosix::acquire(), vpr::OneThread::next, and vpr::MutexPosix::release().

Referenced by startFunc().

00190 {
00191    OneThread* theThread;
00192 
00193    readyThreads.acquire();           // wait until at least 1 thread is free
00194 
00195    listLock.acquire();               // acquire exclusive rights to threadList
00196    theThread = listHead;         // get address of first free OneThread
00197    listHead = theThread->next;   // make next in list, the head of list
00198    listLock.release();               // release threadList
00199 
00200    return theThread;
00201 }

void vpr::ThreadPool::wait (  )  [inline]

Waits until all threads are done doing their work.

Note:
This was renamed from barrier() in version 0.92.1 to deal with a case where that name is a preprocessor macro.
Since:
0.92.1

Definition at line 169 of file ThreadPool.h.

References vpr::MutexPosix::acquire(), and vpr::MutexPosix::release().

00170    {
00171       finishedLock.acquire();      // Get the lock that means threads done
00172       finishedLock.release();      // Reset it to done
00173    }

void vpr::ThreadPool::printList (  ) 

Definition at line 205 of file ThreadPool.cpp.

References vpr::OneThread::next.

Referenced by addThread().

00206 {
00207    OneThread* curThread = listHead;
00208    int counter = 0;
00209 
00210    std::cerr << "----- Thread List -----\n";
00211 
00212    while ( curThread != NULL )
00213    {
00214       std::cerr << "Thread: " << counter++ << std::endl;
00215       std::cerr << "\tpid: " << *curThread << std::endl;
00216       curThread = curThread->next;
00217    }
00218 }


The documentation for this class was generated from the following files:
Generated on Thu Jan 4 10:55:58 2007 for VR Juggler Portable Runtime by  doxygen 1.5.1