Skip to content

Commit 3a47f86

Browse files
authored
Merge pull request #21 from arduino-libraries/10-improve-threading
10 improve threading
2 parents 8cb6822 + 572fff5 commit 3a47f86

File tree

4 files changed

+142
-124
lines changed

4 files changed

+142
-124
lines changed

src/bridge.h

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ class BridgeClass {
106106

107107
struct k_mutex read_mutex{};
108108
struct k_mutex write_mutex{};
109+
struct k_mutex bridge_mutex{};
109110

110111
k_tid_t upd_tid{};
111112
k_thread_stack_t *upd_stack_area{};
@@ -119,17 +120,27 @@ class BridgeClass {
119120
serial_ptr = &serial;
120121
}
121122

122-
operator bool() const {
123-
return started;
123+
operator bool() {
124+
return is_started();
125+
}
126+
127+
bool is_started() {
128+
k_mutex_lock(&bridge_mutex, K_FOREVER);
129+
bool out = started;
130+
k_mutex_unlock(&bridge_mutex);
131+
return out;
124132
}
125133

126134
// Initialize the bridge
127135
bool begin(unsigned long baud=DEFAULT_SERIAL_BAUD) {
128-
serial_ptr->begin(baud);
129-
transport = new SerialTransport(*serial_ptr);
130-
131136
k_mutex_init(&read_mutex);
132137
k_mutex_init(&write_mutex);
138+
k_mutex_init(&bridge_mutex);
139+
140+
if (is_started()) return true;
141+
142+
serial_ptr->begin(baud);
143+
transport = new SerialTransport(*serial_ptr);
133144

134145
client = new RPCClient(*transport);
135146
server = new RPCServer(*transport);
@@ -142,32 +153,29 @@ class BridgeClass {
142153
UPDATE_THREAD_PRIORITY, 0, K_NO_WAIT);
143154
k_thread_name_set(upd_tid, "bridge");
144155

145-
bool res;
146-
call(RESET_METHOD).result(res);
147-
if (res) {
148-
started = true;
149-
}
156+
k_mutex_lock(&bridge_mutex, K_FOREVER);
157+
bool res = false;
158+
started = call(RESET_METHOD).result(res) && res;
159+
k_mutex_unlock(&bridge_mutex);
150160
return res;
151161
}
152162

153163
template<typename F>
154164
bool provide(const MsgPack::str_t& name, F&& func) {
165+
k_mutex_lock(&bridge_mutex, K_FOREVER);
155166
bool res;
156-
if (!call(BIND_METHOD, name).result(res)) {
157-
return false;
158-
}
159-
return server->bind(name, func);
167+
bool out = call(BIND_METHOD, name).result(res) && res && server->bind(name, func);
168+
k_mutex_unlock(&bridge_mutex);
169+
return out;
160170
}
161171

162172
template<typename F>
163173
bool provide_safe(const MsgPack::str_t& name, F&& func) {
174+
k_mutex_lock(&bridge_mutex, K_FOREVER);
164175
bool res;
165-
if (!call(BIND_METHOD, name).result(res)) {
166-
return false;
167-
}
168-
169-
return server->bind(name, func, "__safe__");
170-
176+
bool out = call(BIND_METHOD, name).result(res) && res && server->bind(name, func, "__safe__");
177+
k_mutex_unlock(&bridge_mutex);
178+
return out;
171179
}
172180

173181
void update() {

src/monitor.h

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class BridgeMonitor: public Stream {
3030
BridgeClass* bridge;
3131
RingBufferN<BufferSize> temp_buffer;
3232
struct k_mutex monitor_mutex{};
33-
bool is_connected = false;
33+
bool _connected = false;
3434

3535
public:
3636
explicit BridgeMonitor(BridgeClass& bridge): bridge(&bridge) {}
@@ -40,15 +40,31 @@ class BridgeMonitor: public Stream {
4040
bool begin(unsigned long _legacy_baud=0, uint16_t _legacy_config=0) {
4141
k_mutex_init(&monitor_mutex);
4242

43+
if (is_connected()) return true;
44+
4345
bool bridge_started = (*bridge);
4446
if (!bridge_started) {
4547
bridge_started = bridge->begin();
4648
}
47-
return bridge_started && bridge->call(MON_CONNECTED_METHOD).result(is_connected);
49+
50+
if (!bridge_started) return false;
51+
52+
k_mutex_lock(&monitor_mutex, K_FOREVER);
53+
bool out = false;
54+
_connected = bridge->call(MON_CONNECTED_METHOD).result(out) && out;
55+
k_mutex_unlock(&monitor_mutex);
56+
return out;
4857
}
4958

50-
explicit operator bool() const {
51-
return is_connected;
59+
bool is_connected() {
60+
k_mutex_lock(&monitor_mutex, K_FOREVER);
61+
bool out = _connected;
62+
k_mutex_unlock(&monitor_mutex);
63+
return out;
64+
}
65+
66+
explicit operator bool() {
67+
return is_connected();
5268
}
5369

5470
int read() override {
@@ -78,12 +94,12 @@ class BridgeMonitor: public Stream {
7894

7995
int peek() override {
8096
k_mutex_lock(&monitor_mutex, K_FOREVER);
97+
int out = -1;
8198
if (temp_buffer.available()) {
82-
k_mutex_unlock(&monitor_mutex);
83-
return temp_buffer.peek();
99+
out = temp_buffer.peek();
84100
}
85101
k_mutex_unlock(&monitor_mutex);
86-
return -1;
102+
return out;
87103
}
88104

89105
size_t write(uint8_t c) override {
@@ -100,43 +116,41 @@ class BridgeMonitor: public Stream {
100116

101117
size_t written;
102118
const bool ret = bridge->call(MON_WRITE_METHOD, send_buffer).result(written);
103-
if (ret) {
104-
return written;
105-
}
106119

107-
return 0;
120+
return ret? written : 0;
108121
}
109122

110123
bool reset() {
111124
bool res;
112-
bool ok = bridge->call(MON_RESET_METHOD).result(res);
113-
if (ok && res) {
114-
is_connected = false;
115-
}
116-
return (ok && res);
125+
bool ok = bridge->call(MON_RESET_METHOD).result(res) && res;
126+
k_mutex_lock(&monitor_mutex, K_FOREVER);
127+
_connected = !ok;
128+
k_mutex_unlock(&monitor_mutex);
129+
return ok;
117130
}
118131

119132
private:
120133
void _read(size_t size) {
121134

122135
if (size == 0) return;
123136

137+
k_mutex_lock(&monitor_mutex, K_FOREVER);
138+
124139
MsgPack::arr_t<uint8_t> message;
125140
RpcResult async_rpc = bridge->call(MON_READ_METHOD, size);
126-
127-
const bool ret = async_rpc.result(message);
141+
const bool ret = _connected && async_rpc.result(message);
128142

129143
if (ret) {
130-
k_mutex_lock(&monitor_mutex, K_FOREVER);
131144
for (size_t i = 0; i < message.size(); ++i) {
132145
temp_buffer.store_char(static_cast<char>(message[i]));
133146
}
134-
k_mutex_unlock(&monitor_mutex);
135147
}
136148

137149
// if (async_rpc.error.code > NO_ERR) {
138-
// is_connected = false;
150+
// _connected = false;
139151
// }
152+
153+
k_mutex_unlock(&monitor_mutex);
140154
}
141155

142156
};

src/tcp_client.h

Lines changed: 30 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -55,73 +55,55 @@ class BridgeTCPClient : public Client {
5555

5656
int connect(const char *host, uint16_t port) override {
5757

58-
if (_connected) return 0;
59-
60-
String hostname = host;
61-
6258
k_mutex_lock(&client_mutex, K_FOREVER);
6359

64-
const bool resp = bridge->call(TCP_CONNECT_METHOD, hostname, port).result(connection_id);
65-
66-
if (!resp) {
67-
_connected = false;
68-
k_mutex_unlock(&client_mutex);
69-
return -1;
70-
}
71-
_connected = true;
60+
String hostname = host;
61+
const bool ok = _connected || bridge->call(TCP_CONNECT_METHOD, hostname, port).result(connection_id);
62+
_connected = ok;
7263

7364
k_mutex_unlock(&client_mutex);
7465

75-
return 0;
66+
return ok? 0 : -1;
7667
}
7768

7869
int connectSSL(const char *host, uint16_t port, const char *ca_cert) {
7970

80-
if (_connected) return 0;
71+
k_mutex_lock(&client_mutex, K_FOREVER);
8172

8273
String hostname = host;
8374
String ca_cert_str = ca_cert;
8475

85-
k_mutex_lock(&client_mutex, K_FOREVER);
86-
87-
const bool resp = bridge->call(TCP_CONNECT_SSL_METHOD, hostname, port, ca_cert_str).result(connection_id);
88-
89-
if (!resp) {
90-
_connected = false;
91-
k_mutex_unlock(&client_mutex);
92-
return -1;
93-
}
94-
_connected = true;
95-
76+
const bool ok = _connected || bridge->call(TCP_CONNECT_SSL_METHOD, hostname, port, ca_cert_str).result(connection_id);
77+
_connected = ok;
9678
k_mutex_unlock(&client_mutex);
97-
return 0;
79+
80+
return ok? 0 : -1;
9881
}
9982

100-
uint32_t getId() const {
101-
return connection_id;
83+
uint32_t getId() {
84+
k_mutex_lock(&client_mutex, K_FOREVER);
85+
const uint32_t out = connection_id;
86+
k_mutex_unlock(&client_mutex);
87+
return out;
10288
}
10389

10490
size_t write(uint8_t c) override {
10591
return write(&c, 1);
10692
}
10793

108-
size_t write(const uint8_t *buf, size_t size) override {
94+
size_t write(const uint8_t *buffer, size_t size) override {
10995

110-
if (!_connected) return 0;
96+
if (!connected()) return 0;
11197

11298
MsgPack::arr_t<uint8_t> payload;
11399

114100
for (size_t i = 0; i < size; ++i) {
115-
payload.push_back(buf[i]);
101+
payload.push_back(buffer[i]);
116102
}
117103

118104
size_t written;
119-
const bool ret = bridge->call(TCP_WRITE_METHOD, connection_id, payload).result(written);
120-
if (ret) {
121-
return written;
122-
}
123-
124-
return 0;
105+
const bool ok = bridge->call(TCP_WRITE_METHOD, connection_id, payload).result(written);
106+
return ok? written : 0;
125107
}
126108

127109
int available() override {
@@ -151,12 +133,12 @@ class BridgeTCPClient : public Client {
151133

152134
int peek() override {
153135
k_mutex_lock(&client_mutex, K_FOREVER);
136+
int out = -1;
154137
if (temp_buffer.available()) {
155-
k_mutex_unlock(&client_mutex);
156-
return temp_buffer.peek();
138+
out = temp_buffer.peek();
157139
}
158140
k_mutex_unlock(&client_mutex);
159-
return -1;
141+
return out;
160142
}
161143

162144
void flush() override {
@@ -170,37 +152,35 @@ class BridgeTCPClient : public Client {
170152
void stop() override {
171153
k_mutex_lock(&client_mutex, K_FOREVER);
172154
String msg;
173-
const bool resp = bridge->call(TCP_CLOSE_METHOD, connection_id).result(msg);
174-
if (resp) {
175-
_connected = false;
155+
if (_connected) {
156+
_connected = !bridge->call(TCP_CLOSE_METHOD, connection_id).result(msg);
176157
}
177158
k_mutex_unlock(&client_mutex);
178159
}
179160

180161
uint8_t connected() override {
181-
if (_connected) return 1;
182-
return 0;
162+
k_mutex_lock(&client_mutex, K_FOREVER);
163+
const uint8_t out = _connected? 1 : 0;
164+
k_mutex_unlock(&client_mutex);
165+
return out;
183166
}
184167

185168
operator bool() override {
186169
return available() || connected();
187170
}
188171

189-
//friend class BridgeTCPServer;
190-
191172
using Print::write;
192173

193174
private:
194175
void _read(size_t size) {
195176

196-
if (size == 0 || !_connected) return;
177+
if (size == 0) return;
197178

198179
k_mutex_lock(&client_mutex, K_FOREVER);
199180

200181
MsgPack::arr_t<uint8_t> message;
201182
RpcResult async_rpc = bridge->call(TCP_READ_METHOD, connection_id, size);
202-
203-
const bool ret = async_rpc.result(message);
183+
const bool ret = _connected && async_rpc.result(message);
204184

205185
if (ret) {
206186
for (size_t i = 0; i < message.size(); ++i) {

0 commit comments

Comments
 (0)