ndmspc v1.2.0-0.1.rc7
Loading...
Searching...
No Matches
NWsClient.cxx
1#include <regex>
2#include <algorithm>
3#include <cstring>
4#include <TSystem.h>
5#include "NWsClient.h"
6#include "NLogger.h"
7
8// The global LWS callback function.
9static int lws_callback_client_impl(struct lws * wsi, enum lws_callback_reasons reason, void * /*user*/, void * in,
10 size_t len)
11{
12 NLogTrace("LWS Callback Reason: %d", reason);
13 Ndmspc::NWsClient * client = static_cast<Ndmspc::NWsClient *>(lws_wsi_user(wsi));
14 // If not found in wsi user data, try to get from context
15 if (!client) {
16 client = (Ndmspc::NWsClient *)lws_context_user(lws_get_context(wsi));
17 }
18
19 // if (!client && reason != LWS_CALLBACK_PROTOCOL_INIT) {
20 // return 0;
21 // }
22 // if (!client) {
23 // NLogError("LWS Callback Error: NWsClient instance pointer is NULL. This is critical.");
24 // return -1;
25 // }
26
27 // NLogDebug("LWS Callback Reason: %d", reason);
28 switch (reason) {
29 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
30 NLogError("Connection attempt failed, setting connected to false.");
31 client->fConnected = false;
32 client->fConnectionAttemptComplete = true;
33 client->fWsi = nullptr;
34 {
35 std::lock_guard<std::mutex> lock(client->fOutgoingMutex);
36 std::queue<std::string> empty_queue;
37 std::swap(client->fOutgoingMessageQueue, empty_queue);
38 }
39 client->fConnectCv.notify_all();
40 break;
41
42 case LWS_CALLBACK_CLIENT_ESTABLISHED:
43 NLogTrace("LWS_CALLBACK_CLIENT_ESTABLISHED");
44
45 client->fConnected = true;
46 client->fWsi = wsi;
47 client->fConnectionAttemptComplete = true;
48 client->fConnectCv.notify_all();
49 lws_callback_on_writable(wsi);
50 break;
51
52 case LWS_CALLBACK_CLIENT_RECEIVE: {
53 // Accumulate message fragments
54 static std::string messageBuffer;
55 // static std::mutex messageBufferMutex;
56
57 {
58 // std::lock_guard<std::mutex> lock(messageBufferMutex);
59 messageBuffer.append(reinterpret_cast<char *>(in), len);
60
61 // Check if this is the final fragment
62 bool isFinalFragment = lws_is_final_fragment(wsi);
63
64 if (isFinalFragment) {
65 NLogTrace("Received: %s ", messageBuffer.c_str());
66
67 if (client->fOnMessageCallback) {
68 try {
69 client->fOnMessageCallback(messageBuffer);
70 }
71 catch (const std::exception & e) {
72 NLogError("LWS Callback Error: User OnMessageCallback threw an exception: %s", e.what());
73 }
74 }
75 messageBuffer.clear();
76 }
77 }
78 } break;
79
80 case LWS_CALLBACK_CLIENT_WRITEABLE: {
81 NLogTrace("LWS_CALLBACK_CLIENT_WRITEABLE triggered for %s", client->fHost.c_str());
82 std::lock_guard<std::mutex> lock(client->fOutgoingMutex);
83 if (!client->fOutgoingMessageQueue.empty()) {
84 const std::string & message = client->fOutgoingMessageQueue.front();
85
86 client->fSendBuffer.resize(LWS_PRE + message.size());
87 memcpy(client->fSendBuffer.data() + LWS_PRE, message.data(), message.size());
88
89 int n = lws_write(wsi, client->fSendBuffer.data() + LWS_PRE, message.size(), LWS_WRITE_TEXT);
90
91 if (n < (int)message.size()) {
92 NLogError("LWS Callback Error: lws_write failed to send the full message, sent %d bytes, expected %zu bytes.",
93 n, message.size());
94 }
95 client->fOutgoingMessageQueue.pop();
96
97 if (!client->fOutgoingMessageQueue.empty()) {
98 lws_callback_on_writable(wsi);
99 }
100 }
101 else {
102 client->fSendCv.notify_all();
103 }
104
105 } break;
106
107 case LWS_CALLBACK_CLOSED:
108 NLogTrace("LWS_CALLBACK_CLOSED: Connection closed by server or client.");
109 client->fConnected = false;
110 client->fWsi = nullptr;
111 {
112 std::lock_guard<std::mutex> lock(client->fOutgoingMutex);
113 std::queue<std::string> empty_queue;
114 std::swap(client->fOutgoingMessageQueue, empty_queue);
115 }
116 client->fConnectionAttemptComplete = true;
117 client->fConnectCv.notify_all();
118 break;
119
120 default: break;
121 }
122 return 0;
123}
124
125namespace Ndmspc {
126
127// lws_protocols NWsClient::fProtocols[] = {
128// {NWsClient::fgProtocolName, lws_callback_client_impl, sizeof(Ndmspc::NWsClient *), 0}, {NULL, NULL, 0, 0}};
129lws_protocols Ndmspc::NWsClient::fProtocols[] = {
130 {Ndmspc::NWsClient::fgProtocolName, lws_callback_client_impl, sizeof(Ndmspc::NWsClient *), 4096, 0, nullptr, 0},
131 LWS_PROTOCOL_LIST_TERM /* terminator */
132};
133NWsClient::NWsClient(int maxRetries, int retryDelayMs)
134 : fLwsContext(nullptr), fWsi(nullptr), fMaxRetries(maxRetries), fRetryDelayMs(retryDelayMs), fConnected(false),
136{
137 // Silent websocket logging
138 lws_set_log_level(LLL_ERR, NULL);
139 // lws_set_log_level(LLL_DEBUG | LLL_PARSER | LLL_HEADER, NULL);
140}
141
143{
144 Disconnect();
145 NLogDebug("NWsClient destructor finished.");
146}
147
148bool NWsClient::Connect(const std::string & uriString)
149{
150 int attempt = 0;
151 while (attempt < fMaxRetries) {
152 NLogInfo("NWsClient: Attempting to connect to %s (attempt %d)", uriString.c_str(), attempt + 1);
153 if (fConnected.load()) {
154 NLogError("NWsClient: Already connected.");
155 return true;
156 }
157 if (fLwsContext) {
158 NLogError("NWsClient: Context already exists, disconnect first.");
159 return false;
160 }
161
162 fShutdownRequested = false;
164 fWsi = nullptr;
165
166 WS_URI parsedUri;
167 try {
168 parsedUri = ParseUri(uriString);
169 }
170 catch (const std::runtime_error & e) {
171 NLogError("NWsClient: URI parsing error: %s", e.what());
172 return false;
173 }
174
175 fHost = parsedUri.fHost;
176 fPort = parsedUri.fPort;
177 fPath = parsedUri.fPath;
178
179 lws_context_creation_info info;
180 memset(&info, 0, sizeof(info));
181
182 info.port = CONTEXT_PORT_NO_LISTEN;
183 info.protocols = fProtocols;
184 info.gid = -1;
185 info.uid = -1;
186
187 if (parsedUri.fScheme == "wss") {
188 info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
189 }
190 info.user = this;
191
192 fLwsContext = lws_create_context(&info);
193 if (!fLwsContext) {
194 NLogError("Failed to create libwebsockets context.");
195 return false;
196 }
197
198 struct lws_client_connect_info cci;
199 memset(&cci, 0, sizeof(cci));
200 cci.context = fLwsContext;
201 cci.address = fHost.c_str();
202 cci.port = fPort;
203 cci.path = fPath.c_str();
204 cci.host = fHost.c_str();
205 cci.origin = fHost.c_str();
206
207 cci.protocol = fgProtocolName;
208 cci.userdata = this;
209
210 if (parsedUri.fScheme == "wss") {
211 cci.ssl_connection = LCCSCF_USE_SSL;
212 }
213 else {
214 cci.ssl_connection = 0;
215 }
216
217 NLogTrace("NWsClient: Initiating connection to %s (attempt %d)", uriString.c_str(), attempt + 1);
218
219 fWsi = lws_client_connect_via_info(&cci);
220 if (!fWsi) {
221 NLogError("NWsClient: Failed to connect to WebSocket server at %s (attempt %d)", uriString.c_str(), attempt + 1);
222 lws_context_destroy(fLwsContext);
223 fLwsContext = nullptr;
224 attempt++;
225 if (attempt < fMaxRetries) {
226 std::this_thread::sleep_for(std::chrono::milliseconds(fRetryDelayMs));
227 continue;
228 }
229 return false;
230 }
231 cci.pwsi = &fWsi;
232
233 fLwsServiceThread = std::thread(&NWsClient::LwsServiceLoop, this);
234
235 {
236 std::unique_lock<std::mutex> lock(fConnectMutex);
237 fConnectCv.wait(lock, [this]() { return fConnectionAttemptComplete.load() || fShutdownRequested.load(); });
238 }
239 if (!fConnected.load()) {
240 NLogError("NWsClient: Connection to %s failed or shutdown requested (attempt %d).", uriString.c_str(),
241 attempt + 1);
242 Disconnect();
243 attempt++;
244 if (attempt < fMaxRetries) {
245 std::this_thread::sleep_for(std::chrono::milliseconds(fRetryDelayMs));
246 continue;
247 }
248 return false;
249 }
250
251 return true;
252 }
253 return false;
254}
255
257{
258 NLogTrace("NWsClient: Disconnect requested.");
259 if (fLwsContext) {
260 // Wait until all outgoing messages are sent
261 while (true) {
262 {
263 std::lock_guard<std::mutex> lock(fOutgoingMutex);
264 if (fOutgoingMessageQueue.empty()) break;
265 }
266 std::this_thread::sleep_for(std::chrono::milliseconds(10));
267 }
268
269 fShutdownRequested = true;
271 fConnectCv.notify_all();
272
273 NLogDebug("NWsClient: Cancelling LWS service.");
274
275 lws_cancel_service(fLwsContext);
276
277 if (fLwsServiceThread.joinable()) {
278 fLwsServiceThread.join();
279 }
280 NLogDebug("NWsClient: LWS service thread joined.");
281
282 if (fWsi) {
283 fWsi = nullptr;
284 }
285
286 lws_context_destroy(fLwsContext);
287 fLwsContext = nullptr;
288 fConnected = false;
289
290 std::queue<std::string> empty_queue;
291 std::lock_guard<std::mutex> lock(fOutgoingMutex);
292 std::swap(fOutgoingMessageQueue, empty_queue);
293
294 NLogTrace("NWsClient: LWS context destroyed.");
295 }
296 fLwsContext = nullptr;
297 fWsi = nullptr;
298 fConnected = false;
299}
300bool NWsClient::Send(const std::string & message)
301{
302 if (!fConnected.load() || !fWsi) {
303 NLogError("NWsClient: Cannot send, not connected to WebSocket server.");
304 return false;
305 }
306
307 size_t outgoingQueueSize;
308 {
309 std::lock_guard<std::mutex> lock(fOutgoingMutex);
310 fOutgoingMessageQueue.push(message);
311 outgoingQueueSize = fOutgoingMessageQueue.size();
312 }
313
314 lws_callback_on_writable(fWsi);
315 NLogTrace("NWsClient: Called lws_callback_on_writable for message. Queue size: %zu", outgoingQueueSize);
316
317 // // Wait until the message is sent (queue size decreases)
318 // {
319 // std::unique_lock<std::mutex> lock(fOutgoingMutex);
320 // fSendCv.wait(lock, [this, outgoingQueueSize]() {
321 // NLogDebug("NWsClient: Waiting for message to be sent. Current queue size: %zu", fOutgoingMessageQueue.size());
322 // return fOutgoingMessageQueue.size() < outgoingQueueSize || !fConnected.load();
323 // });
324 // }
325
326 NLogTrace("NWsClient: Message sent successfully.");
327 return fConnected.load();
328}
329
331{
332 NLogTrace("NWsClient: LWS service loop started.");
333 int n = 0;
334 while (!fShutdownRequested.load()) {
335 n = lws_service(fLwsContext, -1);
336 // NLogDebug("NWsClient: lws_service returned %d", n);
337 if (n < 0) {
338 NLogError("NWsClient: lws_service returned error or was cancelled, stopping loop.");
339 fShutdownRequested = true;
340 fConnected = false;
342 fConnectCv.notify_all();
343 break;
344 }
345 // sleep for a short duration to avoid busy-waiting
346 std::this_thread::sleep_for(std::chrono::milliseconds(1));
347 }
348 NLogTrace("NWsClient: LWS service loop stopped.");
349}
350
351WS_URI NWsClient::ParseUri(const std::string & uriString)
352{
353 WS_URI parsed;
354 parsed.fPort = 0;
355
356 std::regex uriRegex(R"((ws|wss)://([a-zA-Z0-9\-\.]+)(:([0-9]+))?(/.*)?)");
357 std::smatch matches;
358
359 if (!std::regex_match(uriString, matches, uriRegex)) {
360 throw std::runtime_error("Invalid WebSocket URI format: " + uriString);
361 }
362
363 parsed.fScheme = matches[1].str();
364 std::transform(parsed.fScheme.begin(), parsed.fScheme.end(), parsed.fScheme.begin(), ::tolower);
365
366 parsed.fHost = matches[2].str();
367
368 if (matches[4].matched) {
369 try {
370 parsed.fPort = std::stoi(matches[4].str());
371 }
372 catch (const std::exception & e) {
373 throw std::runtime_error("Invalid port number in URI: " + matches[4].str());
374 }
375 }
376 else {
377 if (parsed.fScheme == "ws") {
378 parsed.fPort = 80;
379 }
380 else if (parsed.fScheme == "wss") {
381 parsed.fPort = 443;
382 }
383 else {
384 throw std::runtime_error("Unknown scheme for default port: " + parsed.fScheme);
385 }
386 }
387
388 parsed.fPath = matches[5].str();
389 if (parsed.fPath.empty()) {
390 parsed.fPath = "/";
391 }
392
393 return parsed;
394}
395
396} // namespace Ndmspc
WebSocket client for asynchronous communication using libwebsockets.
Definition NWsClient.h:49
void LwsServiceLoop()
Service loop for libwebsockets running in a separate thread.
NWsClient(int maxRetries=5, int retryDelayMs=1000)
Constructor.
struct lws * fWsi
WebSocket instance.
Definition NWsClient.h:108
int fRetryDelayMs
Delay between retries (ms)
Definition NWsClient.h:111
bool Connect(const std::string &uriString)
Connect to a WebSocket server.
static constexpr const char * fgProtocolName
Protocol name for websocket communication.
Definition NWsClient.h:51
int fMaxRetries
Maximum connection retries.
Definition NWsClient.h:110
int fPort
Port number.
Definition NWsClient.h:114
void Disconnect()
Disconnect from the WebSocket server.
static WS_URI ParseUri(const std::string &uriString)
Parse a WebSocket URI string into its components.
std::atomic< bool > fShutdownRequested
Shutdown flag.
Definition NWsClient.h:118
static lws_protocols fProtocols[]
Protocols supported by libwebsockets.
Definition NWsClient.h:129
std::string fPath
Path.
Definition NWsClient.h:115
std::atomic< bool > fConnectionAttemptComplete
Connection attempt completion flag.
Definition NWsClient.h:119
std::condition_variable fSendCv
Condition variable for sending messages.
Definition NWsClient.h:124
bool Send(const std::string &message)
Send a message to the server.
struct lws_context * fLwsContext
libwebsockets context
Definition NWsClient.h:107
std::vector< unsigned char > fSendBuffer
Buffer for sending messages.
Definition NWsClient.h:123
std::atomic< bool > fConnected
Connection status.
Definition NWsClient.h:117
std::queue< std::string > fOutgoingMessageQueue
Queue of outgoing messages.
Definition NWsClient.h:121
std::mutex fConnectMutex
Mutex for connection state.
Definition NWsClient.h:130
std::string fHost
Hostname.
Definition NWsClient.h:113
std::mutex fOutgoingMutex
Mutex for outgoing queue.
Definition NWsClient.h:122
std::thread fLwsServiceThread
Thread running the service loop.
Definition NWsClient.h:109
std::condition_variable fConnectCv
Condition variable for connection.
Definition NWsClient.h:131
OnMessageCallback fOnMessageCallback
Callback for received messages.
Definition NWsClient.h:126
~NWsClient()
Destructor. Cleans up resources and disconnects.
Global callback function for libwebsockets client events.
Structure representing a parsed WebSocket URI.
Definition NWsClient.h:36
std::string fScheme
URI scheme (e.g., "ws", "wss")
Definition NWsClient.h:37
std::string fPath
Path component.
Definition NWsClient.h:40
int fPort
Port number.
Definition NWsClient.h:39
std::string fHost
Hostname or IP address.
Definition NWsClient.h:38