00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 #include <vpr/vprConfig.h>
00043
00044 #include <string.h>
00045 #include <iomanip>
00046 #include <sys/types.h>
00047 #include <unistd.h>
00048 #include <pthread.h>
00049 #include <sched.h>
00050
00051 #ifdef HAVE_SYS_CAPABILITY_H
00052 # include <sys/capability.h>
00053 #endif
00054
00055 #include <boost/concept_check.hpp>
00056
00057 #include <vpr/Thread/ThreadManager.h>
00058 #include <vpr/Thread/ThreadFunctor.h>
00059 #include <vpr/Thread/Thread.h>
00060 #include <vpr/Util/Assert.h>
00061
00062
00063 #include <vpr/Util/Debug.h>
00064 #include <vpr/md/POSIX/Thread/ThreadPosix.h>
00065
00066
00067 typedef struct sched_param sched_param_t;
00068
00069 namespace vpr
00070 {
00071
00072 ThreadPosix::staticWrapper ThreadPosix::statics;
00073
00074
00075
00076 ThreadPosix::ThreadPosix(VPRThreadPriority priority, VPRThreadScope scope,
00077 VPRThreadState state, size_t stackSize)
00078 : mUserThreadFunctor(NULL)
00079 , mDeleteThreadFunctor(false)
00080 , mRunning(false)
00081 , mPriority(priority)
00082 , mScope(scope)
00083 , mState(state)
00084 , mStackSize(stackSize)
00085 , mThreadStartCompleted(false)
00086 , mStartFunctor(NULL)
00087 {
00088 ;
00089 }
00090
00091
00092
00093 ThreadPosix::ThreadPosix(thread_func_t func, void* arg,
00094 VPRThreadPriority priority, VPRThreadScope scope,
00095 VPRThreadState state, size_t stackSize)
00096 : mUserThreadFunctor(NULL)
00097 , mDeleteThreadFunctor(false)
00098 , mRunning(false)
00099 , mPriority(priority)
00100 , mScope(scope)
00101 , mState(state)
00102 , mStackSize(stackSize)
00103 , mThreadStartCompleted(false)
00104 , mStartFunctor(NULL)
00105 {
00106
00107
00108 setFunctor(new ThreadNonMemberFunctor(func, arg));
00109 mDeleteThreadFunctor = true;
00110 start();
00111 }
00112
00113
00114
00115 ThreadPosix::ThreadPosix(BaseThreadFunctor* functorPtr,
00116 VPRThreadPriority priority, VPRThreadScope scope,
00117 VPRThreadState state, size_t stackSize)
00118 : mUserThreadFunctor(NULL)
00119 , mDeleteThreadFunctor(false)
00120 , mRunning(false)
00121 , mPriority(priority)
00122 , mScope(scope)
00123 , mState(state)
00124 , mStackSize(stackSize)
00125 , mThreadStartCompleted(false)
00126 , mStartFunctor(NULL)
00127 {
00128 setFunctor(functorPtr);
00129 start();
00130 }
00131
00132
00133 ThreadPosix::~ThreadPosix()
00134 {
00135 if ( mDeleteThreadFunctor )
00136 {
00137 delete mUserThreadFunctor;
00138 mUserThreadFunctor = NULL;
00139 }
00140
00141 if ( NULL != mStartFunctor )
00142 {
00143 delete mStartFunctor;
00144 mStartFunctor = NULL;
00145 }
00146
00147
00148 ThreadManager::instance()->lock();
00149 {
00150 unregisterThread();
00151 }
00152 ThreadManager::instance()->unlock();
00153 }
00154
00155 void ThreadPosix::setFunctor(BaseThreadFunctor* functorPtr)
00156 {
00157 vprASSERT(! mRunning && "Thread already running.");
00158 vprASSERT(functorPtr->isValid() && "Invalid functor.");
00159
00160 mUserThreadFunctor = functorPtr;
00161 }
00162
00163 vpr::ReturnStatus ThreadPosix::start()
00164 {
00165 vpr::ReturnStatus status;
00166
00167 if ( mRunning )
00168 {
00169 vprASSERT(false && "Thread already started");
00170 status.setCode(vpr::ReturnStatus::Fail);
00171 }
00172 else if ( NULL == mUserThreadFunctor )
00173 {
00174 vprASSERT(false && "No functor set");
00175 status.setCode(vpr::ReturnStatus::Fail);
00176 }
00177 else
00178 {
00179 mStartFunctor =
00180 new ThreadMemberFunctor<ThreadPosix>(this, &ThreadPosix::startThread,
00181 NULL);
00182
00183
00184
00185 mThreadStartCompleted = false;
00186 status = spawn(mStartFunctor);
00187
00188
00189 if ( status.success() )
00190 {
00191
00192
00193 mThreadStartCondVar.acquire();
00194 {
00195 while ( ! mThreadStartCompleted )
00196 {
00197 mThreadStartCondVar.wait();
00198 }
00199 }
00200 mThreadStartCondVar.release();
00201
00202 mRunning = true;
00203 }
00204
00205 else
00206 {
00207 ThreadManager::instance()->lock();
00208 {
00209 registerThread(false);
00210 }
00211 ThreadManager::instance()->unlock();
00212 }
00213 }
00214
00215 return status;
00216 }
00217
00218
00219 vpr::ReturnStatus ThreadPosix::spawn(BaseThreadFunctor* functorPtr)
00220 {
00221 vpr::ReturnStatus status;
00222 int ret_val;
00223 pthread_attr_t thread_attrs;
00224
00225 int pthread_prio = vprThreadPriorityToPOSIX(mPriority);
00226
00227
00228
00229 sched_param_t prio_param;
00230
00231 pthread_attr_init(&thread_attrs);
00232 pthread_attr_setdetachstate(&thread_attrs, vprThreadStateToPOSIX(mState));
00233
00234
00235
00236 # ifdef _POSIX_THREAD_PRIORITY_SCHEDULING
00237 int thread_scope = vprThreadScopeToPOSIX(mScope);
00238
00239 # if defined(HAVE_SYS_CAPABILITY_H) && ! defined(VPR_OS_FreeBSD) && \
00240 ! defined(VPR_OS_Linux)
00241 cap_t capabilities = cap_get_proc();
00242
00243
00244
00245 if ( capabilities->cap_effective & CAP_SCHED_MGT )
00246 {
00247 thread_scope = PTHREAD_SCOPE_SYSTEM;
00248 }
00249 # endif
00250
00251 pthread_attr_setscope(&thread_attrs, thread_scope);
00252
00253 if ( pthread_prio > 0 )
00254 {
00255 prio_param.sched_priority = pthread_prio;
00256 pthread_attr_setschedparam(&thread_attrs, &prio_param);
00257 }
00258 # endif
00259
00260
00261
00262
00263
00264 #ifdef _POSIX_THREAD_ATTR_STACKSIZE
00265 if ( mStackSize > 0 )
00266 {
00267
00268
00269 pthread_attr_setstacksize(&thread_attrs, mStackSize);
00270 }
00271 #endif
00272
00273
00274 ret_val = pthread_create(&(mThread), &thread_attrs,
00275 vprThreadFunctorFunction, (void *) functorPtr);
00276
00277
00278 if ( ret_val != 0 )
00279 {
00280 status.setCode(vpr::ReturnStatus::Fail);
00281 std::cerr << "vpr::ThreadPosix::spawn() - Cannot create thread:"
00282 << strerror(ret_val) << std::endl;
00283 }
00284
00285 return status;
00286 }
00287
00288
00289 void ThreadPosix::startThread(void* nullParam)
00290 {
00291 boost::ignore_unused_variable_warning(nullParam);
00292
00293
00294
00295 ThreadManager::instance()->lock();
00296 {
00297 threadIdKey().setspecific((void*)this);
00298 registerThread(true);
00299 }
00300 ThreadManager::instance()->unlock();
00301
00302
00303 mThreadStartCondVar.acquire();
00304 {
00305 mThreadStartCompleted = true;
00306 mThreadStartCondVar.signal();
00307 }
00308 mThreadStartCondVar.release();
00309
00310
00311 (*mUserThreadFunctor)();
00312 }
00313
00314
00315 int ThreadPosix::getPrio(VPRThreadPriority* prio)
00316 {
00317 #ifdef _POSIX_THREAD_PRIORITY_SCHEDULING
00318 int policy, ret_val;
00319 sched_param_t fifo_sched_param;
00320
00321 ret_val = pthread_getschedparam(mThread, &policy, &fifo_sched_param);
00322 *prio = posixThreadPriorityToVPR(fifo_sched_param.sched_priority);
00323
00324 return ret_val;
00325 #else
00326 std::cerr << "vpr::ThreadPosix::getPrio(): Not supported\n";
00327
00328 return -1;
00329 #endif
00330 }
00331
00332
00333 int ThreadPosix::setPrio(VPRThreadPriority prio)
00334 {
00335 #ifdef _POSIX_THREAD_PRIORITY_SCHEDULING
00336 sched_param_t sched_param;
00337 sched_param.sched_priority = prio;
00338
00339 return pthread_setschedparam(mThread, SCHED_RR, &sched_param);
00340 #else
00341 boost::ignore_unused_variable_warning(prio);
00342 std::cerr << "vpr::ThreadPosix::setPrio(): Not supported\n";
00343
00344 return -1;
00345 #endif
00346 }
00347
00348 int ThreadPosix::setRunOn(int cpu)
00349 {
00350 #ifdef VPR_OS_IRIX
00351 int ret_val;
00352
00353 if ( mScope == PTHREAD_SCOPE_SYSTEM )
00354 {
00355 ret_val = pthread_setrunon_np(cpu);
00356 }
00357 else
00358 {
00359 std::cerr << "This thread is not a system-scope thread!\n";
00360 ret_val = -1;
00361 }
00362
00363 return ret_val;
00364 #else
00365 boost::ignore_unused_variable_warning(cpu);
00366 std::cerr << "vpr::ThreadPosix::setRunOn(): Not available on this system.\n";
00367
00368 return -1;
00369 #endif
00370 }
00371
00372 int ThreadPosix::getRunOn(int* cur_cpu)
00373 {
00374 #ifdef VPR_OS_IRIX
00375 int ret_val;
00376
00377 if ( mScope == PTHREAD_SCOPE_SYSTEM )
00378 {
00379 ret_val = pthread_getrunon_np(cur_cpu);
00380 }
00381 else
00382 {
00383 std::cerr << "This thread is not a system-scope thread!\n";
00384 ret_val = -1;
00385 }
00386
00387 return ret_val;
00388 #else
00389 boost::ignore_unused_variable_warning(cur_cpu);
00390 std::cerr << "vpr::ThreadPosix::getRunOn(): Not available on this system.\n";
00391
00392 return -1;
00393 #endif
00394 }
00395
00396 int ThreadPosix::kill(int signum)
00397 {
00398 return pthread_kill(mThread, signum);
00399 }
00400
00401 Thread* ThreadPosix::self()
00402 {
00403 vprASSERT((statics.mStaticsInitialized==1221) && "Trying to call vpr::ThreadPosix::self before statics are initialized. Don't do that");
00404
00405 Thread* my_thread;
00406 threadIdKey().getspecific((void**)&my_thread);
00407
00408 return my_thread;
00409 }
00410
00411
00412 std::ostream& ThreadPosix::outStream(std::ostream& out)
00413 {
00414 out.setf(std::ios::right);
00415 out << std::setw(7) << std::setfill('0') << getpid() << "/";
00416 out.unsetf(std::ios::right);
00417 BaseThread::outStream(out);
00418 out << std::setfill(' ');
00419 return out;
00420 }
00421
00422
00423
00424
00425
00426 int ThreadPosix::vprThreadPriorityToPOSIX(const VPRThreadPriority priority)
00427 {
00428 int posix_prio;
00429 int min_prio, max_prio;
00430
00431 min_prio = sched_get_priority_min(SCHED_OTHER);
00432 max_prio = sched_get_priority_max(SCHED_OTHER);
00433
00434 switch ( priority )
00435 {
00436 case VPR_PRIORITY_LOW:
00437 case VPR_PRIORITY_NORMAL:
00438 posix_prio = min_prio;
00439 break;
00440 case VPR_PRIORITY_HIGH:
00441 posix_prio = min_prio + 1;
00442 break;
00443 case VPR_PRIORITY_URGENT:
00444 posix_prio = max_prio;
00445 break;
00446 default:
00447 posix_prio = min_prio;
00448 break;
00449 };
00450
00451 return posix_prio;
00452 }
00453
00454 int ThreadPosix::vprThreadScopeToPOSIX(const VPRThreadScope scope)
00455 {
00456 int posix_scope;
00457
00458 switch ( scope )
00459 {
00460 case VPR_LOCAL_THREAD:
00461 posix_scope = PTHREAD_SCOPE_PROCESS;
00462 break;
00463 case VPR_GLOBAL_THREAD:
00464 posix_scope = PTHREAD_SCOPE_SYSTEM;
00465 break;
00466 default:
00467 posix_scope = VPR_THREAD_SCOPE;
00468 break;
00469 };
00470
00471 return posix_scope;
00472 }
00473
00474 int ThreadPosix::vprThreadStateToPOSIX(const VPRThreadState state)
00475 {
00476 int posix_state;
00477
00478 switch ( state )
00479 {
00480 case VPR_JOINABLE_THREAD:
00481 posix_state = PTHREAD_CREATE_JOINABLE;
00482 break;
00483 case VPR_UNJOINABLE_THREAD:
00484 posix_state = PTHREAD_CREATE_DETACHED;
00485 break;
00486 default:
00487 posix_state = PTHREAD_CREATE_JOINABLE;
00488 break;
00489 };
00490
00491 return posix_state;
00492 }
00493
00494 BaseThread::VPRThreadPriority ThreadPosix::posixThreadPriorityToVPR(const int priority)
00495 {
00496 VPRThreadPriority vpr_prio;
00497 int min_prio, max_prio;
00498
00499 min_prio = sched_get_priority_min(SCHED_OTHER);
00500 max_prio = sched_get_priority_max(SCHED_OTHER);
00501
00502 if ( priority == min_prio )
00503 {
00504 vpr_prio = VPR_PRIORITY_NORMAL;
00505 }
00506 else if ( priority == max_prio )
00507 {
00508 vpr_prio = VPR_PRIORITY_URGENT;
00509 }
00510 else
00511 {
00512 vpr_prio = VPR_PRIORITY_HIGH;
00513 }
00514
00515 return vpr_prio;
00516 }
00517
00518 BaseThread::VPRThreadScope ThreadPosix::posixThreadScopeToVPR(const int scope)
00519 {
00520 VPRThreadScope vpr_scope;
00521
00522 switch ( scope )
00523 {
00524 case PTHREAD_SCOPE_PROCESS:
00525 vpr_scope = VPR_LOCAL_THREAD;
00526 break;
00527 case PTHREAD_SCOPE_SYSTEM:
00528 vpr_scope = VPR_GLOBAL_THREAD;
00529 break;
00530 default:
00531 vprDEBUG(vprDBG_ALL, vprDBG_WARNING_LVL)
00532 << clrOutNORM(clrYELLOW, "WARNING:")
00533 << " Unexpected value " << scope
00534 << " in vpr::ThreadPosix::posixThreadScopeToVPR()" << std::endl
00535 << vprDEBUG_FLUSH;
00536 #if VPR_THREAD_SCOPE == PTHREAD_SCOPE_PROCESS
00537 vprDEBUG_NEXT(vprDBG_ALL, vprDBG_WARNING_LVL)
00538 << "Defaulting to VPR_LOCAL_THREAD for return value." << std::endl
00539 << vprDEBUG_FLUSH;
00540 vpr_scope = VPR_LOCAL_THREAD;
00541 #else
00542 vprDEBUG_NEXT(vprDBG_ALL, vprDBG_WARNING_LVL)
00543 << "Defaulting to VPR_GLOBAL_THREAD for return value." << std::endl
00544 << vprDEBUG_FLUSH;
00545 vpr_scope = VPR_GLOBAL_THREAD;
00546 #endif
00547 break;
00548 };
00549
00550 return vpr_scope;
00551 }
00552
00553 BaseThread::VPRThreadState ThreadPosix::posixThreadStateToVPR(const int state)
00554 {
00555 VPRThreadState vpr_state;
00556
00557 switch ( state )
00558 {
00559 case PTHREAD_CREATE_JOINABLE:
00560 vpr_state = VPR_JOINABLE_THREAD;
00561 break;
00562 case PTHREAD_CREATE_DETACHED:
00563 vpr_state = VPR_UNJOINABLE_THREAD;
00564 break;
00565 default:
00566 vprDEBUG(vprDBG_ALL, vprDBG_WARNING_LVL)
00567 << clrOutNORM(clrYELLOW, "WARNING:")
00568 << " Unexpected value " << state
00569 << " in vpr::ThreadPosix::posixThreadStateToVPR()" << std::endl
00570 << vprDEBUG_FLUSH;
00571 vprDEBUG_NEXT(vprDBG_ALL, vprDBG_WARNING_LVL)
00572 << "Defaulting to VPR_JOINABLE_THREAD for return value."
00573 << std::endl << vprDEBUG_FLUSH;
00574 vpr_state = VPR_JOINABLE_THREAD;
00575 break;
00576 };
00577
00578 return vpr_state;
00579 }
00580
00581
00582 }