9 static int lws_callback_client_impl(
struct lws * wsi,
enum lws_callback_reasons reason,
void * ,
void * in,
12 NLogTrace(
"LWS Callback Reason: %d", reason);
29 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
30 NLogError(
"Connection attempt failed, setting connected to false.");
33 client->
fWsi =
nullptr;
36 std::queue<std::string> empty_queue;
42 case LWS_CALLBACK_CLIENT_ESTABLISHED:
43 NLogTrace(
"LWS_CALLBACK_CLIENT_ESTABLISHED");
49 lws_callback_on_writable(wsi);
52 case LWS_CALLBACK_CLIENT_RECEIVE: {
54 static std::string messageBuffer;
59 messageBuffer.append(
reinterpret_cast<char *
>(in), len);
62 bool isFinalFragment = lws_is_final_fragment(wsi);
64 if (isFinalFragment) {
65 NLogTrace(
"Received: %s ", messageBuffer.c_str());
71 catch (
const std::exception & e) {
72 NLogError(
"LWS Callback Error: User OnMessageCallback threw an exception: %s", e.what());
75 messageBuffer.clear();
80 case LWS_CALLBACK_CLIENT_WRITEABLE: {
81 NLogTrace(
"LWS_CALLBACK_CLIENT_WRITEABLE triggered for %s", client->
fHost.c_str());
86 client->
fSendBuffer.resize(LWS_PRE + message.size());
87 memcpy(client->
fSendBuffer.data() + LWS_PRE, message.data(), message.size());
89 int n = lws_write(wsi, client->
fSendBuffer.data() + LWS_PRE, message.size(), LWS_WRITE_TEXT);
91 if (n < (
int)message.size()) {
92 NLogError(
"LWS Callback Error: lws_write failed to send the full message, sent %d bytes, expected %zu bytes.",
98 lws_callback_on_writable(wsi);
107 case LWS_CALLBACK_CLOSED:
108 NLogTrace(
"LWS_CALLBACK_CLOSED: Connection closed by server or client.");
110 client->
fWsi =
nullptr;
113 std::queue<std::string> empty_queue;
131 LWS_PROTOCOL_LIST_TERM
134 : fLwsContext(nullptr), fWsi(nullptr), fMaxRetries(maxRetries), fRetryDelayMs(retryDelayMs), fConnected(false),
135 fShutdownRequested(false), fConnectionAttemptComplete(false)
138 lws_set_log_level(LLL_ERR, NULL);
145 NLogDebug(
"NWsClient destructor finished.");
152 NLogInfo(
"NWsClient: Attempting to connect to %s (attempt %d)", uriString.c_str(), attempt + 1);
154 NLogError(
"NWsClient: Already connected.");
158 NLogError(
"NWsClient: Context already exists, disconnect first.");
170 catch (
const std::runtime_error & e) {
171 NLogError(
"NWsClient: URI parsing error: %s", e.what());
179 lws_context_creation_info info;
180 memset(&info, 0,
sizeof(info));
182 info.port = CONTEXT_PORT_NO_LISTEN;
187 if (parsedUri.
fScheme ==
"wss") {
188 info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
194 NLogError(
"Failed to create libwebsockets context.");
198 struct lws_client_connect_info cci;
199 memset(&cci, 0,
sizeof(cci));
201 cci.address =
fHost.c_str();
203 cci.path =
fPath.c_str();
204 cci.host =
fHost.c_str();
205 cci.origin =
fHost.c_str();
210 if (parsedUri.
fScheme ==
"wss") {
211 cci.ssl_connection = LCCSCF_USE_SSL;
214 cci.ssl_connection = 0;
217 NLogTrace(
"NWsClient: Initiating connection to %s (attempt %d)", uriString.c_str(), attempt + 1);
219 fWsi = lws_client_connect_via_info(&cci);
221 NLogError(
"NWsClient: Failed to connect to WebSocket server at %s (attempt %d)", uriString.c_str(), attempt + 1);
226 std::this_thread::sleep_for(std::chrono::milliseconds(
fRetryDelayMs));
240 NLogError(
"NWsClient: Connection to %s failed or shutdown requested (attempt %d).", uriString.c_str(),
245 std::this_thread::sleep_for(std::chrono::milliseconds(
fRetryDelayMs));
258 NLogTrace(
"NWsClient: Disconnect requested.");
266 std::this_thread::sleep_for(std::chrono::milliseconds(10));
273 NLogDebug(
"NWsClient: Cancelling LWS service.");
280 NLogDebug(
"NWsClient: LWS service thread joined.");
290 std::queue<std::string> empty_queue;
294 NLogTrace(
"NWsClient: LWS context destroyed.");
303 NLogError(
"NWsClient: Cannot send, not connected to WebSocket server.");
307 size_t outgoingQueueSize;
314 lws_callback_on_writable(
fWsi);
315 NLogTrace(
"NWsClient: Called lws_callback_on_writable for message. Queue size: %zu", outgoingQueueSize);
326 NLogTrace(
"NWsClient: Message sent successfully.");
332 NLogTrace(
"NWsClient: LWS service loop started.");
338 NLogError(
"NWsClient: lws_service returned error or was cancelled, stopping loop.");
346 std::this_thread::sleep_for(std::chrono::milliseconds(1));
348 NLogTrace(
"NWsClient: LWS service loop stopped.");
356 std::regex uriRegex(R
"((ws|wss)://([a-zA-Z0-9\-\.]+)(:([0-9]+))?(/.*)?)");
359 if (!std::regex_match(uriString, matches, uriRegex)) {
360 throw std::runtime_error(
"Invalid WebSocket URI format: " + uriString);
363 parsed.
fScheme = matches[1].str();
366 parsed.
fHost = matches[2].str();
368 if (matches[4].matched) {
370 parsed.
fPort = std::stoi(matches[4].str());
372 catch (
const std::exception & e) {
373 throw std::runtime_error(
"Invalid port number in URI: " + matches[4].str());
380 else if (parsed.
fScheme ==
"wss") {
384 throw std::runtime_error(
"Unknown scheme for default port: " + parsed.
fScheme);
388 parsed.
fPath = matches[5].str();
389 if (parsed.
fPath.empty()) {
WebSocket client for asynchronous communication using libwebsockets.
void LwsServiceLoop()
Service loop for libwebsockets running in a separate thread.
NWsClient(int maxRetries=5, int retryDelayMs=1000)
Constructor.
struct lws * fWsi
WebSocket instance.
int fRetryDelayMs
Delay between retries (ms)
bool Connect(const std::string &uriString)
Connect to a WebSocket server.
static constexpr const char * fgProtocolName
Protocol name for websocket communication.
int fMaxRetries
Maximum connection retries.
void Disconnect()
Disconnect from the WebSocket server.
static WS_URI ParseUri(const std::string &uriString)
Parse a WebSocket URI string into its components.
std::atomic< bool > fShutdownRequested
Shutdown flag.
static lws_protocols fProtocols[]
Protocols supported by libwebsockets.
std::atomic< bool > fConnectionAttemptComplete
Connection attempt completion flag.
std::condition_variable fSendCv
Condition variable for sending messages.
bool Send(const std::string &message)
Send a message to the server.
struct lws_context * fLwsContext
libwebsockets context
std::vector< unsigned char > fSendBuffer
Buffer for sending messages.
std::atomic< bool > fConnected
Connection status.
std::queue< std::string > fOutgoingMessageQueue
Queue of outgoing messages.
std::mutex fConnectMutex
Mutex for connection state.
std::string fHost
Hostname.
std::mutex fOutgoingMutex
Mutex for outgoing queue.
std::thread fLwsServiceThread
Thread running the service loop.
std::condition_variable fConnectCv
Condition variable for connection.
OnMessageCallback fOnMessageCallback
Callback for received messages.
~NWsClient()
Destructor. Cleans up resources and disconnects.
Structure representing a parsed WebSocket URI.
std::string fScheme
URI scheme (e.g., "ws", "wss")
std::string fPath
Path component.
std::string fHost
Hostname or IP address.