00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include <gadget/gadgetConfig.h>
00034 #include <gadget/Util/Debug.h>
00035
00036 #include <gadget/RemoteInputManager.h>
00037
00038
00039 #include <vpr/Thread/Thread.h>
00040 #include <vpr/Thread/ThreadFunctor.h>
00041
00042
00043 #include <gadget/Type/BaseTypeFactory.h>
00044 #include <gadget/VirtualDevice.h>
00045 #include <gadget/DeviceServer.h>
00046 #include <gadget/Type/DeviceFactory.h>
00047 #include <gadget/InputManager.h>
00048
00049 #include <gadget/Node.h>
00050 #include <cluster/ClusterManager.h>
00051
00052
00053 #include <cluster/Packets/PacketFactory.h>
00054 #include <cluster/Packets/ConnectionRequest.h>
00055 #include <cluster/Packets/DeviceRequest.h>
00056 #include <cluster/Packets/DeviceAck.h>
00057 #include <cluster/Packets/ApplicationDataRequest.h>
00058 #include <cluster/Packets/EndBlock.h>
00059 #include <cluster/Packets/DataPacket.h>
00060
00061
00062 #include <jccl/RTRC/ConfigManager.h>
00063 #include <jccl/Config/ConfigElement.h>
00064
00065 #include <map>
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077 namespace gadget
00078 {
00079 RemoteInputManager::RemoteInputManager(const vpr::GUID& guid) : mHandlerGUID(guid)
00080 {
00081
00082 }
00083
00084 RemoteInputManager::~RemoteInputManager()
00085 {
00086 for ( std::map<vpr::GUID, VirtualDevice*>::iterator j = mVirtualDevices.begin(); j != mVirtualDevices.end(); j++ )
00087 {
00088 if ( (*j).second != NULL )
00089 {
00090 delete (*j).second;
00091 }
00092 }
00093 for ( std::vector<DeviceServer*>::iterator j = mDeviceServers.begin(); j != mDeviceServers.end(); j++ )
00094 {
00095 if ( (*j) != NULL )
00096 {
00097 delete (*j);
00098 }
00099 }
00100 }
00101
00102 void RemoteInputManager::recoverFromLostNode(Node* lost_node)
00103 {
00104 removeVirtualDevicesOnHost(lost_node->getHostname());
00105 removeDeviceClientsForHost(lost_node->getHostname());
00106
00107
00108
00109
00110 }
00111
00115 void RemoteInputManager::handlePacket(cluster::Packet* packet, Node* node)
00116 {
00117
00118 if ( NULL != packet && NULL != node )
00119 {
00120 switch ( packet->getPacketType() )
00121 {
00122 case cluster::Header::RIM_DEVICE_REQ:
00123 {
00124 cluster::DeviceRequest* temp_device_request = dynamic_cast<cluster::DeviceRequest*>(packet);
00125 vprASSERT(NULL != temp_device_request && "Dynamic cast failed!");
00126 std::string device_name = temp_device_request->getDeviceName();
00127
00128 if ( jccl::ConfigManager::instance()->isPendingStale() )
00129 {
00130 gadget::Input* temp_input_device = gadget::InputManager::instance()->getDevice(device_name);
00131 if ( temp_input_device != NULL )
00132 {
00133 DeviceServer* temp_device_server = getDeviceServer(device_name);
00134 if ( NULL == temp_device_server )
00135 {
00136 addDeviceServer(device_name, temp_input_device);
00137 temp_device_server = getDeviceServer(device_name);
00138 }
00139
00140 temp_device_server->addClient(node);
00141
00142
00143 std::string temp_string = temp_input_device->getInputTypeName();
00144 vpr::GUID temp_guid = temp_device_server->getId();
00145 cluster::DeviceAck* temp_ack =
00146 new cluster::DeviceAck(mHandlerGUID, temp_guid,
00147 device_name, temp_string, true);
00148 node->send(temp_ack);
00149 }
00150 else
00151 {
00152 std::string temp_string = "";
00153 vpr::GUID empty_id;
00154 cluster::DeviceAck* temp_ack =
00155 new cluster::DeviceAck(mHandlerGUID, empty_id, device_name,
00156 temp_string, false);
00157 node->send(temp_ack);
00158 }
00159 }
00160 else
00161 {
00162 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00163 << clrOutBOLD(clrRED,"Pending List is not stale(Config Manager is still configuring the local system) ")
00164 << clrOutBOLD(clrRED,"So we can not process this device request right now.") << std::endl << vprDEBUG_FLUSH;
00165
00166 std::string temp_string = "";
00167 vpr::GUID empty_id;
00168 cluster::DeviceAck* temp_ack =
00169 new cluster::DeviceAck(mHandlerGUID, empty_id, device_name,
00170 temp_string, false);
00171 node->send(temp_ack);
00172 }
00173 break;
00174 }
00175 case cluster::Header::RIM_DEVICE_ACK:
00176 {
00177
00178
00179
00180
00181
00182 cluster::DeviceAck* temp_device_ack = dynamic_cast<cluster::DeviceAck*>(packet);
00183 vprASSERT(NULL != temp_device_ack && "Dynamic cast failed!");
00184 std::string device_name = temp_device_ack->getDeviceName();
00185
00186 if ( temp_device_ack->getAck() )
00187 {
00188 removePendingDeviceRequest(device_name);
00189
00190 if ( getVirtualDevice(device_name) != NULL )
00191 {
00192 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrOutBOLD(clrRED, "ERROR:")
00193 << "Somehow we already have a virtual device named: " << device_name << std::endl << vprDEBUG_FLUSH;
00194 }
00195 else
00196 {
00197 addVirtualDevice(temp_device_ack->getId(), device_name,
00198 temp_device_ack->getDeviceBaseType(),
00199 temp_device_ack->getHostname());
00200
00201
00202 gadget::InputManager::instance()->addRemoteDevice(getVirtualDevice(device_name), device_name);
00203 }
00204 }
00205 else
00206 {
00207
00208
00209
00210 }
00211 break;
00212 }
00213 case cluster::Header::RIM_DATA_PACKET:
00214 {
00215 cluster::DataPacket* temp_data_packet = dynamic_cast<cluster::DataPacket*>(packet);
00216 vprASSERT(NULL != temp_data_packet && "Dynamic cast failed!");
00217
00218
00219
00220
00221 gadget::Input* virtual_device = getVirtualDevice(temp_data_packet->getObjectId());
00222 if ( virtual_device != NULL )
00223 {
00224 vpr::BufferObjectReader* temp_reader = new vpr::BufferObjectReader(temp_data_packet->getDeviceData());
00225
00226 temp_reader->setAttrib("rim.timestamp.delta", node->getDelta());
00227 virtual_device->readObject(temp_reader);
00228 }
00229 break;
00230 }
00231 default:
00232 {
00233 std::cout << "RIM DOES NOT HANDLE THIS PACKET TYPE" << packet->getPacketType() << std::endl;
00234 break;
00235 }
00236 }
00237 }
00238 }
00239
00240
00241 vpr::ReturnStatus RemoteInputManager::addVirtualDevice(const vpr::GUID& device_id, const std::string& name,
00242 const std::string& device_base_type, const std::string& hostname)
00243 {
00244 vpr::Guard<vpr::Mutex> guard(mVirtualDevicesLock);
00245
00246 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00247 << clrOutBOLD(clrMAGENTA, "[RemoteInputManager]")
00248 << "Creating Virtual Device: " << name << std::endl << vprDEBUG_FLUSH;
00249
00250 gadget::Input* temp_input_device = gadget::BaseTypeFactory::instance()->loadNetDevice(device_base_type);
00251 VirtualDevice* temp_virtual_device = new VirtualDevice(name, device_id, device_base_type, hostname, temp_input_device);
00252
00253 mVirtualDevices[device_id] = temp_virtual_device;
00254
00255 return(vpr::ReturnStatus::Succeed);
00256 }
00257
00258 void RemoteInputManager::addVirtualDevice(VirtualDevice* device)
00259 {
00260 vpr::Guard<vpr::Mutex> guard(mVirtualDevicesLock);
00261
00262 mVirtualDevices[device->getId()] = device;
00263 }
00264
00265 gadget::Input* RemoteInputManager::getVirtualDevice(const vpr::GUID& device_id)
00266 {
00267 vpr::Guard<vpr::Mutex> guard(mVirtualDevicesLock);
00268
00269 for ( std::map<vpr::GUID, VirtualDevice*>::iterator i = mVirtualDevices.begin();
00270 i != mVirtualDevices.end() ; i++ )
00271 {
00272 if ( (*i).first == device_id )
00273 {
00274 return((*i).second->getDevice());
00275 }
00276 }
00277 return NULL;
00278 }
00279
00280 gadget::Input* RemoteInputManager::getVirtualDevice(const std::string& device_name)
00281 {
00282 vpr::Guard<vpr::Mutex> guard(mVirtualDevicesLock);
00283
00284 for ( std::map<vpr::GUID, VirtualDevice*>::iterator i = mVirtualDevices.begin();
00285 i != mVirtualDevices.end() ; i++ )
00286 {
00287 if ( (*i).second->getName() == device_name )
00288 {
00289 return((*i).second->getDevice());
00290 }
00291 }
00292 return NULL;
00293 }
00294
00295 vpr::ReturnStatus RemoteInputManager::removeVirtualDevicesOnHost(const std::string& host_name)
00296 {
00297
00298
00299 vpr::Guard<vpr::Mutex> guard(mVirtualDevicesLock);
00300
00301 std::vector<std::string> devices_to_remove;
00302 for ( std::map<vpr::GUID, VirtualDevice*>::iterator i = mVirtualDevices.begin();
00303 i != mVirtualDevices.end() ; i++ )
00304 {
00305 if ( (*i).second->getRemoteHostname() == host_name )
00306 {
00307 devices_to_remove.push_back((*i).second->getName());
00308 }
00309 }
00310
00311 for ( std::vector<std::string>::iterator i = devices_to_remove.begin();
00312 i != devices_to_remove.end();i++ )
00313 {
00314
00315
00316
00317 createPendingConfigRemoveAndAdd(*i);
00318 }
00319 return vpr::ReturnStatus::Succeed;
00320 }
00321
00322 vpr::ReturnStatus RemoteInputManager::removeDeviceClientsForHost(const std::string& host_name)
00323 {
00324
00325 vpr::Guard<vpr::Mutex> guard(mDeviceServersLock);
00326
00327 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00328 << clrOutBOLD(clrMAGENTA,"[RemoteInputManager]")
00329 << " Removing client, " << host_name << " from all Device Servers.\n" << vprDEBUG_FLUSH;
00330
00331 for ( std::vector<DeviceServer*>::iterator i = mDeviceServers.begin();
00332 i != mDeviceServers.end() ; i++ )
00333 {
00334 (*i)->removeClient(host_name);
00335 }
00336 return vpr::ReturnStatus::Succeed;
00337 }
00338
00339 void RemoteInputManager::removeVirtualDevice(const vpr::GUID& device_id)
00340 {
00341 vpr::Guard<vpr::Mutex> guard(mVirtualDevicesLock);
00342
00343 for ( std::map<vpr::GUID, VirtualDevice*>::iterator i = mVirtualDevices.begin();
00344 i != mVirtualDevices.end() ; i++ )
00345 {
00346 if ( (*i).first == device_id )
00347 {
00348
00349 gadget::InputManager::instance()->removeDevice((*i).second->getName());
00350 delete (*i).second;
00351 mVirtualDevices.erase(i);
00352 return;
00353 }
00354 }
00355 }
00356
00357 void RemoteInputManager::removeVirtualDevice(const std::string& device_name)
00358 {
00359 vpr::Guard<vpr::Mutex> guard(mVirtualDevicesLock);
00360
00361
00362 gadget::InputManager::instance()->removeDevice(device_name);
00363
00364 for ( std::map<vpr::GUID, VirtualDevice*>::iterator i = mVirtualDevices.begin();
00365 i != mVirtualDevices.end() ; i++ )
00366 {
00367 if ( (*i).second->getName() == device_name )
00368 {
00369 (*i).second->debugDump(vprDBG_CONFIG_LVL);
00370 delete (*i).second;
00371 mVirtualDevices.erase(i);
00372 return;
00373 }
00374 }
00375 }
00376
00377 void RemoteInputManager::debugDumpVirtualDevices(int debug_level)
00378 {
00379 vpr::Guard<vpr::Mutex> guard(mVirtualDevicesLock);
00380
00381 vpr::DebugOutputGuard dbg_output(gadgetDBG_RIM,debug_level,
00382 std::string("-------------- Virtual Devices --------------\n"),
00383 std::string("---------------------------------------------\n"));
00384 for ( std::map<vpr::GUID, VirtualDevice*>::iterator j = mVirtualDevices.begin(); j != mVirtualDevices.end(); j++ )
00385 {
00386 (*j).second->debugDump(debug_level);
00387 }
00388 }
00389
00390
00391
00392
00393 vpr::ReturnStatus RemoteInputManager::addDeviceServer(const std::string& name, gadget::Input* device)
00394 {
00395 vpr::Guard<vpr::Mutex> guard(mDeviceServersLock);
00396
00397 DeviceServer* temp_device_server =
00398 new DeviceServer(name, device, mHandlerGUID);
00399 mDeviceServers.push_back(temp_device_server);
00400
00401 return(vpr::ReturnStatus::Succeed);
00402 }
00403
00404 void RemoteInputManager::addDeviceServer(DeviceServer* device)
00405 {
00406 vpr::Guard<vpr::Mutex> guard(mDeviceServersLock);
00407 mDeviceServers.push_back(device);
00408 }
00409
00410 DeviceServer* RemoteInputManager::getDeviceServer(const std::string& device_name)
00411 {
00412 vpr::Guard<vpr::Mutex> guard(mDeviceServersLock);
00413
00414 for ( std::vector<DeviceServer*>::iterator i = mDeviceServers.begin();
00415 i != mDeviceServers.end() ; i++ )
00416 {
00417 if ( (*i)->getName() == device_name )
00418 {
00419 return(*i);
00420 }
00421 }
00422 return NULL;
00423 }
00424
00425 void RemoteInputManager::removeDeviceServer(const std::string& device_name)
00426 {
00427 vpr::Guard<vpr::Mutex> guard(mDeviceServersLock);
00428
00429 for ( std::vector<DeviceServer*>::iterator i = mDeviceServers.begin();
00430 i != mDeviceServers.end() ; i++ )
00431 {
00432 if ( (*i)->getName() == device_name )
00433 {
00434 delete (*i);
00435 mDeviceServers.erase(i);
00436 return;
00437 }
00438 }
00439 }
00440
00441 void RemoteInputManager::debugDumpDeviceServers(int debug_level)
00442 {
00443 vpr::Guard<vpr::Mutex> guard(mDeviceServersLock);
00444
00445 vpr::DebugOutputGuard dbg_output(gadgetDBG_RIM,debug_level,
00446 std::string("-------------- Device Servers --------------\n"),
00447 std::string("---------------------------------------------\n"));
00448 for ( std::vector<DeviceServer*>::iterator j = mDeviceServers.begin(); j != mDeviceServers.end(); j++ )
00449 {
00450 (*j)->debugDump(debug_level);
00451 }
00452 }
00453
00454 void RemoteInputManager::sendDataAndSync()
00455 {
00456
00457
00458
00459
00460
00461 vpr::Guard<vpr::Mutex> guard(mDeviceServersLock);
00462
00463
00464 for ( unsigned int i=0; i<mDeviceServers.size(); i++ )
00465 {
00466 mDeviceServers[i]->go();
00467 }
00468
00469
00470 for ( unsigned int i=0; i<mDeviceServers.size(); i++ )
00471 {
00472 mDeviceServers[i]->sync();
00473 }
00474
00475
00476
00477
00478
00479 }
00480
00482
00484
00489
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506
00507
00508
00509
00510
00511
00512
00513
00514
00515
00516
00517
00518
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566
00567
00574
00575
00576
00577
00578
00579
00580
00581 vpr::Uint16 RemoteInputManager::getNumberPendingDeviceRequests()
00582 {
00583 vpr::Guard<vpr::Mutex> guard(mPendingDeviceRequestsLock);
00584 return(mPendingDeviceRequests.size());
00585 }
00586
00587 vpr::ReturnStatus RemoteInputManager::createPendingConfigRemove(std::string device_name)
00588 {
00589 jccl::ConfigManager* cfg_mgr = jccl::ConfigManager::instance();
00590
00591 cfg_mgr->lockActive();
00592 std::vector<jccl::ConfigElementPtr>::iterator active_begin = cfg_mgr->getActiveBegin();
00593 std::vector<jccl::ConfigElementPtr>::iterator active_end = cfg_mgr->getActiveEnd();
00594 std::vector<jccl::ConfigElementPtr>::iterator i;
00595
00596
00597 for ( i = active_begin ; i != active_end ; i++ )
00598 {
00599 if ( (*i)->getName() == device_name )
00600 {
00601 cfg_mgr->addConfigElement(*i, jccl::ConfigManager::PendingElement::REMOVE);
00602
00603 cfg_mgr->unlockActive();
00604 cfg_mgr->removeActive(device_name);
00605 cfg_mgr->lockActive();
00606 }
00607 }
00608 cfg_mgr->unlockActive();
00609 return vpr::ReturnStatus::Succeed;
00610 }
00611
00612 vpr::ReturnStatus RemoteInputManager::createPendingConfigRemoveAndAdd(std::string device_name)
00613 {
00614 jccl::ConfigManager* cfg_mgr = jccl::ConfigManager::instance();
00615
00616 cfg_mgr->lockActive();
00617 std::vector<jccl::ConfigElementPtr>::iterator active_begin = cfg_mgr->getActiveBegin();
00618 std::vector<jccl::ConfigElementPtr>::iterator active_end = cfg_mgr->getActiveEnd();
00619 std::vector<jccl::ConfigElementPtr>::iterator i;
00620 for ( i = active_begin ; i != active_end ; i++ )
00621 {
00622 if ( (*i)->getName() == device_name )
00623 {
00624 cfg_mgr->addConfigElement(*i, jccl::ConfigManager::PendingElement::REMOVE);
00625 cfg_mgr->addConfigElement(*i, jccl::ConfigManager::PendingElement::ADD);
00626
00627 cfg_mgr->unlockActive();
00628 cfg_mgr->removeActive(device_name);
00629 cfg_mgr->lockActive();
00630 }
00631 }
00632 cfg_mgr->unlockActive();
00633 return vpr::ReturnStatus::Succeed;
00634 }
00635
00636 void RemoteInputManager::addPendingDeviceRequest(cluster::DeviceRequest* new_device_req, Node* node)
00637 {
00638 vpr::Guard<vpr::Mutex> guard(mPendingDeviceRequestsLock);
00639 mPendingDeviceRequests[new_device_req] = node;
00640 }
00641
00642 void RemoteInputManager::removePendingDeviceRequest(std::string device_name)
00643 {
00644 vpr::Guard<vpr::Mutex> guard(mPendingDeviceRequestsLock);
00645
00646 std::map<cluster::DeviceRequest*, Node*>::iterator begin = mPendingDeviceRequests.begin();
00647 std::map<cluster::DeviceRequest*, Node*>::iterator end = mPendingDeviceRequests.end();
00648 std::map<cluster::DeviceRequest*, Node*>::iterator i;
00649
00650 for ( i = begin ; i != end ; i++ )
00651 {
00652 if ( (*i).first->getDeviceName() == device_name )
00653 {
00654 mPendingDeviceRequests.erase(i);
00655 return;
00656 }
00657 }
00658 }
00659
00660 void RemoteInputManager::sendDeviceRequests()
00661 {
00662 vpr::Guard<vpr::Mutex> guard(mPendingDeviceRequestsLock);
00663
00664 std::map<cluster::DeviceRequest*, Node*>::iterator begin = mPendingDeviceRequests.begin();
00665 std::map<cluster::DeviceRequest*, Node*>::iterator end = mPendingDeviceRequests.end();
00666 std::map<cluster::DeviceRequest*, Node*>::iterator i;
00667
00668 for ( i = begin ; i != end ; i++ )
00669 {
00670 if ( (*i).second->isConnected() )
00671 {
00672 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
00673 << clrOutBOLD(clrMAGENTA, "[RemoteInputManager]")
00674 << " Sending device request for: " << (*i).first->getDeviceName()
00675 << std::endl << vprDEBUG_FLUSH;
00676
00677 (*i).second->send((*i).first);
00678 }
00679 }
00680 }
00681 }