Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.24)

project(native_streaming VERSION 1.0.16 LANGUAGES CXX)
project(native_streaming VERSION 1.0.17 LANGUAGES CXX)

if (NOT CMAKE_MESSAGE_CONTEXT)
set(CMAKE_MESSAGE_CONTEXT ${PROJECT_NAME})
Expand Down
7 changes: 5 additions & 2 deletions include/native_streaming/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,12 @@ class Client : public std::enable_shared_from_this<Client>

/// @brief creates a connection Session using provided web-socket stream object
/// @param wsStream web-socket stream object which provides as a R/W interface for connection
/// @param endpointAddress string with the address in the format IP:port of the connection endpoint associated with the session
/// @param endpointAddress the IP address of the connection endpoint associated with the session
/// @brief endpointPortNumber the port number of the connection endpoint associated with the session
/// @return pointer to created Session object
std::shared_ptr<Session> createSession(std::shared_ptr<WebsocketStream> wsStream, const std::string& endpointAddress);
std::shared_ptr<Session> createSession(std::shared_ptr<WebsocketStream> wsStream,
const std::string& endpointAddress,
const boost::asio::ip::port_type& endpointPortNumber);

/// async operations handler
std::shared_ptr<boost::asio::io_context> ioContextPtr;
Expand Down
6 changes: 4 additions & 2 deletions include/native_streaming/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ class Server : public std::enable_shared_from_this<Server>
/// @brief creates a connection Session using provided web-socket stream object
/// @param wsStream web-socket stream object which provides as a R/W interface for connection
/// @param user context, usualy a pointer to the authenticated user object
/// @param endpointAddress string with the address in the format IP:port of the connection endpoint associated with the session
/// @param endpointAddress the IP address of the connection endpoint associated with the session
/// @brief endpointPortNumber the port number of the connection endpoint associated with the session
/// @return pointer to created Session object
std::shared_ptr<Session> createSession(std::shared_ptr<WebsocketStream> wsStream,
const std::shared_ptr<void>& userContext,
const std::string& endpointAddress);
const std::string& endpointAddress,
const boost::asio::ip::port_type& endpointPortNumber);

/// async operations handler
std::shared_ptr<boost::asio::io_context> ioContextPtr;
Expand Down
15 changes: 10 additions & 5 deletions include/native_streaming/session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class Session : public std::enable_shared_from_this<Session>
std::shared_ptr<void> userContext,
boost::beast::role_type role,
LogCallback logCallback,
const std::string& endpointAddress);
const std::string& endpointAddress,
const boost::asio::ip::port_type& endpointPortNumber);
~Session();

Session(const Session&) = delete;
Expand Down Expand Up @@ -74,11 +75,12 @@ class Session : public std::enable_shared_from_this<Session>
/// @brief returns user context object, usualy a pointer to the authenticated user object
std::shared_ptr<void> getUserContext();

/// @brief returns a string with the address in the format IP:port of the connection endpoint associated with the session.
/// for a client-side session, it returns the server address (e.g., 127.0.0.1:7420).
/// for a server-side session, it returns the address of the connected client (e.g., 127.0.0.1:53124).
/// @brief returns a string with the IP address of the connection endpoint associated with the session.
std::string getEndpointAddress();

/// @brief returns a port number of the connection endpoint associated with the session.
boost::asio::ip::port_type getEndpointPortNumber();

/// @brief sets a callback to be called when the write operation has not been scheduled due to a timeout reached
/// @param writeTaskTimeoutHandler callback
void setWriteTimedOutHandler(OnSessionErrorCallback writeTaskTimeoutHandler);
Expand Down Expand Up @@ -124,8 +126,11 @@ class Session : public std::enable_shared_from_this<Session>
/// @brief interval of sending the websocket pongs
std::chrono::milliseconds heartbeatPeriod;

/// @brief string with the address in the format IP:port of the connection endpoint associated with the session
/// @brief string with the IP address in the format IP:port of the connection endpoint associated with the session
std::string endpointAddress;

/// @brief port number of the connection endpoint associated with the session
boost::asio::ip::port_type endpointPortNumber;
};

END_NAMESPACE_NATIVE_STREAMING
12 changes: 8 additions & 4 deletions src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,24 +157,28 @@ void Client::onUpgradeConnection(const boost::system::error_code& ec, std::share
}

std::string endpointAddress;
boost::asio::ip::port_type endpointPortNumber;
try
{
auto remoteEp = wsStream->next_layer().socket().remote_endpoint();
endpointAddress = remoteEp.address().to_string() + ":" + std::to_string(remoteEp.port());
endpointAddress = remoteEp.address().to_string();
endpointPortNumber = remoteEp.port();
}
catch (const std::exception& e)
{
NS_LOG_E("Websocket connection aborted - cannot get connection endpoint: {}", e.what());
return;
}

onNewSessionCallback(createSession(wsStream, endpointAddress));
onNewSessionCallback(createSession(wsStream, endpointAddress, endpointPortNumber));
}

std::shared_ptr<Session> Client::createSession(std::shared_ptr<WebsocketStream> wsStream, const std::string& endpointAddress)
std::shared_ptr<Session> Client::createSession(std::shared_ptr<WebsocketStream> wsStream,
const std::string& endpointAddress,
const boost::asio::ip::port_type& endpointPortNumber)
{
websocketStream.reset();
return std::make_shared<Session>(ioContextPtr, wsStream, nullptr, boost::beast::role_type::client, logCallback, endpointAddress);
return std::make_shared<Session>(ioContextPtr, wsStream, nullptr, boost::beast::role_type::client, logCallback, endpointAddress, endpointPortNumber);
}

END_NAMESPACE_NATIVE_STREAMING
11 changes: 7 additions & 4 deletions src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ void Server::onUpgradeConnection(const boost::system::error_code& ec,
// it throws an exception when attempting to retrieve the endpoint address.
// To handle this, first verify the socket state and then safely attempt to retrieve the endpoint name.
std::string endpointAddress;
boost::asio::ip::port_type endpointPortNumber;
if (!(wsStream->is_open() && wsStream->next_layer().socket().is_open()))
{
NS_LOG_W("Websocket connection aborted: the socket is already closed");
Expand All @@ -223,7 +224,8 @@ void Server::onUpgradeConnection(const boost::system::error_code& ec,
try
{
auto remoteEp = wsStream->next_layer().socket().remote_endpoint();
endpointAddress = remoteEp.address().to_string() + ":" + std::to_string(remoteEp.port());
endpointAddress = remoteEp.address().to_string();
endpointPortNumber = remoteEp.port();
}
catch (const std::exception& e)
{
Expand All @@ -233,14 +235,15 @@ void Server::onUpgradeConnection(const boost::system::error_code& ec,
}

NS_LOG_I("Client {} - websocket connection accepted", endpointAddress);
onNewSessionCallback(createSession(wsStream, userContext, endpointAddress));
onNewSessionCallback(createSession(wsStream, userContext, endpointAddress, endpointPortNumber));
}

std::shared_ptr<Session> Server::createSession(std::shared_ptr<WebsocketStream> wsStream,
const std::shared_ptr<void>& userContext,
const std::string& endpointAddress)
const std::string& endpointAddress,
const boost::asio::ip::port_type& endpointPortNumber)
{
return std::make_shared<Session>(ioContextPtr, wsStream, userContext, boost::beast::role_type::server, logCallback, endpointAddress);
return std::make_shared<Session>(ioContextPtr, wsStream, userContext, boost::beast::role_type::server, logCallback, endpointAddress, endpointPortNumber);
}

END_NAMESPACE_NATIVE_STREAMING
9 changes: 8 additions & 1 deletion src/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ Session::Session(std::shared_ptr<boost::asio::io_context> ioContextPtr,
std::shared_ptr<void> userContext,
boost::beast::role_type role,
LogCallback logCallback,
const std::string& endpointAddress)
const std::string& endpointAddress,
const boost::asio::ip::port_type& endpointPortNumber)
: role(role)
, logCallback(logCallback)
, ioContextPtr(ioContextPtr)
Expand All @@ -21,6 +22,7 @@ Session::Session(std::shared_ptr<boost::asio::io_context> ioContextPtr,
, heartbeatTimer(std::make_shared<boost::asio::steady_timer>(*ioContextPtr.get()))
, heartbeatPeriod(defaultHeartbeatPeriod)
, endpointAddress(endpointAddress)
, endpointPortNumber(endpointPortNumber)
{
setOptions();
}
Expand Down Expand Up @@ -178,6 +180,11 @@ std::string Session::getEndpointAddress()
return endpointAddress;
}

boost::asio::ip::port_type Session::getEndpointPortNumber()
{
return endpointPortNumber;
}

void Session::setWriteTimedOutHandler(OnSessionErrorCallback writeTaskTimeoutHandler)
{
writer->setWriteTimedOutHandler(
Expand Down
6 changes: 4 additions & 2 deletions test/test_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ TEST_F(ConnectionTest, Connect)
ASSERT_TRUE(serverSession.operator bool());
ASSERT_TRUE(clientSession.operator bool());

ASSERT_EQ(clientSession->getEndpointAddress(), std::string(CONNECTION_HOST) + std::string(":") + std::to_string(CONNECTION_PORT));
ASSERT_EQ(clientSession->getEndpointAddress(), std::string(CONNECTION_HOST));
ASSERT_EQ(clientSession->getEndpointPortNumber(), CONNECTION_PORT);

serverSession.reset();
clientSession.reset();
Expand Down Expand Up @@ -95,7 +96,8 @@ TEST_F(ConnectionTest, ConnectIPv6)
ASSERT_TRUE(serverSession.operator bool());
ASSERT_TRUE(clientSession.operator bool());

ASSERT_EQ(clientSession->getEndpointAddress(), std::string("::1") + std::string(":") + std::to_string(CONNECTION_PORT));
ASSERT_EQ(clientSession->getEndpointAddress(), std::string("::1"));
ASSERT_EQ(clientSession->getEndpointPortNumber(), CONNECTION_PORT);

serverSession.reset();
clientSession.reset();
Expand Down