gadget::Node Class Reference

Network node. More...

#include <gadget/Node.h>

Inheritance diagram for gadget::Node:

Inheritance graph
[legend]
Collaboration diagram for gadget::Node:

Collaboration graph
[legend]
List of all members.

Public Types

enum  Status { DISCONNECTED = 0, PENDING = 1, NEWCONNECTION = 2, CONNECTED = 3 }

Public Member Functions

 Node (const std::string &name, const std::string &host_name, const vpr::Uint16 &port, vpr::SocketStream *socket_stream, AbstractNetworkManager *net_mgr)
 Create a Node with the given attributes.
virtual ~Node ()
 Shutdown the update thread and close the SocketStream.
void debugDump (int debug_level=vprDBG_CONFIG_LVL)
 Display all relevant information about Node.
virtual void printStats (int debug_level=1)
 Display Bandwidth statistics for the given SocketStream.
std::string getName ()
 Return the name given to this node during configuration.
void setName (const std::string &name)
 Change the name of this node to the given value.
std::string getHostname ()
 Return the hostname of the node.
vpr::Uint16 getPort ()
 Return the port number that we are connected to.
vpr::SocketStream * getSockStream ()
 Get a pointer to the SocketStream used to communicate with this node.
void setSockStream (vpr::SocketStream *stream)
 Set the SocketStream used to communicate with this node.
bool isConnected ()
 Return if we are connected to this node.
void setStatus (int connect)
 Set the current connection status to this machine.
int getStatus ()
 Get the current connection state for this node.
bool isUpdated ()
 Return if this node is updated or not.
void setUpdated (bool update)
 Set the update status for this node.
void start ()
 Starts the control loop.
void controlLoop (void *nullParam)
 Control loop for updating this thread.
void signalUpdate ()
 Signal a semaphore to let the update thread fall into the code to update the UserData structures.
void sync ()
 Blocks until the end of the frame.
void shutdown ()
 Kill the update thread.
vpr::Uint64 * getDelta ()
 Get the time delta between the remote and local clock.
vpr::ReturnStatus send (cluster::Packet *out_packet)
 Send the given packet to this node.
cluster::PacketrecvPacket ()
 Receive a packet from the network.

Protected Member Functions

void update ()
 Update this cluster node.

Protected Attributes

std::string mName
 Node name.
std::string mHostname
 Host that it is connected to.
vpr::Uint16 mPort
 Port that it is connected to.
bool mRunning
 Thread is running the control loop.
vpr::SocketStream * mSockStream
 Socket used for communication to this node.
vpr::Mutex mSockWriteLock
 Lock writing to the SocketStream.
vpr::Mutex mSockReadLock
 Lock reading from the SocketStream.
vpr::Mutex mStatusLock
 Lock the isConnected value.
int mStatus
 States if this node is connected.
vpr::Mutex mUpdatedLock
 Lock the isUpdated value.
bool mUpdated
 States if this node is updated.
vpr::Semaphore mUpdateTriggerSema
 Semaphore trigger for UserData update.
vpr::Semaphore mNodeDoneSema
 Semaphore trigger for completion.
vpr::Thread * mControlThread
 Update thread for this node.
bool mThreadActive
 Has the update thread started?
vpr::Uint64 mDelta
 Time delta between remote and local clocks.
AbstractNetworkManagermNetworkManager
 Network that should handle incoming packets.

Detailed Description

Network node.

Definition at line 56 of file Node.h.


Member Enumeration Documentation

enum gadget::Node::Status

Enumerator:
DISCONNECTED 
PENDING 
NEWCONNECTION 
CONNECTED 

Definition at line 59 of file Node.h.

00060    {
00061       DISCONNECTED   = 0,
00062       PENDING        = 1,
00063       NEWCONNECTION  = 2,
00064       CONNECTED      = 3
00065    };


Constructor & Destructor Documentation

gadget::Node::Node ( const std::string &  name,
const std::string &  host_name,
const vpr::Uint16 &  port,
vpr::SocketStream *  socket_stream,
AbstractNetworkManager net_mgr 
)

Create a Node with the given attributes.

Parameters:
name Name of the Cluster Node from the config file
host_name Hostname of the remote machine
port The scoket port that we should connect to
socket_stream SocketStream used to communicate with remote machine
net_mgr The network manager.

Definition at line 60 of file Node.cpp.

References gadgetDBG_RIM(), mHostname, mName, mPort, mSockStream, mThreadActive, and mUpdated.

00063    : mRunning(false), mStatus(DISCONNECTED), mUpdateTriggerSema(0),
00064      mNodeDoneSema(0), mControlThread(NULL), mNetworkManager(net_mgr)
00065 {
00066    mThreadActive = false;
00067    mUpdated = false;
00068 
00069    mName = name;
00070    mHostname = host_name;
00071    mPort = port;
00072    mSockStream = socket_stream;
00073    vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00074       << clrOutBOLD(clrBLUE,"[Node]")
00075       << " Created a Node: " << name << " - " << host_name
00076       << std::endl << vprDEBUG_FLUSH;
00077 }

gadget::Node::~Node (  )  [virtual]

Shutdown the update thread and close the SocketStream.

Definition at line 79 of file Node.cpp.

References shutdown().

00080 {
00081    shutdown();
00082 }


Member Function Documentation

void gadget::Node::debugDump ( int  debug_level = vprDBG_CONFIG_LVL  ) 

Display all relevant information about Node.

Definition at line 108 of file Node.cpp.

References CONNECTED, gadgetDBG_NET_MGR(), getStatus(), mHostname, mName, mPort, mSockStream, and NEWCONNECTION.

Referenced by controlLoop(), and cluster::ClusterNode::controlLoop().

00109 {
00110 
00111    vpr::DebugOutputGuard dbg_output(gadgetDBG_NET_MGR, debug_level,
00112                               std::string("-------------- Node --------------\n"),
00113                               std::string("-----------------------------------------\n"));
00114 
00115    vprDEBUG(gadgetDBG_NET_MGR, debug_level) << "Node Name: " 
00116       << mName << std::endl << vprDEBUG_FLUSH;
00117    vprDEBUG(gadgetDBG_NET_MGR, debug_level) << "Hostname:  " 
00118       << mHostname << std::endl << vprDEBUG_FLUSH;
00119    vprDEBUG(gadgetDBG_NET_MGR, debug_level) << "Port:      " 
00120       << mPort << std::endl << vprDEBUG_FLUSH;
00121    vprDEBUG(gadgetDBG_NET_MGR, debug_level) << "SockStream " 
00122       << (NULL == mSockStream ? "is NULL" : "is NOT NULL") << std::endl << vprDEBUG_FLUSH;
00123    if (CONNECTED == getStatus())
00124    {
00125       vprDEBUG(gadgetDBG_NET_MGR, debug_level) << clrOutBOLD(clrGREEN,"CONNECTED") << std::endl << vprDEBUG_FLUSH;
00126    }
00127    else if (NEWCONNECTION == getStatus())
00128    {
00129       vprDEBUG(gadgetDBG_NET_MGR, debug_level) << clrOutBOLD(clrRED,"NEW CONNECTION") << std::endl << vprDEBUG_FLUSH;
00130    }
00131    else
00132    {
00133       vprDEBUG(gadgetDBG_NET_MGR, debug_level) << clrOutBOLD(clrRED,"DISCONNECTED") << std::endl << vprDEBUG_FLUSH;
00134    }
00135 }

void gadget::Node::printStats ( int  debug_level = 1  )  [virtual]

Display Bandwidth statistics for the given SocketStream.

Definition at line 137 of file Node.cpp.

References gadgetDBG_RIM(), and mSockStream.

00138 {
00139    vpr::BaseIOStatsStrategy* stats = mSockStream->getIOStatStrategy();
00140    vpr::BandwidthIOStatsStrategy* bw_interface = dynamic_cast<vpr::BandwidthIOStatsStrategy*>(stats );
00141 
00142    if(bw_interface != NULL)
00143    {
00144       // Dump out write stats
00145       vprDEBUG(gadgetDBG_RIM,debug_level) << "Socket Write bandwidth stats ---" << std::endl << vprDEBUG_FLUSH;
00146       vprDEBUG(gadgetDBG_RIM,debug_level) << "stats type: " << typeid(stats).name() << std::endl << vprDEBUG_FLUSH;
00147       vprDEBUG(gadgetDBG_RIM,debug_level) << "      sent bytes: " << bw_interface->writeStats().getTotal() << std::endl << vprDEBUG_FLUSH;
00148       vprDEBUG(gadgetDBG_RIM,debug_level) << "         av send: " << bw_interface->writeStats().getMean()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH;
00149       vprDEBUG(gadgetDBG_RIM,debug_level) << "        STA send: " << bw_interface->writeStats().getSTA()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH;
00150       vprDEBUG(gadgetDBG_RIM,debug_level) << "       Inst send: " << bw_interface->writeStats().getInstAverage()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH;
00151       vprDEBUG(gadgetDBG_RIM,debug_level) << "    Max STA send: " << bw_interface->writeStats().getMaxSTA()/1024.0f << " k/s" << std::endl << std::endl << vprDEBUG_FLUSH;
00152 
00153       vprDEBUG(gadgetDBG_RIM,debug_level) << "      read bytes: " << bw_interface->readStats().getTotal() << std::endl << vprDEBUG_FLUSH;
00154       vprDEBUG(gadgetDBG_RIM,debug_level) << "         av read: " << bw_interface->readStats().getMean()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH;
00155       vprDEBUG(gadgetDBG_RIM,debug_level) << "        STA read: " << bw_interface->readStats().getSTA()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH;
00156       vprDEBUG(gadgetDBG_RIM,debug_level) << "       Inst read: " << bw_interface->readStats().getInstAverage()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH;
00157       vprDEBUG(gadgetDBG_RIM,debug_level) << "    Max STA read: " << bw_interface->readStats().getMaxSTA()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH;
00158 
00159    }
00160    else
00161    {
00162       vprDEBUG(gadgetDBG_RIM,debug_level) << "SocketBWTest: Don't have BW Stats on stats. type is: " << typeid(stats).name() << std::endl << vprDEBUG_FLUSH;
00163    }
00164 
00165 }

std::string gadget::Node::getName (  )  [inline]

Return the name given to this node during configuration.

Definition at line 98 of file Node.h.

Referenced by gadget::AbstractNetworkManager::addNode(), gadget::Connector::attemptConnect(), controlLoop(), cluster::ClusterNode::controlLoop(), cluster::StartBarrierPlugin::postPostFrame(), and start().

00099    {
00100       return mName;
00101    }

void gadget::Node::setName ( const std::string &  name  )  [inline]

Change the name of this node to the given value.

Definition at line 106 of file Node.h.

00107    {
00108       mName = name;
00109    }

std::string gadget::Node::getHostname (  )  [inline]

Return the hostname of the node.

Definition at line 114 of file Node.h.

Referenced by gadget::Connector::attemptConnect(), cluster::StartBarrierPlugin::handlePacket(), and gadget::RemoteInputManager::recoverFromLostNode().

00115    {
00116       return mHostname;
00117    }

vpr::Uint16 gadget::Node::getPort (  )  [inline]

Return the port number that we are connected to.

Definition at line 122 of file Node.h.

Referenced by gadget::Connector::attemptConnect(), controlLoop(), and cluster::ClusterNode::controlLoop().

00123    {
00124       return mPort;
00125    }

vpr::SocketStream* gadget::Node::getSockStream (  )  [inline]

Get a pointer to the SocketStream used to communicate with this node.

Definition at line 130 of file Node.h.

Referenced by gadget::Connector::attemptConnect().

00131    {
00132       return mSockStream;
00133    }

void gadget::Node::setSockStream ( vpr::SocketStream *  stream  )  [inline]

Set the SocketStream used to communicate with this node.

Definition at line 138 of file Node.h.

Referenced by gadget::Connector::attemptConnect().

00139    {
00140       mSockStream = stream;
00141    }

bool gadget::Node::isConnected (  )  [inline]

Return if we are connected to this node.

Definition at line 146 of file Node.h.

Referenced by cluster::StartBarrierPlugin::postPostFrame(), and update().

00147    {
00148       return (CONNECTED == getStatus());
00149    }

void gadget::Node::setStatus ( int  connect  ) 

Set the current connection status to this machine.

Definition at line 167 of file Node.cpp.

References CONNECTED, DISCONNECTED, mStatus, and mStatusLock.

Referenced by gadget::Connector::attemptConnect(), controlLoop(), cluster::ClusterNode::controlLoop(), cluster::StartBarrierPlugin::postPostFrame(), and shutdown().

00168 {
00169    vpr::Guard<vpr::Mutex> guard(mStatusLock);
00170    
00171    if (mStatus == CONNECTED && connect == DISCONNECTED)
00172    {
00173       //TODO: ADD This back in SOON
00174       //ClusterManager::instance()->recoverFromLostNode(this);
00175    }
00176    
00177    mStatus = connect;
00178 }

int gadget::Node::getStatus (  )  [inline]

Get the current connection state for this node.

Definition at line 159 of file Node.h.

Referenced by gadget::Connector::attemptConnect(), debugDump(), and cluster::StartBarrierPlugin::postPostFrame().

00160    {
00161       return mStatus;
00162    }

bool gadget::Node::isUpdated (  )  [inline]

Return if this node is updated or not.

Reimplemented in cluster::ClusterNode.

Definition at line 167 of file Node.h.

00168    {
00169       return mUpdated;
00170    }

void gadget::Node::setUpdated ( bool  update  )  [inline]

Set the update status for this node.

Reimplemented in cluster::ClusterNode.

Definition at line 175 of file Node.h.

Referenced by gadget::AbstractNetworkManager::handlePacket().

00176    {
00177       mUpdated = update;
00178    }

void gadget::Node::start (  ) 

Starts the control loop.

Definition at line 269 of file Node.cpp.

References controlLoop(), gadgetDBG_RIM(), getName(), mControlThread, mRunning, and mThreadActive.

00270 {
00271    // --- Setup Multi-Process stuff --- //
00272    // Create a new thread to handle the control
00273 
00274    if (NULL != mControlThread && mControlThread->valid())
00275    {
00276       vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00277             << "Node " << getName() << " already running..."
00278             << std::endl << vprDEBUG_FLUSH;
00279       return;
00280    }
00281 
00282    mRunning = true;
00283 
00284    vpr::ThreadMemberFunctor<Node>* memberFunctor =
00285       new vpr::ThreadMemberFunctor<Node>(this, &Node::controlLoop, NULL);
00286 
00287    mControlThread = new vpr::Thread(memberFunctor);
00288 
00289    if (mControlThread->valid())
00290    {
00291       mThreadActive = true;
00292    }
00293    vprDEBUG(gadgetDBG_RIM, vprDBG_CONFIG_LVL)
00294       << "Node " << getName() << " started. thread: "
00295       << mControlThread << std::endl << vprDEBUG_FLUSH;
00296 }

void gadget::Node::controlLoop ( void *  nullParam  ) 

Control loop for updating this thread.

Reimplemented in cluster::ClusterNode.

Definition at line 207 of file Node.cpp.

References debugDump(), DISCONNECTED, gadgetDBG_RIM(), cluster::ClusterException::getMessage(), getName(), getPort(), mNodeDoneSema, mRunning, mUpdated, mUpdateTriggerSema, setStatus(), and update().

Referenced by start().

00208 {
00209    // - Block on an update call
00210    // - Update Local Data
00211    // - Send
00212    // - Signal Sync
00213 
00214    boost::ignore_unused_variable_warning(nullParam);
00215    
00216    while( mRunning )
00217    {
00218       // Wait for trigger
00219       if( mRunning )
00220       {
00221          mUpdateTriggerSema.acquire();
00222       }
00223          
00224       mUpdated = false;
00225       while ( mRunning && !mUpdated )
00226       {
00227          try
00228          {
00229             update();
00230          }
00231          catch(cluster::ClusterException cluster_exception)
00232          {
00233             vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrSetBOLD(clrRED)
00234                << cluster_exception.getMessage() << clrRESET
00235                << std::endl << vprDEBUG_FLUSH;
00236             
00237             vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) <<
00238                "Node::update() We have lost our connection to: " << getName() << ":" << getPort()
00239                << std::endl << vprDEBUG_FLUSH;
00240 
00241             debugDump(vprDBG_CONFIG_LVL);
00242             
00243             // Set the Node as disconnected since we have lost the connection
00244             setStatus(DISCONNECTED);
00245             
00246             // Shut down manually instead of calling shutdown since
00247             // we are in the control thread.
00248             mRunning = false;
00249             //if (NULL != mSockStream)
00250             //{
00251                //if(mSockStream->isOpen())
00252                //{
00253                //   mSockStream->close();
00254                //}
00255                //delete mSockStream;
00256                //mSockStream = NULL;
00257             //}
00258          }
00259       }
00260       
00261       // Signal done with Update
00262       mNodeDoneSema.release();
00263    }
00264    vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << "Node: " << getName() << " is stopping."
00265                                              << std::endl << vprDEBUG_FLUSH;
00266 }

void gadget::Node::signalUpdate (  ) 

Signal a semaphore to let the update thread fall into the code to update the UserData structures.

Reimplemented in cluster::ClusterNode.

Definition at line 298 of file Node.cpp.

References gadgetDBG_RIM(), mThreadActive, and mUpdateTriggerSema.

00299 {
00300    while(!mThreadActive)
00301    {
00302       vprDEBUG(gadgetDBG_RIM, vprDBG_HVERB_LVL) << "Waiting for thread to start ClusterNode::start().\n" << vprDEBUG_FLUSH;
00303       vpr::Thread::yield();
00304    }
00305    //vprDEBUG(gadgetDBG_RIM,/*vprDBG_HVERB_LVL*/1) << getName() << "Signaling ClusterNode\n" << vprDEBUG_FLUSH;
00306    mUpdateTriggerSema.release();
00307 }

void gadget::Node::sync (  ) 

Blocks until the end of the frame.

Postcondition:
The frame has been drawn.

Reimplemented in cluster::ClusterNode.

Definition at line 313 of file Node.cpp.

References mNodeDoneSema, and mThreadActive.

00314 {
00315    vprASSERT(mThreadActive == true);
00316    mNodeDoneSema.acquire();
00317 }

void gadget::Node::shutdown (  ) 

Kill the update thread.

Definition at line 84 of file Node.cpp.

References DISCONNECTED, mRunning, mSockStream, mUpdateTriggerSema, and setStatus().

Referenced by ~Node().

00085 {
00086    setStatus(DISCONNECTED);
00087    // This may break the accept code since we might not want to delete the Socket.
00088    // We may be able to just use a smart pointer to point to the SocketStream.
00089    mRunning = false;
00090   
00091    // Make sure that the conrtol loop exits naturally.
00092    mUpdateTriggerSema.release();
00093    
00094    //mNodeDoneSema.acquire();
00095    
00096    if (NULL != mSockStream)
00097    {
00098       /*
00099       if(mSockStream->isOpen())
00100       {
00101          mSockStream->close();
00102       }
00103       delete mSockStream;
00104       */
00105    }
00106 }

vpr::Uint64* gadget::Node::getDelta (  )  [inline]

Get the time delta between the remote and local clock.

Definition at line 211 of file Node.h.

Referenced by gadget::RemoteInputManager::handlePacket().

00212    {
00213       return &mDelta;
00214    }

vpr::ReturnStatus gadget::Node::send ( cluster::Packet out_packet  ) 

Send the given packet to this node.

Definition at line 319 of file Node.cpp.

References gadgetDBG_RIM(), cluster::Packet::getData(), cluster::DataPacket::getDeviceData(), cluster::Packet::getHeader(), cluster::Header::getPacketLength(), cluster::Packet::getPacketType(), mSockStream, mSockWriteLock, cluster::Header::RIM_DATA_PACKET, cluster::Header::RIM_PACKET_HEAD_SIZE, and cluster::Header::send().

Referenced by gadget::RemoteInputManager::handlePacket(), cluster::ApplicationDataManager::handlePacket(), and cluster::StartBarrierPlugin::postPostFrame().

00320 {
00321    vprASSERT(NULL != out_packet && "Can not send a NULL packet.");
00322 
00323    vpr::Guard<vpr::Mutex> guard(mSockWriteLock);
00324 
00325    // -Send header data
00326    // -Send packet data
00327 
00328    if (mSockStream == NULL)
00329    {
00330       vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00331          << clrOutBOLD(clrRED, "ERROR:")
00332          << " SocketSteam is NULL" << std::endl << vprDEBUG_FLUSH;
00333       throw cluster::ClusterException("Node::send() - SocketStream is NULL!");
00334    }
00335 
00336    cluster::Header* mHeader = out_packet->getHeader();
00337 
00338    if (mHeader == NULL)
00339    {
00340       vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00341          << clrOutBOLD(clrRED, "ERROR:")
00342          << " Packet Header is NULL" << std::endl << vprDEBUG_FLUSH;
00343       throw cluster::ClusterException("Node::send() - Packet Header is NULL!");
00344    }
00345 
00346    if (!mHeader->send(mSockStream).success())
00347    {
00348       throw cluster::ClusterException("Packet::recv() - Sending Header Data failed!");
00349    }
00350 
00351    vpr::Uint32 bytes_written;      
00352 
00353    if(mHeader->getPacketLength() == cluster::Header::RIM_PACKET_HEAD_SIZE)
00354    {
00355       return(vpr::ReturnStatus::Succeed);
00356    }
00357    
00358    // If we have a data packet we need to also send the raw data
00359    if (out_packet->getPacketType() != cluster::Header::RIM_DATA_PACKET)
00360    {
00361       std::vector<vpr::Uint8>* packet_data = out_packet->getData();
00362 
00363       vpr::ReturnStatus status = mSockStream->send(*packet_data, 
00364             mHeader->getPacketLength() - cluster::Header::RIM_PACKET_HEAD_SIZE,
00365             bytes_written);
00366       if (!status.success())
00367       {
00368          throw cluster::ClusterException("Packet::recv() - Sending data packet failed!!");
00369       }
00370       
00371       return(status);
00372    }
00373    else
00374    {
00375       std::vector<vpr::Uint8>* packet_data = out_packet->getData();
00376 
00377       // Since we are sending a DataPacket we are not actually sending all data here. We are only sending 2 GUIDs here
00378       int size = 32;
00379 
00380       vpr::ReturnStatus status = mSockStream->send(*packet_data, size ,bytes_written);
00381       if (!status.success())
00382       {
00383          throw cluster::ClusterException("Packet::recv() - Sending packet failed!!");
00384       }
00385 
00386 
00387       cluster::DataPacket* temp_data_packet = dynamic_cast<cluster::DataPacket*>(out_packet);
00388       vprASSERT(NULL != temp_data_packet && "Dynamic cast failed!");
00389 
00390       // Testing GUIDs
00391       /*vpr::BufferObjectReader* testing = new vpr::BufferObjectReader(packet_data);
00392       vpr::GUID test;
00393       test.readObject(testing);
00394       vpr::GUID test2;
00395       test2.readObject(testing);
00396 
00397       std::cout << "1: " << test.toString() << " 2: " << test2.toString() << std::endl;
00398 
00399       delete testing;
00400       
00401       // Testing ID
00402       testing = new vpr::BufferObjectReader(temp_data_packet->getDeviceData());
00403       std::cout << "ID: " << (int)testing->readUint16() << std::endl;
00404 
00405       delete testing;
00406       */
00407       
00408       status = mSockStream->send(*(temp_data_packet->getDeviceData()),temp_data_packet->getDeviceData()->size(),bytes_written);
00409       if (!status.success())
00410       {
00411          throw cluster::ClusterException("Packet::recv() - Sending Packet Data failed!!");
00412       }
00413       
00414       return(status);
00415    }     
00416 }

cluster::Packet * gadget::Node::recvPacket (  ) 

Receive a packet from the network.

Definition at line 418 of file Node.cpp.

References gadgetDBG_RIM(), cluster::Header::getPacketLength(), cluster::Header::getPacketType(), mSockReadLock, mSockStream, cluster::Packet::parse(), cluster::Header::readData(), cluster::Header::RIM_PACKET_HEAD_SIZE, and cluster::Packet::setHeader().

Referenced by gadget::Connector::attemptConnect(), and update().

00419 {
00420    // - Read in header
00421    // - Get Constructor for correct PacketType
00422    // - Call constructor
00423    // - Read in Packet data
00424    // - Parse data into new packet
00425    // - Return finished packet
00426    
00427    vpr::Guard<vpr::Mutex> guard(mSockReadLock);
00428    
00429    cluster::Header* packet_head = new cluster::Header();
00430    
00431    try
00432    {
00433       packet_head->readData(mSockStream);
00434    }
00435    catch (cluster::ClusterException& ex)
00436    {
00437       vprDEBUG( gadgetDBG_RIM, vprDBG_HVERB_LVL )
00438          << clrOutNORM(clrRED, "ERROR: ")
00439          << "Node::recvPacket() Could not read the header from the socket." << std::endl
00440          << ex.what() << std::endl << vprDEBUG_FLUSH;
00441       throw ex;
00442    }
00443    
00444    vprDEBUG( gadgetDBG_RIM, vprDBG_HVERB_LVL )
00445       << "Node::recvPacket() PacketFactory is trying to make a packet type: " 
00446       << packet_head->getPacketType()
00447       << std::endl << vprDEBUG_FLUSH;
00448 
00449    // Get Packet from factory
00450    cluster::Packet* new_packet =
00451       cluster::PacketFactory::instance()->createObject( packet_head->getPacketType() );
00452 
00453    // Verify that the packet has been made
00454    if ( NULL == new_packet )
00455    {
00456       throw cluster::ClusterException( "Node::recvPacket() - Packet was not found in Factory." );
00457    }
00458    
00459    // - Recv the packet data
00460    //   - Copy over pointer to header
00461    //   - Continue reading packet from socket
00462    
00463    // Set the header for the new packet.
00464    new_packet->setHeader( packet_head );
00465    // Allocate memory for incoming packet.
00466    std::vector<vpr::Uint8> incoming_data;
00467 
00468    // Make sure that we are connected.
00469    if ( NULL == mSockStream )
00470    {
00471       vprDEBUG( gadgetDBG_RIM, vprDBG_CRITICAL_LVL )
00472          << clrOutBOLD( clrRED, "ERROR:" )
00473          << " mSockSteam is NULL" <<  std::endl << vprDEBUG_FLUSH;
00474       throw cluster::ClusterException( "Node::recvPacket::recv() - mSocketStream is NULL!" );
00475    }
00476    //else if (!mSockStream->isConnected())
00477    //{
00478    //   vprDEBUG( gadgetDBG_RIM, vprDBG_CRITICAL_LVL )
00479    //      << clrOutBOLD( clrRED, "ERROR:" )
00480    //      << " mSockSteam is not connected."
00481    //      << std::endl << vprDEBUG_FLUSH;
00482    //   throw cluster::ClusterException( "Node::recvPacket::recv() - ClusterNode is not connected!" );
00483    //}
00484    else
00485    {
00486       vpr::Uint32 bytes_read;   
00487    
00488       // Get packet data.
00489       vpr::ReturnStatus status =
00490          mSockStream->recvn( incoming_data,
00491                              packet_head->getPacketLength() -
00492                              cluster::Header::RIM_PACKET_HEAD_SIZE,
00493                              bytes_read );
00494 
00495       if ( !status.success() )
00496       {
00497          vprDEBUG( gadgetDBG_RIM, vprDBG_CONFIG_LVL )
00498             << clrOutBOLD( clrRED, "ERROR:" )
00499             << " Reading packet data failed. Expecting: "
00500             << packet_head->getPacketLength() - cluster::Header::RIM_PACKET_HEAD_SIZE
00501             << " But got: " << bytes_read << " ReturnStatus Code: "
00502             << (int)status.code() << std::endl << vprDEBUG_FLUSH;
00503 
00504          throw cluster::ClusterException( "Node::recvPacket() - Reading packet data failed!" );
00505       }
00506    }
00507    
00508    vpr::BufferObjectReader* reader = new vpr::BufferObjectReader( &incoming_data );
00509    
00510    // Parse Packet with new data
00511    new_packet->parse( reader );
00512    
00513    //NOTE: incoming_data goes out of scope here which means that we are left with only the data that we parsed.
00514    //TODO: We could save memory by not parsing the raw DataPacket but just passing the location of the memory that we want to use.
00515 
00516    //parse_data_length = DataPacket::ParsedDataLength
00517    //recvn(incoming_parse_data, ...)
00518    //reader = new reader(incoming_parse_data);
00519    //new_packet->parse(reader);
00520    //recvn(incoming_raw_data, ...)
00521    //new_packet->setRawData(incoming_raw_data);
00522 
00523    return new_packet;
00524 }

void gadget::Node::update (  )  [protected]

Update this cluster node.

Definition at line 180 of file Node.cpp.

References gadget::AbstractNetworkManager::handlePacket(), isConnected(), mNetworkManager, cluster::Packet::printData(), and recvPacket().

Referenced by controlLoop(), and cluster::ClusterNode::controlLoop().

00181 {
00182    // - If connected() && !updated()
00183    //   - try recvPacket()
00184    //     - Catch ClusterException
00185    //       - set not connected
00186    //       - add node to connection pending list
00187    //       - set reconfig needed on reconnect
00188    //     - If no Exception
00189    //       - Print Packet Data
00190    //       - Take the action of the packet
00191 
00192    vprASSERT(isConnected() && "Node is not connected, we can not update!\nWe must not be calling update from the correct location.");
00193    cluster::Packet* temp_packet = NULL;
00194    
00195    temp_packet = recvPacket();
00196    
00197    // Print Packet Information
00198    temp_packet->printData(vprDBG_CONFIG_LVL);
00199 
00200    // Handle the packet correctly
00201    mNetworkManager->handlePacket(temp_packet,this);
00202 
00203    // Clean up after ourselves
00204    delete temp_packet;
00205 }


Member Data Documentation

std::string gadget::Node::mName [protected]

Node name.

Definition at line 233 of file Node.h.

Referenced by debugDump(), and Node().

std::string gadget::Node::mHostname [protected]

Host that it is connected to.

Definition at line 234 of file Node.h.

Referenced by debugDump(), and Node().

vpr::Uint16 gadget::Node::mPort [protected]

Port that it is connected to.

Definition at line 235 of file Node.h.

Referenced by debugDump(), and Node().

bool gadget::Node::mRunning [protected]

Thread is running the control loop.

Definition at line 236 of file Node.h.

Referenced by controlLoop(), cluster::ClusterNode::controlLoop(), shutdown(), and start().

vpr::SocketStream* gadget::Node::mSockStream [protected]

Socket used for communication to this node.

Definition at line 238 of file Node.h.

Referenced by debugDump(), Node(), printStats(), recvPacket(), send(), and shutdown().

vpr::Mutex gadget::Node::mSockWriteLock [protected]

Lock writing to the SocketStream.

Definition at line 239 of file Node.h.

Referenced by send().

vpr::Mutex gadget::Node::mSockReadLock [protected]

Lock reading from the SocketStream.

Definition at line 240 of file Node.h.

Referenced by recvPacket().

vpr::Mutex gadget::Node::mStatusLock [protected]

Lock the isConnected value.

Definition at line 242 of file Node.h.

Referenced by setStatus().

int gadget::Node::mStatus [protected]

States if this node is connected.

Definition at line 243 of file Node.h.

Referenced by setStatus().

vpr::Mutex gadget::Node::mUpdatedLock [protected]

Lock the isUpdated value.

Definition at line 245 of file Node.h.

bool gadget::Node::mUpdated [protected]

States if this node is updated.

Definition at line 246 of file Node.h.

Referenced by controlLoop(), and Node().

vpr::Semaphore gadget::Node::mUpdateTriggerSema [protected]

Semaphore trigger for UserData update.

Definition at line 248 of file Node.h.

Referenced by controlLoop(), shutdown(), and signalUpdate().

vpr::Semaphore gadget::Node::mNodeDoneSema [protected]

Semaphore trigger for completion.

Definition at line 249 of file Node.h.

Referenced by controlLoop(), and sync().

vpr::Thread* gadget::Node::mControlThread [protected]

Update thread for this node.

Definition at line 251 of file Node.h.

Referenced by start().

bool gadget::Node::mThreadActive [protected]

Has the update thread started?

Definition at line 252 of file Node.h.

Referenced by Node(), signalUpdate(), cluster::ClusterNode::signalUpdate(), start(), sync(), and cluster::ClusterNode::sync().

vpr::Uint64 gadget::Node::mDelta [protected]

Time delta between remote and local clocks.

Definition at line 254 of file Node.h.

AbstractNetworkManager* gadget::Node::mNetworkManager [protected]

Network that should handle incoming packets.

Definition at line 255 of file Node.h.

Referenced by update().


The documentation for this class was generated from the following files:
Generated on Thu Jan 4 10:44:46 2007 for Gadgeteer by  doxygen 1.5.1