XRootD
Loading...
Searching...
No Matches
XrdTpcMultistream.cc
Go to the documentation of this file.
1
4
5#include "XrdTpcTPC.hh"
6#include "XrdTpcState.hh"
7#include "XrdTpcCurlMulti.hh"
8
10
11#include <curl/curl.h>
12
13#include <algorithm>
14#include <sstream>
15#include <stdexcept>
16
17
18using namespace TPC;
19
20class CurlHandlerSetupError : public std::runtime_error {
21public:
22 CurlHandlerSetupError(const std::string &msg) :
23 std::runtime_error(msg)
24 {}
25
26 virtual ~CurlHandlerSetupError() throw () {}
27};
28
29namespace {
30class MultiCurlHandler {
31public:
32 MultiCurlHandler(std::vector<State*> &states, XrdSysError &log) :
33 m_handle(curl_multi_init()),
34 m_states(states),
35 m_log(log),
36 m_bytes_transferred(0),
37 m_error_code(0),
38 m_status_code(0)
39 {
40 if (m_handle == NULL) {
41 throw CurlHandlerSetupError("Failed to initialize a libcurl multi-handle");
42 }
43 m_avail_handles.reserve(states.size());
44 m_active_handles.reserve(states.size());
45 for (std::vector<State*>::const_iterator state_iter = states.begin();
46 state_iter != states.end();
47 state_iter++) {
48 m_avail_handles.push_back((*state_iter)->GetHandle());
49 }
50 }
51
52 ~MultiCurlHandler()
53 {
54 if (!m_handle) {return;}
55 for (std::vector<CURL *>::const_iterator it = m_active_handles.begin();
56 it != m_active_handles.end();
57 it++) {
58 curl_multi_remove_handle(m_handle, *it);
59 }
60 curl_multi_cleanup(m_handle);
61 }
62
63 MultiCurlHandler(const MultiCurlHandler &) = delete;
64
65 CURLM *Get() const {return m_handle;}
66
67 void FinishCurlXfer(CURL *curl) {
68 CURLMcode mres = curl_multi_remove_handle(m_handle, curl);
69 if (mres) {
70 std::stringstream ss;
71 ss << "Failed to remove transfer from set: "
72 << curl_multi_strerror(mres);
73 throw std::runtime_error(ss.str());
74 }
75 for (std::vector<State*>::iterator state_iter = m_states.begin();
76 state_iter != m_states.end();
77 state_iter++) {
78 if (curl == (*state_iter)->GetHandle()) {
79 m_bytes_transferred += (*state_iter)->BytesTransferred();
80 int error_code = (*state_iter)->GetErrorCode();
81 if (error_code && !m_error_code) {
82 m_error_code = error_code;
83 m_error_message = (*state_iter)->GetErrorMessage();
84 }
85 int status_code = (*state_iter)->GetStatusCode();
86 if (status_code >= 400 && !m_status_code) {
87 m_status_code = status_code;
88 m_error_message = (*state_iter)->GetErrorMessage();
89 }
90 (*state_iter)->ResetAfterRequest();
91 break;
92 }
93 }
94 for (std::vector<CURL *>::iterator iter = m_active_handles.begin();
95 iter != m_active_handles.end();
96 ++iter)
97 {
98 if (*iter == curl) {
99 m_active_handles.erase(iter);
100 break;
101 }
102 }
103 m_avail_handles.push_back(curl);
104 }
105
106 off_t StartTransfers(off_t current_offset, off_t content_length, size_t block_size,
107 int &running_handles) {
108 bool started_new_xfer = false;
109 do {
110 size_t xfer_size = std::min(content_length - current_offset, static_cast<off_t>(block_size));
111 if (xfer_size == 0) {return current_offset;}
112 if (!(started_new_xfer = StartTransfer(current_offset, xfer_size))) {
113 // In this case, we need to start new transfers but weren't able to.
114 if (running_handles == 0) {
115 if (!CanStartTransfer(true)) {
116 m_log.Emsg("StartTransfers", "Unable to start transfers.");
117 }
118 }
119 break;
120 } else {
121 running_handles += 1;
122 }
123 current_offset += xfer_size;
124 } while (true);
125 return current_offset;
126 }
127
128 int Flush() {
129 int last_error = 0;
130 for (std::vector<State*>::iterator state_it = m_states.begin();
131 state_it != m_states.end();
132 state_it++)
133 {
134 int error = (*state_it)->Flush();
135 if (error) {last_error = error;}
136 }
137 return last_error;
138 }
139
140 off_t BytesTransferred() const {
141 return m_bytes_transferred;
142 }
143
144 int GetStatusCode() const {
145 return m_status_code;
146 }
147
148 int GetErrorCode() const {
149 return m_error_code;
150 }
151
152 void SetErrorCode(int error_code) {
153 m_error_code = error_code;
154 }
155
156 std::string GetErrorMessage() const {
157 return m_error_message;
158 }
159
160 void SetErrorMessage(const std::string &error_msg) {
161 m_error_message = error_msg;
162 }
163
164private:
165
166 bool StartTransfer(off_t offset, size_t size) {
167 if (!CanStartTransfer(false)) {return false;}
168 for (std::vector<CURL*>::const_iterator handle_it = m_avail_handles.begin();
169 handle_it != m_avail_handles.end();
170 handle_it++) {
171 for (std::vector<State*>::iterator state_it = m_states.begin();
172 state_it != m_states.end();
173 state_it++) {
174 if ((*state_it)->GetHandle() == *handle_it) { // This state object represents an idle handle.
175 (*state_it)->SetTransferParameters(offset, size);
176 ActivateHandle(**state_it);
177 return true;
178 }
179 }
180 }
181 return false;
182 }
183
184 void ActivateHandle(State &state) {
185 CURL *curl = state.GetHandle();
186 m_active_handles.push_back(curl);
187 CURLMcode mres;
188 mres = curl_multi_add_handle(m_handle, curl);
189 if (mres) {
190 std::stringstream ss;
191 ss << "Failed to add transfer to libcurl multi-handle"
192 << curl_multi_strerror(mres);
193 throw std::runtime_error(ss.str());
194 }
195 for (auto iter = m_avail_handles.begin();
196 iter != m_avail_handles.end();
197 ++iter)
198 {
199 if (*iter == curl) {
200 m_avail_handles.erase(iter);
201 break;
202 }
203 }
204 }
205
206 bool CanStartTransfer(bool log_reason) const {
207 size_t idle_handles = m_avail_handles.size();
208 size_t transfer_in_progress = 0;
209 for (std::vector<State*>::const_iterator state_iter = m_states.begin();
210 state_iter != m_states.end();
211 state_iter++) {
212 for (std::vector<CURL*>::const_iterator handle_iter = m_active_handles.begin();
213 handle_iter != m_active_handles.end();
214 handle_iter++) {
215 if (*handle_iter == (*state_iter)->GetHandle()) {
216 transfer_in_progress += (*state_iter)->BodyTransferInProgress();
217 break;
218 }
219 }
220 }
221 if (!idle_handles) {
222 if (log_reason) {
223 m_log.Emsg("CanStartTransfer", "Unable to start transfers as no idle CURL handles are available.");
224 }
225 return false;
226 }
227 ssize_t available_buffers = m_states[0]->AvailableBuffers();
228 // To be conservative, set aside buffers for any transfers that have been activated
229 // but don't have their first responses back yet.
230 available_buffers -= (m_active_handles.size() - transfer_in_progress);
231 if (log_reason && (available_buffers == 0)) {
232 std::stringstream ss;
233 ss << "Unable to start transfers as no buffers are available. Available buffers: " <<
234 m_states[0]->AvailableBuffers() << ", Active curl handles: " << m_active_handles.size()
235 << ", Transfers in progress: " << transfer_in_progress;
236 m_log.Emsg("CanStartTransfer", ss.str().c_str());
237 if (m_states[0]->AvailableBuffers() == 0) {
238 m_states[0]->DumpBuffers();
239 }
240 }
241 return available_buffers > 0;
242 }
243
244 CURLM *m_handle;
245 std::vector<CURL *> m_avail_handles;
246 std::vector<CURL *> m_active_handles;
247 std::vector<State*> &m_states;
248 XrdSysError &m_log;
249 off_t m_bytes_transferred;
250 int m_error_code;
251 int m_status_code;
252 std::string m_error_message;
253};
254}
255
256
257int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
258 size_t streams, std::vector<State*> &handles,
259 std::vector<ManagedCurlHandle> &curl_handles, TPCLogRecord &rec)
260{
261 bool success;
262 // The content-length was set thanks to the call to GetContentLengthTPCPull() before calling this function
263 off_t content_size = state.GetContentLength();
264 off_t current_offset = 0;
265
266 size_t concurrency = streams * m_pipelining_multiplier;
267
268 handles.reserve(concurrency);
269 handles.push_back(new State());
270 handles[0]->Move(state);
271 for (size_t idx = 1; idx < concurrency; idx++) {
272 handles.push_back(handles[0]->Duplicate());
273 curl_handles.emplace_back(handles.back()->GetHandle());
274 }
275
276 // Notify the packet marking manager that the transfer will start after this point
277 rec.pmarkManager.startTransfer();
278
279 // Create the multi-handle and add in the current transfer to it.
280 MultiCurlHandler mch(handles, m_log);
281 CURLM *multi_handle = mch.Get();
282
283#ifdef USE_PIPELINING
284 curl_multi_setopt(multi_handle, CURLMOPT_PIPELINING, 1);
285 curl_multi_setopt(multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS, streams);
286#endif
287
288 // Start response to client prior to the first call to curl_multi_perform
289 int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
290 if (retval) {
291 logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
292 "Failed to send the initial response to the TPC client");
293 return retval;
294 } else {
295 logTransferEvent(LogMask::Debug, rec, "RESPONSE_START",
296 "Initial transfer response sent to the TPC client");
297 }
298
299 // Start assigning transfers
300 int running_handles = 0;
301 current_offset = mch.StartTransfers(current_offset, content_size, m_block_size, running_handles);
302
303 // Transfer loop: use curl to actually run the transfer, but periodically
304 // interrupt things to send back performance updates to the client.
305 time_t last_marker = 0;
306 // Track the time since the transfer last made progress
307 off_t last_advance_bytes = 0;
308 time_t last_advance_time = time(NULL);
309 time_t transfer_start = last_advance_time;
310 CURLcode res = static_cast<CURLcode>(-1);
311 CURLMcode mres = CURLM_OK;
312 do {
313 time_t now = time(NULL);
314 time_t next_marker = last_marker + m_marker_period;
315 if (now >= next_marker) {
316 if (current_offset > last_advance_bytes) {
317 last_advance_bytes = current_offset;
318 last_advance_time = now;
319 }
320 if (SendPerfMarker(req, rec, handles, current_offset)) {
321 logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL",
322 "Failed to send a perf marker to the TPC client");
323 return -1;
324 }
325 int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
326 if (now > last_advance_time + timeout) {
327 const char *log_prefix = rec.log_prefix.c_str();
328 bool tpc_pull = strncmp("Pull", log_prefix, 4) == 0;
329
330 mch.SetErrorCode(10);
331 std::stringstream ss;
332 ss << "Transfer failed because no bytes have been "
333 << (tpc_pull ? "received from the source (pull mode) in "
334 : "transmitted to the destination (push mode) in ") << timeout << " seconds.";
335 mch.SetErrorMessage(ss.str());
336 break;
337 }
338 last_marker = now;
339 }
340
341 mres = curl_multi_perform(multi_handle, &running_handles);
342 if (mres == CURLM_CALL_MULTI_PERFORM) {
343 // curl_multi_perform should be called again immediately. On newer
344 // versions of curl, this is no longer used.
345 continue;
346 } else if (mres != CURLM_OK) {
347 break;
348 }
349
350 rec.pmarkManager.beginPMarks();
351
352
353 // Harvest any messages, looking for CURLMSG_DONE.
354 CURLMsg *msg;
355 do {
356 int msgq = 0;
357 msg = curl_multi_info_read(multi_handle, &msgq);
358 if (msg && (msg->msg == CURLMSG_DONE)) {
359 CURL *easy_handle = msg->easy_handle;
360 res = msg->data.result;
361 mch.FinishCurlXfer(easy_handle);
362 // If any requests fail, cut off the entire transfer.
363 if (res != CURLE_OK) {
364 break;
365 }
366 }
367 } while (msg);
368 if (res != static_cast<CURLcode>(-1) && res != CURLE_OK) {
369 std::stringstream ss;
370 ss << "Breaking loop due to failed curl transfer: " << curl_easy_strerror(res);
371 logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_CURL_FAILURE",
372 ss.str());
373 break;
374 }
375
376 if (running_handles < static_cast<int>(concurrency)) {
377 // Issue new transfers if there is still pending work to do.
378 // Otherwise, continue running until there are no handles left.
379 if (current_offset != content_size) {
380 current_offset = mch.StartTransfers(current_offset, content_size,
381 m_block_size, running_handles);
382 if (!running_handles) {
383 std::stringstream ss;
384 ss << "No handles are able to run. Streams=" << streams << ", concurrency="
385 << concurrency;
386
387 logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE", ss.str());
388 }
389 } else if (running_handles == 0) {
390 logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE",
391 "Unable to start new transfers; breaking loop.");
392 break;
393 }
394 }
395
396 int64_t max_sleep_time = next_marker - time(NULL);
397 if (max_sleep_time <= 0) {
398 continue;
399 }
400 int fd_count;
401#ifdef HAVE_CURL_MULTI_WAIT
402 mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000,
403 &fd_count);
404#else
405 mres = curl_multi_wait_impl(multi_handle, max_sleep_time*1000,
406 &fd_count);
407#endif
408 if (mres != CURLM_OK) {
409 break;
410 }
411 } while (running_handles);
412
413 if (mres != CURLM_OK) {
414 std::stringstream ss;
415 ss << "Internal libcurl multi-handle error: "
416 << curl_multi_strerror(mres);
417 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", ss.str());
418 throw std::runtime_error(ss.str());
419 }
420
421 // Harvest any messages, looking for CURLMSG_DONE.
422 CURLMsg *msg;
423 do {
424 int msgq = 0;
425 msg = curl_multi_info_read(multi_handle, &msgq);
426 if (msg && (msg->msg == CURLMSG_DONE)) {
427 CURL *easy_handle = msg->easy_handle;
428 mch.FinishCurlXfer(easy_handle);
429 if (res == CURLE_OK || res == static_cast<CURLcode>(-1))
430 res = msg->data.result; // Transfer result will be examined below.
431 }
432 } while (msg);
433
434 if (!state.GetErrorCode() && res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
435 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR",
436 "Internal state error in libcurl");
437 throw std::runtime_error("Internal state error in libcurl");
438 }
439
440 mch.Flush();
441
442 rec.bytes_transferred = mch.BytesTransferred();
443 rec.tpc_status = mch.GetStatusCode();
444
445 // Generate the final response back to the client.
446 std::stringstream ss;
447 success = false;
448 if (mch.GetStatusCode() >= 400) {
449 std::string err = mch.GetErrorMessage();
450 std::stringstream ss2;
451 ss2 << "Remote side failed with status code " << mch.GetStatusCode();
452 if (!err.empty()) {
453 std::replace(err.begin(), err.end(), '\n', ' ');
454 ss2 << "; error message: \"" << err << "\"";
455 }
456 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss.str());
457 ss << generateClientErr(ss2, rec);
458 } else if (mch.GetErrorCode()) {
459 std::string err = mch.GetErrorMessage();
460 if (err.empty()) {err = "(no error message provided)";}
461 else {std::replace(err.begin(), err.end(), '\n', ' ');}
462 std::stringstream ss2;
463 ss2 << "Error when interacting with local filesystem: " << err;
464 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss2.str());
465 ss << generateClientErr(ss2, rec);
466 } else if (res != CURLE_OK) {
467 std::stringstream ss2;
468 ss2 << "Request failed when processing";
469 std::stringstream ss3;
470 ss3 << ss2.str() << ":" << curl_easy_strerror(res);
471 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss3.str());
472 ss << generateClientErr(ss2, rec, res);
473 } else if (current_offset != content_size) {
474 std::stringstream ss2;
475 ss2 << "Internal logic error led to early abort; current offset is " <<
476 current_offset << " while full size is " << content_size;
477 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss2.str());
478 ss << generateClientErr(ss2, rec);
479 } else {
480 if (!handles[0]->Finalize()) {
481 std::stringstream ss2;
482 ss2 << "Failed to finalize and close file handle.";
483 ss << generateClientErr(ss2, rec);
484 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR",
485 ss2.str());
486 } else {
487 ss << "success: Created";
488 success = true;
489 }
490 }
491
492 if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
493 logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR",
494 "Failed to send last update to remote client");
495 return retval;
496 } else if (success) {
497 logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS");
498 rec.status = 0;
499 }
500 return req.ChunkResp(NULL, 0);
501}
502
503
504int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state,
505 size_t streams, TPCLogRecord &rec)
506{
507 std::vector<ManagedCurlHandle> curl_handles;
508 std::vector<State*> handles;
509 std::stringstream err_ss;
510 try {
511 int retval = RunCurlWithStreamsImpl(req, state, streams, handles, curl_handles, rec);
512 for (std::vector<State*>::iterator state_iter = handles.begin();
513 state_iter != handles.end();
514 state_iter++) {
515 delete *state_iter;
516 }
517 return retval;
518 } catch (CurlHandlerSetupError &e) {
519 for (std::vector<State*>::iterator state_iter = handles.begin();
520 state_iter != handles.end();
521 state_iter++) {
522 delete *state_iter;
523 }
524
525 rec.status = 500;
526 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what());
527 std::stringstream ss;
528 ss << e.what();
529 err_ss << generateClientErr(ss, rec);
530 return req.SendSimpleResp(rec.status, NULL, NULL, e.what(), 0);
531 } catch (std::runtime_error &e) {
532 for (std::vector<State*>::iterator state_iter = handles.begin();
533 state_iter != handles.end();
534 state_iter++) {
535 delete *state_iter;
536 }
537
538 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what());
539 std::stringstream ss;
540 ss << e.what();
541 err_ss << generateClientErr(ss, rec);
542 int retval;
543 if ((retval = req.ChunkResp(err_ss.str().c_str(), 0))) {
544 return retval;
545 }
546 return req.ChunkResp(NULL, 0);
547 }
548}
#define Duplicate(x, y)
CURLMcode curl_multi_wait_impl(CURLM *multi_handle, int timeout_ms, int *numfds)
void CURL
CurlHandlerSetupError(const std::string &msg)
CURL * GetHandle() const
int GetErrorCode() const
off_t GetContentLength() const
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.
@ Info
Definition XrdTpcTPC.hh:29
@ Error
Definition XrdTpcTPC.hh:31
@ Debug
Definition XrdTpcTPC.hh:28