libpqxx
The C++ client library for PostgreSQL
Loading...
Searching...
No Matches
stream_query.hxx
1/* Definition of the pqxx::internal::stream_query class.
2 *
3 * Enables optimized batch reads from a database table.
4 *
5 * DO NOT INCLUDE THIS FILE DIRECTLY; include pqxx/stream_query instead.
6 *
7 * Copyright (c) 2000-2025, Jeroen T. Vermeulen.
8 *
9 * See COPYING for copyright license. If you did not receive a file called
10 * COPYING with this source code, please notify the distributor of this
11 * mistake, or contact the author.
12 */
13#ifndef PQXX_H_STREAM_QUERY
14#define PQXX_H_STREAM_QUERY
15
16#if !defined(PQXX_HEADER_PRE)
17# error "Include libpqxx headers as <pqxx/header>, not <pqxx/header.hxx>."
18#endif
19
20#include <cassert>
21#include <functional>
22#include <variant>
23
24#include "pqxx/connection.hxx"
25#include "pqxx/except.hxx"
26#include "pqxx/internal/concat.hxx"
27#include "pqxx/internal/encoding_group.hxx"
28#include "pqxx/internal/encodings.hxx"
29#include "pqxx/internal/gates/connection-stream_from.hxx"
30#include "pqxx/internal/stream_iterator.hxx"
31#include "pqxx/separated_list.hxx"
32#include "pqxx/transaction_base.hxx"
33#include "pqxx/transaction_focus.hxx"
34#include "pqxx/util.hxx"
35
36
37namespace pqxx
38{
39class transaction_base;
40} // namespace pqxx
41
42
43namespace pqxx::internal
44{
48
49
50// C++20: Can we use generators, and maybe get speedup from HALO?
52
79template<typename... TYPE> class stream_query : transaction_focus
80{
81public:
82 using line_handle = std::unique_ptr<char, void (*)(void const *)>;
83
85 inline stream_query(transaction_base &tx, std::string_view query);
87 inline stream_query(
88 transaction_base &tx, std::string_view query, params const &);
89
90 stream_query(stream_query &&) = delete;
91 stream_query &operator=(stream_query &&) = delete;
92
93 ~stream_query() noexcept
94 {
95 try
96 {
97 close();
98 }
99 catch (std::exception const &e)
100 {
101 reg_pending_error(e.what());
102 }
103 }
104
106 bool done() const & noexcept { return m_char_finder == nullptr; }
107
109 inline auto begin() &;
111
115 auto end() const & { return stream_query_end_iterator{}; }
116
118 std::tuple<TYPE...> parse_line(zview line) &
119 {
120 assert(not done());
121
122 auto const line_size{std::size(line)};
123
124 // This function uses m_row as a buffer, across calls. The only reason for
125 // it to carry over across calls is to avoid reallocation.
126
127 // Make room for unescaping the line. It's a pessimistic size.
128 // Unusually, we're storing terminating zeroes *inside* the string.
129 // This is the only place where we modify m_row. MAKE SURE THE BUFFER DOES
130 // NOT GET RESIZED while we're working, because we're working with views
131 // into its buffer.
132 m_row.resize(line_size + 1);
133
134 std::size_t offset{0u};
135 char *write{m_row.data()};
136
137 // DO NOT shrink m_row to fit. We're carrying views pointing into the
138 // buffer. (Also, how useful would shrinking really be?)
139
140 // Folding expression: scan and unescape each field, and convert it to its
141 // requested type.
142 std::tuple<TYPE...> data{parse_field<TYPE>(line, offset, write)...};
143
144 assert(offset == line_size + 1u);
145 return data;
146 }
147
149 std::pair<line_handle, std::size_t> read_line() &;
150
151private:
153
156 static inline char_finder_func *get_finder(transaction_base const &tx);
157
159
173 std::tuple<std::size_t, char *, zview>
174 read_field(zview line, std::size_t offset, char *write)
175 {
176#if !defined(NDEBUG)
177 auto const line_size{std::size(line)};
178#endif
179
181
182 char const *lp{std::data(line)};
183
184 // The COPY line now ends in a tab. (We replace the trailing newline with
185 // that to simplify the loop here.)
186 assert(lp[line_size] == '\t');
187 assert(lp[line_size + 1] == '\0');
188
189 if ((lp[offset] == '\\') and (lp[offset + 1] == 'N'))
190 {
191 // Null field. Consume the "\N" and the field separator.
192 offset += 3;
193 assert(offset <= (line_size + 1));
194 assert(lp[offset - 1] == '\t');
195 // Return a null value. There's nothing to write into m_row.
196 return {offset, write, {}};
197 }
198
199 // Beginning of the field text in the row buffer.
200 char const *const field_begin{write};
201
202 // We're relying on several assumptions just for making the main loop
203 // condition work:
204 // * The COPY line ends in a newline.
205 // * Multibyte characters never start with an ASCII-range byte.
206 // * We can index a view beyond its bounds (but within its address space).
207 //
208 // Effectively, the newline acts as a final field separator.
209 while (lp[offset] != '\t')
210 {
211 assert(lp[offset] != '\0');
212
213 // Beginning of the next character of interest (or the end of the line).
214 auto const stop_char{m_char_finder(line, offset)};
215 PQXX_ASSUME(stop_char > offset);
216 assert(stop_char < (line_size + 1));
217
218 // Copy the text we have so far. It's got no special characters in it.
219 std::memcpy(write, &lp[offset], stop_char - offset);
220 write += (stop_char - offset);
222
223 // We're still within the line.
224 char const special{lp[offset]};
225 if (special == '\\')
226 {
227 // Escape sequence.
228 // Consume the backslash.
229 ++offset;
231
232 // The database will only escape ASCII characters, so we assume that
233 // we're dealing with a single-byte character.
234 char const escaped{lp[offset]};
235 assert((escaped >> 7) == 0);
236 ++offset;
238 }
239 else
240 {
241 // Field separator. Fall out of the loop.
242 assert(special == '\t');
243 }
244 }
245
246 // Hit the end of the field.
247 assert(lp[offset] == '\t');
248 *write = '\0';
249 ++write;
250 ++offset;
251 return {offset, write, {field_begin, write - field_begin - 1}};
252 }
253
255
268 template<typename TARGET>
269 TARGET parse_field(zview line, std::size_t &offset, char *&write)
270 {
273
274 assert(offset <= std::size(line));
275
276 auto [new_offset, new_write, text]{read_field(line, offset, write)};
277 PQXX_ASSUME(new_offset > offset);
278 PQXX_ASSUME(new_write >= write);
281 if constexpr (nullity::always_null)
282 {
283 if (std::data(text) != nullptr)
284 throw conversion_error{concat(
285 "Streaming a non-null value into a ", type_name<field_type>,
286 ", which must always be null.")};
287 }
288 else if (std::data(text) == nullptr)
289 {
290 if constexpr (nullity::has_null)
291 return nullity::null();
292 else
294 }
295 else
296 {
297 // Don't ever try to convert a non-null value to nullptr_t!
298 return from_string<field_type>(text);
299 }
300 }
301
303 void close() noexcept
304 {
305 if (not done())
306 {
307 m_char_finder = nullptr;
308 unregister_me();
309 }
310 }
311
313
317 char_finder_func *m_char_finder;
318
320
324 std::string m_row;
325};
326} // namespace pqxx::internal
327#endif
Stream query results from the database. Used by transaction_base::stream.
Definition stream_query.hxx:80
bool done() const &noexcept
Has this stream reached the end of its data?
Definition stream_query.hxx:106
stream_query(transaction_base &tx, std::string_view query)
Execute query on tx, stream results.
Definition stream_query_impl.hxx:12
std::tuple< TYPE... > parse_line(zview line) &
Parse and convert the latest line of data we received.
Definition stream_query.hxx:118
auto begin() &
Begin iterator. Only for use by "range for.".
Definition stream_query_impl.hxx:166
std::pair< line_handle, std::size_t > read_line() &
Read a COPY line from the server.
Definition stream_query_impl.hxx:174
auto end() const &
End iterator. Only for use by "range for.".
Definition stream_query.hxx:115
Base class for things that monopolise a transaction's attention.
Definition transaction_focus.hxx:29
Marker-type wrapper: zero-terminated std::string_view.
Definition zview.hxx:38
Internal items for libpqxx' own use. Do not use these yourself.
Definition encodings.cxx:33
std::string concat(TYPE... item)
Efficiently combine a bunch of items into one big string.
Definition concat.hxx:31
void throw_null_conversion(std::string const &type)
Throw exception for attempt to convert SQL NULL to given type.
Definition strconv.cxx:264
std::size_t(std::string_view haystack, std::size_t start) char_finder_func
Function type: "find first occurrence of specific any of ASCII characters.".
Definition encoding_group.hxx:70
constexpr char unescape_char(char escaped) noexcept
Return original byte for escaped character.
Definition util.hxx:633
The end() iterator for a stream_query.
Definition stream_query.hxx:47
The home of all libpqxx classes, functions, templates, etc.
Definition array.cxx:27
constexpr char array_separator
Element separator between SQL array elements of this type.
Definition strconv.hxx:559