Skip to content

Commit 0cad9b5

Browse files
authored
Add missing build time for INSERT/COPY with PostgreSQL protocol (#172)
Closes GH-171
1 parent 574d7a0 commit 0cad9b5

File tree

16 files changed

+607
-426
lines changed

16 files changed

+607
-426
lines changed

benchmark/insert-copy.cc

Lines changed: 74 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,18 @@
1717
* under the License.
1818
*/
1919

20-
#include <chrono>
21-
#include <cstdlib>
22-
#include <iostream>
23-
#include <vector>
20+
#include "insert.hh"
2421

25-
#include <libpq-fe.h>
22+
#include <arpa/inet.h>
2623

27-
class ConnectionFinisher {
28-
public:
29-
ConnectionFinisher(PGconn* connection) : connection_(connection) {}
30-
~ConnectionFinisher() { PQfinish(connection_); }
31-
32-
private:
33-
PGconn* connection_;
34-
};
35-
36-
class ResultClearner {
37-
public:
38-
ResultClearner(PGresult* result) : result_(result) {}
39-
~ResultClearner() { PQclear(result_); }
40-
41-
private:
42-
PGresult* result_;
43-
};
24+
namespace {
25+
template <typename Type>
26+
void
27+
write_binary_data(std::ostream& stream, Type value)
28+
{
29+
stream.write(reinterpret_cast<const char*>(&value), sizeof(Type));
30+
}
31+
}; // namespace
4432

4533
int
4634
main(int argc, char** argv)
@@ -78,33 +66,31 @@ main(int argc, char** argv)
7866
}
7967
}
8068

81-
std::vector<std::string> buffers;
69+
std::vector<std::vector<Value>> records;
8270
{
83-
auto result = PQexec(connection, "COPY data TO STDOUT (FORMAT binary)");
71+
auto result = PQexec(connection, "SELECT * FROM data");
8472
ResultClearner resultClearner(result);
85-
if (PQresultStatus(result) != PGRES_COPY_OUT)
73+
if (PQresultStatus(result) != PGRES_TUPLES_OK)
8674
{
87-
std::cerr << "failed to copy to: " << PQerrorMessage(connection) << std::endl;
75+
std::cerr << "failed to select: " << PQerrorMessage(connection) << std::endl;
8876
return EXIT_FAILURE;
8977
}
90-
while (true)
78+
auto nTuples = PQntuples(result);
79+
auto nFields = PQnfields(result);
80+
for (int iTuple = 0; iTuple < nTuples; iTuple++)
9181
{
92-
char* data;
93-
auto size = PQgetCopyData(connection, &data, 0);
94-
if (size == -1)
95-
{
96-
break;
97-
}
98-
if (size == -2)
82+
std::vector<Value> values;
83+
for (int iField = 0; iField < nFields; iField++)
9984
{
100-
std::cerr << "failed to read copy data: " << PQerrorMessage(connection)
101-
<< std::endl;
102-
return EXIT_FAILURE;
85+
if (!append_value(values, result, iTuple, iField))
86+
{
87+
return EXIT_FAILURE;
88+
}
10389
}
104-
buffers.emplace_back(data, size);
105-
free(data);
90+
records.push_back(std::move(values));
10691
}
10792
}
93+
10894
auto before = std::chrono::steady_clock::now();
10995
{
11096
auto result = PQexec(connection, "COPY data_insert FROM STDOUT (FORMAT binary)");
@@ -115,15 +101,58 @@ main(int argc, char** argv)
115101
<< std::endl;
116102
return EXIT_FAILURE;
117103
}
118-
for (const auto& buffer : buffers)
104+
std::ostringstream copyDataStream;
119105
{
120-
auto copyDataResult = PQputCopyData(connection, buffer.data(), buffer.size());
121-
if (copyDataResult == -1)
106+
// See the "Binary Format" section in
107+
// https://www.postgresql.org/docs/current/sql-copy.html for
108+
// details.
109+
110+
const char signature[] = "PGCOPY\n\377\r\n";
111+
// The last '\0' is also part of the signature.
112+
copyDataStream << std::string_view(signature, sizeof(signature));
113+
const uint32_t flags = 0;
114+
write_binary_data(copyDataStream, htonl(flags));
115+
const uint32_t headerExtensionAreaLength = 0;
116+
write_binary_data(copyDataStream, htonl(headerExtensionAreaLength));
117+
auto nRecords = records.size();
118+
for (size_t iRecord = 0; iRecord < nRecords; ++iRecord)
122119
{
123-
std::cerr << "failed to put copy data: " << PQerrorMessage(connection)
124-
<< std::endl;
125-
return EXIT_FAILURE;
120+
const auto& values = records[iRecord];
121+
const auto nValues = values.size();
122+
write_binary_data(copyDataStream, htons(nValues));
123+
for (size_t iValue = 0; iValue < nValues; ++iValue)
124+
{
125+
const auto& value = values[iValue];
126+
if (std::holds_alternative<std::monostate>(value))
127+
{
128+
write_binary_data(copyDataStream,
129+
htonl(static_cast<uint32_t>(-1)));
130+
}
131+
else if (std::holds_alternative<int32_t>(value))
132+
{
133+
const auto& int32Value = std::get<int32_t>(value);
134+
write_binary_data(copyDataStream, htonl(sizeof(int32_t)));
135+
write_binary_data(copyDataStream,
136+
htonl(static_cast<uint32_t>(int32Value)));
137+
}
138+
else if (std::holds_alternative<std::string>(value))
139+
{
140+
const auto& stringValue = std::get<std::string>(value);
141+
write_binary_data(copyDataStream, htonl(stringValue.size()));
142+
copyDataStream << stringValue;
143+
}
144+
}
126145
}
146+
const uint16_t fileTrailer = -1;
147+
write_binary_data(copyDataStream, htons(fileTrailer));
148+
}
149+
const auto& copyData = copyDataStream.str();
150+
auto copyDataResult = PQputCopyData(connection, copyData.data(), copyData.size());
151+
if (copyDataResult == -1)
152+
{
153+
std::cerr << "failed to put copy data: " << PQerrorMessage(connection)
154+
<< std::endl;
155+
return EXIT_FAILURE;
127156
}
128157
auto copyEndResult = PQputCopyEnd(connection, nullptr);
129158
if (copyEndResult == -1)

benchmark/insert.cc

Lines changed: 44 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
#include <chrono>
21-
#include <cstdlib>
22-
#include <iostream>
23-
24-
#include <libpq-fe.h>
25-
26-
#include <catalog/pg_type_d.h>
27-
28-
class ConnectionFinisher {
29-
public:
30-
ConnectionFinisher(PGconn* connection) : connection_(connection) {}
31-
~ConnectionFinisher() { PQfinish(connection_); }
32-
33-
private:
34-
PGconn* connection_;
35-
};
36-
37-
class ResultClearner {
38-
public:
39-
ResultClearner(PGresult* result) : result_(result) {}
40-
~ResultClearner() { PQclear(result_); }
41-
42-
private:
43-
PGresult* result_;
44-
};
20+
#include "insert.hh"
4521

4622
int
4723
main(int argc, char** argv)
@@ -79,7 +55,7 @@ main(int argc, char** argv)
7955
}
8056
}
8157

82-
std::string insert = "INSERT INTO data_insert VALUES ";
58+
std::vector<std::vector<Value>> records;
8359
{
8460
auto result = PQexec(connection, "SELECT * FROM data");
8561
ResultClearner resultClearner(result);
@@ -92,37 +68,57 @@ main(int argc, char** argv)
9268
auto nFields = PQnfields(result);
9369
for (int iTuple = 0; iTuple < nTuples; iTuple++)
9470
{
95-
if (iTuple > 0)
96-
{
97-
insert += ", ";
98-
}
71+
std::vector<Value> values;
9972
for (int iField = 0; iField < nFields; iField++)
10073
{
101-
if (PQgetisnull(result, iTuple, iField))
102-
{
103-
insert += "(null)";
104-
}
105-
else
74+
if (!append_value(values, result, iTuple, iField))
10675
{
107-
insert += "(";
108-
auto type = PQftype(result, iField);
109-
if (type == TEXTOID)
110-
{
111-
insert += "'";
112-
}
113-
insert += PQgetvalue(result, iTuple, iField);
114-
if (type == TEXTOID)
115-
{
116-
insert += "'";
117-
}
118-
insert += ")";
76+
return EXIT_FAILURE;
11977
}
12078
}
79+
records.push_back(std::move(values));
12180
}
12281
}
82+
12383
auto before = std::chrono::steady_clock::now();
12484
{
125-
auto result = PQexec(connection, insert.c_str());
85+
std::ostringstream insert;
86+
insert << "INSERT INTO data_insert VALUES ";
87+
auto nRecords = records.size();
88+
for (size_t iRecord = 0; iRecord < nRecords; ++iRecord)
89+
{
90+
const auto& values = records[iRecord];
91+
if (iRecord > 0)
92+
{
93+
insert << ", ";
94+
}
95+
insert << "(";
96+
auto nValues = values.size();
97+
for (size_t iValue = 0; iValue < nValues; ++iValue)
98+
{
99+
if (iValue > 0)
100+
{
101+
insert << ", ";
102+
}
103+
const auto& value = values[iValue];
104+
if (std::holds_alternative<std::monostate>(value))
105+
{
106+
insert << "null";
107+
}
108+
else if (std::holds_alternative<int32_t>(value))
109+
{
110+
insert << std::get<int32_t>(value);
111+
}
112+
else if (std::holds_alternative<std::string>(value))
113+
{
114+
insert << "'";
115+
insert << std::get<std::string>(value);
116+
insert << "'";
117+
}
118+
}
119+
insert << ")";
120+
}
121+
auto result = PQexec(connection, insert.str().c_str());
126122
ResultClearner resultClearner(result);
127123
if (PQresultStatus(result) != PGRES_COMMAND_OK)
128124
{

benchmark/insert.hh

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <charconv>
23+
#include <chrono>
24+
#include <cstdlib>
25+
#include <iostream>
26+
#include <sstream>
27+
#include <variant>
28+
#include <vector>
29+
30+
#include <libpq-fe.h>
31+
32+
#include <catalog/pg_type_d.h>
33+
34+
namespace {
35+
class ConnectionFinisher {
36+
public:
37+
ConnectionFinisher(PGconn* connection) : connection_(connection) {}
38+
~ConnectionFinisher() { PQfinish(connection_); }
39+
40+
private:
41+
PGconn* connection_;
42+
};
43+
44+
class ResultClearner {
45+
public:
46+
ResultClearner(PGresult* result) : result_(result) {}
47+
~ResultClearner() { PQclear(result_); }
48+
49+
private:
50+
PGresult* result_;
51+
};
52+
53+
using Value = std::variant<std::monostate, int32_t, std::string>;
54+
55+
bool
56+
append_value(std::vector<Value>& values, PGresult* result, int iTuple, int iField)
57+
{
58+
if (PQgetisnull(result, iTuple, iField))
59+
{
60+
values.push_back(std::monostate{});
61+
return true;
62+
}
63+
64+
Oid type = PQftype(result, iField);
65+
char* rawValue = PQgetvalue(result, iTuple, iField);
66+
int length = PQgetlength(result, iTuple, iField);
67+
switch (type)
68+
{
69+
case INT4OID:
70+
{
71+
int32_t value;
72+
auto result = std::from_chars(rawValue, rawValue + length, value);
73+
if (result.ec != std::errc{})
74+
{
75+
std::cerr << "failed to parse integer value: " << rawValue << std::endl;
76+
return false;
77+
}
78+
values.emplace_back(value);
79+
}
80+
break;
81+
case TEXTOID:
82+
{
83+
values.emplace_back(std::string(rawValue, length));
84+
}
85+
break;
86+
default:
87+
std::cerr << "unsupported type: " << type << std::endl;
88+
return false;
89+
}
90+
return true;
91+
}
92+
}; // namespace

0 commit comments

Comments
 (0)