#include <gadget/Node.h>
Inheritance diagram for gadget::Node:


Public Types | |
| enum | Status { DISCONNECTED = 0, PENDING = 1, NEWCONNECTION = 2, CONNECTED = 3 } |
Public Member Functions | |
| Node (const std::string &name, const std::string &host_name, const vpr::Uint16 &port, vpr::SocketStream *socket_stream, AbstractNetworkManager *net_mgr) | |
| Create a Node with the given attributes. | |
| virtual | ~Node () |
| Shutdown the update thread and close the SocketStream. | |
| void | debugDump (int debug_level=vprDBG_CONFIG_LVL) |
| Display all relevant information about Node. | |
| virtual void | printStats (int debug_level=1) |
| Display Bandwidth statistics for the given SocketStream. | |
| std::string | getName () |
| Return the name given to this node during configuration. | |
| void | setName (const std::string &name) |
| Change the name of this node to the given value. | |
| std::string | getHostname () |
| Return the hostname of the node. | |
| vpr::Uint16 | getPort () |
| Return the port number that we are connected to. | |
| vpr::SocketStream * | getSockStream () |
| Get a pointer to the SocketStream used to communicate with this node. | |
| void | setSockStream (vpr::SocketStream *stream) |
| Set the SocketStream used to communicate with this node. | |
| bool | isConnected () |
| Return if we are connected to this node. | |
| void | setStatus (int connect) |
| Set the current connection status to this machine. | |
| int | getStatus () |
| Get the current connection state for this node. | |
| bool | isUpdated () |
| Return if this node is updated or not. | |
| void | setUpdated (bool update) |
| Set the update status for this node. | |
| void | start () |
| Starts the control loop. | |
| void | controlLoop (void *nullParam) |
| Control loop for updating this thread. | |
| void | signalUpdate () |
| Signal a semaphore to let the update thread fall into the code to update the UserData structures. | |
| void | sync () |
| Blocks until the end of the frame. | |
| void | shutdown () |
| Kill the update thread. | |
| vpr::Uint64 * | getDelta () |
| Get the time delta between the remote and local clock. | |
| vpr::ReturnStatus | send (cluster::Packet *out_packet) |
| Send the given packet to this node. | |
| cluster::Packet * | recvPacket () |
| Receive a packet from the network. | |
Protected Member Functions | |
| void | update () |
| Update this cluster node. | |
Protected Attributes | |
| std::string | mName |
| Node name. | |
| std::string | mHostname |
| Host that it is connected to. | |
| vpr::Uint16 | mPort |
| Port that it is connected to. | |
| bool | mRunning |
| Thread is running the control loop. | |
| vpr::SocketStream * | mSockStream |
| Socket used for communication to this node. | |
| vpr::Mutex | mSockWriteLock |
| Lock writing to the SocketStream. | |
| vpr::Mutex | mSockReadLock |
| Lock reading from the SocketStream. | |
| vpr::Mutex | mStatusLock |
| Lock the isConnected value. | |
| int | mStatus |
| States if this node is connected. | |
| vpr::Mutex | mUpdatedLock |
| Lock the isUpdated value. | |
| bool | mUpdated |
| States if this node is updated. | |
| vpr::Semaphore | mUpdateTriggerSema |
| Semaphore trigger for UserData update. | |
| vpr::Semaphore | mNodeDoneSema |
| Semaphore trigger for completion. | |
| vpr::Thread * | mControlThread |
| Update thread for this node. | |
| bool | mThreadActive |
| Has the update thread started? | |
| vpr::Uint64 | mDelta |
| Time delta between remote and local clocks. | |
| AbstractNetworkManager * | mNetworkManager |
| Network that should handle incoming packets. | |
Definition at line 56 of file Node.h.
| enum gadget::Node::Status |
Definition at line 59 of file Node.h.
00060 { 00061 DISCONNECTED = 0, 00062 PENDING = 1, 00063 NEWCONNECTION = 2, 00064 CONNECTED = 3 00065 };
| gadget::Node::Node | ( | const std::string & | name, | |
| const std::string & | host_name, | |||
| const vpr::Uint16 & | port, | |||
| vpr::SocketStream * | socket_stream, | |||
| AbstractNetworkManager * | net_mgr | |||
| ) |
Create a Node with the given attributes.
| name | Name of the Cluster Node from the config file | |
| host_name | Hostname of the remote machine | |
| port | The scoket port that we should connect to | |
| socket_stream | SocketStream used to communicate with remote machine | |
| net_mgr | The network manager. |
Definition at line 60 of file Node.cpp.
References gadgetDBG_RIM(), mHostname, mName, mPort, mSockStream, mThreadActive, and mUpdated.
00063 : mRunning(false), mStatus(DISCONNECTED), mUpdateTriggerSema(0), 00064 mNodeDoneSema(0), mControlThread(NULL), mNetworkManager(net_mgr) 00065 { 00066 mThreadActive = false; 00067 mUpdated = false; 00068 00069 mName = name; 00070 mHostname = host_name; 00071 mPort = port; 00072 mSockStream = socket_stream; 00073 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) 00074 << clrOutBOLD(clrBLUE,"[Node]") 00075 << " Created a Node: " << name << " - " << host_name 00076 << std::endl << vprDEBUG_FLUSH; 00077 }
| gadget::Node::~Node | ( | ) | [virtual] |
Shutdown the update thread and close the SocketStream.
Definition at line 79 of file Node.cpp.
References shutdown().
00080 { 00081 shutdown(); 00082 }
| void gadget::Node::debugDump | ( | int | debug_level = vprDBG_CONFIG_LVL |
) |
Display all relevant information about Node.
Definition at line 108 of file Node.cpp.
References CONNECTED, gadgetDBG_NET_MGR(), getStatus(), mHostname, mName, mPort, mSockStream, and NEWCONNECTION.
Referenced by controlLoop(), and cluster::ClusterNode::controlLoop().
00109 { 00110 00111 vpr::DebugOutputGuard dbg_output(gadgetDBG_NET_MGR, debug_level, 00112 std::string("-------------- Node --------------\n"), 00113 std::string("-----------------------------------------\n")); 00114 00115 vprDEBUG(gadgetDBG_NET_MGR, debug_level) << "Node Name: " 00116 << mName << std::endl << vprDEBUG_FLUSH; 00117 vprDEBUG(gadgetDBG_NET_MGR, debug_level) << "Hostname: " 00118 << mHostname << std::endl << vprDEBUG_FLUSH; 00119 vprDEBUG(gadgetDBG_NET_MGR, debug_level) << "Port: " 00120 << mPort << std::endl << vprDEBUG_FLUSH; 00121 vprDEBUG(gadgetDBG_NET_MGR, debug_level) << "SockStream " 00122 << (NULL == mSockStream ? "is NULL" : "is NOT NULL") << std::endl << vprDEBUG_FLUSH; 00123 if (CONNECTED == getStatus()) 00124 { 00125 vprDEBUG(gadgetDBG_NET_MGR, debug_level) << clrOutBOLD(clrGREEN,"CONNECTED") << std::endl << vprDEBUG_FLUSH; 00126 } 00127 else if (NEWCONNECTION == getStatus()) 00128 { 00129 vprDEBUG(gadgetDBG_NET_MGR, debug_level) << clrOutBOLD(clrRED,"NEW CONNECTION") << std::endl << vprDEBUG_FLUSH; 00130 } 00131 else 00132 { 00133 vprDEBUG(gadgetDBG_NET_MGR, debug_level) << clrOutBOLD(clrRED,"DISCONNECTED") << std::endl << vprDEBUG_FLUSH; 00134 } 00135 }
| void gadget::Node::printStats | ( | int | debug_level = 1 |
) | [virtual] |
Display Bandwidth statistics for the given SocketStream.
Definition at line 137 of file Node.cpp.
References gadgetDBG_RIM(), and mSockStream.
00138 { 00139 vpr::BaseIOStatsStrategy* stats = mSockStream->getIOStatStrategy(); 00140 vpr::BandwidthIOStatsStrategy* bw_interface = dynamic_cast<vpr::BandwidthIOStatsStrategy*>(stats ); 00141 00142 if(bw_interface != NULL) 00143 { 00144 // Dump out write stats 00145 vprDEBUG(gadgetDBG_RIM,debug_level) << "Socket Write bandwidth stats ---" << std::endl << vprDEBUG_FLUSH; 00146 vprDEBUG(gadgetDBG_RIM,debug_level) << "stats type: " << typeid(stats).name() << std::endl << vprDEBUG_FLUSH; 00147 vprDEBUG(gadgetDBG_RIM,debug_level) << " sent bytes: " << bw_interface->writeStats().getTotal() << std::endl << vprDEBUG_FLUSH; 00148 vprDEBUG(gadgetDBG_RIM,debug_level) << " av send: " << bw_interface->writeStats().getMean()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH; 00149 vprDEBUG(gadgetDBG_RIM,debug_level) << " STA send: " << bw_interface->writeStats().getSTA()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH; 00150 vprDEBUG(gadgetDBG_RIM,debug_level) << " Inst send: " << bw_interface->writeStats().getInstAverage()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH; 00151 vprDEBUG(gadgetDBG_RIM,debug_level) << " Max STA send: " << bw_interface->writeStats().getMaxSTA()/1024.0f << " k/s" << std::endl << std::endl << vprDEBUG_FLUSH; 00152 00153 vprDEBUG(gadgetDBG_RIM,debug_level) << " read bytes: " << bw_interface->readStats().getTotal() << std::endl << vprDEBUG_FLUSH; 00154 vprDEBUG(gadgetDBG_RIM,debug_level) << " av read: " << bw_interface->readStats().getMean()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH; 00155 vprDEBUG(gadgetDBG_RIM,debug_level) << " STA read: " << bw_interface->readStats().getSTA()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH; 00156 vprDEBUG(gadgetDBG_RIM,debug_level) << " Inst read: " << bw_interface->readStats().getInstAverage()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH; 00157 vprDEBUG(gadgetDBG_RIM,debug_level) << " Max STA read: " << bw_interface->readStats().getMaxSTA()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH; 00158 00159 } 00160 else 00161 { 00162 vprDEBUG(gadgetDBG_RIM,debug_level) << "SocketBWTest: Don't have BW Stats on stats. type is: " << typeid(stats).name() << std::endl << vprDEBUG_FLUSH; 00163 } 00164 00165 }
| std::string gadget::Node::getName | ( | ) | [inline] |
Return the name given to this node during configuration.
Definition at line 98 of file Node.h.
Referenced by gadget::AbstractNetworkManager::addNode(), gadget::Connector::attemptConnect(), controlLoop(), cluster::ClusterNode::controlLoop(), cluster::StartBarrierPlugin::postPostFrame(), and start().
00099 { 00100 return mName; 00101 }
| void gadget::Node::setName | ( | const std::string & | name | ) | [inline] |
| std::string gadget::Node::getHostname | ( | ) | [inline] |
Return the hostname of the node.
Definition at line 114 of file Node.h.
Referenced by gadget::Connector::attemptConnect(), cluster::StartBarrierPlugin::handlePacket(), and gadget::RemoteInputManager::recoverFromLostNode().
00115 { 00116 return mHostname; 00117 }
| vpr::Uint16 gadget::Node::getPort | ( | ) | [inline] |
Return the port number that we are connected to.
Definition at line 122 of file Node.h.
Referenced by gadget::Connector::attemptConnect(), controlLoop(), and cluster::ClusterNode::controlLoop().
00123 { 00124 return mPort; 00125 }
| vpr::SocketStream* gadget::Node::getSockStream | ( | ) | [inline] |
Get a pointer to the SocketStream used to communicate with this node.
Definition at line 130 of file Node.h.
Referenced by gadget::Connector::attemptConnect().
00131 { 00132 return mSockStream; 00133 }
| void gadget::Node::setSockStream | ( | vpr::SocketStream * | stream | ) | [inline] |
Set the SocketStream used to communicate with this node.
Definition at line 138 of file Node.h.
Referenced by gadget::Connector::attemptConnect().
00139 { 00140 mSockStream = stream; 00141 }
| bool gadget::Node::isConnected | ( | ) | [inline] |
| void gadget::Node::setStatus | ( | int | connect | ) |
Set the current connection status to this machine.
Definition at line 167 of file Node.cpp.
References CONNECTED, DISCONNECTED, mStatus, and mStatusLock.
Referenced by gadget::Connector::attemptConnect(), controlLoop(), cluster::ClusterNode::controlLoop(), cluster::StartBarrierPlugin::postPostFrame(), and shutdown().
00168 { 00169 vpr::Guard<vpr::Mutex> guard(mStatusLock); 00170 00171 if (mStatus == CONNECTED && connect == DISCONNECTED) 00172 { 00173 //TODO: ADD This back in SOON 00174 //ClusterManager::instance()->recoverFromLostNode(this); 00175 } 00176 00177 mStatus = connect; 00178 }
| int gadget::Node::getStatus | ( | ) | [inline] |
Get the current connection state for this node.
Definition at line 159 of file Node.h.
Referenced by gadget::Connector::attemptConnect(), debugDump(), and cluster::StartBarrierPlugin::postPostFrame().
00160 { 00161 return mStatus; 00162 }
| bool gadget::Node::isUpdated | ( | ) | [inline] |
Return if this node is updated or not.
Reimplemented in cluster::ClusterNode.
Definition at line 167 of file Node.h.
00168 { 00169 return mUpdated; 00170 }
| void gadget::Node::setUpdated | ( | bool | update | ) | [inline] |
Set the update status for this node.
Reimplemented in cluster::ClusterNode.
Definition at line 175 of file Node.h.
Referenced by gadget::AbstractNetworkManager::handlePacket().
| void gadget::Node::start | ( | ) |
Starts the control loop.
Definition at line 269 of file Node.cpp.
References controlLoop(), gadgetDBG_RIM(), getName(), mControlThread, mRunning, and mThreadActive.
00270 { 00271 // --- Setup Multi-Process stuff --- // 00272 // Create a new thread to handle the control 00273 00274 if (NULL != mControlThread && mControlThread->valid()) 00275 { 00276 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) 00277 << "Node " << getName() << " already running..." 00278 << std::endl << vprDEBUG_FLUSH; 00279 return; 00280 } 00281 00282 mRunning = true; 00283 00284 vpr::ThreadMemberFunctor<Node>* memberFunctor = 00285 new vpr::ThreadMemberFunctor<Node>(this, &Node::controlLoop, NULL); 00286 00287 mControlThread = new vpr::Thread(memberFunctor); 00288 00289 if (mControlThread->valid()) 00290 { 00291 mThreadActive = true; 00292 } 00293 vprDEBUG(gadgetDBG_RIM, vprDBG_CONFIG_LVL) 00294 << "Node " << getName() << " started. thread: " 00295 << mControlThread << std::endl << vprDEBUG_FLUSH; 00296 }
| void gadget::Node::controlLoop | ( | void * | nullParam | ) |
Control loop for updating this thread.
Reimplemented in cluster::ClusterNode.
Definition at line 207 of file Node.cpp.
References debugDump(), DISCONNECTED, gadgetDBG_RIM(), cluster::ClusterException::getMessage(), getName(), getPort(), mNodeDoneSema, mRunning, mUpdated, mUpdateTriggerSema, setStatus(), and update().
Referenced by start().
00208 { 00209 // - Block on an update call 00210 // - Update Local Data 00211 // - Send 00212 // - Signal Sync 00213 00214 boost::ignore_unused_variable_warning(nullParam); 00215 00216 while( mRunning ) 00217 { 00218 // Wait for trigger 00219 if( mRunning ) 00220 { 00221 mUpdateTriggerSema.acquire(); 00222 } 00223 00224 mUpdated = false; 00225 while ( mRunning && !mUpdated ) 00226 { 00227 try 00228 { 00229 update(); 00230 } 00231 catch(cluster::ClusterException cluster_exception) 00232 { 00233 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrSetBOLD(clrRED) 00234 << cluster_exception.getMessage() << clrRESET 00235 << std::endl << vprDEBUG_FLUSH; 00236 00237 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << 00238 "Node::update() We have lost our connection to: " << getName() << ":" << getPort() 00239 << std::endl << vprDEBUG_FLUSH; 00240 00241 debugDump(vprDBG_CONFIG_LVL); 00242 00243 // Set the Node as disconnected since we have lost the connection 00244 setStatus(DISCONNECTED); 00245 00246 // Shut down manually instead of calling shutdown since 00247 // we are in the control thread. 00248 mRunning = false; 00249 //if (NULL != mSockStream) 00250 //{ 00251 //if(mSockStream->isOpen()) 00252 //{ 00253 // mSockStream->close(); 00254 //} 00255 //delete mSockStream; 00256 //mSockStream = NULL; 00257 //} 00258 } 00259 } 00260 00261 // Signal done with Update 00262 mNodeDoneSema.release(); 00263 } 00264 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << "Node: " << getName() << " is stopping." 00265 << std::endl << vprDEBUG_FLUSH; 00266 }
| void gadget::Node::signalUpdate | ( | ) |
Signal a semaphore to let the update thread fall into the code to update the UserData structures.
Reimplemented in cluster::ClusterNode.
Definition at line 298 of file Node.cpp.
References gadgetDBG_RIM(), mThreadActive, and mUpdateTriggerSema.
00299 { 00300 while(!mThreadActive) 00301 { 00302 vprDEBUG(gadgetDBG_RIM, vprDBG_HVERB_LVL) << "Waiting for thread to start ClusterNode::start().\n" << vprDEBUG_FLUSH; 00303 vpr::Thread::yield(); 00304 } 00305 //vprDEBUG(gadgetDBG_RIM,/*vprDBG_HVERB_LVL*/1) << getName() << "Signaling ClusterNode\n" << vprDEBUG_FLUSH; 00306 mUpdateTriggerSema.release(); 00307 }
| void gadget::Node::sync | ( | ) |
Blocks until the end of the frame.
Reimplemented in cluster::ClusterNode.
Definition at line 313 of file Node.cpp.
References mNodeDoneSema, and mThreadActive.
00314 { 00315 vprASSERT(mThreadActive == true); 00316 mNodeDoneSema.acquire(); 00317 }
| void gadget::Node::shutdown | ( | ) |
Kill the update thread.
Definition at line 84 of file Node.cpp.
References DISCONNECTED, mRunning, mSockStream, mUpdateTriggerSema, and setStatus().
Referenced by ~Node().
00085 { 00086 setStatus(DISCONNECTED); 00087 // This may break the accept code since we might not want to delete the Socket. 00088 // We may be able to just use a smart pointer to point to the SocketStream. 00089 mRunning = false; 00090 00091 // Make sure that the conrtol loop exits naturally. 00092 mUpdateTriggerSema.release(); 00093 00094 //mNodeDoneSema.acquire(); 00095 00096 if (NULL != mSockStream) 00097 { 00098 /* 00099 if(mSockStream->isOpen()) 00100 { 00101 mSockStream->close(); 00102 } 00103 delete mSockStream; 00104 */ 00105 } 00106 }
| vpr::Uint64* gadget::Node::getDelta | ( | ) | [inline] |
Get the time delta between the remote and local clock.
Definition at line 211 of file Node.h.
Referenced by gadget::RemoteInputManager::handlePacket().
00212 { 00213 return &mDelta; 00214 }
| vpr::ReturnStatus gadget::Node::send | ( | cluster::Packet * | out_packet | ) |
Send the given packet to this node.
Definition at line 319 of file Node.cpp.
References gadgetDBG_RIM(), cluster::Packet::getData(), cluster::DataPacket::getDeviceData(), cluster::Packet::getHeader(), cluster::Header::getPacketLength(), cluster::Packet::getPacketType(), mSockStream, mSockWriteLock, cluster::Header::RIM_DATA_PACKET, cluster::Header::RIM_PACKET_HEAD_SIZE, and cluster::Header::send().
Referenced by gadget::RemoteInputManager::handlePacket(), cluster::ApplicationDataManager::handlePacket(), and cluster::StartBarrierPlugin::postPostFrame().
00320 { 00321 vprASSERT(NULL != out_packet && "Can not send a NULL packet."); 00322 00323 vpr::Guard<vpr::Mutex> guard(mSockWriteLock); 00324 00325 // -Send header data 00326 // -Send packet data 00327 00328 if (mSockStream == NULL) 00329 { 00330 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) 00331 << clrOutBOLD(clrRED, "ERROR:") 00332 << " SocketSteam is NULL" << std::endl << vprDEBUG_FLUSH; 00333 throw cluster::ClusterException("Node::send() - SocketStream is NULL!"); 00334 } 00335 00336 cluster::Header* mHeader = out_packet->getHeader(); 00337 00338 if (mHeader == NULL) 00339 { 00340 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) 00341 << clrOutBOLD(clrRED, "ERROR:") 00342 << " Packet Header is NULL" << std::endl << vprDEBUG_FLUSH; 00343 throw cluster::ClusterException("Node::send() - Packet Header is NULL!"); 00344 } 00345 00346 if (!mHeader->send(mSockStream).success()) 00347 { 00348 throw cluster::ClusterException("Packet::recv() - Sending Header Data failed!"); 00349 } 00350 00351 vpr::Uint32 bytes_written; 00352 00353 if(mHeader->getPacketLength() == cluster::Header::RIM_PACKET_HEAD_SIZE) 00354 { 00355 return(vpr::ReturnStatus::Succeed); 00356 } 00357 00358 // If we have a data packet we need to also send the raw data 00359 if (out_packet->getPacketType() != cluster::Header::RIM_DATA_PACKET) 00360 { 00361 std::vector<vpr::Uint8>* packet_data = out_packet->getData(); 00362 00363 vpr::ReturnStatus status = mSockStream->send(*packet_data, 00364 mHeader->getPacketLength() - cluster::Header::RIM_PACKET_HEAD_SIZE, 00365 bytes_written); 00366 if (!status.success()) 00367 { 00368 throw cluster::ClusterException("Packet::recv() - Sending data packet failed!!"); 00369 } 00370 00371 return(status); 00372 } 00373 else 00374 { 00375 std::vector<vpr::Uint8>* packet_data = out_packet->getData(); 00376 00377 // Since we are sending a DataPacket we are not actually sending all data here. We are only sending 2 GUIDs here 00378 int size = 32; 00379 00380 vpr::ReturnStatus status = mSockStream->send(*packet_data, size ,bytes_written); 00381 if (!status.success()) 00382 { 00383 throw cluster::ClusterException("Packet::recv() - Sending packet failed!!"); 00384 } 00385 00386 00387 cluster::DataPacket* temp_data_packet = dynamic_cast<cluster::DataPacket*>(out_packet); 00388 vprASSERT(NULL != temp_data_packet && "Dynamic cast failed!"); 00389 00390 // Testing GUIDs 00391 /*vpr::BufferObjectReader* testing = new vpr::BufferObjectReader(packet_data); 00392 vpr::GUID test; 00393 test.readObject(testing); 00394 vpr::GUID test2; 00395 test2.readObject(testing); 00396 00397 std::cout << "1: " << test.toString() << " 2: " << test2.toString() << std::endl; 00398 00399 delete testing; 00400 00401 // Testing ID 00402 testing = new vpr::BufferObjectReader(temp_data_packet->getDeviceData()); 00403 std::cout << "ID: " << (int)testing->readUint16() << std::endl; 00404 00405 delete testing; 00406 */ 00407 00408 status = mSockStream->send(*(temp_data_packet->getDeviceData()),temp_data_packet->getDeviceData()->size(),bytes_written); 00409 if (!status.success()) 00410 { 00411 throw cluster::ClusterException("Packet::recv() - Sending Packet Data failed!!"); 00412 } 00413 00414 return(status); 00415 } 00416 }
| cluster::Packet * gadget::Node::recvPacket | ( | ) |
Receive a packet from the network.
Definition at line 418 of file Node.cpp.
References gadgetDBG_RIM(), cluster::Header::getPacketLength(), cluster::Header::getPacketType(), mSockReadLock, mSockStream, cluster::Packet::parse(), cluster::Header::readData(), cluster::Header::RIM_PACKET_HEAD_SIZE, and cluster::Packet::setHeader().
Referenced by gadget::Connector::attemptConnect(), and update().
00419 { 00420 // - Read in header 00421 // - Get Constructor for correct PacketType 00422 // - Call constructor 00423 // - Read in Packet data 00424 // - Parse data into new packet 00425 // - Return finished packet 00426 00427 vpr::Guard<vpr::Mutex> guard(mSockReadLock); 00428 00429 cluster::Header* packet_head = new cluster::Header(); 00430 00431 try 00432 { 00433 packet_head->readData(mSockStream); 00434 } 00435 catch (cluster::ClusterException& ex) 00436 { 00437 vprDEBUG( gadgetDBG_RIM, vprDBG_HVERB_LVL ) 00438 << clrOutNORM(clrRED, "ERROR: ") 00439 << "Node::recvPacket() Could not read the header from the socket." << std::endl 00440 << ex.what() << std::endl << vprDEBUG_FLUSH; 00441 throw ex; 00442 } 00443 00444 vprDEBUG( gadgetDBG_RIM, vprDBG_HVERB_LVL ) 00445 << "Node::recvPacket() PacketFactory is trying to make a packet type: " 00446 << packet_head->getPacketType() 00447 << std::endl << vprDEBUG_FLUSH; 00448 00449 // Get Packet from factory 00450 cluster::Packet* new_packet = 00451 cluster::PacketFactory::instance()->createObject( packet_head->getPacketType() ); 00452 00453 // Verify that the packet has been made 00454 if ( NULL == new_packet ) 00455 { 00456 throw cluster::ClusterException( "Node::recvPacket() - Packet was not found in Factory." ); 00457 } 00458 00459 // - Recv the packet data 00460 // - Copy over pointer to header 00461 // - Continue reading packet from socket 00462 00463 // Set the header for the new packet. 00464 new_packet->setHeader( packet_head ); 00465 // Allocate memory for incoming packet. 00466 std::vector<vpr::Uint8> incoming_data; 00467 00468 // Make sure that we are connected. 00469 if ( NULL == mSockStream ) 00470 { 00471 vprDEBUG( gadgetDBG_RIM, vprDBG_CRITICAL_LVL ) 00472 << clrOutBOLD( clrRED, "ERROR:" ) 00473 << " mSockSteam is NULL" << std::endl << vprDEBUG_FLUSH; 00474 throw cluster::ClusterException( "Node::recvPacket::recv() - mSocketStream is NULL!" ); 00475 } 00476 //else if (!mSockStream->isConnected()) 00477 //{ 00478 // vprDEBUG( gadgetDBG_RIM, vprDBG_CRITICAL_LVL ) 00479 // << clrOutBOLD( clrRED, "ERROR:" ) 00480 // << " mSockSteam is not connected." 00481 // << std::endl << vprDEBUG_FLUSH; 00482 // throw cluster::ClusterException( "Node::recvPacket::recv() - ClusterNode is not connected!" ); 00483 //} 00484 else 00485 { 00486 vpr::Uint32 bytes_read; 00487 00488 // Get packet data. 00489 vpr::ReturnStatus status = 00490 mSockStream->recvn( incoming_data, 00491 packet_head->getPacketLength() - 00492 cluster::Header::RIM_PACKET_HEAD_SIZE, 00493 bytes_read ); 00494 00495 if ( !status.success() ) 00496 { 00497 vprDEBUG( gadgetDBG_RIM, vprDBG_CONFIG_LVL ) 00498 << clrOutBOLD( clrRED, "ERROR:" ) 00499 << " Reading packet data failed. Expecting: " 00500 << packet_head->getPacketLength() - cluster::Header::RIM_PACKET_HEAD_SIZE 00501 << " But got: " << bytes_read << " ReturnStatus Code: " 00502 << (int)status.code() << std::endl << vprDEBUG_FLUSH; 00503 00504 throw cluster::ClusterException( "Node::recvPacket() - Reading packet data failed!" ); 00505 } 00506 } 00507 00508 vpr::BufferObjectReader* reader = new vpr::BufferObjectReader( &incoming_data ); 00509 00510 // Parse Packet with new data 00511 new_packet->parse( reader ); 00512 00513 //NOTE: incoming_data goes out of scope here which means that we are left with only the data that we parsed. 00514 //TODO: We could save memory by not parsing the raw DataPacket but just passing the location of the memory that we want to use. 00515 00516 //parse_data_length = DataPacket::ParsedDataLength 00517 //recvn(incoming_parse_data, ...) 00518 //reader = new reader(incoming_parse_data); 00519 //new_packet->parse(reader); 00520 //recvn(incoming_raw_data, ...) 00521 //new_packet->setRawData(incoming_raw_data); 00522 00523 return new_packet; 00524 }
| void gadget::Node::update | ( | ) | [protected] |
Update this cluster node.
Definition at line 180 of file Node.cpp.
References gadget::AbstractNetworkManager::handlePacket(), isConnected(), mNetworkManager, cluster::Packet::printData(), and recvPacket().
Referenced by controlLoop(), and cluster::ClusterNode::controlLoop().
00181 { 00182 // - If connected() && !updated() 00183 // - try recvPacket() 00184 // - Catch ClusterException 00185 // - set not connected 00186 // - add node to connection pending list 00187 // - set reconfig needed on reconnect 00188 // - If no Exception 00189 // - Print Packet Data 00190 // - Take the action of the packet 00191 00192 vprASSERT(isConnected() && "Node is not connected, we can not update!\nWe must not be calling update from the correct location."); 00193 cluster::Packet* temp_packet = NULL; 00194 00195 temp_packet = recvPacket(); 00196 00197 // Print Packet Information 00198 temp_packet->printData(vprDBG_CONFIG_LVL); 00199 00200 // Handle the packet correctly 00201 mNetworkManager->handlePacket(temp_packet,this); 00202 00203 // Clean up after ourselves 00204 delete temp_packet; 00205 }
std::string gadget::Node::mName [protected] |
std::string gadget::Node::mHostname [protected] |
Host that it is connected to.
Definition at line 234 of file Node.h.
Referenced by debugDump(), and Node().
vpr::Uint16 gadget::Node::mPort [protected] |
Port that it is connected to.
Definition at line 235 of file Node.h.
Referenced by debugDump(), and Node().
bool gadget::Node::mRunning [protected] |
Thread is running the control loop.
Definition at line 236 of file Node.h.
Referenced by controlLoop(), cluster::ClusterNode::controlLoop(), shutdown(), and start().
vpr::SocketStream* gadget::Node::mSockStream [protected] |
Socket used for communication to this node.
Definition at line 238 of file Node.h.
Referenced by debugDump(), Node(), printStats(), recvPacket(), send(), and shutdown().
vpr::Mutex gadget::Node::mSockWriteLock [protected] |
vpr::Mutex gadget::Node::mSockReadLock [protected] |
Lock reading from the SocketStream.
Definition at line 240 of file Node.h.
Referenced by recvPacket().
vpr::Mutex gadget::Node::mStatusLock [protected] |
int gadget::Node::mStatus [protected] |
vpr::Mutex gadget::Node::mUpdatedLock [protected] |
bool gadget::Node::mUpdated [protected] |
States if this node is updated.
Definition at line 246 of file Node.h.
Referenced by controlLoop(), and Node().
vpr::Semaphore gadget::Node::mUpdateTriggerSema [protected] |
Semaphore trigger for UserData update.
Definition at line 248 of file Node.h.
Referenced by controlLoop(), shutdown(), and signalUpdate().
vpr::Semaphore gadget::Node::mNodeDoneSema [protected] |
Semaphore trigger for completion.
Definition at line 249 of file Node.h.
Referenced by controlLoop(), and sync().
vpr::Thread* gadget::Node::mControlThread [protected] |
bool gadget::Node::mThreadActive [protected] |
Has the update thread started?
Definition at line 252 of file Node.h.
Referenced by Node(), signalUpdate(), cluster::ClusterNode::signalUpdate(), start(), sync(), and cluster::ClusterNode::sync().
vpr::Uint64 gadget::Node::mDelta [protected] |
AbstractNetworkManager* gadget::Node::mNetworkManager [protected] |
1.5.1