ndmspc  v1.2.0-0.1.rc3
NWsClient.cxx
1 #include <regex>
2 #include <algorithm>
3 #include <cstring>
4 #include <TSystem.h>
5 #include "NWsClient.h"
6 #include "NLogger.h"
7 
8 // The global LWS callback function.
9 static int lws_callback_client_impl(struct lws * wsi, enum lws_callback_reasons reason, void * /*user*/, void * in,
10  size_t len)
11 {
12  NLogTrace("LWS Callback Reason: %d", reason);
13  Ndmspc::NWsClient * client = static_cast<Ndmspc::NWsClient *>(lws_wsi_user(wsi));
14  // If not found in wsi user data, try to get from context
15  if (!client) {
16  client = (Ndmspc::NWsClient *)lws_context_user(lws_get_context(wsi));
17  }
18 
19  // if (!client && reason != LWS_CALLBACK_PROTOCOL_INIT) {
20  // return 0;
21  // }
22  // if (!client) {
23  // NLogError("LWS Callback Error: NWsClient instance pointer is NULL. This is critical.");
24  // return -1;
25  // }
26 
27  // NLogDebug("LWS Callback Reason: %d", reason);
28  switch (reason) {
29  case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
30  NLogError("Connection attempt failed, setting connected to false.");
31  client->fConnected = false;
32  client->fConnectionAttemptComplete = true;
33  client->fWsi = nullptr;
34  {
35  std::lock_guard<std::mutex> lock(client->fOutgoingMutex);
36  std::queue<std::string> empty_queue;
37  std::swap(client->fOutgoingMessageQueue, empty_queue);
38  }
39  client->fConnectCv.notify_all();
40  break;
41 
42  case LWS_CALLBACK_CLIENT_ESTABLISHED:
43  NLogTrace("LWS_CALLBACK_CLIENT_ESTABLISHED");
44 
45  client->fConnected = true;
46  client->fWsi = wsi;
47  client->fConnectionAttemptComplete = true;
48  client->fConnectCv.notify_all();
49  lws_callback_on_writable(wsi);
50  break;
51 
52  case LWS_CALLBACK_CLIENT_RECEIVE: {
53  // Accumulate message fragments
54  static std::string messageBuffer;
55  // static std::mutex messageBufferMutex;
56 
57  {
58  // std::lock_guard<std::mutex> lock(messageBufferMutex);
59  messageBuffer.append(reinterpret_cast<char *>(in), len);
60 
61  // Check if this is the final fragment
62  bool isFinalFragment = lws_is_final_fragment(wsi);
63 
64  if (isFinalFragment) {
65  NLogTrace("Received: %s ", messageBuffer.c_str());
66 
67  if (client->fOnMessageCallback) {
68  try {
69  client->fOnMessageCallback(messageBuffer);
70  }
71  catch (const std::exception & e) {
72  NLogError("LWS Callback Error: User OnMessageCallback threw an exception: %s", e.what());
73  }
74  }
75  messageBuffer.clear();
76  }
77  }
78  } break;
79 
80  case LWS_CALLBACK_CLIENT_WRITEABLE: {
81  NLogTrace("LWS_CALLBACK_CLIENT_WRITEABLE triggered for %s", client->fHost.c_str());
82  std::lock_guard<std::mutex> lock(client->fOutgoingMutex);
83  if (!client->fOutgoingMessageQueue.empty()) {
84  const std::string & message = client->fOutgoingMessageQueue.front();
85 
86  client->fSendBuffer.resize(LWS_PRE + message.size());
87  memcpy(client->fSendBuffer.data() + LWS_PRE, message.data(), message.size());
88 
89  int n = lws_write(wsi, client->fSendBuffer.data() + LWS_PRE, message.size(), LWS_WRITE_TEXT);
90 
91  if (n < (int)message.size()) {
92  NLogError("LWS Callback Error: lws_write failed to send the full message, sent %d bytes, expected %zu bytes.",
93  n, message.size());
94  }
95  client->fOutgoingMessageQueue.pop();
96 
97  if (!client->fOutgoingMessageQueue.empty()) {
98  lws_callback_on_writable(wsi);
99  }
100  }
101  else {
102  client->fSendCv.notify_all();
103  }
104 
105  } break;
106 
107  case LWS_CALLBACK_CLOSED:
108  NLogTrace("LWS_CALLBACK_CLOSED: Connection closed by server or client.");
109  client->fConnected = false;
110  client->fWsi = nullptr;
111  {
112  std::lock_guard<std::mutex> lock(client->fOutgoingMutex);
113  std::queue<std::string> empty_queue;
114  std::swap(client->fOutgoingMessageQueue, empty_queue);
115  }
116  client->fConnectionAttemptComplete = true;
117  client->fConnectCv.notify_all();
118  break;
119 
120  default: break;
121  }
122  return 0;
123 }
124 
125 namespace Ndmspc {
126 
127 // lws_protocols NWsClient::fProtocols[] = {
128 // {NWsClient::fgProtocolName, lws_callback_client_impl, sizeof(Ndmspc::NWsClient *), 0}, {NULL, NULL, 0, 0}};
129 lws_protocols Ndmspc::NWsClient::fProtocols[] = {
130  {Ndmspc::NWsClient::fgProtocolName, lws_callback_client_impl, sizeof(Ndmspc::NWsClient *), 4096, 0, nullptr, 0},
131  LWS_PROTOCOL_LIST_TERM /* terminator */
132 };
133 NWsClient::NWsClient(int maxRetries, int retryDelayMs)
134  : fLwsContext(nullptr), fWsi(nullptr), fMaxRetries(maxRetries), fRetryDelayMs(retryDelayMs), fConnected(false),
135  fShutdownRequested(false), fConnectionAttemptComplete(false)
136 {
137  // Silent websocket logging
138  lws_set_log_level(LLL_ERR, NULL);
139  // lws_set_log_level(LLL_DEBUG | LLL_PARSER | LLL_HEADER, NULL);
140 }
141 
143 {
144  Disconnect();
145  NLogDebug("NWsClient destructor finished.");
146 }
147 
148 bool NWsClient::Connect(const std::string & uriString)
149 {
150  int attempt = 0;
151  while (attempt < fMaxRetries) {
152  NLogInfo("NWsClient: Attempting to connect to %s (attempt %d)", uriString.c_str(), attempt + 1);
153  if (fConnected.load()) {
154  NLogError("NWsClient: Already connected.");
155  return true;
156  }
157  if (fLwsContext) {
158  NLogError("NWsClient: Context already exists, disconnect first.");
159  return false;
160  }
161 
162  fShutdownRequested = false;
164  fWsi = nullptr;
165 
166  WS_URI parsedUri;
167  try {
168  parsedUri = ParseUri(uriString);
169  }
170  catch (const std::runtime_error & e) {
171  NLogError("NWsClient: URI parsing error: %s", e.what());
172  return false;
173  }
174 
175  fHost = parsedUri.fHost;
176  fPort = parsedUri.fPort;
177  fPath = parsedUri.fPath;
178 
179  lws_context_creation_info info;
180  memset(&info, 0, sizeof(info));
181 
182  info.port = CONTEXT_PORT_NO_LISTEN;
183  info.protocols = fProtocols;
184  info.gid = -1;
185  info.uid = -1;
186 
187  if (parsedUri.fScheme == "wss") {
188  info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
189  }
190  info.user = this;
191 
192  fLwsContext = lws_create_context(&info);
193  if (!fLwsContext) {
194  NLogError("Failed to create libwebsockets context.");
195  return false;
196  }
197 
198  struct lws_client_connect_info cci;
199  memset(&cci, 0, sizeof(cci));
200  cci.context = fLwsContext;
201  cci.address = fHost.c_str();
202  cci.port = fPort;
203  cci.path = fPath.c_str();
204  cci.host = fHost.c_str();
205  cci.origin = fHost.c_str();
206 
207  cci.protocol = fgProtocolName;
208  cci.userdata = this;
209 
210  if (parsedUri.fScheme == "wss") {
211  cci.ssl_connection = LCCSCF_USE_SSL;
212  }
213  else {
214  cci.ssl_connection = 0;
215  }
216 
217  NLogTrace("NWsClient: Initiating connection to %s (attempt %d)", uriString.c_str(), attempt + 1);
218 
219  fWsi = lws_client_connect_via_info(&cci);
220  if (!fWsi) {
221  NLogError("NWsClient: Failed to connect to WebSocket server at %s (attempt %d)", uriString.c_str(), attempt + 1);
222  lws_context_destroy(fLwsContext);
223  fLwsContext = nullptr;
224  attempt++;
225  if (attempt < fMaxRetries) {
226  std::this_thread::sleep_for(std::chrono::milliseconds(fRetryDelayMs));
227  continue;
228  }
229  return false;
230  }
231  cci.pwsi = &fWsi;
232 
233  fLwsServiceThread = std::thread(&NWsClient::LwsServiceLoop, this);
234 
235  {
236  std::unique_lock<std::mutex> lock(fConnectMutex);
237  fConnectCv.wait(lock, [this]() { return fConnectionAttemptComplete.load() || fShutdownRequested.load(); });
238  }
239  if (!fConnected.load()) {
240  NLogError("NWsClient: Connection to %s failed or shutdown requested (attempt %d).", uriString.c_str(),
241  attempt + 1);
242  Disconnect();
243  attempt++;
244  if (attempt < fMaxRetries) {
245  std::this_thread::sleep_for(std::chrono::milliseconds(fRetryDelayMs));
246  continue;
247  }
248  return false;
249  }
250 
251  return true;
252  }
253  return false;
254 }
255 
257 {
258  NLogTrace("NWsClient: Disconnect requested.");
259  if (fLwsContext) {
260  // Wait until all outgoing messages are sent
261  while (true) {
262  {
263  std::lock_guard<std::mutex> lock(fOutgoingMutex);
264  if (fOutgoingMessageQueue.empty()) break;
265  }
266  std::this_thread::sleep_for(std::chrono::milliseconds(10));
267  }
268 
269  fShutdownRequested = true;
271  fConnectCv.notify_all();
272 
273  NLogDebug("NWsClient: Cancelling LWS service.");
274 
275  lws_cancel_service(fLwsContext);
276 
277  if (fLwsServiceThread.joinable()) {
278  fLwsServiceThread.join();
279  }
280  NLogDebug("NWsClient: LWS service thread joined.");
281 
282  if (fWsi) {
283  fWsi = nullptr;
284  }
285 
286  lws_context_destroy(fLwsContext);
287  fLwsContext = nullptr;
288  fConnected = false;
289 
290  std::queue<std::string> empty_queue;
291  std::lock_guard<std::mutex> lock(fOutgoingMutex);
292  std::swap(fOutgoingMessageQueue, empty_queue);
293 
294  NLogTrace("NWsClient: LWS context destroyed.");
295  }
296  fLwsContext = nullptr;
297  fWsi = nullptr;
298  fConnected = false;
299 }
300 bool NWsClient::Send(const std::string & message)
301 {
302  if (!fConnected.load() || !fWsi) {
303  NLogError("NWsClient: Cannot send, not connected to WebSocket server.");
304  return false;
305  }
306 
307  size_t outgoingQueueSize;
308  {
309  std::lock_guard<std::mutex> lock(fOutgoingMutex);
310  fOutgoingMessageQueue.push(message);
311  outgoingQueueSize = fOutgoingMessageQueue.size();
312  }
313 
314  lws_callback_on_writable(fWsi);
315  NLogTrace("NWsClient: Called lws_callback_on_writable for message. Queue size: %zu", outgoingQueueSize);
316 
317  // // Wait until the message is sent (queue size decreases)
318  // {
319  // std::unique_lock<std::mutex> lock(fOutgoingMutex);
320  // fSendCv.wait(lock, [this, outgoingQueueSize]() {
321  // NLogDebug("NWsClient: Waiting for message to be sent. Current queue size: %zu", fOutgoingMessageQueue.size());
322  // return fOutgoingMessageQueue.size() < outgoingQueueSize || !fConnected.load();
323  // });
324  // }
325 
326  NLogTrace("NWsClient: Message sent successfully.");
327  return fConnected.load();
328 }
329 
331 {
332  NLogTrace("NWsClient: LWS service loop started.");
333  int n = 0;
334  while (!fShutdownRequested.load()) {
335  n = lws_service(fLwsContext, -1);
336  // NLogDebug("NWsClient: lws_service returned %d", n);
337  if (n < 0) {
338  NLogError("NWsClient: lws_service returned error or was cancelled, stopping loop.");
339  fShutdownRequested = true;
340  fConnected = false;
342  fConnectCv.notify_all();
343  break;
344  }
345  // sleep for a short duration to avoid busy-waiting
346  std::this_thread::sleep_for(std::chrono::milliseconds(1));
347  }
348  NLogTrace("NWsClient: LWS service loop stopped.");
349 }
350 
351 WS_URI NWsClient::ParseUri(const std::string & uriString)
352 {
353  WS_URI parsed;
354  parsed.fPort = 0;
355 
356  std::regex uriRegex(R"((ws|wss)://([a-zA-Z0-9\-\.]+)(:([0-9]+))?(/.*)?)");
357  std::smatch matches;
358 
359  if (!std::regex_match(uriString, matches, uriRegex)) {
360  throw std::runtime_error("Invalid WebSocket URI format: " + uriString);
361  }
362 
363  parsed.fScheme = matches[1].str();
364  std::transform(parsed.fScheme.begin(), parsed.fScheme.end(), parsed.fScheme.begin(), ::tolower);
365 
366  parsed.fHost = matches[2].str();
367 
368  if (matches[4].matched) {
369  try {
370  parsed.fPort = std::stoi(matches[4].str());
371  }
372  catch (const std::exception & e) {
373  throw std::runtime_error("Invalid port number in URI: " + matches[4].str());
374  }
375  }
376  else {
377  if (parsed.fScheme == "ws") {
378  parsed.fPort = 80;
379  }
380  else if (parsed.fScheme == "wss") {
381  parsed.fPort = 443;
382  }
383  else {
384  throw std::runtime_error("Unknown scheme for default port: " + parsed.fScheme);
385  }
386  }
387 
388  parsed.fPath = matches[5].str();
389  if (parsed.fPath.empty()) {
390  parsed.fPath = "/";
391  }
392 
393  return parsed;
394 }
395 
396 } // namespace Ndmspc
WebSocket client for asynchronous communication using libwebsockets.
Definition: NWsClient.h:49
void LwsServiceLoop()
Service loop for libwebsockets running in a separate thread.
Definition: NWsClient.cxx:330
NWsClient(int maxRetries=5, int retryDelayMs=1000)
Constructor.
Definition: NWsClient.cxx:133
struct lws * fWsi
WebSocket instance.
Definition: NWsClient.h:108
int fRetryDelayMs
Delay between retries (ms)
Definition: NWsClient.h:111
bool Connect(const std::string &uriString)
Connect to a WebSocket server.
Definition: NWsClient.cxx:148
static constexpr const char * fgProtocolName
Protocol name for websocket communication.
Definition: NWsClient.h:51
int fMaxRetries
Maximum connection retries.
Definition: NWsClient.h:110
int fPort
Port number.
Definition: NWsClient.h:114
void Disconnect()
Disconnect from the WebSocket server.
Definition: NWsClient.cxx:256
static WS_URI ParseUri(const std::string &uriString)
Parse a WebSocket URI string into its components.
Definition: NWsClient.cxx:351
std::atomic< bool > fShutdownRequested
Shutdown flag.
Definition: NWsClient.h:118
static lws_protocols fProtocols[]
Protocols supported by libwebsockets.
Definition: NWsClient.h:128
std::string fPath
Path.
Definition: NWsClient.h:115
std::atomic< bool > fConnectionAttemptComplete
Connection attempt completion flag.
Definition: NWsClient.h:119
std::condition_variable fSendCv
Condition variable for sending messages.
Definition: NWsClient.h:124
bool Send(const std::string &message)
Send a message to the server.
Definition: NWsClient.cxx:300
struct lws_context * fLwsContext
libwebsockets context
Definition: NWsClient.h:107
std::vector< unsigned char > fSendBuffer
Buffer for sending messages.
Definition: NWsClient.h:123
std::atomic< bool > fConnected
Connection status.
Definition: NWsClient.h:117
std::queue< std::string > fOutgoingMessageQueue
Queue of outgoing messages.
Definition: NWsClient.h:121
std::mutex fConnectMutex
Mutex for connection state.
Definition: NWsClient.h:130
std::string fHost
Hostname.
Definition: NWsClient.h:113
std::mutex fOutgoingMutex
Mutex for outgoing queue.
Definition: NWsClient.h:122
std::thread fLwsServiceThread
Thread running the service loop.
Definition: NWsClient.h:109
std::condition_variable fConnectCv
Condition variable for connection.
Definition: NWsClient.h:131
OnMessageCallback fOnMessageCallback
Callback for received messages.
Definition: NWsClient.h:126
~NWsClient()
Destructor. Cleans up resources and disconnects.
Definition: NWsClient.cxx:142
Structure representing a parsed WebSocket URI.
Definition: NWsClient.h:36
std::string fScheme
URI scheme (e.g., "ws", "wss")
Definition: NWsClient.h:37
std::string fPath
Path component.
Definition: NWsClient.h:40
int fPort
Port number.
Definition: NWsClient.h:39
std::string fHost
Hostname or IP address.
Definition: NWsClient.h:38