Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Namespace Members   Compound Members   File Members   Examples  

cluster::ClusterNode Class Reference

#include <ClusterNode.h>

List of all members.

Public Types

enum  Status { DISCONNECTED = 0, CONNECTED = 1, INPROGRESS = 2, NEWCONNECTION = 3 }

Public Methods

 ClusterNode ()
 Create a new ClusterNode for a Node that is connecting to us that we did not previously know about. More...

 ClusterNode (const std::string &name, const std::string &host_name, const vpr::Uint16 &port, vpr::SocketStream *socket_stream)
 Create a ClusterNode with the given attributes. More...

 ~ClusterNode ()
 Shutdown the update thread and close the SocketStream. More...

void debugDump (int debug_level)
 Display all relevant information about ClusterNode. More...

void printStats (int debug_level=1)
 Display Bandwidth statistics for the given SocketStream. More...

std::string getName ()
 Return the name of the node. More...

std::string getHostname ()
 Return the hostname of the node. More...

vpr::Uint16 getPort ()
 Return the port number that we are connected to. More...

void setName (std::string &name)
 Set the name of the ClusterNode. More...

void setHostname (std::string &host_name)
 Set the hostname of the ClusterNode. More...

void setPort (vpr::Uint16 &port)
 Set the port of the ClusterNode. More...

vpr::SocketStream * getSockStream ()
 Get a pointer to the SocketStream used to communicate with this node. More...

void setSockStream (vpr::SocketStream *socket)
 Set the SocketStream used to communicate with this node. More...

bool isConnected ()
 Return if we are connected to this node. More...

void setConnected (int connect)
 Set the current connection state to this machine. More...

int getConnected ()
 Get the current connection state for this node. More...

bool isUpdated ()
 Return if this node is updated or not. More...

void setUpdated (bool update)
 Set the update status for this node. More...

bool isRunning ()
 Return if this node is running or not. More...

void setRunning (bool started)
 Set the running status for this node. More...

vpr::ReturnStatus attemptConnect ()
 Attempt to connect to this node. More...

void update ()
 Update this cluster node. More...

void lockSockRead ()
 Lock a mutex for Reading from the SocketStream. More...

void unlockSockRead ()
 Unlock a mutex for Reading from the SocketStream. More...

void lockSockWrite ()
 Lock a mutex for Writing from the SocketStream. More...

void unlockSockWrite ()
 Unlock a mutex for Writing from the SocketStream. More...

void start ()
 Starts the control loop. More...

void controlLoop (void *nullParam)
 Control loop for updating this thread. More...

void signalUpdate ()
 Signal a semaphore to let the update thread fall into the code to update the UserData structures. More...

void sync ()
 Blocks until the end of the frame. More...

void shutdown ()
 Kill the update thread. More...

vpr::Uint64 * getDelta ()
vpr::ReturnStatus send (Packet *out_packet)
PacketrecvPacket ()


Member Enumeration Documentation

enum cluster::ClusterNode::Status
 

Enumeration values:
DISCONNECTED 
CONNECTED 
INPROGRESS 
NEWCONNECTION 

Definition at line 50 of file ClusterNode.h.

00051       {
00052          DISCONNECTED   = 0,
00053          CONNECTED      = 1,
00054          INPROGRESS     = 2,
00055          NEWCONNECTION  = 3
00056       };


Constructor & Destructor Documentation

cluster::ClusterNode::ClusterNode  
 

Create a new ClusterNode for a Node that is connecting to us that we did not previously know about.

Definition at line 63 of file ClusterNode.cpp.

References DISCONNECTED.

00063                             : mUpdateTriggerSema(0), mClusterNodeDoneSema(0), mControlThread(NULL)
00064    {
00065       mThreadActive = false;
00066       mPendingConnectionRequest = false;
00067       mUpdated = false;
00068       mRunning = false;
00069 
00070       mConnected = DISCONNECTED;
00071 
00072       mName = std::string("None Given");
00073       mHostname = std::string("None Given");
00074       mPort = 0;
00075       mSockStream = NULL;
00076       std::cout << "ARONARON REMOVE Created a ClusterNode: No name given." << std::endl;
00077    }

cluster::ClusterNode::ClusterNode const std::string &    name,
const std::string &    host_name,
const vpr::Uint16 &    port,
vpr::SocketStream *    socket_stream
 

Create a ClusterNode with the given attributes.

Parameters:
name  Name of the Cluster Node from the config file
host_name  Hostname of the remote machine
port  The scoket port that we should connect to
socket_stream  SocketStream used to communicate with remote machine

Definition at line 79 of file ClusterNode.cpp.

References DISCONNECTED.

00081       : mUpdateTriggerSema(0), mClusterNodeDoneSema(0), mControlThread(NULL)
00082    {
00083       mThreadActive = false;
00084       mUpdated = false;
00085       mPendingConnectionRequest = false;
00086       mRunning = false;
00087 
00088       mConnected = DISCONNECTED;
00089 
00090       mName = name;
00091       mHostname = host_name;
00092       mPort = port;
00093       mSockStream = socket_stream;
00094       std::cout << "ARONARON REMOVE Created a ClusterNode: " << name << " - " << host_name << std::endl;
00095    }

cluster::ClusterNode::~ClusterNode  
 

Shutdown the update thread and close the SocketStream.

Definition at line 97 of file ClusterNode.cpp.

References shutdown.

00098    {
00099       shutdown();
00100       // This may break the acept code since we migt not want to delete the Socket. We may be able to just 
00101       // use a smart pointer to point to the SocketStream.
00102       /*
00103       if (mSockStream != NULL)
00104       {
00105          mSockStream->close();
00106          delete mSockStream;
00107       }
00108       */
00109    }


Member Function Documentation

void cluster::ClusterNode::debugDump int    debug_level
 

Display all relevant information about ClusterNode.

Definition at line 228 of file ClusterNode.cpp.

References CONNECTED, gadgetDBG_RIM, getConnected, and NEWCONNECTION.

Referenced by controlLoop.

00229    {
00230 
00231       vpr::DebugOutputGuard dbg_output(gadgetDBG_RIM,debug_level,
00232                                  std::string("-------------- ClusterNode --------------\n"),
00233                                  std::string("-----------------------------------------\n"));
00234 
00235       vprDEBUG(gadgetDBG_RIM,debug_level) << "Node Name: " << mName << std::endl << vprDEBUG_FLUSH;
00236       vprDEBUG(gadgetDBG_RIM,debug_level) << "Hostname:  " << mHostname << std::endl << vprDEBUG_FLUSH;
00237       vprDEBUG(gadgetDBG_RIM,debug_level) << "Port:      " << mPort << std::endl << vprDEBUG_FLUSH;
00238       vprDEBUG(gadgetDBG_RIM,debug_level) << "SockStream " << (mSockStream == NULL ? "is NULL" : "is NOT NULL") << std::endl << vprDEBUG_FLUSH;
00239       if (getConnected() == CONNECTED)
00240       {
00241          vprDEBUG(gadgetDBG_RIM,debug_level) << clrOutBOLD(clrGREEN,"CONNECTED") << std::endl << vprDEBUG_FLUSH;
00242       }
00243       else if (getConnected() == NEWCONNECTION)
00244       {
00245          vprDEBUG(gadgetDBG_RIM,debug_level) << clrOutBOLD(clrRED,"PENDING CONNECTION REQUEST") << std::endl << vprDEBUG_FLUSH;
00246       }
00247       else
00248       {
00249          vprDEBUG(gadgetDBG_RIM,debug_level) << clrOutBOLD(clrRED,"DISCONNECTED") << std::endl << vprDEBUG_FLUSH;
00250       }
00251    }

void cluster::ClusterNode::printStats int    debug_level = 1
 

Display Bandwidth statistics for the given SocketStream.

Definition at line 252 of file ClusterNode.cpp.

References gadgetDBG_RIM.

00253    {
00254       vpr::BaseIOStatsStrategy* stats = mSockStream->getIOStatStrategy();
00255       vpr::BandwidthIOStatsStrategy* bw_interface = dynamic_cast<vpr::BandwidthIOStatsStrategy*>(stats );
00256 
00257       if(bw_interface != NULL)
00258       {
00259          // Dump out write stats
00260          vprDEBUG(gadgetDBG_RIM,debug_level) << "Socket Write bandwidth stats ---\n";
00261          vprDEBUG(gadgetDBG_RIM,debug_level) << "stats type: " << typeid(stats).name() << vprDEBUG_FLUSH;
00262          vprDEBUG(gadgetDBG_RIM,debug_level) << "      sent bytes: " << bw_interface->writeStats().getTotal() << vprDEBUG_FLUSH;
00263          vprDEBUG(gadgetDBG_RIM,debug_level) << "         av send: " << bw_interface->writeStats().getMean()/1024.0f << " k/s" << vprDEBUG_FLUSH;
00264          vprDEBUG(gadgetDBG_RIM,debug_level) << "        STA send: " << bw_interface->writeStats().getSTA()/1024.0f << " k/s" << vprDEBUG_FLUSH;
00265          vprDEBUG(gadgetDBG_RIM,debug_level) << "       Inst send: " << bw_interface->writeStats().getInstAverage()/1024.0f << " k/s" << vprDEBUG_FLUSH;
00266          vprDEBUG(gadgetDBG_RIM,debug_level) << "    Max STA send: " << bw_interface->writeStats().getMaxSTA()/1024.0f << " k/s" << vprDEBUG_FLUSH;
00267 
00268          vprDEBUG(gadgetDBG_RIM,debug_level) << "      read bytes: " << bw_interface->readStats().getTotal() << vprDEBUG_FLUSH;
00269          vprDEBUG(gadgetDBG_RIM,debug_level) << "         av read: " << bw_interface->readStats().getMean()/1024.0f << " k/s" << vprDEBUG_FLUSH;
00270          vprDEBUG(gadgetDBG_RIM,debug_level) << "        STA read: " << bw_interface->readStats().getSTA()/1024.0f << " k/s" << vprDEBUG_FLUSH;
00271          vprDEBUG(gadgetDBG_RIM,debug_level) << "       Inst read: " << bw_interface->readStats().getInstAverage()/1024.0f << " k/s" << vprDEBUG_FLUSH;
00272          vprDEBUG(gadgetDBG_RIM,debug_level) << "    Max STA read: " << bw_interface->readStats().getMaxSTA()/1024.0f << " k/s" << vprDEBUG_FLUSH;
00273 
00274       }
00275       else
00276       {
00277          vprDEBUG(gadgetDBG_RIM,debug_level) << "SocketBWTest: Don't have BW Stats on stats. type is: " << typeid(stats).name() << std::endl << vprDEBUG_FLUSH;
00278       }
00279 
00280    }

std::string cluster::ClusterNode::getName   [inline]
 

Return the name of the node.

Definition at line 93 of file ClusterNode.h.

Referenced by controlLoop, and start.

00093 { return mName; }

std::string cluster::ClusterNode::getHostname   [inline]
 

Return the hostname of the node.

Definition at line 98 of file ClusterNode.h.

00098 { return mHostname; }

vpr::Uint16 cluster::ClusterNode::getPort   [inline]
 

Return the port number that we are connected to.

Definition at line 103 of file ClusterNode.h.

Referenced by controlLoop.

00103 { return mPort;}

void cluster::ClusterNode::setName std::string &    name [inline]
 

Set the name of the ClusterNode.

Definition at line 108 of file ClusterNode.h.

00108 { mName = name; }

void cluster::ClusterNode::setHostname std::string &    host_name [inline]
 

Set the hostname of the ClusterNode.

Definition at line 113 of file ClusterNode.h.

00113 { mHostname = host_name; }

void cluster::ClusterNode::setPort vpr::Uint16 &    port [inline]
 

Set the port of the ClusterNode.

Definition at line 118 of file ClusterNode.h.

00118 { mPort = port;}

vpr::SocketStream* cluster::ClusterNode::getSockStream   [inline]
 

Get a pointer to the SocketStream used to communicate with this node.

Definition at line 123 of file ClusterNode.h.

Referenced by attemptConnect.

00123 { return mSockStream; }

void cluster::ClusterNode::setSockStream vpr::SocketStream *    socket [inline]
 

Set the SocketStream used to communicate with this node.

Definition at line 128 of file ClusterNode.h.

00128 { mSockStream = socket; }

bool cluster::ClusterNode::isConnected   [inline]
 

Return if we are connected to this node.

Definition at line 133 of file ClusterNode.h.

Referenced by attemptConnect, and update.

00133 { return (mConnected == CONNECTED); }

void cluster::ClusterNode::setConnected int    connect
 

Set the current connection state to this machine.

Definition at line 282 of file ClusterNode.cpp.

References CONNECTED, and DISCONNECTED.

Referenced by controlLoop, and update.

00283    {
00284       // - If we are disconnecting
00285       //   - Delete all VirtualDevices for this node
00286       //   - Delete all DeviceServers for this node
00287       //   - Refresh All Proxies
00288       vpr::Guard<vpr::Mutex> guard(mConnectedLock);
00289       if (mConnected == CONNECTED && connect == DISCONNECTED)
00290       {
00291          /*
00292          if (mSockStream != NULL && mSockStream->isOpen())
00293          {
00294             mSockStream->close();
00295             //delete mSockStream;
00296             //mSockStream = NULL;
00297          }
00298          */
00299          //TODO: ADD This back in SOON
00300          //ClusterManager::instance()->recoverFromLostNode(this);
00301       }
00302       if (connect == CONNECTED)
00303       {
00304          ClusterNetwork::instance()->removePendingNode(mHostname);
00305       }
00306       mConnected = connect;
00307    }

int cluster::ClusterNode::getConnected   [inline]
 

Get the current connection state for this node.

Definition at line 143 of file ClusterNode.h.

Referenced by attemptConnect, and debugDump.

00143 { return mConnected; }

bool cluster::ClusterNode::isUpdated   [inline]
 

Return if this node is updated or not.

Definition at line 148 of file ClusterNode.h.

Referenced by controlLoop.

00148 { return mUpdated; }

void cluster::ClusterNode::setUpdated bool    update [inline]
 

Set the update status for this node.

Definition at line 153 of file ClusterNode.h.

Referenced by controlLoop, and update.

00154       { mUpdated = update; }

bool cluster::ClusterNode::isRunning   [inline]
 

Return if this node is running or not.

Definition at line 159 of file ClusterNode.h.

00159 { return mRunning; }

void cluster::ClusterNode::setRunning bool    started [inline]
 

Set the running status for this node.

Definition at line 164 of file ClusterNode.h.

00165       { mRunning = started; }

vpr::ReturnStatus cluster::ClusterNode::attemptConnect  
 

Attempt to connect to this node.

Definition at line 121 of file ClusterNode.cpp.

References gadgetDBG_RIM, getConnected, getSockStream, isConnected, NEWCONNECTION, recvPacket, and send.

00122    {
00123       // - Try to connect to given node
00124       // - If success
00125       //   - Create a ConnectionRequest Packet
00126       //   - Send request(a responce will come later in the normal controlLoop)
00127 
00129 
00130       /*if (mSockStream != NULL)
00131       {
00132          mSockStream->close();
00133          delete mSockStream;
00134          mSockStream == NULL;
00135       }*/
00136 
00137          // If ClusterNode is already connected
00138       // XXX: FIX THIS, it should not return SUCCESS or WOULDBLOCK
00139       if (isConnected())
00140       {
00141          return(vpr::ReturnStatus::WouldBlock);
00142       }
00143          // If we currently have a pending connection request
00144       if (getConnected() == NEWCONNECTION)
00145       {
00146          vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL)
00147             << clrOutBOLD(clrBLUE,"[ClusterNode::attemptConnect]")
00148             << " Pending Connection Request!" << std::endl << vprDEBUG_FLUSH;
00149 
00150          return(vpr::ReturnStatus::InProgress);
00151       }
00152 
00153       vpr::SocketStream* sock_stream;
00154       vpr::InetAddr inet_addr;
00155 
00156       //vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL)
00157       //   << clrOutBOLD(clrBLUE,"[ClusterNode::attemptConnect]")
00158       //   << " HostName: " << mHostname << ":" << mPort << std::endl << vprDEBUG_FLUSH;
00159 
00160          // Set the address that we want to connect to
00161       if ( !inet_addr.setAddress(mHostname, mPort).success() )
00162       {
00163          vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00164             << clrOutBOLD(clrBLUE,"[ClusterNode::attemptConnect]")
00165             << clrOutBOLD(clrRED," ERROR: Failed to set address\n") << vprDEBUG_FLUSH;
00166          return vpr::ReturnStatus::Fail;
00167       }
00168          // Create a new socket stream to this address
00169       sock_stream = new vpr::SocketStream(vpr::InetAddr::AnyAddr, inet_addr);
00170 
00171 
00172          // If we can successfully open the socket and connect to the server
00173       if ( sock_stream->open().success() && sock_stream->connect().success() )
00174       {
00175          vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00176             << clrOutBOLD(clrBLUE,"[ClusterNode::attemptConnect]")
00177             << " Successfully connected to: " << mHostname <<":"<< mPort << "\n"<< vprDEBUG_FLUSH;
00178          sock_stream->setNoDelay(true);
00179          mSockStream = sock_stream;
00180       }
00181       else
00182       {
00183          delete sock_stream;
00184 //         vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00185 //            << clrOutBOLD(clrBLUE,"[ClusterNode::attemptConnect]")
00186 //            << clrSetNORM(clrRED) << "ERROR: Could not connect to device server: "
00187 //            << mHostname <<" : "<< mPort << "\n" << clrRESET << vprDEBUG_FLUSH;
00188          return vpr::ReturnStatus::Fail;
00189       }
00190 
00192 
00193       // We might want to send the listen port on this machine to the remote machine just in 
00194       // case they for some reason want to re-connect back to us. This should be taken care 
00195       // of in configuration but in all reality the remote machine does not have to have the 
00196       // machine specific element for this machine.
00197       
00198       // Get the localhost name.
00199       vpr::InetAddr local;
00200       vpr::InetAddr::getLocalHost(local);
00201          
00202       ConnectionRequest request(local.getHostname(),0/*Might be needed, look above*/);
00203 
00204       send(&request);
00205       
00206       Packet* temp_packet = recvPacket();
00207 
00208       //vprASSERT(temp_packet->getPacketType() == cluster::Header::RIM_CONNECTION_ACK && "We must be receiving a ConnectionAck here");
00209       
00210       vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00211          << "We have a responce" << std::endl << vprDEBUG_FLUSH;
00212 
00213       temp_packet->printData(vprDBG_CONFIG_LVL);
00214       ClusterManager::instance()->handlePacket(temp_packet,this);
00215 
00216       delete temp_packet;
00217       if (getConnected() == NEWCONNECTION)
00218       {
00219          ClusterDelta cluster_delta;
00220          vpr::Interval temp;
00221          temp = cluster_delta.getClusterDelta(getSockStream());
00222          mDelta = temp.getBaseVal();
00223       }
00224 
00225       return(vpr::ReturnStatus::Succeed);
00226    }

void cluster::ClusterNode::update  
 

Update this cluster node.

Definition at line 318 of file ClusterNode.cpp.

References DISCONNECTED, isConnected, recvPacket, setConnected, and setUpdated.

Referenced by controlLoop.

00319    {
00320       // - If connected() && !updated()
00321       //   - try recvPacket()
00322       //     - Catch ClusterException
00323       //       - set not connected
00324       //       - add node to connection pending list
00325       //       - set reconfig needed on reconnect
00326       //     - If no Exception
00327       //       - Print Packet Data
00328       //       - Take the action of the packet
00329 
00330       //debugDump(vprDBG_CONFIG_LVL);
00331       if (mSockStream == NULL)
00332       {
00333          // Do Nothing
00334          setConnected(DISCONNECTED);
00335          setUpdated(false);
00336          //debugDump(vprDBG_CONFIG_LVL);
00337          return;
00338       }
00339       vprASSERT(isConnected() && "ClusterNode is not connected, we can not update!\nWe must not be calling update from the correct location.");
00340       Packet* temp_packet = NULL;
00341       
00342       temp_packet = recvPacket();
00343       
00344       // Print Packet Information
00345       temp_packet->printData(vprDBG_CONFIG_LVL);
00346 
00347       // Handle the packet correctly
00348       ClusterManager::instance()->handlePacket(temp_packet,this);
00349 
00350       // Clean up after ourselves
00351       delete temp_packet;
00352    }

void cluster::ClusterNode::lockSockRead   [inline]
 

Lock a mutex for Reading from the SocketStream.

Definition at line 180 of file ClusterNode.h.

00181       { mSockReadLock.acquire(); }

void cluster::ClusterNode::unlockSockRead   [inline]
 

Unlock a mutex for Reading from the SocketStream.

Definition at line 186 of file ClusterNode.h.

00187       { mSockReadLock.release(); }

void cluster::ClusterNode::lockSockWrite   [inline]
 

Lock a mutex for Writing from the SocketStream.

Definition at line 192 of file ClusterNode.h.

00193       { mSockWriteLock.acquire(); }

void cluster::ClusterNode::unlockSockWrite   [inline]
 

Unlock a mutex for Writing from the SocketStream.

Definition at line 198 of file ClusterNode.h.

00199       { mSockWriteLock.release(); }

void cluster::ClusterNode::start  
 

Starts the control loop.

Definition at line 410 of file ClusterNode.cpp.

References gadgetDBG_RIM, and getName.

00411    {
00412       // --- Setup Multi-Process stuff --- //
00413       // Create a new thread to handle the control
00414 
00415       if (NULL != mControlThread && mControlThread->valid())
00416       {
00417          vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00418                << "ClusterNode " << getName() << " already running..."
00419                << std::endl << vprDEBUG_FLUSH;
00420          return;
00421       }
00422 
00423       vpr::ThreadMemberFunctor<ClusterNode>* memberFunctor =
00424          new vpr::ThreadMemberFunctor<ClusterNode>(this, &ClusterNode::controlLoop, NULL);
00425 
00426       mControlThread = new vpr::Thread(memberFunctor);
00427 
00428       if (mControlThread->valid())
00429       {
00430          mThreadActive = true;
00431       }
00432       vprDEBUG(gadgetDBG_RIM, vprDBG_CONFIG_LVL)
00433          << "ClusterNode " << getName() << " started. thread: "
00434          << mControlThread << std::endl << vprDEBUG_FLUSH;
00435    }

void cluster::ClusterNode::controlLoop void *    nullParam
 

Control loop for updating this thread.

Definition at line 354 of file ClusterNode.cpp.

References debugDump, DISCONNECTED, gadgetDBG_RIM, getName, getPort, isUpdated, setConnected, setUpdated, and update.

00355    {
00356       // -Block on an update call
00357       // -Update Local Data
00358       // -Send
00359       // -Signal Sync
00360 
00361       boost::ignore_unused_variable_warning(nullParam);
00362       
00363       bool running = true;
00364 
00365       while(running)
00366       {
00367          // Wait for trigger
00368          mUpdateTriggerSema.acquire();
00369          {
00370             //vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << getName() << "Dropping into ClusterNode::controlLoop() "
00371             //                                          << std::endl << vprDEBUG_FLUSH;
00372 
00373             setUpdated(false);
00374             while (!isUpdated())
00375             {
00376                try
00377                {
00378                   update();
00379                }
00380                catch(cluster::ClusterException cluster_exception)
00381                {
00382                   vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrSetBOLD(clrRED)
00383                      << cluster_exception.getMessage() << clrRESET
00384                      << std::endl << vprDEBUG_FLUSH;
00385                   vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) <<
00386                      "ClusterNode::update() We have lost our connection to: " << getName() << ":" << getPort()
00387                      << std::endl << vprDEBUG_FLUSH;
00388 
00389                   debugDump(vprDBG_CONFIG_LVL);
00390                   // Set the ClusterNode as disconnected since we have lost the connection
00391                   setConnected(DISCONNECTED);
00392                   // Break out of the two loops we are in to stop the tread
00393                   setUpdated(true);             
00394                   running = false;
00395                   
00396                   // TODO: Deleting mSockStream causes a seg fault
00397                   mSockStream = NULL;
00398                }
00399             }
00400          }
00401          
00402          // Signal done with Update
00403          mClusterNodeDoneSema.release();
00404       }
00405       vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << "ClusterNode: " << getName() << " is stopping."
00406                                                 << std::endl << vprDEBUG_FLUSH;
00407    }

void cluster::ClusterNode::signalUpdate  
 

Signal a semaphore to let the update thread fall into the code to update the UserData structures.

Definition at line 437 of file ClusterNode.cpp.

References gadgetDBG_RIM.

00438    {
00439       while(!mThreadActive)
00440       {
00441          vprDEBUG(gadgetDBG_RIM, vprDBG_HVERB_LVL) << "Waiting in for thread to start ClusterNode::go().\n" << vprDEBUG_FLUSH;
00442          vpr::Thread::yield();
00443       }
00444       //vprDEBUG(gadgetDBG_RIM,/*vprDBG_HVERB_LVL*/1) << getName() << "Signaling ClusterNode\n" << vprDEBUG_FLUSH;
00445       mUpdateTriggerSema.release();
00446    }

void cluster::ClusterNode::sync  
 

Blocks until the end of the frame.

Postcondition:
The frame has been drawn.

Definition at line 452 of file ClusterNode.cpp.

00453    {
00454       vprASSERT(mThreadActive == true);
00455       mClusterNodeDoneSema.acquire();
00456    }

void cluster::ClusterNode::shutdown  
 

Kill the update thread.

Definition at line 111 of file ClusterNode.cpp.

Referenced by ~ClusterNode.

00112    {     // Kill the accepting thread
00113       if ( mControlThread )
00114       {
00115          mThreadActive = false;
00116          mControlThread->kill();
00117          mControlThread = NULL;
00118       }
00119    }

vpr::Uint64* cluster::ClusterNode::getDelta   [inline]
 

Definition at line 228 of file ClusterNode.h.

00229       {
00230          return &mDelta;
00231       }

vpr::ReturnStatus cluster::ClusterNode::send Packet   out_packet
 

Definition at line 458 of file ClusterNode.cpp.

References gadgetDBG_RIM.

Referenced by attemptConnect.

00459    {
00460       vprASSERT(NULL != out_packet && "Can not send a NULL packet.");
00461 
00462       vpr::Guard<vpr::Mutex> guard(mSockWriteLock);
00463 
00464       // -Send header data
00465       // -Send packet data
00466 
00467       if (mSockStream == NULL)
00468       {
00469          vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrSetBOLD(clrRED)
00470             << "ERROR: SocketSteam is NULL\n" << clrRESET << vprDEBUG_FLUSH;
00471          throw cluster::ClusterException("ClusterNode::send() - SocketStream is NULL!");
00472       }
00473 
00474       Header* mHeader = out_packet->getHeader();
00475 
00476       if (mHeader == NULL)
00477       {
00478          vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrSetBOLD(clrRED)
00479             << "ERROR: Packet Header is NULL\n" << clrRESET << vprDEBUG_FLUSH;
00480          throw cluster::ClusterException("ClusterNode::send() - Packet Header is NULL!");
00481       }
00482 
00483       if (!mHeader->send(mSockStream).success())
00484       {
00485          mSockStream->close();
00486          delete mSockStream;
00487          mSockStream = NULL;
00488          throw cluster::ClusterException("Packet::recv() - Sending Header Data failed!");
00489       }
00490 
00491       vpr::Uint32 bytes_written;      
00492 
00493       if(mHeader->getPacketLength() == Header::RIM_PACKET_HEAD_SIZE)
00494       {
00495          return(vpr::ReturnStatus::Succeed);
00496       }
00497       
00498       
00499       // If we have a data packet we need to also send the raw data
00500       //
00501       if (out_packet->getPacketType() != Header::RIM_DATA_PACKET)
00502       {
00503          std::vector<vpr::Uint8>* packet_data = out_packet->getData();
00504 
00505          vpr::ReturnStatus status = mSockStream->send(*packet_data,mHeader->getPacketLength()-Header::RIM_PACKET_HEAD_SIZE,bytes_written);
00506          if (!status.success())
00507          {
00508             mSockStream->close();
00509             delete mSockStream;
00510             mSockStream = NULL;
00511             throw cluster::ClusterException("Packet::recv() - Sending Packet Data failed!!");
00512          }
00513          
00514          return(status);
00515       }
00516       else
00517       {
00518          std::vector<vpr::Uint8>* packet_data = out_packet->getData();
00519 
00520          // Since we are sending a DataPacket we are not actually sending all data here. We are only sending 2 GUIDs here
00521          int size = 32;
00522 
00523          vpr::ReturnStatus status = mSockStream->send(*packet_data, size ,bytes_written);
00524          if (!status.success())
00525          {
00526             mSockStream->close();
00527             delete mSockStream;
00528             mSockStream = NULL;
00529             throw cluster::ClusterException("Packet::recv() - Sending Packet Data failed!!");
00530          }
00531 
00532 
00533          DataPacket* temp_data_packet = dynamic_cast<DataPacket*>(out_packet);
00534          vprASSERT(NULL != temp_data_packet && "Dynamic cast failed!");
00535 
00536          // Testing GUIDs
00537          /*vpr::BufferObjectReader* testing = new vpr::BufferObjectReader(packet_data);
00538          vpr::GUID test;
00539          test.readObject(testing);
00540          vpr::GUID test2;
00541          test2.readObject(testing);
00542 
00543          std::cout << "1: " << test.toString() << " 2: " << test2.toString() << std::endl;
00544 
00545          delete testing;
00546          
00547          // Testing ID
00548          testing = new vpr::BufferObjectReader(temp_data_packet->getDeviceData());
00549          std::cout << "ID: " << (int)testing->readUint16() << std::endl;
00550 
00551          delete testing;
00552          */
00553          
00554          status = mSockStream->send(*(temp_data_packet->getDeviceData()),temp_data_packet->getDeviceData()->size(),bytes_written);
00555          if (!status.success())
00556          {
00557             mSockStream->close();
00558             delete mSockStream;
00559             mSockStream = NULL;
00560             throw cluster::ClusterException("Packet::recv() - Sending Packet Data failed!!");
00561          }
00562          
00563          return(status);
00564       }     
00565    }

Packet * cluster::ClusterNode::recvPacket  
 

Definition at line 567 of file ClusterNode.cpp.

References gadgetDBG_RIM.

Referenced by attemptConnect, and update.

00568    {
00569       // -Read in header
00570       // -Get Constructor for correct PacketType
00571       // -Call constructor
00572       // -Read in Packet data
00573       // -Parse data into new packet
00574       // -Return finished packet
00575       Header* packet_head = new Header(mSockStream);
00576       
00577       vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "ClusterNode::recvPacket() PacketFactory is trying to make a packet type: " 
00578          << packet_head->getPacketType() << std::endl << vprDEBUG_FLUSH;
00579 
00580       // Get Packet from factory
00581       Packet* new_packet = PacketFactory::instance()->createObject(packet_head->getPacketType());
00582 
00583       // Verify that the packet has been made
00584       if(NULL == new_packet)
00585       {
00586          throw cluster::ClusterException("ClusterNode::recvPacket() - Packet was not found in Factory.");
00587       }
00588       
00589       // Recv the packet data
00590       // -Copy over pointer to header
00591       // -Continue reading packet from socket
00592       
00593       new_packet->setHeader(packet_head);
00594       
00595       std::vector<vpr::Uint8> incoming_data;
00596 
00597       if (NULL == mSockStream)
00598       {
00599          vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrSetBOLD(clrRED) 
00600             << "ERROR: SocketSteam is NULL\n" << clrRESET << vprDEBUG_FLUSH;
00601          throw cluster::ClusterException("ClusterNode::recvPacket::recv() - mSocketStream is NULL!");
00602       }
00603       //TODO: else if (!mSockStream->isConnected())
00604       //{
00605       //   vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrSetBOLD(clrRED) 
00606       //      << "ERROR: SocketSteam is not connected\n" << clrRESET << vprDEBUG_FLUSH;
00607       //   throw cluster::ClusterException("ClusterNode::recvPacket::recv() - mSocketStream is not connected!");
00608       //}
00609       else
00610       {
00611          vpr::Uint32 bytes_read;   
00612 
00613          //vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << "Blocking while reading " 
00614          //   << packet_head->getPacketLength()-Header::RIM_PACKET_HEAD_SIZE << " bytes.\n" << vprDEBUG_FLUSH;
00615          vpr::ReturnStatus status = mSockStream->recvn(incoming_data, packet_head->getPacketLength()-Header::RIM_PACKET_HEAD_SIZE, bytes_read);   
00616          //vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << "We got " 
00617          //   << bytes_read << " bytes.\n" << vprDEBUG_FLUSH;
00618 
00619          if (!status.success())
00620          {
00621             mSockStream->close();
00622             delete mSockStream;
00623             mSockStream = NULL;
00624             vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrSetBOLD(clrRED) << "Reading packet data failed. Expecting: "
00625                   << packet_head->getPacketLength()-Header::RIM_PACKET_HEAD_SIZE << " But got: " << bytes_read
00626                   << " ReturnStatus Code: " << (int)status.code() << std::endl << clrRESET << vprDEBUG_FLUSH;
00627             throw cluster::ClusterException("ClusterNode::recvPacket() - Reading packet data failed!");
00628          }
00629       }
00630       
00631       vpr::BufferObjectReader* reader = new vpr::BufferObjectReader(&incoming_data);
00632       
00633       // Parse Packet with new data
00634       new_packet->parse(reader);
00635       
00636       //NOTE: incoming_data goes out of scope here which means that we are left with only the data that we parsed.
00637       //TODO: We could save memory by not parsing the raw DataPacket but just passing the location of the memory that we want to use.
00638 
00639       //parse_data_length = DataPacket::ParsedDataLength
00640       //recvn(incoming_parse_data, ...)
00641       //reader = new reader(incoming_parse_data);
00642       //new_packet->parse(reader);
00643       //recvn(incoming_raw_data, ...)
00644       //new_packet->setRawData(incoming_raw_data);
00645 
00646       return new_packet;
00647    }


The documentation for this class was generated from the following files:
Generated on Sun May 2 14:26:44 2004 for Gadgeteer by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002