libpqxx
The C++ client library for PostgreSQL
Loading...
Searching...
No Matches
stream_query_impl.hxx
1/* Code for parts of pqxx::internal::stream_query.
2 *
3 * These definitions needs to be in a separate file in order to iron out
4 * circular dependencies between headers.
5 */
6#if !defined(PQXX_H_STREAM_QUERY_IMPL)
7# define PQXX_H_STREAM_QUERY_IMPL
8
9namespace pqxx::internal
10{
11template<typename... TYPE>
13 transaction_base &tx, std::string_view query) :
14 transaction_focus{tx, "stream_query"}, m_char_finder{get_finder(tx)}
15{
16 auto const r{tx.exec0(internal::concat("COPY (", query, ") TO STDOUT"))};
17 if (r.columns() != sizeof...(TYPE))
18 throw usage_error{concat(
19 "Parsing query stream with wrong number of columns: "
20 "code expects ",
21 sizeof...(TYPE), " but query returns ", r.columns(), ".")};
22 register_me();
23}
24
25
26template<typename... TYPE>
27inline char_finder_func *
29{
30 auto const group{enc_group(tx.conn().encoding_id())};
31 return get_s_char_finder<'\t', '\\'>(group);
32}
33
34
35// C++20: Replace with generator? Could be faster (local vars vs. members).
37
40template<typename... TYPE> class stream_query_input_iterator
41{
42 using stream_t = stream_query<TYPE...>;
43
44public:
45 using value_type = std::tuple<TYPE...>;
46 using difference_type = long;
47
49 m_home(&home),
50 m_line{typename stream_query<TYPE...>::line_handle(
52 {
53 consume_line();
54 }
57
60 {
61 assert(not done());
62 consume_line();
63 return *this;
64 }
65
67
70 {
71 ++*this;
72 return {};
73 }
74
76 value_type operator*() const
77 {
78 return m_home->parse_line(zview{m_line.get(), m_line_size});
79 }
80
82 bool operator==(stream_query_end_iterator) const noexcept { return done(); }
85 {
86 return not done();
87 }
88
90 operator=(stream_query_input_iterator &&rhs) noexcept
91 {
92 if (&rhs != this)
93 {
94 m_line = std::move(rhs.m_line);
95 m_home = rhs.m_home;
96 m_line_size = rhs.m_line_size;
97 }
98 return *this;
99 }
100
101private:
102 stream_query_input_iterator() {}
103
105 bool done() const noexcept { return m_home->done(); }
106
108
111 void consume_line() &
112 {
113 auto [line, size]{m_home->read_line()};
114 m_line = std::move(line);
115 m_line_size = size;
116 if (m_line)
117 {
118 // We know how many fields to expect. Replace the newline at the end
119 // with the field separator, so the parsing loop only needs to scan for a
120 // tab, not a tab or a newline.
121 char *const ptr{m_line.get()};
122 assert(ptr[size] == '\n');
123 ptr[size] = '\t';
124 }
125 }
126
127 stream_t *m_home;
128
130 typename stream_t::line_handle m_line;
131
133 std::size_t m_line_size;
134};
135
136
137template<typename... TYPE>
138inline bool operator==(
139 stream_query_end_iterator, stream_query_input_iterator<TYPE...> const &i)
140{
141 return i.done();
142}
143
144
145template<typename... TYPE>
146inline bool operator!=(
147 stream_query_end_iterator, stream_query_input_iterator<TYPE...> const &i)
148{
149 return not i.done();
150}
151
152
153template<typename... TYPE> inline auto stream_query<TYPE...>::begin() &
154{
155 return stream_query_input_iterator<TYPE...>{*this};
156}
157
158
159template<typename... TYPE>
160inline std::pair<typename stream_query<TYPE...>::line_handle, std::size_t>
162{
163 assert(not done());
164
165 internal::gate::connection_stream_from gate{m_trans->conn()};
166 try
167 {
168 auto line{gate.read_copy_line()};
169 // Check for completion.
170 if (not line.first)
171 PQXX_UNLIKELY close();
172 return line;
173 }
174 catch (std::exception const &)
175 {
176 close();
177 throw;
178 }
179}
180} // namespace pqxx::internal
181#endif
int encoding_id() const
Get the connection's encoding, as a PostgreSQL-defined code.
Definition connection.cxx:1112
Input iterator for stream_query.
Definition stream_query_impl.hxx:41
stream_query_input_iterator & operator++() &
Pre-increment. This is what you'd normally want to use.
Definition stream_query_impl.hxx:59
bool operator!=(stream_query_end_iterator) const noexcept
Comparison only works for comparing to end().
Definition stream_query_impl.hxx:84
bool operator==(stream_query_end_iterator) const noexcept
Are we at the end?
Definition stream_query_impl.hxx:82
value_type operator*() const
Dereference. There's no caching in here, so don't repeat calls.
Definition stream_query_impl.hxx:76
stream_query_input_iterator operator++(int)
Post-increment. Only here to satisfy input_iterator concept.
Definition stream_query_impl.hxx:69
Stream query results from the database. Used by transaction_base::stream.
Definition stream_query.hxx:80
bool done() const &noexcept
Has this stream reached the end of its data?
Definition stream_query.hxx:103
stream_query(transaction_base &tx, std::string_view query)
Execute query on tx, stream results.
Definition stream_query_impl.hxx:12
std::tuple< TYPE... > parse_line(zview line) &
Parse and convert the latest line of data we received.
Definition stream_query.hxx:115
auto begin() &
Begin iterator. Only for use by "range for.".
Definition stream_query_impl.hxx:153
std::pair< line_handle, std::size_t > read_line() &
Read a COPY line from the server.
Definition stream_query_impl.hxx:161
Base class for things that monopolise a transaction's attention.
Definition transaction_focus.hxx:29
Marker-type wrapper: zero-terminated std::string_view.
Definition zview.hxx:38
Error in usage of libpqxx library, similar to std::logic_error.
Definition except.hxx:249
result exec0(zview query, std::string_view desc)
Execute command, which should return zero rows of data.
Definition transaction_base.hxx:387
constexpr connection & conn() const noexcept
The connection in which this transaction lives.
Definition transaction_base.hxx:998
Interface definition (and common code) for "transaction" classes.
Definition transaction_base.hxx:150
void pqfreemem(void const *ptr) noexcept
Wrapper for PQfreemem(), with C++ linkage.
Definition util.cxx:205
Internal items for libpqxx' own use. Do not use these yourself.
Definition encodings.cxx:33
std::string concat(TYPE... item)
Efficiently combine a bunch of items into one big string.
Definition concat.hxx:31
pqxx::internal::encoding_group enc_group(std::string_view encoding_name)
Convert libpq encoding name to its libpqxx encoding group.
Definition encodings.cxx:35
std::size_t(std::string_view haystack, std::size_t start) char_finder_func
Function type: "find first occurrence of specific any of ASCII characters.".
Definition encoding_group.hxx:71
The end() iterator for a stream_query.
Definition stream_query.hxx:47
Definition connection-stream_from.hxx:13