#include <cluster/ClusterDelta.h>
Public Member Functions | |
| ClusterDelta () | |
| void | getPacket (unsigned num) |
| void | sendAndClear () |
| void | clearIntervals () |
| void | receiveExpectedTime () |
| void | createExpectedTime () |
| void | receiveHandshake () |
| void | createHandshake () |
| void | createResponce () |
| void | receiveResponce () |
| void | clientClusterDelta (vpr::SocketStream *socket_stream) |
| vpr::Interval | getClusterDelta (vpr::SocketStream *socket_stream) |
| void | setSocketStream (vpr::SocketStream *socket_stream) |
Definition at line 73 of file ClusterDelta.h.
| cluster::ClusterDelta::ClusterDelta | ( | ) |
Definition at line 49 of file ClusterDelta.cpp.
00049 : syncPacket(12) 00050 { 00051 mTol = 2; 00052 mAccept = false; 00053 mReader = new vpr::BufferObjectReader(&syncPacket); 00054 }
| void cluster::ClusterDelta::getPacket | ( | unsigned | num | ) |
Definition at line 191 of file ClusterDelta.cpp.
Referenced by receiveExpectedTime(), receiveHandshake(), and receiveResponce().
00192 { // Need to delete the old Object Readers 00193 vpr::Uint32 bytes_read; 00194 syncPacket.clear(); 00195 mReader->setCurPos(0); 00196 //vpr::ReturnStatus status = mSocketStream->readn(syncPacket,SYNC_PACKET_LENGTH,bytes_read); 00197 mSocketStream->readn(syncPacket,num ,bytes_read); 00198 }
| void cluster::ClusterDelta::sendAndClear | ( | ) |
Definition at line 181 of file ClusterDelta.cpp.
Referenced by createExpectedTime(), createHandshake(), and createResponce().
00182 { 00183 vpr::Uint32 bytes_just_sent = 0; 00184 if ( !mWriter.getData()->empty() ) 00185 { 00186 mSocketStream->send(*(mWriter.getData()),mWriter.getData()->size(),bytes_just_sent); 00187 } 00188 mWriter.mData->clear(); 00189 mWriter.mCurHeadPos = 0; 00190 }
| void cluster::ClusterDelta::clearIntervals | ( | ) |
Definition at line 199 of file ClusterDelta.cpp.
Referenced by clientClusterDelta(), and getClusterDelta().
00200 { 00201 mLocalSendTime.set(0,vpr::Interval::Base); 00202 mLocalReceiveTime.set(0,vpr::Interval::Base); 00203 mRemoteSendTime.set(0,vpr::Interval::Base); 00204 mRemoteReceiveTime.set(0,vpr::Interval::Base); 00205 mExpectedRemoteTime.set(0,vpr::Interval::Base); 00206 mLatencyTime.set(0,vpr::Interval::Base); 00207 mDelta.set(0,vpr::Interval::Base); 00208 mErrorTime.set(0,vpr::Interval::Base); 00209 }
| void cluster::ClusterDelta::receiveExpectedTime | ( | ) |
Definition at line 134 of file ClusterDelta.cpp.
References gadgetDBG_RIM(), and getPacket().
Referenced by clientClusterDelta().
00135 { 00136 00137 getPacket(8); 00138 mLocalReceiveTime.setNow(); 00139 vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Got Expected Time at: " << mLocalReceiveTime.getBaseVal() << "\n" << vprDEBUG_FLUSH; 00140 mExpectedRemoteTime.set(mReader->readUint64(), vpr::Interval::Base); 00141 vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Got Expected Time: " << mExpectedRemoteTime.getBaseVal() << "\n" << vprDEBUG_FLUSH; 00142 if (mExpectedRemoteTime > mLocalReceiveTime) 00143 { 00144 mErrorTime = mExpectedRemoteTime - mLocalReceiveTime; 00145 } 00146 else 00147 { 00148 mErrorTime = mLocalReceiveTime - mExpectedRemoteTime; 00149 } 00150 00151 vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Error of: " << mErrorTime.getBaseVal() << "\n" << vprDEBUG_FLUSH; 00152 00153 if ( mErrorTime.getBaseVal() < mTol ) 00154 { 00155 mAccept = true; 00156 } 00157 else 00158 { 00159 mAccept = false; 00160 mTol = gmtl::Math::pow(mTol,(float)1.5); 00161 } 00162 }
| void cluster::ClusterDelta::createExpectedTime | ( | ) |
Definition at line 108 of file ClusterDelta.cpp.
References gadgetDBG_RIM(), and sendAndClear().
Referenced by getClusterDelta().
00109 { 00110 vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Calculate Expected Time: " << mRemoteSendTime.getBaseVal() << "\n" << vprDEBUG_FLUSH; 00111 mLatencyTime.set( (mLocalReceiveTime.getBaseVal()-mLocalSendTime.getBaseVal())/2, vpr::Interval::Base); 00112 vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Latency: " << mLatencyTime.getBaseVal() << "\n" << vprDEBUG_FLUSH; 00113 00114 mDelta=(mRemoteSendTime-mLocalSendTime-mLatencyTime); 00115 00116 std::cout << " " << mRemoteSendTime.getBaseVal() << std::endl; 00117 std::cout << " - " << mRemoteSendTime.getBaseVal() << std::endl; 00118 std::cout << " - " << mRemoteSendTime.getBaseVal() << std::endl; 00119 std::cout << " = " << mRemoteSendTime.getBaseVal() - mRemoteSendTime.getBaseVal() - mRemoteSendTime.getBaseVal() << std::endl; 00120 00121 vpr::Int64 mNewDelta = mRemoteSendTime.getBaseVal() - mRemoteSendTime.getBaseVal() - mRemoteSendTime.getBaseVal(); 00122 00123 std::cout << "New Delta: " << mNewDelta << std::endl; 00124 00125 std::cout << "mDelta" << mDelta.getBaseVal() << std::endl; 00126 00127 vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Delta: " << mDelta.getBaseVal() << "\n" << vprDEBUG_FLUSH; 00128 mLocalSendTime.setNow(); 00129 mExpectedRemoteTime = mLocalSendTime + mDelta + mLatencyTime; 00130 vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Sent Expected Time: " << mExpectedRemoteTime.getBaseVal() << "\n" << vprDEBUG_FLUSH; 00131 mWriter.writeUint64(mExpectedRemoteTime.getBaseVal()); 00132 sendAndClear(); 00133 }
| void cluster::ClusterDelta::receiveHandshake | ( | ) |
Definition at line 99 of file ClusterDelta.cpp.
References gadgetDBG_RIM(), and getPacket().
Referenced by clientClusterDelta(), and getClusterDelta().
00100 { 00101 getPacket(8); 00102 mLocalReceiveTime.setNow(); 00103 vpr::Uint64 temp = mReader->readUint64(); 00104 vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Received Time: " << temp << "\n" << vprDEBUG_FLUSH; 00105 mRemoteSendTime.set(temp, vpr::Interval::Base); 00106 vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Received Time: " << mRemoteSendTime.getBaseVal() << "\n" << vprDEBUG_FLUSH; 00107 }
| void cluster::ClusterDelta::createHandshake | ( | ) |
Definition at line 83 of file ClusterDelta.cpp.
References gadgetDBG_RIM(), and sendAndClear().
Referenced by clientClusterDelta(), and getClusterDelta().
00084 { 00085 mLocalSendTime.setNow(); 00086 // If this is not the first handshake, then actually send the last receive time. 00087 //if ( mLocalReceiveTime.getBaseVal() != 0 ) 00088 //{ 00089 // mWriter.writeUint64(mLocalReceiveTime.getBaseVal()); 00090 // vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Send Handshake: " << mLocalReceiveTime.getBaseVal() << "\n" << vprDEBUG_FLUSH; 00091 //} 00092 //else 00093 //{ 00094 mWriter.writeUint64(mLocalSendTime.getBaseVal()); 00095 vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Send Handshake: " << mLocalSendTime.getBaseVal() << "\n" << vprDEBUG_FLUSH; 00096 //} 00097 sendAndClear(); 00098 }
| void cluster::ClusterDelta::createResponce | ( | ) |
Definition at line 163 of file ClusterDelta.cpp.
References gadgetDBG_RIM(), and sendAndClear().
Referenced by clientClusterDelta().
00164 { 00165 vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Createing Responce" << "\n" << vprDEBUG_FLUSH; 00166 vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] mTol: " << mTol << " Accept?: " << mAccept << "\n" << vprDEBUG_FLUSH; 00167 mWriter.writeBool(mAccept); 00168 sendAndClear(); 00169 }
| void cluster::ClusterDelta::receiveResponce | ( | ) |
Definition at line 171 of file ClusterDelta.cpp.
References gadgetDBG_RIM(), and getPacket().
Referenced by getClusterDelta().
00172 { 00173 getPacket(1); 00174 mAccept = mReader->readBool(); 00175 if ( mAccept == false ) 00176 { 00177 vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << clrOutNORM(clrRED,"[SYNC]FAILED SYNC") << "\n" << vprDEBUG_FLUSH; 00178 } 00179 }
| void cluster::ClusterDelta::clientClusterDelta | ( | vpr::SocketStream * | socket_stream | ) |
Definition at line 56 of file ClusterDelta.cpp.
References clearIntervals(), createHandshake(), createResponce(), receiveExpectedTime(), and receiveHandshake().
00057 { 00058 mSocketStream = socket_stream; 00059 while ( mAccept == false ) 00060 { 00061 clearIntervals(); 00062 receiveHandshake(); 00063 createHandshake(); 00064 receiveExpectedTime(); 00065 createResponce(); 00066 } 00067 }
| vpr::Interval cluster::ClusterDelta::getClusterDelta | ( | vpr::SocketStream * | socket_stream | ) |
Definition at line 68 of file ClusterDelta.cpp.
References clearIntervals(), createExpectedTime(), createHandshake(), receiveHandshake(), and receiveResponce().
00069 { 00070 mSocketStream = socket_stream; 00071 while ( mAccept == false ) 00072 { 00073 clearIntervals(); 00074 createHandshake(); 00075 receiveHandshake(); 00076 createExpectedTime(); 00077 receiveResponce(); 00078 } 00079 return(mDelta); 00080 }
| void cluster::ClusterDelta::setSocketStream | ( | vpr::SocketStream * | socket_stream | ) | [inline] |
1.5.1