XRootD
Loading...
Searching...
No Matches
XrdTpcState.cc
Go to the documentation of this file.
1
2#include <algorithm>
3#include <sstream>
4#include <stdexcept>
5
6#include "XrdVersion.hh"
9
10#include <curl/curl.h>
11
12#include "XrdTpcState.hh"
13#include "XrdTpcStream.hh"
14
15using namespace TPC;
16
17
19 if (m_headers) {
20 curl_slist_free_all(m_headers);
21 m_headers = NULL;
22 if (m_curl) {curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, m_headers);}
23 }
24}
25
26
27void State::Move(State &other)
28{
29 m_push = other.m_push;
30 m_recv_status_line = other.m_recv_status_line;
31 m_recv_all_headers = other.m_recv_all_headers;
32 m_offset = other.m_offset;
33 m_start_offset = other.m_start_offset;
34 m_status_code = other.m_status_code;
35 m_content_length = other.m_content_length;
36 m_stream = other.m_stream;
37 m_curl = other.m_curl;
38 m_headers = other.m_headers;
39 m_headers_copy = other.m_headers_copy;
40 m_resp_protocol = other.m_resp_protocol;
41 m_is_transfer_state = other.m_is_transfer_state;
42 curl_easy_setopt(m_curl, CURLOPT_HEADERDATA, this);
43 if (m_is_transfer_state) {
44 if (m_push) {
45 curl_easy_setopt(m_curl, CURLOPT_READDATA, this);
46 } else {
47 curl_easy_setopt(m_curl, CURLOPT_WRITEDATA, this);
48 }
49 }
50 tpcForwardCreds = other.tpcForwardCreds;
51 other.m_headers_copy.clear();
52 other.m_curl = NULL;
53 other.m_headers = NULL;
54 other.m_stream = NULL;
55}
56
57
58bool State::InstallHandlers(CURL *curl) {
59 curl_easy_setopt(curl, CURLOPT_USERAGENT, "xrootd-tpc/" XrdVERSION);
60 curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, &State::HeaderCB);
61 curl_easy_setopt(curl, CURLOPT_HEADERDATA, this);
62 if(m_is_transfer_state) {
63 if (m_push) {
64 curl_easy_setopt(curl, CURLOPT_UPLOAD, 1);
65 curl_easy_setopt(curl, CURLOPT_READFUNCTION, &State::ReadCB);
66 curl_easy_setopt(curl, CURLOPT_READDATA, this);
67 struct stat buf;
68 if (SFS_OK == m_stream->Stat(&buf)) {
69 curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, buf.st_size);
70 }
71 } else {
72 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &State::WriteCB);
73 curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
74 }
75 }
76 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
77 if(tpcForwardCreds) {
78 curl_easy_setopt(curl,CURLOPT_UNRESTRICTED_AUTH,1L);
79 }
80
81 // Only use low-speed limits with libcurl v7.38 or later.
82 // Older versions have poor transfer performance, corrected in curl commit cacdc27f.
83 curl_version_info_data *curl_ver = curl_version_info(CURLVERSION_NOW);
84 if (curl_ver->age > 0 && curl_ver->version_num >= 0x072600) {
85 // Require a minimum speed from the transfer: 2 minute average must at least 10KB/s
86 curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 2*60);
87 curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 10*1024);
88 }
89 return true;
90}
91
96 struct curl_slist *list = NULL;
97 for (std::map<std::string, std::string>::const_iterator hdr_iter = req.headers.begin();
98 hdr_iter != req.headers.end();
99 hdr_iter++) {
100 if (!strcasecmp(hdr_iter->first.c_str(),"copy-header")) {
101 list = curl_slist_append(list, hdr_iter->second.c_str());
102 m_headers_copy.emplace_back(hdr_iter->second);
103 }
104 // Note: len("TransferHeader") == 14
105 if (!strncasecmp(hdr_iter->first.c_str(),"transferheader",14)) {
106 std::stringstream ss;
107 ss << hdr_iter->first.substr(14) << ": " << hdr_iter->second;
108 list = curl_slist_append(list, ss.str().c_str());
109 m_headers_copy.emplace_back(ss.str());
110 }
111 }
112 if (list != NULL) {
113 curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, list);
114 m_headers = list;
115 }
116}
117
119 m_offset = 0;
120 m_status_code = -1;
121 m_content_length = -1;
122 m_recv_all_headers = false;
123 m_recv_status_line = false;
124}
125
126size_t State::HeaderCB(char *buffer, size_t size, size_t nitems, void *userdata)
127{
128 State *obj = static_cast<State*>(userdata);
129 std::string header(buffer, size*nitems);
130 return obj->Header(header);
131}
132
133int State::Header(const std::string &header) {
134 //printf("Recieved remote header (%d, %d): %s", m_recv_all_headers, m_recv_status_line, header.c_str());
135 if (m_recv_all_headers) { // This is the second request -- maybe processed a redirect?
136 m_recv_all_headers = false;
137 m_recv_status_line = false;
138 }
139 if (!m_recv_status_line) {
140 std::stringstream ss(header);
141 std::string item;
142 if (!std::getline(ss, item, ' ')) return 0;
143 m_resp_protocol = item;
144 //printf("\n\nResponse protocol: %s\n", m_resp_protocol.c_str());
145 if (!std::getline(ss, item, ' ')) return 0;
146 try {
147 m_status_code = std::stol(item);
148 } catch (...) {
149 return 0;
150 }
151 m_recv_status_line = true;
152 } else if (header.size() == 0 || header == "\n" || header == "\r\n") {
153 m_recv_all_headers = true;
154 }
155 else if (header != "\r\n") {
156 // Parse the header
157 std::size_t found = header.find(":");
158 if (found != std::string::npos) {
159 std::string header_name = header.substr(0, found);
160 std::transform(header_name.begin(), header_name.end(), header_name.begin(), ::tolower);
161 std::string header_value = header.substr(found+1);
162 if (header_name == "content-length")
163 {
164 try {
165 m_content_length = std::stoll(header_value);
166 } catch (...) {
167 // Header unparseable -- not a great sign, fail request.
168 //printf("Content-length header unparseable\n");
169 return 0;
170 }
171 }
172 } else {
173 // Non-empty header that isn't the status line, but no ':' present --
174 // malformed request?
175 //printf("Malformed header: %s\n", header.c_str());
176 return 0;
177 }
178 }
179 return header.size();
180}
181
182size_t State::WriteCB(void *buffer, size_t size, size_t nitems, void *userdata) {
183 State *obj = static_cast<State*>(userdata);
184 if (obj->GetStatusCode() < 0) {
185 return 0;
186 } // malformed request - got body before headers.
187 if (obj->GetStatusCode() >= 400) {
188 obj->m_error_buf += std::string(static_cast<char*>(buffer),
189 std::min(static_cast<size_t>(1024), size*nitems));
190 // Record error messages until we hit a KB; at that point, fail out.
191 if (obj->m_error_buf.size() >= 1024)
192 return 0;
193 else
194 return size*nitems;
195 } // Status indicates failure.
196 return obj->Write(static_cast<char*>(buffer), size*nitems);
197}
198
199ssize_t State::Write(char *buffer, size_t size) {
200 ssize_t retval = m_stream->Write(m_start_offset + m_offset, buffer, size, false);
201 if (retval == SFS_ERROR) {
202 m_error_buf = m_stream->GetErrorMessage();
203 m_error_code = 1;
204 return -1;
205 }
206 m_offset += retval;
207 return retval;
208}
209
211 if (m_push) {
212 return 0;
213 }
214
215 ssize_t retval = m_stream->Write(m_start_offset + m_offset, 0, 0, true);
216 if (retval == SFS_ERROR) {
217 m_error_buf = m_stream->GetErrorMessage();
218 m_error_code = 2;
219 return -1;
220 }
221 m_offset += retval;
222 return retval;
223}
224
225size_t State::ReadCB(void *buffer, size_t size, size_t nitems, void *userdata) {
226 State *obj = static_cast<State*>(userdata);
227 if (obj->GetStatusCode() < 0) {return 0;} // malformed request - got body before headers.
228 if (obj->GetStatusCode() >= 400) {return 0;} // Status indicates failure.
229 return obj->Read(static_cast<char*>(buffer), size*nitems);
230}
231
232int State::Read(char *buffer, size_t size) {
233 int retval = m_stream->Read(m_start_offset + m_offset, buffer, size);
234 if (retval == SFS_ERROR) {
235 return -1;
236 }
237 m_offset += retval;
238 //printf("Read a total of %ld bytes.\n", m_offset);
239 return retval;
240}
241
243 CURL *curl = curl_easy_duphandle(m_curl);
244 if (!curl) {
245 throw std::runtime_error("Failed to duplicate existing curl handle.");
246 }
247
248 State *state = new State(0, *m_stream, curl, m_push, tpcForwardCreds);
249
250 if (m_headers) {
251 state->m_headers_copy.reserve(m_headers_copy.size());
252 for (std::vector<std::string>::const_iterator header_iter = m_headers_copy.begin();
253 header_iter != m_headers_copy.end();
254 header_iter++) {
255 state->m_headers = curl_slist_append(state->m_headers, header_iter->c_str());
256 state->m_headers_copy.push_back(*header_iter);
257 }
258 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, NULL);
259 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, state->m_headers);
260 }
261
262 return state;
263}
264
265void State::SetTransferParameters(off_t offset, size_t size) {
266 m_start_offset = offset;
267 m_offset = 0;
268 m_content_length = size;
269 std::stringstream ss;
270 ss << offset << "-" << (offset+size-1);
271 curl_easy_setopt(m_curl, CURLOPT_RANGE, ss.str().c_str());
272}
273
275{
276 return m_stream->AvailableBuffers();
277}
278
280{
281 m_stream->DumpBuffers();
282}
283
285{
286 if (!m_stream->Finalize()) {
287 m_error_buf = m_stream->GetErrorMessage();
288 m_error_code = 3;
289 return false;
290 }
291 return true;
292}
293
295{
296 // CURLINFO_PRIMARY_PORT is only defined for 7.21.0 or later; on older
297 // library versions, simply omit this information.
298#if LIBCURL_VERSION_NUM >= 0x071500
299 char *curl_ip = NULL;
300 CURLcode rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_IP, &curl_ip);
301 if ((rc != CURLE_OK) || !curl_ip) {
302 return "";
303 }
304 long curl_port = 0;
305 rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_PORT, &curl_port);
306 if ((rc != CURLE_OK) || !curl_port) {
307 return "";
308 }
309 std::stringstream ss;
310 // libcurl returns IPv6 addresses of the form:
311 // 2600:900:6:1301:5054:ff:fe0b:9cba:8000
312 // However the HTTP-TPC spec says to use the form
313 // [2600:900:6:1301:5054:ff:fe0b:9cba]:8000
314 // Hence, we add '[' and ']' whenever a ':' is seen.
315 if (NULL == strchr(curl_ip, ':'))
316 ss << "tcp:" << curl_ip << ":" << curl_port;
317 else
318 ss << "tcp:[" << curl_ip << "]:" << curl_port;
319 return ss.str();
320#else
321 return "";
322#endif
323}
#define stat(a, b)
Definition XrdPosix.hh:96
#define SFS_ERROR
#define SFS_OK
void CURL
State * Duplicate()
void Move(State &other)
int GetStatusCode() const
void DumpBuffers() const
void CopyHeaders(XrdHttpExtReq &req)
void ResetAfterRequest()
void SetTransferParameters(off_t offset, size_t size)
std::string GetConnectionDescription()
bool Finalize()
int AvailableBuffers() const
int Stat(struct stat *)
std::map< std::string, std::string > & headers