diff --git a/include/echo/Network/NetworkManager.h b/include/echo/Network/NetworkManager.h --- a/include/echo/Network/NetworkManager.h +++ b/include/echo/Network/NetworkManager.h @@ -1,170 +1,166 @@ #ifndef _NETWORKMANAGER_H_ #define _NETWORKMANAGER_H_ #include #include #include #include #include #include #include #include #include #include #include #include #include /*Concepts DataPacket Connection ConnectionTypeInfo* Send Recv ConnectionOwner/Listener NotifyReceiveData ConnectionTypeInfo ConnectionTypeManager NetworkManager (Manager) Passive Connection Direct Connection When listening A Passive connection is already "established" by definition. So the network manager will provide a means to establish a secondary connection from a request on the passive listening connection. Passive Listen - Opens a passive connection Receive Data Is Connection request? Yes Connection request contains information about establishing another connection (in the format of connectionDetails) No Discard Data */ namespace Echo { class NetworkSystem; class ConnectionDetails; class NetworkManager : public TaskGroup, public DataPacketFactory { private: Mutex mConnectionListsMutex; struct InsertOrderIndex{}; struct ConnectionIndex{}; /** * A ConnectionSet is a multi-index for look up of connection and iteration by insert order. */ typedef boost::multi_index_container< shared_ptr, boost::multi_index::indexed_by< // Sort order added boost::multi_index::sequenced< boost::multi_index::tag >, // Sort by connection boost::multi_index::ordered_unique< boost::multi_index::tag, boost::multi_index::identity< shared_ptr > > > > ConnectionSet; ConnectionSet mConnectionsPendingRecvNotify; ConnectionSet mConnectionsDropped; ConnectionSet mConnectionsEstablished; std::list< std::pair< shared_ptr, IncomingConnectionListener* > > mConnectionsIncoming; std::list< shared_ptr > mNetworkEventListeners; std::map< std::string, shared_ptr > mSystems; shared_ptr mDefaultSystem; - void ProcessDroppedConnections(); - void ProcessEstablishedConnections(); - void ProcessIncomingConnections(); - void ProcessReceiveNotifications(); Size mTotalBytesSent; Size mTotalBytesReceived; Size mBytesSentPerSecond; Size mBytesReceviedPerSecond; Size mCurrentBytesSentPerSecond; Size mCurrentBytesReceviedPerSecond; Size mNewConnectionBufferSize; Timer::CPUTimer mSpeedTimer; shared_ptr mDataPacketFactory; public: /** * @param dataPacketFactory if null, the NetworkManager will create an internal default. */ NetworkManager(shared_ptr dataPacketFactory = nullptr); ~NetworkManager(); //Setup a listening connection. //listener must be a valid listener bool Listen( IncomingConnectionListener* listener, const std::string& connectionDetails ); //Connection details contain the format // //(system)type:address:addressAdditionalDetails // //depending on the system, the address and address details will vary. Address details may or //may not be optional. // //Examples are. // // (Socket)passive:emblementertainment.com.au:1234 // (Socket)direct:emblementertainment.com.au:5647 // //In both cases the Socket system is used. // passive indicates a UDP connection // direct indicates a TCP connection shared_ptr Connect( const std::string& connectionDetails, Connection::ConnectCallback connectCallback = Connection::ConnectCallback(), Connection::DisconnectCallback disconnectCallback = Connection::DisconnectCallback()); //Install a Network System. //This will cause Initialise to be called on the Network system. bool InstallSystem( shared_ptr networkSystem, bool defaultSystem=false ); bool UninstallSystem( const std::string& name ); shared_ptr GetInstalledSystem( const std::string& name ); void UninstallAllSystems(); void AddNetworkEventListener(shared_ptr listener); void RemoveNetworkEventListener(shared_ptr listener); void ClearAllEventListeners(); //These three functions can operate on any thread. When NetworkManager is updated //any processing of notifications to ConnectionOwner is done. This means //that what ever thread NetworkManager is running on is the thread ConnectionOwner //will receive on. void ConnectionEstablished(shared_ptr connection); void ConnectionIncoming(shared_ptr connection, IncomingConnectionListener* listener); void ConnectionDropped(shared_ptr connection); void ConnectionPacketReceived(shared_ptr connection); u32 GetAddrFromString(const std::string& address); shared_ptr NewDataPacket() override; void ReportSentData(Size numberOfBytesSent); void ReportReceivedData(Size numberOfBytesReceived); //Start is called like any other EchoTask Start and will call Start //for each installed system.h This is a chance for each system to start //any threads they might be using. // //Note: NetworkManager::Start() will never fail even if a system fails to start bool OnStart(); void OnStop(); void Update(Seconds lastFrametime); void SetNewConnectionBufferSize(Size sizeInBytes); Size GetNewConnectionBufferSize() const; Size GetBytesSentPerSecond() const {return mBytesSentPerSecond;} Size GetBytesReceviedPerSecond() const {return mBytesReceviedPerSecond;} Size GetTotalBytesSent() const {return mTotalBytesSent;} Size GetTotalBytesRecevied() const {return mTotalBytesReceived;} }; } #endif diff --git a/src/Network/NetworkManager.cpp b/src/Network/NetworkManager.cpp --- a/src/Network/NetworkManager.cpp +++ b/src/Network/NetworkManager.cpp @@ -1,448 +1,431 @@ #include #include #include #include #include #include #include #include #include #include #include #include namespace Echo { NetworkManager::NetworkManager(shared_ptr dataPacketFactory) : mTotalBytesSent(0), mTotalBytesReceived(0), mBytesSentPerSecond(0), mBytesReceviedPerSecond(0), mCurrentBytesSentPerSecond(0), mCurrentBytesReceviedPerSecond(0), mNewConnectionBufferSize(1024*1024*5) // 5MB Buffer, this can be reconfigured { if(!dataPacketFactory) { // Default is a simple pool with 512 8KiB packets - taking up 4MiB. This is fairly // arbitrary and can be overridden by providing a custom factory. mDataPacketFactory.reset(new SimpleDataPacketPool(512,8192)); }else { mDataPacketFactory = dataPacketFactory; } } NetworkManager::~NetworkManager() { UninstallAllSystems(); } bool NetworkManager::Listen( IncomingConnectionListener* listener, const std::string& connectionDetails ) { ConnectionDetails detailsExploded(connectionDetails); shared_ptr system; if(detailsExploded.HasSystem()) { system=GetInstalledSystem(detailsExploded.GetSystem()); }else { system=mDefaultSystem; } if(!system) return false; return system->Listen(listener,detailsExploded); } shared_ptr NetworkManager::Connect( const std::string& connectionDetails, Connection::ConnectCallback connectCallback, Connection::DisconnectCallback disconnectCallback) { ConnectionDetails detailsExploded(connectionDetails); shared_ptr system; if(detailsExploded.HasSystem()) { system=GetInstalledSystem(detailsExploded.GetSystem()); }else { system=mDefaultSystem; } if(!system) return shared_ptr(); return system->Connect(detailsExploded,connectCallback,disconnectCallback); } bool NetworkManager::InstallSystem( shared_ptr networkSystem, bool defaultSystem/*=false */ ) { if(!networkSystem) { ECHO_LOG_ERROR("NetworkManager::InstallSystem(): Null System."); return false; } std::map< std::string, shared_ptr >::iterator it=mSystems.find(networkSystem->GetName()); if(it!=mSystems.end()) { ECHO_LOG_ERROR("NetworkManager::InstallSystem(): System with name '" << networkSystem->GetName() << "' already exists."); return false; } if(!networkSystem->Initialise()) { ECHO_LOG_ERROR("NetworkManager::InstallSystem(): Network system '" << networkSystem->GetName() << "' failed to initialise."); return false; } mSystems[networkSystem->GetName()]=networkSystem; if(defaultSystem || !mDefaultSystem) mDefaultSystem=networkSystem; return true; } bool NetworkManager::UninstallSystem( const std::string& name ) { std::map< std::string, shared_ptr >::iterator it=mSystems.find(name); if(it==mSystems.end()) { ECHO_LOG_ERROR("NetworkManager::UninstallSystem(): System with name '" << name << "' not found."); return false; } it->second->DisconnectAll(); - //Update disconnect notification. - ProcessDroppedConnections(); + //Update for disconnect notifications. + Update(Seconds(0)); it->second->CleanUp(); mSystems.erase(it); return true; } shared_ptr NetworkManager::GetInstalledSystem( const std::string& name ) { std::map< std::string, shared_ptr >::iterator it=mSystems.find(name); if(it==mSystems.end()) { ECHO_LOG_ERROR("NetworkManager::GetInstalledSystem(): System with name '" << name << "' not found."); return shared_ptr(); } return it->second; } void NetworkManager::UninstallAllSystems() { while(!mSystems.empty()) { UninstallSystem(mSystems.begin()->first); } } void NetworkManager::ConnectionEstablished( shared_ptr connection ) { { ScopedLock lock(mConnectionListsMutex); ConnectionSet::index::type& connectionIndex= mConnectionsEstablished.get(); if(connectionIndex.find(connection)!=connectionIndex.end()) { return; } mConnectionsEstablished.push_back(connection); } if(!mNetworkEventListeners.empty()) { std::list< shared_ptr >::iterator it = mNetworkEventListeners.begin(); std::list< shared_ptr >::iterator itEnd = mNetworkEventListeners.end(); while(it!=itEnd) { (*it)->OnNetworkEvent(NetworkEventListener::NetworkEventTypes::ESTABLISHED); ++it; } } } void NetworkManager::ConnectionIncoming(shared_ptr connection, IncomingConnectionListener* listener) { mConnectionListsMutex.Lock(); mConnectionsIncoming.push_back(std::make_pair(connection,listener)); mConnectionListsMutex.Unlock(); if(!mNetworkEventListeners.empty()) { std::list< shared_ptr >::iterator it = mNetworkEventListeners.begin(); std::list< shared_ptr >::iterator itEnd = mNetworkEventListeners.end(); while(it!=itEnd) { (*it)->OnNetworkEvent(NetworkEventListener::NetworkEventTypes::INCOMING_CONNECTION); ++it; } } } void NetworkManager::ConnectionDropped( shared_ptr connection ) { { ScopedLock lock(mConnectionListsMutex); ConnectionSet::index::type& connectionIndex= mConnectionsDropped.get(); if(connectionIndex.find(connection)!=connectionIndex.end()) { return; } mConnectionsDropped.push_back(connection); } if(!mNetworkEventListeners.empty()) { std::list< shared_ptr >::iterator it = mNetworkEventListeners.begin(); std::list< shared_ptr >::iterator itEnd = mNetworkEventListeners.end(); while(it!=itEnd) { (*it)->OnNetworkEvent(NetworkEventListener::NetworkEventTypes::DISCONNECTED); ++it; } } } void NetworkManager::ConnectionPacketReceived( shared_ptr connection ) { { ScopedLock lock(mConnectionListsMutex); ConnectionSet::index::type& connectionIndex= mConnectionsPendingRecvNotify.get(); if(connectionIndex.find(connection)!=connectionIndex.end()) { return; } mConnectionsPendingRecvNotify.push_back(connection); } if(!mNetworkEventListeners.empty()) { std::list< shared_ptr >::iterator it = mNetworkEventListeners.begin(); std::list< shared_ptr >::iterator itEnd = mNetworkEventListeners.end(); while(it!=itEnd) { (*it)->OnNetworkEvent(NetworkEventListener::NetworkEventTypes::PACKET_RECEIVED); ++it; } } } bool NetworkManager::OnStart() { std::map< std::string, shared_ptr >::iterator it=mSystems.begin(); std::map< std::string, shared_ptr >::iterator itEnd=mSystems.end(); while(it!=itEnd) { if(!it->second->Start()) { ECHO_LOG_ERROR("NetworkManager::Start(): System '" << it->second->GetName() << "' failed to start."); } ++it; } mSpeedTimer.Start(); return true; } void NetworkManager::OnStop() { std::map< std::string, shared_ptr >::iterator it=mSystems.begin(); std::map< std::string, shared_ptr >::iterator itEnd=mSystems.end(); while(it!=itEnd) { it->second->DisconnectAll(); - //Update disconnect notification. - ProcessDroppedConnections(); + //Update for disconnect notifications. + Update(Seconds(0)); it->second->CleanUp(); ++it; } } void NetworkManager::Update( Seconds lastFrametime ) { TaskGroup::Update(lastFrametime); - ProcessEstablishedConnections(); - ProcessIncomingConnections(); - ProcessReceiveNotifications(); - ProcessDroppedConnections(); + // The following used to be in separate functions but that resulted in locking the connections mutex + + // Lets get all the containers to process. We'll move them and unlock the mutex so the + // network threads can continue to notify while we're processing. + mConnectionListsMutex.Lock(); + ConnectionSet connectionsEstablished = std::move(mConnectionsEstablished); + std::list< std::pair< shared_ptr, IncomingConnectionListener* > > incomingConnections = std::move(mConnectionsIncoming); + ConnectionSet connectionsPendingNotify = std::move(mConnectionsPendingRecvNotify); + ConnectionSet connectionsDropped = std::move(mConnectionsDropped); + mConnectionListsMutex.Unlock(); + + // Processed established connections - this prepares their state for incoming listener connection listeners to have + // them in the correct state. + if(!connectionsEstablished.empty()) + { + ConnectionSet::index::type& insertOrderIndex= connectionsEstablished.get(); + for(auto& connection : insertOrderIndex) + { + connection->_established(); + } + } + + // Process incoming connections + if(!incomingConnections.empty()) + { + for(auto& connectionListenerPair : incomingConnections) + { + connectionListenerPair.second->IncomingConnection(connectionListenerPair.first); + } + } + + // Process any packets + if(!connectionsPendingNotify.empty()) + { + ConnectionSet::index::type& insertOrderIndex= connectionsPendingNotify.get(); + for(auto& connection : insertOrderIndex) + { + connection->NotifyAnyReceivedPackets(); + } + } + + // Process dropped connections + if(!connectionsDropped.empty()) + { + ConnectionSet::index::type& insertOrderIndex= connectionsDropped.get(); + for(auto& connection : insertOrderIndex) + { + connection->_dropped(); + } + } if(mSpeedTimer.GetElapsed()>Seconds(1)) { mBytesSentPerSecond = mCurrentBytesSentPerSecond; mBytesReceviedPerSecond = mCurrentBytesReceviedPerSecond; mCurrentBytesSentPerSecond = 0; mCurrentBytesReceviedPerSecond = 0; mSpeedTimer.Start(); } } - void NetworkManager::ProcessDroppedConnections() - { - mConnectionListsMutex.Lock(); - ConnectionSet connectionsDropped = std::move(mConnectionsDropped); - mConnectionListsMutex.Unlock(); - if(!connectionsDropped.empty()) - { - ConnectionSet::index::type& insertOrderIndex= connectionsDropped.get(); - for(auto& connection : insertOrderIndex) - { - connection->_dropped(); - } - } - } - - void NetworkManager::ProcessEstablishedConnections() - { - mConnectionListsMutex.Lock(); - ConnectionSet connectionsEstablished = std::move(mConnectionsEstablished); - mConnectionListsMutex.Unlock(); - if(!connectionsEstablished.empty()) - { - ConnectionSet::index::type& insertOrderIndex= connectionsEstablished.get(); - for(auto& connection : insertOrderIndex) - { - connection->_established(); - } - } - } - - void NetworkManager::ProcessIncomingConnections() - { - mConnectionListsMutex.Lock(); - if(!mConnectionsIncoming.empty()) - { - std::list< std::pair< shared_ptr, IncomingConnectionListener* > >::iterator it=mConnectionsIncoming.begin(); - std::list< std::pair< shared_ptr, IncomingConnectionListener* > >::iterator itEnd=mConnectionsIncoming.end(); - while(it!=itEnd) - { - it->second->IncomingConnection(it->first); - ++it; - } - mConnectionsIncoming.clear(); - } - mConnectionListsMutex.Unlock(); - } - - void NetworkManager::ProcessReceiveNotifications() - { - mConnectionListsMutex.Lock(); - ConnectionSet connectionsPendingNotify = std::move(mConnectionsPendingRecvNotify); - mConnectionListsMutex.Unlock(); - if(!connectionsPendingNotify.empty()) - { - ConnectionSet::index::type& insertOrderIndex= connectionsPendingNotify.get(); - for(auto& connection : insertOrderIndex) - { - connection->NotifyAnyReceivedPackets(); - } - } - } - - u32 NetworkManager::GetAddrFromString(const std::string& address) { //#ifdef ECHO_PLATFORM_WINDOWS struct hostent* remoteHost;//=gethostbyname(); struct in_addr addr; // If the user input is an alpha name for the host, use gethostbyname() // If not, get host by addr (assume IPv4) if (isalpha(address.c_str()[0])) { // host address is a name //ECHO_LOG_INFO("Calling gethostbyname with " << address); remoteHost = gethostbyname(address.c_str()); addr.s_addr=inet_addr(remoteHost->h_addr_list[0]); }else { //ECHO_LOG_INFO("Calling gethostbyaddr with " << address); addr.s_addr = inet_addr(address.c_str()); if(addr.s_addr == INADDR_NONE) { ECHO_LOG_WARNING("The IPv4 address entered must be a legal address"); return 0; }else remoteHost = gethostbyaddr((char *) &addr, 4, AF_INET); } if (remoteHost == NULL) { //int dwError = WSAGetLastError(); //if (dwError != 0) //{ // if (dwError == WSAHOST_NOT_FOUND) // { // ECHO_LOG_WARNING("Host not found\n"); // } else // if (dwError == WSANO_DATA) // { // ECHO_LOG_WARNING("No data record found"); // } else // { // ECHO_LOG_WARNING("Function failed with error: " << dwError); // } //} }else { //ECHO_LOG_INFO("\tOfficial name: " << remoteHost->h_name); //ECHO_LOG_INFO("\tAlternate names: " << remoteHost->h_aliases); //ECHO_LOG_INFO("\tAddress type: "; //switch (remoteHost->h_addrtype) //{ //case AF_INET: // ECHO_LOG_INFO("\tAF_INET"); // break; //case AF_INET6: // ECHO_LOG_INFO("\tAF_INET6"); // break; //case AF_NETBIOS: // ECHO_LOG_INFO("\tAF_NETBIOS"); // break; //default: // ECHO_LOG_INFO("\t" << remoteHost->h_addrtype); // break; //} } return addr.s_addr; //#endif // return 0; } void NetworkManager::AddNetworkEventListener( shared_ptr listener ) { mNetworkEventListeners.push_back(listener); } void NetworkManager::RemoveNetworkEventListener( shared_ptr listener ) { mNetworkEventListeners.remove(listener); } void NetworkManager::ClearAllEventListeners() { mNetworkEventListeners.clear(); } shared_ptr NetworkManager::NewDataPacket() { return mDataPacketFactory->NewDataPacket(); } void NetworkManager::SetNewConnectionBufferSize(Size sizeInBytes) { mNewConnectionBufferSize = sizeInBytes; } Size NetworkManager::GetNewConnectionBufferSize() const { return mNewConnectionBufferSize; } void NetworkManager::ReportSentData(Size numberOfBytesSent) { mTotalBytesSent+=numberOfBytesSent; mCurrentBytesSentPerSecond+=numberOfBytesSent; } void NetworkManager::ReportReceivedData(Size numberOfBytesReceived) { mTotalBytesReceived+=numberOfBytesReceived; mCurrentBytesReceviedPerSecond+=numberOfBytesReceived; } }