uvw 2.12.1
Loading...
Searching...
No Matches
stream.h
1#ifndef UVW_STREAM_INCLUDE_H
2#define UVW_STREAM_INCLUDE_H
3
4#include <algorithm>
5#include <cstddef>
6#include <iterator>
7#include <memory>
8#include <utility>
9#include <uv.h>
10#include "handle.hpp"
11#include "loop.h"
12#include "request.hpp"
13
14namespace uvw {
15
21struct ConnectEvent {};
22
28struct EndEvent {};
29
35struct ListenEvent {};
36
42struct ShutdownEvent {};
43
49struct WriteEvent {};
50
56struct DataEvent {
57 explicit DataEvent(std::unique_ptr<char[]> buf, std::size_t len) noexcept;
58
59 std::unique_ptr<char[]> data;
60 std::size_t length;
61};
62
63namespace details {
64
65struct ConnectReq final: public Request<ConnectReq, uv_connect_t> {
66 using Request::Request;
67
68 template<typename F, typename... Args>
69 void connect(F &&f, Args &&...args) {
70 invoke(std::forward<F>(f), get(), std::forward<Args>(args)..., &defaultCallback<ConnectEvent>);
71 }
72};
73
74struct ShutdownReq final: public Request<ShutdownReq, uv_shutdown_t> {
75 using Request::Request;
76
77 void shutdown(uv_stream_t *handle);
78};
79
80template<typename Deleter>
81class WriteReq final: public Request<WriteReq<Deleter>, uv_write_t> {
82 using ConstructorAccess = typename Request<WriteReq<Deleter>, uv_write_t>::ConstructorAccess;
83
84public:
85 WriteReq(ConstructorAccess ca, std::shared_ptr<Loop> loop, std::unique_ptr<char[], Deleter> dt, unsigned int len)
86 : Request<WriteReq<Deleter>, uv_write_t>{ca, std::move(loop)},
87 data{std::move(dt)},
88 buf{uv_buf_init(data.get(), len)} {}
89
90 void write(uv_stream_t *handle) {
91 this->invoke(&uv_write, this->get(), handle, &buf, 1, &this->template defaultCallback<WriteEvent>);
92 }
93
94 void write(uv_stream_t *handle, uv_stream_t *send) {
95 this->invoke(&uv_write2, this->get(), handle, &buf, 1, send, &this->template defaultCallback<WriteEvent>);
96 }
97
98private:
99 std::unique_ptr<char[], Deleter> data;
100 uv_buf_t buf;
101};
102
103} // namespace details
104
112template<typename T, typename U>
113class StreamHandle: public Handle<T, U> {
114 static constexpr unsigned int DEFAULT_BACKLOG = 128;
115
116 static void readCallback(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) {
117 T &ref = *(static_cast<T *>(handle->data));
118 // data will be destroyed no matter of what the value of nread is
119 std::unique_ptr<char[]> data{buf->base};
120
121 // nread == 0 is ignored (see http://docs.libuv.org/en/v1.x/stream.html)
122 // equivalent to EAGAIN/EWOULDBLOCK, it shouldn't be treated as an error
123 // for we don't have data to emit though, it's fine to suppress it
124
125 if(nread == UV_EOF) {
126 // end of stream
127 ref.publish(EndEvent{});
128 } else if(nread > 0) {
129 // data available
130 ref.publish(DataEvent{std::move(data), static_cast<std::size_t>(nread)});
131 } else if(nread < 0) {
132 // transmission error
133 ref.publish(ErrorEvent(nread));
134 }
135 }
136
137 static void listenCallback(uv_stream_t *handle, int status) {
138 if(T &ref = *(static_cast<T *>(handle->data)); status) {
139 ref.publish(ErrorEvent{status});
140 } else {
141 ref.publish(ListenEvent{});
142 }
143 }
144
145public:
146#ifdef _MSC_VER
147 StreamHandle(typename Handle<T, U>::ConstructorAccess ca, std::shared_ptr<Loop> ref)
148 : Handle<T, U>{ca, std::move(ref)} {}
149#else
150 using Handle<T, U>::Handle;
151#endif
152
160 void shutdown() {
161 auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
162 ptr->publish(event);
163 };
164
165 auto shutdown = this->loop().template resource<details::ShutdownReq>();
166 shutdown->template once<ErrorEvent>(listener);
167 shutdown->template once<ShutdownEvent>(listener);
168 shutdown->shutdown(this->template get<uv_stream_t>());
169 }
170
181 void listen(int backlog = DEFAULT_BACKLOG) {
182 this->invoke(&uv_listen, this->template get<uv_stream_t>(), backlog, &listenCallback);
183 }
184
204 template<typename S>
205 void accept(S &ref) {
206 this->invoke(&uv_accept, this->template get<uv_stream_t>(), this->template get<uv_stream_t>(ref));
207 }
208
216 void read() {
217 this->invoke(&uv_read_start, this->template get<uv_stream_t>(), &this->allocCallback, &readCallback);
218 }
219
225 void stop() {
226 this->invoke(&uv_read_stop, this->template get<uv_stream_t>());
227 }
228
241 template<typename Deleter>
242 void write(std::unique_ptr<char[], Deleter> data, unsigned int len) {
243 auto req = this->loop().template resource<details::WriteReq<Deleter>>(std::move(data), len);
244 auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
245 ptr->publish(event);
246 };
247
248 req->template once<ErrorEvent>(listener);
249 req->template once<WriteEvent>(listener);
250 req->write(this->template get<uv_stream_t>());
251 }
252
265 void write(char *data, unsigned int len) {
266 auto req = this->loop().template resource<details::WriteReq<void (*)(char *)>>(std::unique_ptr<char[], void (*)(char *)>{data, [](char *) {}}, len);
267 auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
268 ptr->publish(event);
269 };
270
271 req->template once<ErrorEvent>(listener);
272 req->template once<WriteEvent>(listener);
273 req->write(this->template get<uv_stream_t>());
274 }
275
295 template<typename S, typename Deleter>
296 void write(S &send, std::unique_ptr<char[], Deleter> data, unsigned int len) {
297 auto req = this->loop().template resource<details::WriteReq<Deleter>>(std::move(data), len);
298 auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
299 ptr->publish(event);
300 };
301
302 req->template once<ErrorEvent>(listener);
303 req->template once<WriteEvent>(listener);
304 req->write(this->template get<uv_stream_t>(), this->template get<uv_stream_t>(send));
305 }
306
326 template<typename S>
327 void write(S &send, char *data, unsigned int len) {
328 auto req = this->loop().template resource<details::WriteReq<void (*)(char *)>>(std::unique_ptr<char[], void (*)(char *)>{data, [](char *) {}}, len);
329 auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
330 ptr->publish(event);
331 };
332
333 req->template once<ErrorEvent>(listener);
334 req->template once<WriteEvent>(listener);
335 req->write(this->template get<uv_stream_t>(), this->template get<uv_stream_t>(send));
336 }
337
349 int tryWrite(std::unique_ptr<char[]> data, unsigned int len) {
350 uv_buf_t bufs[] = {uv_buf_init(data.get(), len)};
351 auto bw = uv_try_write(this->template get<uv_stream_t>(), bufs, 1);
352
353 if(bw < 0) {
354 this->publish(ErrorEvent{bw});
355 bw = 0;
356 }
357
358 return bw;
359 }
360
372 template<typename V, typename W>
373 int tryWrite(std::unique_ptr<char[]> data, unsigned int len, StreamHandle<V, W> &send) {
374 uv_buf_t bufs[] = {uv_buf_init(data.get(), len)};
375 auto bw = uv_try_write2(this->template get<uv_stream_t>(), bufs, 1, send.raw());
376
377 if(bw < 0) {
378 this->publish(ErrorEvent{bw});
379 bw = 0;
380 }
381
382 return bw;
383 }
384
396 int tryWrite(char *data, unsigned int len) {
397 uv_buf_t bufs[] = {uv_buf_init(data, len)};
398 auto bw = uv_try_write(this->template get<uv_stream_t>(), bufs, 1);
399
400 if(bw < 0) {
401 this->publish(ErrorEvent{bw});
402 bw = 0;
403 }
404
405 return bw;
406 }
407
419 template<typename V, typename W>
420 int tryWrite(char *data, unsigned int len, StreamHandle<V, W> &send) {
421 uv_buf_t bufs[] = {uv_buf_init(data, len)};
422 auto bw = uv_try_write2(this->template get<uv_stream_t>(), bufs, 1, send.raw());
423
424 if(bw < 0) {
425 this->publish(ErrorEvent{bw});
426 bw = 0;
427 }
428
429 return bw;
430 }
431
436 bool readable() const noexcept {
437 return (uv_is_readable(this->template get<uv_stream_t>()) == 1);
438 }
439
444 bool writable() const noexcept {
445 return (uv_is_writable(this->template get<uv_stream_t>()) == 1);
446 }
447
463 bool blocking(bool enable = false) {
464 return (0 == uv_stream_set_blocking(this->template get<uv_stream_t>(), enable));
465 }
466
471 size_t writeQueueSize() const noexcept {
472 return uv_stream_get_write_queue_size(this->template get<uv_stream_t>());
473 }
474};
475
476} // namespace uvw
477
478#ifndef UVW_AS_LIB
479# include "stream.cpp"
480#endif
481
482#endif // UVW_STREAM_INCLUDE_H
Connection< E > once(Listener< E > f)
Registers a short-lived listener with the event emitter.
Definition emitter.h:240
Handle base class.
Definition handle.hpp:26
Request base class.
Definition request.hpp:18
std::shared_ptr< R > data() const
Gets user-defined data. uvw won't use this field in any case.
Definition resource.hpp:48
The StreamHandle handle.
Definition stream.h:113
void write(S &send, char *data, unsigned int len)
Extended write function for sending handles over a pipe handle.
Definition stream.h:327
void write(S &send, std::unique_ptr< char[], Deleter > data, unsigned int len)
Extended write function for sending handles over a pipe handle.
Definition stream.h:296
void read()
Starts reading data from an incoming stream.
Definition stream.h:216
void shutdown()
Shutdowns the outgoing (write) side of a duplex stream.
Definition stream.h:160
int tryWrite(std::unique_ptr< char[]> data, unsigned int len)
Queues a write request if it can be completed immediately.
Definition stream.h:349
bool writable() const noexcept
Checks if the stream is writable.
Definition stream.h:444
bool blocking(bool enable=false)
Enables or disables blocking mode for a stream.
Definition stream.h:463
int tryWrite(char *data, unsigned int len, StreamHandle< V, W > &send)
Queues a write request if it can be completed immediately.
Definition stream.h:420
size_t writeQueueSize() const noexcept
Gets the amount of queued bytes waiting to be sent.
Definition stream.h:471
int tryWrite(char *data, unsigned int len)
Queues a write request if it can be completed immediately.
Definition stream.h:396
bool readable() const noexcept
Checks if the stream is readable.
Definition stream.h:436
void stop()
Stops reading data from the stream.
Definition stream.h:225
void write(char *data, unsigned int len)
Writes data to the stream.
Definition stream.h:265
void listen(int backlog=DEFAULT_BACKLOG)
Starts listening for incoming connections.
Definition stream.h:181
void accept(S &ref)
Accepts incoming connections.
Definition stream.h:205
int tryWrite(std::unique_ptr< char[]> data, unsigned int len, StreamHandle< V, W > &send)
Queues a write request if it can be completed immediately.
Definition stream.h:373
void write(std::unique_ptr< char[], Deleter > data, unsigned int len)
Writes data to the stream.
Definition stream.h:242
const U * raw() const noexcept
Gets the underlying raw data structure.
uvw default namespace.
Definition async.h:8
ConnectEvent event.
Definition stream.h:21
DataEvent event.
Definition stream.h:56
std::unique_ptr< char[]> data
Definition stream.h:59
std::size_t length
Definition stream.h:60
EndEvent event.
Definition stream.h:28
The ErrorEvent event.
Definition emitter.h:23
ListenEvent event.
Definition stream.h:35
ShutdownEvent event.
Definition stream.h:42
WriteEvent event.
Definition stream.h:49