#include <ClusterNode.h>
Public Types | |
| enum | Status { DISCONNECTED = 0, CONNECTED = 1, INPROGRESS = 2, NEWCONNECTION = 3 } |
Public Methods | |
| ClusterNode () | |
| Create a new ClusterNode for a Node that is connecting to us that we did not previously know about. More... | |
| ClusterNode (const std::string &name, const std::string &host_name, const vpr::Uint16 &port, vpr::SocketStream *socket_stream) | |
| Create a ClusterNode with the given attributes. More... | |
| ~ClusterNode () | |
| Shutdown the update thread and close the SocketStream. More... | |
| void | debugDump (int debug_level) |
| Display all relevant information about ClusterNode. More... | |
| void | printStats (int debug_level=1) |
| Display Bandwidth statistics for the given SocketStream. More... | |
| std::string | getName () |
| Return the name of the node. More... | |
| std::string | getHostname () |
| Return the hostname of the node. More... | |
| vpr::Uint16 | getPort () |
| Return the port number that we are connected to. More... | |
| void | setName (std::string &name) |
| Set the name of the ClusterNode. More... | |
| void | setHostname (std::string &host_name) |
| Set the hostname of the ClusterNode. More... | |
| void | setPort (vpr::Uint16 &port) |
| Set the port of the ClusterNode. More... | |
| vpr::SocketStream * | getSockStream () |
| Get a pointer to the SocketStream used to communicate with this node. More... | |
| void | setSockStream (vpr::SocketStream *socket) |
| Set the SocketStream used to communicate with this node. More... | |
| bool | isConnected () |
| Return if we are connected to this node. More... | |
| void | setConnected (int connect) |
| Set the current connection state to this machine. More... | |
| int | getConnected () |
| Get the current connection state for this node. More... | |
| bool | isUpdated () |
| Return if this node is updated or not. More... | |
| void | setUpdated (bool update) |
| Set the update status for this node. More... | |
| bool | isRunning () |
| Return if this node is running or not. More... | |
| void | setRunning (bool started) |
| Set the running status for this node. More... | |
| vpr::ReturnStatus | attemptConnect () |
| Attempt to connect to this node. More... | |
| void | update () |
| Update this cluster node. More... | |
| void | lockSockRead () |
| Lock a mutex for Reading from the SocketStream. More... | |
| void | unlockSockRead () |
| Unlock a mutex for Reading from the SocketStream. More... | |
| void | lockSockWrite () |
| Lock a mutex for Writing from the SocketStream. More... | |
| void | unlockSockWrite () |
| Unlock a mutex for Writing from the SocketStream. More... | |
| void | start () |
| Starts the control loop. More... | |
| void | controlLoop (void *nullParam) |
| Control loop for updating this thread. More... | |
| void | signalUpdate () |
| Signal a semaphore to let the update thread fall into the code to update the UserData structures. More... | |
| void | sync () |
| Blocks until the end of the frame. More... | |
| void | shutdown () |
| Kill the update thread. More... | |
| vpr::Uint64 * | getDelta () |
| vpr::ReturnStatus | send (Packet *out_packet) |
| Packet * | recvPacket () |
|
|
Definition at line 50 of file ClusterNode.h.
00051 {
00052 DISCONNECTED = 0,
00053 CONNECTED = 1,
00054 INPROGRESS = 2,
00055 NEWCONNECTION = 3
00056 };
|
|
|
Create a new ClusterNode for a Node that is connecting to us that we did not previously know about.
Definition at line 63 of file ClusterNode.cpp. References DISCONNECTED.
00063 : mUpdateTriggerSema(0), mClusterNodeDoneSema(0), mControlThread(NULL)
00064 {
00065 mThreadActive = false;
00066 mPendingConnectionRequest = false;
00067 mUpdated = false;
00068 mRunning = false;
00069
00070 mConnected = DISCONNECTED;
00071
00072 mName = std::string("None Given");
00073 mHostname = std::string("None Given");
00074 mPort = 0;
00075 mSockStream = NULL;
00076 std::cout << "ARONARON REMOVE Created a ClusterNode: No name given." << std::endl;
00077 }
|
|
||||||||||||||||||||
|
Create a ClusterNode with the given attributes.
Definition at line 79 of file ClusterNode.cpp. References DISCONNECTED.
00081 : mUpdateTriggerSema(0), mClusterNodeDoneSema(0), mControlThread(NULL)
00082 {
00083 mThreadActive = false;
00084 mUpdated = false;
00085 mPendingConnectionRequest = false;
00086 mRunning = false;
00087
00088 mConnected = DISCONNECTED;
00089
00090 mName = name;
00091 mHostname = host_name;
00092 mPort = port;
00093 mSockStream = socket_stream;
00094 std::cout << "ARONARON REMOVE Created a ClusterNode: " << name << " - " << host_name << std::endl;
00095 }
|
|
|
Shutdown the update thread and close the SocketStream.
Definition at line 97 of file ClusterNode.cpp. References shutdown.
00098 {
00099 shutdown();
00100 // This may break the acept code since we migt not want to delete the Socket. We may be able to just
00101 // use a smart pointer to point to the SocketStream.
00102 /*
00103 if (mSockStream != NULL)
00104 {
00105 mSockStream->close();
00106 delete mSockStream;
00107 }
00108 */
00109 }
|
|
|
Display all relevant information about ClusterNode.
Definition at line 228 of file ClusterNode.cpp. References CONNECTED, gadgetDBG_RIM, getConnected, and NEWCONNECTION. Referenced by controlLoop.
00229 {
00230
00231 vpr::DebugOutputGuard dbg_output(gadgetDBG_RIM,debug_level,
00232 std::string("-------------- ClusterNode --------------\n"),
00233 std::string("-----------------------------------------\n"));
00234
00235 vprDEBUG(gadgetDBG_RIM,debug_level) << "Node Name: " << mName << std::endl << vprDEBUG_FLUSH;
00236 vprDEBUG(gadgetDBG_RIM,debug_level) << "Hostname: " << mHostname << std::endl << vprDEBUG_FLUSH;
00237 vprDEBUG(gadgetDBG_RIM,debug_level) << "Port: " << mPort << std::endl << vprDEBUG_FLUSH;
00238 vprDEBUG(gadgetDBG_RIM,debug_level) << "SockStream " << (mSockStream == NULL ? "is NULL" : "is NOT NULL") << std::endl << vprDEBUG_FLUSH;
00239 if (getConnected() == CONNECTED)
00240 {
00241 vprDEBUG(gadgetDBG_RIM,debug_level) << clrOutBOLD(clrGREEN,"CONNECTED") << std::endl << vprDEBUG_FLUSH;
00242 }
00243 else if (getConnected() == NEWCONNECTION)
00244 {
00245 vprDEBUG(gadgetDBG_RIM,debug_level) << clrOutBOLD(clrRED,"PENDING CONNECTION REQUEST") << std::endl << vprDEBUG_FLUSH;
00246 }
00247 else
00248 {
00249 vprDEBUG(gadgetDBG_RIM,debug_level) << clrOutBOLD(clrRED,"DISCONNECTED") << std::endl << vprDEBUG_FLUSH;
00250 }
00251 }
|
|
|
Display Bandwidth statistics for the given SocketStream.
Definition at line 252 of file ClusterNode.cpp. References gadgetDBG_RIM.
00253 {
00254 vpr::BaseIOStatsStrategy* stats = mSockStream->getIOStatStrategy();
00255 vpr::BandwidthIOStatsStrategy* bw_interface = dynamic_cast<vpr::BandwidthIOStatsStrategy*>(stats );
00256
00257 if(bw_interface != NULL)
00258 {
00259 // Dump out write stats
00260 vprDEBUG(gadgetDBG_RIM,debug_level) << "Socket Write bandwidth stats ---\n";
00261 vprDEBUG(gadgetDBG_RIM,debug_level) << "stats type: " << typeid(stats).name() << vprDEBUG_FLUSH;
00262 vprDEBUG(gadgetDBG_RIM,debug_level) << " sent bytes: " << bw_interface->writeStats().getTotal() << vprDEBUG_FLUSH;
00263 vprDEBUG(gadgetDBG_RIM,debug_level) << " av send: " << bw_interface->writeStats().getMean()/1024.0f << " k/s" << vprDEBUG_FLUSH;
00264 vprDEBUG(gadgetDBG_RIM,debug_level) << " STA send: " << bw_interface->writeStats().getSTA()/1024.0f << " k/s" << vprDEBUG_FLUSH;
00265 vprDEBUG(gadgetDBG_RIM,debug_level) << " Inst send: " << bw_interface->writeStats().getInstAverage()/1024.0f << " k/s" << vprDEBUG_FLUSH;
00266 vprDEBUG(gadgetDBG_RIM,debug_level) << " Max STA send: " << bw_interface->writeStats().getMaxSTA()/1024.0f << " k/s" << vprDEBUG_FLUSH;
00267
00268 vprDEBUG(gadgetDBG_RIM,debug_level) << " read bytes: " << bw_interface->readStats().getTotal() << vprDEBUG_FLUSH;
00269 vprDEBUG(gadgetDBG_RIM,debug_level) << " av read: " << bw_interface->readStats().getMean()/1024.0f << " k/s" << vprDEBUG_FLUSH;
00270 vprDEBUG(gadgetDBG_RIM,debug_level) << " STA read: " << bw_interface->readStats().getSTA()/1024.0f << " k/s" << vprDEBUG_FLUSH;
00271 vprDEBUG(gadgetDBG_RIM,debug_level) << " Inst read: " << bw_interface->readStats().getInstAverage()/1024.0f << " k/s" << vprDEBUG_FLUSH;
00272 vprDEBUG(gadgetDBG_RIM,debug_level) << " Max STA read: " << bw_interface->readStats().getMaxSTA()/1024.0f << " k/s" << vprDEBUG_FLUSH;
00273
00274 }
00275 else
00276 {
00277 vprDEBUG(gadgetDBG_RIM,debug_level) << "SocketBWTest: Don't have BW Stats on stats. type is: " << typeid(stats).name() << std::endl << vprDEBUG_FLUSH;
00278 }
00279
00280 }
|
|
|
Return the name of the node.
Definition at line 93 of file ClusterNode.h. Referenced by controlLoop, and start.
00093 { return mName; }
|
|
|
Return the hostname of the node.
Definition at line 98 of file ClusterNode.h.
00098 { return mHostname; }
|
|
|
Return the port number that we are connected to.
Definition at line 103 of file ClusterNode.h. Referenced by controlLoop.
00103 { return mPort;}
|
|
|
Set the name of the ClusterNode.
Definition at line 108 of file ClusterNode.h.
00108 { mName = name; }
|
|
|
Set the hostname of the ClusterNode.
Definition at line 113 of file ClusterNode.h.
00113 { mHostname = host_name; }
|
|
|
Set the port of the ClusterNode.
Definition at line 118 of file ClusterNode.h.
00118 { mPort = port;}
|
|
|
Get a pointer to the SocketStream used to communicate with this node.
Definition at line 123 of file ClusterNode.h. Referenced by attemptConnect.
00123 { return mSockStream; }
|
|
|
Set the SocketStream used to communicate with this node.
Definition at line 128 of file ClusterNode.h.
00128 { mSockStream = socket; }
|
|
|
Return if we are connected to this node.
Definition at line 133 of file ClusterNode.h. Referenced by attemptConnect, and update.
00133 { return (mConnected == CONNECTED); }
|
|
|
Set the current connection state to this machine.
Definition at line 282 of file ClusterNode.cpp. References CONNECTED, and DISCONNECTED. Referenced by controlLoop, and update.
00283 {
00284 // - If we are disconnecting
00285 // - Delete all VirtualDevices for this node
00286 // - Delete all DeviceServers for this node
00287 // - Refresh All Proxies
00288 vpr::Guard<vpr::Mutex> guard(mConnectedLock);
00289 if (mConnected == CONNECTED && connect == DISCONNECTED)
00290 {
00291 /*
00292 if (mSockStream != NULL && mSockStream->isOpen())
00293 {
00294 mSockStream->close();
00295 //delete mSockStream;
00296 //mSockStream = NULL;
00297 }
00298 */
00299 //TODO: ADD This back in SOON
00300 //ClusterManager::instance()->recoverFromLostNode(this);
00301 }
00302 if (connect == CONNECTED)
00303 {
00304 ClusterNetwork::instance()->removePendingNode(mHostname);
00305 }
00306 mConnected = connect;
00307 }
|
|
|
Get the current connection state for this node.
Definition at line 143 of file ClusterNode.h. Referenced by attemptConnect, and debugDump.
00143 { return mConnected; }
|
|
|
Return if this node is updated or not.
Definition at line 148 of file ClusterNode.h. Referenced by controlLoop.
00148 { return mUpdated; }
|
|
|
Set the update status for this node.
Definition at line 153 of file ClusterNode.h. Referenced by controlLoop, and update.
00154 { mUpdated = update; }
|
|
|
Return if this node is running or not.
Definition at line 159 of file ClusterNode.h.
00159 { return mRunning; }
|
|
|
Set the running status for this node.
Definition at line 164 of file ClusterNode.h.
00165 { mRunning = started; }
|
|
|
Attempt to connect to this node.
Definition at line 121 of file ClusterNode.cpp. References gadgetDBG_RIM, getConnected, getSockStream, isConnected, NEWCONNECTION, recvPacket, and send.
00122 {
00123 // - Try to connect to given node
00124 // - If success
00125 // - Create a ConnectionRequest Packet
00126 // - Send request(a responce will come later in the normal controlLoop)
00127
00129
00130 /*if (mSockStream != NULL)
00131 {
00132 mSockStream->close();
00133 delete mSockStream;
00134 mSockStream == NULL;
00135 }*/
00136
00137 // If ClusterNode is already connected
00138 // XXX: FIX THIS, it should not return SUCCESS or WOULDBLOCK
00139 if (isConnected())
00140 {
00141 return(vpr::ReturnStatus::WouldBlock);
00142 }
00143 // If we currently have a pending connection request
00144 if (getConnected() == NEWCONNECTION)
00145 {
00146 vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL)
00147 << clrOutBOLD(clrBLUE,"[ClusterNode::attemptConnect]")
00148 << " Pending Connection Request!" << std::endl << vprDEBUG_FLUSH;
00149
00150 return(vpr::ReturnStatus::InProgress);
00151 }
00152
00153 vpr::SocketStream* sock_stream;
00154 vpr::InetAddr inet_addr;
00155
00156 //vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL)
00157 // << clrOutBOLD(clrBLUE,"[ClusterNode::attemptConnect]")
00158 // << " HostName: " << mHostname << ":" << mPort << std::endl << vprDEBUG_FLUSH;
00159
00160 // Set the address that we want to connect to
00161 if ( !inet_addr.setAddress(mHostname, mPort).success() )
00162 {
00163 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00164 << clrOutBOLD(clrBLUE,"[ClusterNode::attemptConnect]")
00165 << clrOutBOLD(clrRED," ERROR: Failed to set address\n") << vprDEBUG_FLUSH;
00166 return vpr::ReturnStatus::Fail;
00167 }
00168 // Create a new socket stream to this address
00169 sock_stream = new vpr::SocketStream(vpr::InetAddr::AnyAddr, inet_addr);
00170
00171
00172 // If we can successfully open the socket and connect to the server
00173 if ( sock_stream->open().success() && sock_stream->connect().success() )
00174 {
00175 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00176 << clrOutBOLD(clrBLUE,"[ClusterNode::attemptConnect]")
00177 << " Successfully connected to: " << mHostname <<":"<< mPort << "\n"<< vprDEBUG_FLUSH;
00178 sock_stream->setNoDelay(true);
00179 mSockStream = sock_stream;
00180 }
00181 else
00182 {
00183 delete sock_stream;
00184 // vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00185 // << clrOutBOLD(clrBLUE,"[ClusterNode::attemptConnect]")
00186 // << clrSetNORM(clrRED) << "ERROR: Could not connect to device server: "
00187 // << mHostname <<" : "<< mPort << "\n" << clrRESET << vprDEBUG_FLUSH;
00188 return vpr::ReturnStatus::Fail;
00189 }
00190
00192
00193 // We might want to send the listen port on this machine to the remote machine just in
00194 // case they for some reason want to re-connect back to us. This should be taken care
00195 // of in configuration but in all reality the remote machine does not have to have the
00196 // machine specific element for this machine.
00197
00198 // Get the localhost name.
00199 vpr::InetAddr local;
00200 vpr::InetAddr::getLocalHost(local);
00201
00202 ConnectionRequest request(local.getHostname(),0/*Might be needed, look above*/);
00203
00204 send(&request);
00205
00206 Packet* temp_packet = recvPacket();
00207
00208 //vprASSERT(temp_packet->getPacketType() == cluster::Header::RIM_CONNECTION_ACK && "We must be receiving a ConnectionAck here");
00209
00210 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00211 << "We have a responce" << std::endl << vprDEBUG_FLUSH;
00212
00213 temp_packet->printData(vprDBG_CONFIG_LVL);
00214 ClusterManager::instance()->handlePacket(temp_packet,this);
00215
00216 delete temp_packet;
00217 if (getConnected() == NEWCONNECTION)
00218 {
00219 ClusterDelta cluster_delta;
00220 vpr::Interval temp;
00221 temp = cluster_delta.getClusterDelta(getSockStream());
00222 mDelta = temp.getBaseVal();
00223 }
00224
00225 return(vpr::ReturnStatus::Succeed);
00226 }
|
|
|
Update this cluster node.
Definition at line 318 of file ClusterNode.cpp. References DISCONNECTED, isConnected, recvPacket, setConnected, and setUpdated. Referenced by controlLoop.
00319 {
00320 // - If connected() && !updated()
00321 // - try recvPacket()
00322 // - Catch ClusterException
00323 // - set not connected
00324 // - add node to connection pending list
00325 // - set reconfig needed on reconnect
00326 // - If no Exception
00327 // - Print Packet Data
00328 // - Take the action of the packet
00329
00330 //debugDump(vprDBG_CONFIG_LVL);
00331 if (mSockStream == NULL)
00332 {
00333 // Do Nothing
00334 setConnected(DISCONNECTED);
00335 setUpdated(false);
00336 //debugDump(vprDBG_CONFIG_LVL);
00337 return;
00338 }
00339 vprASSERT(isConnected() && "ClusterNode is not connected, we can not update!\nWe must not be calling update from the correct location.");
00340 Packet* temp_packet = NULL;
00341
00342 temp_packet = recvPacket();
00343
00344 // Print Packet Information
00345 temp_packet->printData(vprDBG_CONFIG_LVL);
00346
00347 // Handle the packet correctly
00348 ClusterManager::instance()->handlePacket(temp_packet,this);
00349
00350 // Clean up after ourselves
00351 delete temp_packet;
00352 }
|
|
|
Lock a mutex for Reading from the SocketStream.
Definition at line 180 of file ClusterNode.h.
00181 { mSockReadLock.acquire(); }
|
|
|
Unlock a mutex for Reading from the SocketStream.
Definition at line 186 of file ClusterNode.h.
00187 { mSockReadLock.release(); }
|
|
|
Lock a mutex for Writing from the SocketStream.
Definition at line 192 of file ClusterNode.h.
00193 { mSockWriteLock.acquire(); }
|
|
|
Unlock a mutex for Writing from the SocketStream.
Definition at line 198 of file ClusterNode.h.
00199 { mSockWriteLock.release(); }
|
|
|
Starts the control loop.
Definition at line 410 of file ClusterNode.cpp. References gadgetDBG_RIM, and getName.
00411 {
00412 // --- Setup Multi-Process stuff --- //
00413 // Create a new thread to handle the control
00414
00415 if (NULL != mControlThread && mControlThread->valid())
00416 {
00417 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00418 << "ClusterNode " << getName() << " already running..."
00419 << std::endl << vprDEBUG_FLUSH;
00420 return;
00421 }
00422
00423 vpr::ThreadMemberFunctor<ClusterNode>* memberFunctor =
00424 new vpr::ThreadMemberFunctor<ClusterNode>(this, &ClusterNode::controlLoop, NULL);
00425
00426 mControlThread = new vpr::Thread(memberFunctor);
00427
00428 if (mControlThread->valid())
00429 {
00430 mThreadActive = true;
00431 }
00432 vprDEBUG(gadgetDBG_RIM, vprDBG_CONFIG_LVL)
00433 << "ClusterNode " << getName() << " started. thread: "
00434 << mControlThread << std::endl << vprDEBUG_FLUSH;
00435 }
|
|
|
Control loop for updating this thread.
Definition at line 354 of file ClusterNode.cpp. References debugDump, DISCONNECTED, gadgetDBG_RIM, getName, getPort, isUpdated, setConnected, setUpdated, and update.
00355 {
00356 // -Block on an update call
00357 // -Update Local Data
00358 // -Send
00359 // -Signal Sync
00360
00361 boost::ignore_unused_variable_warning(nullParam);
00362
00363 bool running = true;
00364
00365 while(running)
00366 {
00367 // Wait for trigger
00368 mUpdateTriggerSema.acquire();
00369 {
00370 //vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << getName() << "Dropping into ClusterNode::controlLoop() "
00371 // << std::endl << vprDEBUG_FLUSH;
00372
00373 setUpdated(false);
00374 while (!isUpdated())
00375 {
00376 try
00377 {
00378 update();
00379 }
00380 catch(cluster::ClusterException cluster_exception)
00381 {
00382 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrSetBOLD(clrRED)
00383 << cluster_exception.getMessage() << clrRESET
00384 << std::endl << vprDEBUG_FLUSH;
00385 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) <<
00386 "ClusterNode::update() We have lost our connection to: " << getName() << ":" << getPort()
00387 << std::endl << vprDEBUG_FLUSH;
00388
00389 debugDump(vprDBG_CONFIG_LVL);
00390 // Set the ClusterNode as disconnected since we have lost the connection
00391 setConnected(DISCONNECTED);
00392 // Break out of the two loops we are in to stop the tread
00393 setUpdated(true);
00394 running = false;
00395
00396 // TODO: Deleting mSockStream causes a seg fault
00397 mSockStream = NULL;
00398 }
00399 }
00400 }
00401
00402 // Signal done with Update
00403 mClusterNodeDoneSema.release();
00404 }
00405 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << "ClusterNode: " << getName() << " is stopping."
00406 << std::endl << vprDEBUG_FLUSH;
00407 }
|
|
|
Signal a semaphore to let the update thread fall into the code to update the UserData structures.
Definition at line 437 of file ClusterNode.cpp. References gadgetDBG_RIM.
00438 {
00439 while(!mThreadActive)
00440 {
00441 vprDEBUG(gadgetDBG_RIM, vprDBG_HVERB_LVL) << "Waiting in for thread to start ClusterNode::go().\n" << vprDEBUG_FLUSH;
00442 vpr::Thread::yield();
00443 }
00444 //vprDEBUG(gadgetDBG_RIM,/*vprDBG_HVERB_LVL*/1) << getName() << "Signaling ClusterNode\n" << vprDEBUG_FLUSH;
00445 mUpdateTriggerSema.release();
00446 }
|
|
|
Blocks until the end of the frame.
Definition at line 452 of file ClusterNode.cpp.
00453 {
00454 vprASSERT(mThreadActive == true);
00455 mClusterNodeDoneSema.acquire();
00456 }
|
|
|
Kill the update thread.
Definition at line 111 of file ClusterNode.cpp. Referenced by ~ClusterNode.
00112 { // Kill the accepting thread
00113 if ( mControlThread )
00114 {
00115 mThreadActive = false;
00116 mControlThread->kill();
00117 mControlThread = NULL;
00118 }
00119 }
|
|
|
Definition at line 228 of file ClusterNode.h.
00229 {
00230 return &mDelta;
00231 }
|
|
|
Definition at line 458 of file ClusterNode.cpp. References gadgetDBG_RIM. Referenced by attemptConnect.
00459 {
00460 vprASSERT(NULL != out_packet && "Can not send a NULL packet.");
00461
00462 vpr::Guard<vpr::Mutex> guard(mSockWriteLock);
00463
00464 // -Send header data
00465 // -Send packet data
00466
00467 if (mSockStream == NULL)
00468 {
00469 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrSetBOLD(clrRED)
00470 << "ERROR: SocketSteam is NULL\n" << clrRESET << vprDEBUG_FLUSH;
00471 throw cluster::ClusterException("ClusterNode::send() - SocketStream is NULL!");
00472 }
00473
00474 Header* mHeader = out_packet->getHeader();
00475
00476 if (mHeader == NULL)
00477 {
00478 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrSetBOLD(clrRED)
00479 << "ERROR: Packet Header is NULL\n" << clrRESET << vprDEBUG_FLUSH;
00480 throw cluster::ClusterException("ClusterNode::send() - Packet Header is NULL!");
00481 }
00482
00483 if (!mHeader->send(mSockStream).success())
00484 {
00485 mSockStream->close();
00486 delete mSockStream;
00487 mSockStream = NULL;
00488 throw cluster::ClusterException("Packet::recv() - Sending Header Data failed!");
00489 }
00490
00491 vpr::Uint32 bytes_written;
00492
00493 if(mHeader->getPacketLength() == Header::RIM_PACKET_HEAD_SIZE)
00494 {
00495 return(vpr::ReturnStatus::Succeed);
00496 }
00497
00498
00499 // If we have a data packet we need to also send the raw data
00500 //
00501 if (out_packet->getPacketType() != Header::RIM_DATA_PACKET)
00502 {
00503 std::vector<vpr::Uint8>* packet_data = out_packet->getData();
00504
00505 vpr::ReturnStatus status = mSockStream->send(*packet_data,mHeader->getPacketLength()-Header::RIM_PACKET_HEAD_SIZE,bytes_written);
00506 if (!status.success())
00507 {
00508 mSockStream->close();
00509 delete mSockStream;
00510 mSockStream = NULL;
00511 throw cluster::ClusterException("Packet::recv() - Sending Packet Data failed!!");
00512 }
00513
00514 return(status);
00515 }
00516 else
00517 {
00518 std::vector<vpr::Uint8>* packet_data = out_packet->getData();
00519
00520 // Since we are sending a DataPacket we are not actually sending all data here. We are only sending 2 GUIDs here
00521 int size = 32;
00522
00523 vpr::ReturnStatus status = mSockStream->send(*packet_data, size ,bytes_written);
00524 if (!status.success())
00525 {
00526 mSockStream->close();
00527 delete mSockStream;
00528 mSockStream = NULL;
00529 throw cluster::ClusterException("Packet::recv() - Sending Packet Data failed!!");
00530 }
00531
00532
00533 DataPacket* temp_data_packet = dynamic_cast<DataPacket*>(out_packet);
00534 vprASSERT(NULL != temp_data_packet && "Dynamic cast failed!");
00535
00536 // Testing GUIDs
00537 /*vpr::BufferObjectReader* testing = new vpr::BufferObjectReader(packet_data);
00538 vpr::GUID test;
00539 test.readObject(testing);
00540 vpr::GUID test2;
00541 test2.readObject(testing);
00542
00543 std::cout << "1: " << test.toString() << " 2: " << test2.toString() << std::endl;
00544
00545 delete testing;
00546
00547 // Testing ID
00548 testing = new vpr::BufferObjectReader(temp_data_packet->getDeviceData());
00549 std::cout << "ID: " << (int)testing->readUint16() << std::endl;
00550
00551 delete testing;
00552 */
00553
00554 status = mSockStream->send(*(temp_data_packet->getDeviceData()),temp_data_packet->getDeviceData()->size(),bytes_written);
00555 if (!status.success())
00556 {
00557 mSockStream->close();
00558 delete mSockStream;
00559 mSockStream = NULL;
00560 throw cluster::ClusterException("Packet::recv() - Sending Packet Data failed!!");
00561 }
00562
00563 return(status);
00564 }
00565 }
|
|
|
Definition at line 567 of file ClusterNode.cpp. References gadgetDBG_RIM. Referenced by attemptConnect, and update.
00568 {
00569 // -Read in header
00570 // -Get Constructor for correct PacketType
00571 // -Call constructor
00572 // -Read in Packet data
00573 // -Parse data into new packet
00574 // -Return finished packet
00575 Header* packet_head = new Header(mSockStream);
00576
00577 vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "ClusterNode::recvPacket() PacketFactory is trying to make a packet type: "
00578 << packet_head->getPacketType() << std::endl << vprDEBUG_FLUSH;
00579
00580 // Get Packet from factory
00581 Packet* new_packet = PacketFactory::instance()->createObject(packet_head->getPacketType());
00582
00583 // Verify that the packet has been made
00584 if(NULL == new_packet)
00585 {
00586 throw cluster::ClusterException("ClusterNode::recvPacket() - Packet was not found in Factory.");
00587 }
00588
00589 // Recv the packet data
00590 // -Copy over pointer to header
00591 // -Continue reading packet from socket
00592
00593 new_packet->setHeader(packet_head);
00594
00595 std::vector<vpr::Uint8> incoming_data;
00596
00597 if (NULL == mSockStream)
00598 {
00599 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrSetBOLD(clrRED)
00600 << "ERROR: SocketSteam is NULL\n" << clrRESET << vprDEBUG_FLUSH;
00601 throw cluster::ClusterException("ClusterNode::recvPacket::recv() - mSocketStream is NULL!");
00602 }
00603 //TODO: else if (!mSockStream->isConnected())
00604 //{
00605 // vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrSetBOLD(clrRED)
00606 // << "ERROR: SocketSteam is not connected\n" << clrRESET << vprDEBUG_FLUSH;
00607 // throw cluster::ClusterException("ClusterNode::recvPacket::recv() - mSocketStream is not connected!");
00608 //}
00609 else
00610 {
00611 vpr::Uint32 bytes_read;
00612
00613 //vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << "Blocking while reading "
00614 // << packet_head->getPacketLength()-Header::RIM_PACKET_HEAD_SIZE << " bytes.\n" << vprDEBUG_FLUSH;
00615 vpr::ReturnStatus status = mSockStream->recvn(incoming_data, packet_head->getPacketLength()-Header::RIM_PACKET_HEAD_SIZE, bytes_read);
00616 //vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << "We got "
00617 // << bytes_read << " bytes.\n" << vprDEBUG_FLUSH;
00618
00619 if (!status.success())
00620 {
00621 mSockStream->close();
00622 delete mSockStream;
00623 mSockStream = NULL;
00624 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrSetBOLD(clrRED) << "Reading packet data failed. Expecting: "
00625 << packet_head->getPacketLength()-Header::RIM_PACKET_HEAD_SIZE << " But got: " << bytes_read
00626 << " ReturnStatus Code: " << (int)status.code() << std::endl << clrRESET << vprDEBUG_FLUSH;
00627 throw cluster::ClusterException("ClusterNode::recvPacket() - Reading packet data failed!");
00628 }
00629 }
00630
00631 vpr::BufferObjectReader* reader = new vpr::BufferObjectReader(&incoming_data);
00632
00633 // Parse Packet with new data
00634 new_packet->parse(reader);
00635
00636 //NOTE: incoming_data goes out of scope here which means that we are left with only the data that we parsed.
00637 //TODO: We could save memory by not parsing the raw DataPacket but just passing the location of the memory that we want to use.
00638
00639 //parse_data_length = DataPacket::ParsedDataLength
00640 //recvn(incoming_parse_data, ...)
00641 //reader = new reader(incoming_parse_data);
00642 //new_packet->parse(reader);
00643 //recvn(incoming_raw_data, ...)
00644 //new_packet->setRawData(incoming_raw_data);
00645
00646 return new_packet;
00647 }
|
1.2.14 written by Dimitri van Heesch,
© 1997-2002