diff --git a/examples/LargeResponse/LargeResponse.ino b/examples/LargeResponse/LargeResponse.ino index 46971c02..ea60165d 100644 --- a/examples/LargeResponse/LargeResponse.ino +++ b/examples/LargeResponse/LargeResponse.ino @@ -65,6 +65,64 @@ private: size_t _sent = 0; }; +// Code to reproduce issues: +// - https://github.com/ESP32Async/ESPAsyncWebServer/issues/242 +// - https://github.com/ESP32Async/ESPAsyncWebServer/issues/315 +// +// https://github.com/ESP32Async/ESPAsyncWebServer/pull/317#issuecomment-3421141039 +// +// I cracked it. +// So this is how it works: +// That space that _tcp is writing to identified by CONFIG_TCP_SND_BUF_DEFAULT (and is value-matching with default TCP windows size which is very confusing itself). +// The space returned by client()->write() and client->space() somehow might not be atomically/thread synced (had not dived that deep yet). So if first call to _fillBuffer is done via user-code thread and ended up with some small amount of data consumed and second one is done by _poll or _ack? returns full size again! This is where old code fails. +// If you change your class this way it will fail 100%. +class CustomResponseMRE : public AsyncAbstractResponse { +public: + explicit CustomResponseMRE() { + _code = 200; + _contentType = "text/plain"; + _sendContentLength = false; + // add some useless headers + addHeader("Clear-Site-Data", "Clears browsing data (e.g., cookies, storage, cache) associated with the requesting website."); + addHeader( + "No-Vary-Search", "Specifies a set of rules that define how a URL's query parameters will affect cache matching. These rules dictate whether the same " + "URL with different URL parameters should be saved as separate browser cache entries" + ); + } + + bool _sourceValid() const override { + return true; + } + + size_t _fillBuffer(uint8_t *buf, size_t buflen) override { + if (fillChar == NULL) { + fillChar = 'A'; + return RESPONSE_TRY_AGAIN; + } + if (_sent == RESPONSE_TRY_AGAIN) { + Serial.println("Simulating temporary unavailability of data..."); + _sent = 0; + return RESPONSE_TRY_AGAIN; + } + size_t remaining = totalResponseSize - _sent; + if (remaining == 0) { + return 0; + } + if (buflen > remaining) { + buflen = remaining; + } + Serial.printf("Filling '%c' @ sent: %u, buflen: %u\n", fillChar, _sent, buflen); + std::fill_n(buf, buflen, static_cast(fillChar)); + _sent += buflen; + fillChar = (fillChar == 'Z') ? 'A' : fillChar + 1; + return buflen; + } + +private: + char fillChar = NULL; + size_t _sent = 0; +}; + void setup() { Serial.begin(115200); @@ -77,14 +135,7 @@ void setup() { // // curl -v http://192.168.4.1/1 | grep -o '.' | sort | uniq -c // - // Should output 16000 and the counts for each character from A to D - // - // Console: - // - // Filling 'A' @ index: 0, maxLen: 5652, toSend: 5652 - // Filling 'B' @ index: 5652, maxLen: 4308, toSend: 4308 - // Filling 'C' @ index: 9960, maxLen: 2888, toSend: 2888 - // Filling 'D' @ index: 12848, maxLen: 3152, toSend: 3152 + // Should output 16000 and a distribution of letters which is the same in ESP32 logs and console // server.on("/1", HTTP_GET, [](AsyncWebServerRequest *request) { fillChar = 'A'; @@ -103,19 +154,22 @@ void setup() { // // curl -v http://192.168.4.1/2 | grep -o '.' | sort | uniq -c // - // Should output 16000 - // - // Console: - // - // Filling 'A' @ sent: 0, buflen: 5675 - // Filling 'B' @ sent: 5675, buflen: 4308 - // Filling 'C' @ sent: 9983, buflen: 5760 - // Filling 'D' @ sent: 15743, buflen: 257 + // Should output 16000 and a distribution of letters which is the same in ESP32 logs and console // server.on("/2", HTTP_GET, [](AsyncWebServerRequest *request) { request->send(new CustomResponse()); }); + // Example to use a AsyncAbstractResponse + // + // curl -v http://192.168.4.1/3 | grep -o '.' | sort | uniq -c + // + // Should output 16000 and a distribution of letters which is the same in ESP32 logs and console + // + server.on("/3", HTTP_GET, [](AsyncWebServerRequest *request) { + request->send(new CustomResponseMRE()); + }); + server.begin(); } diff --git a/examples/PerfTests/PerfTests.ino b/examples/PerfTests/PerfTests.ino index af7b90e1..397c6e6b 100644 --- a/examples/PerfTests/PerfTests.ino +++ b/examples/PerfTests/PerfTests.ino @@ -142,6 +142,11 @@ void setup() { // // time curl -N -v -G -d 'd=2000' -d 'l=10000' http://192.168.4.1/slow.html --output - // + // THIS CODE WILL CRASH BECAUSE OF THE WATCHDOG. + // IF YOU REALLY NEED TO DO THIS, YOU MUST DISABLE THE TWDT + // + // CORRECT WAY IS TO USE SSE OR WEBSOCKETS TO DO THE COSTLY PROCESSING ASYNC. + // server.on("/slow.html", HTTP_GET, [](AsyncWebServerRequest *request) { requests = requests + 1; uint32_t d = request->getParam("d")->value().toInt(); diff --git a/examples/ServerSentEvents/ServerSentEvents.ino b/examples/ServerSentEvents/ServerSentEvents.ino index 2ffefe15..70ab4c7d 100644 --- a/examples/ServerSentEvents/ServerSentEvents.ino +++ b/examples/ServerSentEvents/ServerSentEvents.ino @@ -71,12 +71,12 @@ void setup() { }); events.onConnect([](AsyncEventSourceClient *client) { - Serial.printf("SSE Client connected! ID: %" PRIu32 "\n", client->lastId()); + Serial.printf("SSE Client connected!"); client->send("hello!", NULL, millis(), 1000); }); events.onDisconnect([](AsyncEventSourceClient *client) { - Serial.printf("SSE Client disconnected! ID: %" PRIu32 "\n", client->lastId()); + Serial.printf("SSE Client disconnected!"); }); server.addHandler(&events); diff --git a/examples/SlowChunkResponse/SlowChunkResponse.ino b/examples/SlowChunkResponse/SlowChunkResponse.ino index cf7e3e6e..046a208a 100644 --- a/examples/SlowChunkResponse/SlowChunkResponse.ino +++ b/examples/SlowChunkResponse/SlowChunkResponse.ino @@ -114,6 +114,11 @@ void setup() { // // time curl -N -v -G -d 'd=2000' -d 'l=10000' http://192.168.4.1/slow.html --output - // + // THIS CODE WILL CRASH BECAUSE OF THE WATCHDOG. + // IF YOU REALLY NEED TO DO THIS, YOU MUST DISABLE THE TWDT + // + // CORRECT WAY IS TO USE SSE OR WEBSOCKETS TO DO THE COSTLY PROCESSING ASYNC. + // server.on("/slow.html", HTTP_GET, [](AsyncWebServerRequest *request) { uint32_t d = request->getParam("d")->value().toInt(); uint32_t l = request->getParam("l")->value().toInt(); diff --git a/src/AsyncEventSource.cpp b/src/AsyncEventSource.cpp index 812c0ceb..eb1ddc85 100644 --- a/src/AsyncEventSource.cpp +++ b/src/AsyncEventSource.cpp @@ -143,7 +143,7 @@ size_t AsyncEventSourceMessage::send(AsyncClient *client) { // Client -AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server) : _client(request->client()), _server(server) { +AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server) : _client(request->clientRelease()), _server(server) { if (request->hasHeader(T_Last_Event_ID)) { _lastId = atoi(request->getHeader(T_Last_Event_ID)->value().c_str()); @@ -181,9 +181,9 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A ); _server->_addClient(this); - delete request; - _client->setNoDelay(true); + // delete AsyncWebServerRequest object (and bound response) since we have the ownership on client connection now + delete request; } AsyncEventSourceClient::~AsyncEventSourceClient() { @@ -470,8 +470,7 @@ void AsyncEventSource::_adjust_inflight_window() { /* Response */ -AsyncEventSourceResponse::AsyncEventSourceResponse(AsyncEventSource *server) { - _server = server; +AsyncEventSourceResponse::AsyncEventSourceResponse(AsyncEventSource *server) : _server(server) { _code = 200; _contentType = T_text_event_stream; _sendContentLength = false; @@ -482,13 +481,24 @@ AsyncEventSourceResponse::AsyncEventSourceResponse(AsyncEventSource *server) { void AsyncEventSourceResponse::_respond(AsyncWebServerRequest *request) { String out; _assembleHead(out, request->version()); + // unbind client's onAck callback from AsyncWebServerRequest's, we will destroy it on next callback and steal the client, + // can't do it now 'cause now we are in AsyncWebServerRequest::_onAck 's stack actually + // here we are loosing time on one RTT delay, but with current design we can't get rid of Req/Resp objects other way + _request = request; + request->client()->onAck( + [](void *r, AsyncClient *c, size_t len, uint32_t time) { + if (len) { + static_cast(r)->_switchClient(); + } + }, + this + ); request->client()->write(out.c_str(), _headLength); _state = RESPONSE_WAIT_ACK; } -size_t AsyncEventSourceResponse::_ack(AsyncWebServerRequest *request, size_t len, uint32_t time __attribute__((unused))) { - if (len) { - new AsyncEventSourceClient(request, _server); - } - return 0; -} +void AsyncEventSourceResponse::_switchClient() { + // AsyncEventSourceClient c-tor will take the ownership of AsyncTCP's client connection + new AsyncEventSourceClient(_request, _server); + // AsyncEventSourceClient c-tor would also delete _request and *this +}; diff --git a/src/AsyncEventSource.h b/src/AsyncEventSource.h index ccfda95e..3d8a4d77 100644 --- a/src/AsyncEventSource.h +++ b/src/AsyncEventSource.h @@ -141,6 +141,13 @@ class AsyncEventSourceClient { void _runQueue(); public: + /** + * @brief Construct a new Async Event Source Client object + * @note constructor would take the ownership of of AsyncTCP's client pointer from `request` parameter and call delete on it! + * + * @param request + * @param server + */ AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server); ~AsyncEventSourceClient(); @@ -312,11 +319,16 @@ class AsyncEventSource : public AsyncWebHandler { class AsyncEventSourceResponse : public AsyncWebServerResponse { private: AsyncEventSource *_server; + AsyncWebServerRequest *_request; + // this call back will switch AsyncTCP client to SSE + void _switchClient(); public: AsyncEventSourceResponse(AsyncEventSource *server); void _respond(AsyncWebServerRequest *request); - size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time); + size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time) override { + return 0; + }; bool _sourceValid() const { return true; } diff --git a/src/AsyncWebSocket.cpp b/src/AsyncWebSocket.cpp index 5fe0fc23..95574945 100644 --- a/src/AsyncWebSocket.cpp +++ b/src/AsyncWebSocket.cpp @@ -221,14 +221,10 @@ size_t AsyncWebSocketMessage::send(AsyncClient *client) { const char *AWSC_PING_PAYLOAD = "ESPAsyncWebServer-PING"; const size_t AWSC_PING_PAYLOAD_LEN = 22; -AsyncWebSocketClient::AsyncWebSocketClient(AsyncWebServerRequest *request, AsyncWebSocket *server) : _tempObject(NULL) { - _client = request->client(); - _server = server; - _clientId = _server->_getNextId(); - _status = WS_CONNECTED; - _pstate = 0; - _lastMessageTime = millis(); - _keepAlivePeriod = 0; +AsyncWebSocketClient::AsyncWebSocketClient(AsyncClient *client, AsyncWebSocket *server) + : _client(client), _server(server), _clientId(_server->_getNextId()), _status(WS_CONNECTED), _pstate(0), _lastMessageTime(millis()), _keepAlivePeriod(0), + _tempObject(NULL) { + _client->setRxTimeout(0); _client->onError( [](void *r, AsyncClient *c, int8_t error) { @@ -272,7 +268,6 @@ AsyncWebSocketClient::AsyncWebSocketClient(AsyncWebServerRequest *request, Async }, this ); - delete request; memset(&_pinfo, 0, sizeof(_pinfo)); } @@ -806,7 +801,10 @@ void AsyncWebSocket::_handleEvent(AsyncWebSocketClient *client, AwsEventType typ AsyncWebSocketClient *AsyncWebSocket::_newClient(AsyncWebServerRequest *request) { _clients.emplace_back(request, this); + // we've just detached AsyncTCP client from AsyncWebServerRequest _handleEvent(&_clients.back(), WS_EVT_CONNECT, request, NULL, 0); + // after user code completed CONNECT event callback we can delete req/response objects + delete request; return &_clients.back(); } @@ -1243,8 +1241,7 @@ AsyncWebSocketMessageBuffer *AsyncWebSocket::makeBuffer(const uint8_t *data, siz * Authentication code from https://github.com/Links2004/arduinoWebSockets/blob/master/src/WebSockets.cpp#L480 */ -AsyncWebSocketResponse::AsyncWebSocketResponse(const String &key, AsyncWebSocket *server) { - _server = server; +AsyncWebSocketResponse::AsyncWebSocketResponse(const String &key, AsyncWebSocket *server) : _server(server) { _code = 101; _sendContentLength = false; @@ -1290,18 +1287,26 @@ void AsyncWebSocketResponse::_respond(AsyncWebServerRequest *request) { request->client()->close(); return; } + // unbind client's onAck callback from AsyncWebServerRequest's, we will destroy it on next callback and steal the client, + // can't do it now 'cause now we are in AsyncWebServerRequest::_onAck 's stack actually + // here we are loosing time on one RTT delay, but with current design we can't get rid of Req/Resp objects other way + _request = request; + request->client()->onAck( + [](void *r, AsyncClient *c, size_t len, uint32_t time) { + if (len) { + static_cast(r)->_switchClient(); + } + }, + this + ); String out; _assembleHead(out, request->version()); request->client()->write(out.c_str(), _headLength); _state = RESPONSE_WAIT_ACK; } -size_t AsyncWebSocketResponse::_ack(AsyncWebServerRequest *request, size_t len, uint32_t time) { - (void)time; - - if (len) { - _server->_newClient(request); - } - - return 0; +void AsyncWebSocketResponse::_switchClient() { + // detach client from request + _server->_newClient(_request); + // _newClient() would also destruct _request and *this } diff --git a/src/AsyncWebSocket.h b/src/AsyncWebSocket.h index 451829cc..a3cc661f 100644 --- a/src/AsyncWebSocket.h +++ b/src/AsyncWebSocket.h @@ -211,6 +211,9 @@ class AsyncWebSocketClient { AsyncWebSocket *_server; uint32_t _clientId; AwsClientStatus _status; + uint8_t _pstate; + uint32_t _lastMessageTime; + uint32_t _keepAlivePeriod; #ifdef ESP32 mutable std::recursive_mutex _lock; #endif @@ -218,12 +221,8 @@ class AsyncWebSocketClient { std::deque _messageQueue; bool closeWhenFull = true; - uint8_t _pstate; AwsFrameInfo _pinfo; - uint32_t _lastMessageTime; - uint32_t _keepAlivePeriod; - bool _queueControl(uint8_t opcode, const uint8_t *data = NULL, size_t len = 0, bool mask = false); bool _queueMessage(AsyncWebSocketSharedBuffer buffer, uint8_t opcode = WS_TEXT, bool mask = false); void _runQueue(); @@ -232,7 +231,15 @@ class AsyncWebSocketClient { public: void *_tempObject; - AsyncWebSocketClient(AsyncWebServerRequest *request, AsyncWebSocket *server); + AsyncWebSocketClient(AsyncClient *client, AsyncWebSocket *server); + + /** + * @brief Construct a new Async Web Socket Client object + * @note constructor would take the ownership of of AsyncTCP's client pointer from `request` parameter and call delete on it! + * @param request + * @param server + */ + AsyncWebSocketClient(AsyncWebServerRequest *request, AsyncWebSocket *server) : AsyncWebSocketClient(request->clientRelease(), server){}; ~AsyncWebSocketClient(); // client id increments for the given server @@ -464,11 +471,16 @@ class AsyncWebSocketResponse : public AsyncWebServerResponse { private: String _content; AsyncWebSocket *_server; + AsyncWebServerRequest *_request; + // this call back will switch AsyncTCP client to WebSocket + void _switchClient(); public: AsyncWebSocketResponse(const String &key, AsyncWebSocket *server); void _respond(AsyncWebServerRequest *request); - size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time); + size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time) override { + return 0; + }; bool _sourceValid() const { return true; } diff --git a/src/ESPAsyncWebServer.h b/src/ESPAsyncWebServer.h index 9e45800a..d26741c0 100644 --- a/src/ESPAsyncWebServer.h +++ b/src/ESPAsyncWebServer.h @@ -305,6 +305,19 @@ class AsyncWebServerRequest { AsyncClient *client() { return _client; } + + /** + * @brief release owned AsyncClient object + * AsyncClient pointer will be abandoned in this instance, + * the further ownership of the connection should be managed out of request's life-time scope + * could be used for long lived connection like SSE or WebSockets + * @note do not call this method unless you know what you are doing, otherwise it may lead to + * memory leaks and connections lingering + * + * @return AsyncClient* pointer to released connection object + */ + AsyncClient *clientRelease(); + uint8_t version() const { return _version; } @@ -1336,8 +1349,10 @@ class AsyncWebServerResponse { bool _sendContentLength; bool _chunked; size_t _headLength; + // amount of data sent for content part of the response (excluding all headers) size_t _sentLength; size_t _ackedLength; + // amount of response bytes (including all headers) written to sockbuff for delivery size_t _writtenLength; WebResponseState _state; @@ -1394,7 +1409,20 @@ class AsyncWebServerResponse { virtual bool _failed() const; virtual bool _sourceValid() const; virtual void _respond(AsyncWebServerRequest *request); - virtual size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time); + + /** + * @brief write next portion of response data to send buffs + * this method (re)fills tcp send buffers, it could be called either at will + * or from a tcp_recv/tcp_poll callbacks from AsyncTCP + * + * @param request - used to access client object + * @param len - size of acknowledged data from the remote side (TCP window update, not TCP ack!) + * @param time - time passed between last sent and received packet + * @return size_t amount of response data placed to TCP send buffs for delivery (defined by sdkconfig value CONFIG_LWIP_TCP_SND_BUF_DEFAULT) + */ + virtual size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time) { + return 0; + }; }; /* diff --git a/src/WebRequest.cpp b/src/WebRequest.cpp index aa5cd30b..607c7dc2 100644 --- a/src/WebRequest.cpp +++ b/src/WebRequest.cpp @@ -33,8 +33,7 @@ AsyncWebServerRequest::AsyncWebServerRequest(AsyncWebServer *s, AsyncClient *c) [](void *r, AsyncClient *c, int8_t error) { (void)c; // async_ws_log_e("AsyncWebServerRequest::_onError"); - AsyncWebServerRequest *req = (AsyncWebServerRequest *)r; - req->_onError(error); + static_cast(r)->_onError(error); }, this ); @@ -42,17 +41,14 @@ AsyncWebServerRequest::AsyncWebServerRequest(AsyncWebServer *s, AsyncClient *c) [](void *r, AsyncClient *c, size_t len, uint32_t time) { (void)c; // async_ws_log_e("AsyncWebServerRequest::_onAck"); - AsyncWebServerRequest *req = (AsyncWebServerRequest *)r; - req->_onAck(len, time); + static_cast(r)->_onAck(len, time); }, this ); c->onDisconnect( [](void *r, AsyncClient *c) { // async_ws_log_e("AsyncWebServerRequest::_onDisconnect"); - AsyncWebServerRequest *req = (AsyncWebServerRequest *)r; - req->_onDisconnect(); - delete c; + static_cast(r)->_onDisconnect(); }, this ); @@ -60,8 +56,7 @@ AsyncWebServerRequest::AsyncWebServerRequest(AsyncWebServer *s, AsyncClient *c) [](void *r, AsyncClient *c, uint32_t time) { (void)c; // async_ws_log_e("AsyncWebServerRequest::_onTimeout"); - AsyncWebServerRequest *req = (AsyncWebServerRequest *)r; - req->_onTimeout(time); + static_cast(r)->_onTimeout(time); }, this ); @@ -69,8 +64,7 @@ AsyncWebServerRequest::AsyncWebServerRequest(AsyncWebServer *s, AsyncClient *c) [](void *r, AsyncClient *c, void *buf, size_t len) { (void)c; // async_ws_log_e("AsyncWebServerRequest::_onData"); - AsyncWebServerRequest *req = (AsyncWebServerRequest *)r; - req->_onData(buf, len); + static_cast(r)->_onData(buf, len); }, this ); @@ -78,21 +72,27 @@ AsyncWebServerRequest::AsyncWebServerRequest(AsyncWebServer *s, AsyncClient *c) [](void *r, AsyncClient *c) { (void)c; // async_ws_log_e("AsyncWebServerRequest::_onPoll"); - AsyncWebServerRequest *req = (AsyncWebServerRequest *)r; - req->_onPoll(); + static_cast(r)->_onPoll(); }, this ); } AsyncWebServerRequest::~AsyncWebServerRequest() { - // async_ws_log_e("AsyncWebServerRequest::~AsyncWebServerRequest"); + if (_client) { + // usually it is _client's disconnect triggers object destruct, but for completeness we define behavior + // if for some reason *this will be destructed while client is still connected + _client->onDisconnect(nullptr); + delete _client; + _client = nullptr; + } - _this.reset(); + if (_response) { + delete _response; + _response = nullptr; + } - AsyncWebServerResponse *r = _response; - _response = NULL; - delete r; + _this.reset(); if (_tempObject != NULL) { free(_tempObject); @@ -217,31 +217,26 @@ void AsyncWebServerRequest::_onData(void *buf, size_t len) { void AsyncWebServerRequest::_onPoll() { // os_printf("p\n"); - if (_response != NULL && _client != NULL && _client->canSend()) { - if (!_response->_finished()) { - _response->_ack(this, 0, 0); - } else { - AsyncWebServerResponse *r = _response; - _response = NULL; - delete r; - - _client->close(); - } + if (_response && _client && _client->canSend()) { + _response->_ack(this, 0, 0); } } void AsyncWebServerRequest::_onAck(size_t len, uint32_t time) { // os_printf("a:%u:%u\n", len, time); - if (_response != NULL) { - if (!_response->_finished()) { - _response->_ack(this, len, time); - } else if (_response->_finished()) { - AsyncWebServerResponse *r = _response; - _response = NULL; - delete r; - - _client->close(); + if (!_response) { + return; + } + + if (!_response->_finished()) { + _response->_ack(this, len, time); + // recheck if response has just completed, close connection + if (_response->_finished()) { + _client->close(); // this will trigger _onDisconnect() and object destruction } + } else { + // this will close responses that were complete via a single _send() call + _client->close(); // this will trigger _onDisconnect() and object destruction } } @@ -723,7 +718,7 @@ void AsyncWebServerRequest::_send() { send(500, T_text_plain, "Invalid data in handler"); } - // here, we either have a response give nfrom user or one of the two above + // here, we either have a response given from user or one of the two above _client->setRxTimeout(0); _response->_respond(this); _sent = true; @@ -1172,3 +1167,9 @@ bool AsyncWebServerRequest::isExpectedRequestedConnType(RequestedConnectionType return ((erct1 != RCT_NOT_USED) && (erct1 == _reqconntype)) || ((erct2 != RCT_NOT_USED) && (erct2 == _reqconntype)) || ((erct3 != RCT_NOT_USED) && (erct3 == _reqconntype)); } + +AsyncClient *AsyncWebServerRequest::clientRelease() { + AsyncClient *c = _client; + _client = nullptr; + return c; +} diff --git a/src/WebResponseImpl.h b/src/WebResponseImpl.h index 2b9318a4..9be9bc79 100644 --- a/src/WebResponseImpl.h +++ b/src/WebResponseImpl.h @@ -14,21 +14,46 @@ #include #include +#ifndef CONFIG_LWIP_TCP_MSS +// as it is defined for ESP32's Arduino LWIP +#define CONFIG_LWIP_TCP_MSS 1436 +#endif + +#define ASYNC_RESPONCE_BUFF_SIZE CONFIG_LWIP_TCP_MSS * 2 // It is possible to restore these defines, but one can use _min and _max instead. Or std::min, std::max. class AsyncBasicResponse : public AsyncWebServerResponse { private: String _content; + // buffer to accumulate all response headers + String _assembled_headers; + // amount of headers buffer writtent to sockbuff + size_t _writtenHeadersLength{0}; public: explicit AsyncBasicResponse(int code, const char *contentType = asyncsrv::empty, const char *content = asyncsrv::empty); AsyncBasicResponse(int code, const String &contentType, const String &content = emptyString) : AsyncBasicResponse(code, contentType.c_str(), content.c_str()) {} void _respond(AsyncWebServerRequest *request) override final; - size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time) override final; + size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time) override final { + return write_send_buffs(request, len, time); + }; bool _sourceValid() const override final { return true; } + +protected: + /** + * @brief write next portion of response data to send buffs + * this method (re)fills tcp send buffers, it could be called either at will + * or from a tcp_recv/tcp_poll callbacks from AsyncTCP + * + * @param request - used to access client object + * @param len - size of acknowledged data from the remote side (TCP window update, not TCP ack!) + * @param time - time passed between last sent and received packet + * @return size_t amount of response data placed to TCP send buffs for delivery (defined by sdkconfig value CONFIG_LWIP_TCP_SND_BUF_DEFAULT) + */ + size_t write_send_buffs(AsyncWebServerRequest *request, size_t len, uint32_t time); }; class AsyncAbstractResponse : public AsyncWebServerResponse { @@ -39,23 +64,43 @@ class AsyncAbstractResponse : public AsyncWebServerResponse { // in-flight queue credits size_t _in_flight_credit{2}; #endif - String _head; + // buffer to accumulate all response headers + String _assembled_headers; + // amount of headers buffer writtent to sockbuff + size_t _writtenHeadersLength{0}; // Data is inserted into cache at begin(). // This is inefficient with vector, but if we use some other container, // we won't be able to access it as contiguous array of bytes when reading from it, // so by gaining performance in one place, we'll lose it in another. std::vector _cache; + // intermediate buffer to copy outbound data to, also it will keep pending data between _send calls + std::unique_ptr > _send_buffer; + // buffer data size specifiers + size_t _send_buffer_offset{0}, _send_buffer_len{0}; size_t _readDataFromCacheOrContent(uint8_t *data, const size_t len); size_t _fillBufferAndProcessTemplates(uint8_t *buf, size_t maxLen); protected: AwsTemplateProcessor _callback; + /** + * @brief write next portion of response data to send buffs + * this method (re)fills tcp send buffers, it could be called either at will + * or from a tcp_recv/tcp_poll callbacks from AsyncTCP + * + * @param request - used to access client object + * @param len - size of acknowledged data from the remote side (TCP window update, not TCP ack!) + * @param time - time passed between last sent and received packet + * @return size_t amount of response data placed to TCP send buffs for delivery (defined by sdkconfig value CONFIG_LWIP_TCP_SND_BUF_DEFAULT) + */ + size_t write_send_buffs(AsyncWebServerRequest *request, size_t len, uint32_t time); public: AsyncAbstractResponse(AwsTemplateProcessor callback = nullptr); virtual ~AsyncAbstractResponse() {} void _respond(AsyncWebServerRequest *request) override final; - size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time) override final; + size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time) override final { + return write_send_buffs(request, len, time); + }; virtual bool _sourceValid() const { return false; } @@ -142,7 +187,8 @@ class AsyncChunkedResponse : public AsyncAbstractResponse { class AsyncProgmemResponse : public AsyncAbstractResponse { private: const uint8_t *_content; - size_t _readLength; + // offset index (how much we've sent already) + size_t _index; public: AsyncProgmemResponse(int code, const char *contentType, const uint8_t *content, size_t len, AwsTemplateProcessor callback = nullptr); diff --git a/src/WebResponses.cpp b/src/WebResponses.cpp index 0ee9422c..22723e41 100644 --- a/src/WebResponses.cpp +++ b/src/WebResponses.cpp @@ -5,6 +5,11 @@ #include "WebResponseImpl.h" #include "AsyncWebServerLogging.h" +#ifndef CONFIG_LWIP_TCP_WND_DEFAULT +// as it is defined for esp32's LWIP +#define CONFIG_LWIP_TCP_WND_DEFAULT 5760 +#endif + using namespace asyncsrv; /* @@ -238,13 +243,6 @@ bool AsyncWebServerResponse::_sourceValid() const { } void AsyncWebServerResponse::_respond(AsyncWebServerRequest *request) { _state = RESPONSE_END; - request->client()->close(); -} -size_t AsyncWebServerResponse::_ack(AsyncWebServerRequest *request, size_t len, uint32_t time) { - (void)request; - (void)len; - (void)time; - return 0; } /* @@ -265,70 +263,61 @@ AsyncBasicResponse::AsyncBasicResponse(int code, const char *contentType, const void AsyncBasicResponse::_respond(AsyncWebServerRequest *request) { _state = RESPONSE_HEADERS; - String out; - _assembleHead(out, request->version()); - size_t outLen = out.length(); - size_t space = request->client()->space(); - if (!_contentLength && space >= outLen) { - _writtenLength += request->client()->write(out.c_str(), outLen); - _state = RESPONSE_WAIT_ACK; - } else if (_contentLength && space >= outLen + _contentLength) { - out += _content; - outLen += _contentLength; - _writtenLength += request->client()->write(out.c_str(), outLen); - _state = RESPONSE_WAIT_ACK; - } else if (space && space < outLen) { - String partial = out.substring(0, space); - _content = out.substring(space) + _content; - _contentLength += outLen - space; - _writtenLength += request->client()->write(partial.c_str(), partial.length()); - _state = RESPONSE_CONTENT; - } else if (space > outLen && space < (outLen + _contentLength)) { - size_t shift = space - outLen; - outLen += shift; - _sentLength += shift; - out += _content.substring(0, shift); - _content = _content.substring(shift); - _writtenLength += request->client()->write(out.c_str(), outLen); - _state = RESPONSE_CONTENT; - } else { - _content = out + _content; - _contentLength += outLen; - _state = RESPONSE_CONTENT; - } + _assembleHead(_assembled_headers, request->version()); + write_send_buffs(request, 0, 0); } -size_t AsyncBasicResponse::_ack(AsyncWebServerRequest *request, size_t len, uint32_t time) { +size_t AsyncBasicResponse::write_send_buffs(AsyncWebServerRequest *request, size_t len, uint32_t time) { (void)time; + + // this is not functionally needed in AsyncBasicResponse itself, but kept for compatibility if some of the derived classes are rely on it somehow _ackedLength += len; - if (_state == RESPONSE_CONTENT) { - size_t available = _contentLength - _sentLength; - size_t space = request->client()->space(); - // we can fit in this packet - if (space > available) { - _writtenLength += request->client()->write(_content.c_str(), available); - _content = emptyString; - _state = RESPONSE_WAIT_ACK; - return available; + size_t payloadlen{0}; // amount of data to be written to tcp sockbuff during this call, used as return value of this method + + // send http headers first + if (_state == RESPONSE_HEADERS) { + // copy headers buffer to sock buffer + size_t const pcb_written = request->client()->add(_assembled_headers.c_str() + _writtenHeadersLength, _assembled_headers.length() - _writtenHeadersLength); + _writtenLength += pcb_written; + _writtenHeadersLength += pcb_written; + if (_writtenHeadersLength < _assembled_headers.length()) { + // we were not able to fit all headers in current buff, send this part here and return later for the rest + if (!request->client()->send()) { + // something is wrong, what should we do here? + request->client()->close(); + return 0; + } + return pcb_written; } - // send some data, the rest on ack - String out = _content.substring(0, space); - _content = _content.substring(space); - _sentLength += space; - _writtenLength += request->client()->write(out.c_str(), space); - return space; - } else if (_state == RESPONSE_WAIT_ACK) { - if (_ackedLength >= _writtenLength) { + // otherwise we've added all the (remainder) headers in current buff, go on with content + _state = RESPONSE_CONTENT; + payloadlen += pcb_written; + _assembled_headers = String(); // clear + } + + if (_state == RESPONSE_CONTENT) { + size_t const pcb_written = request->client()->write(_content.c_str() + _sentLength, _content.length() - _sentLength); + _writtenLength += pcb_written; // total written data (hdrs + body) + _sentLength += pcb_written; // body written data + payloadlen += pcb_written; // data writtent in current buff + if (_sentLength >= _content.length()) { + // we've just sent all the (remainder) data in current buff, complete the response _state = RESPONSE_END; } } - return 0; + + // implicit complete + if (_state == RESPONSE_WAIT_ACK) { + _state = RESPONSE_END; + } + + return payloadlen; } /* * Abstract Response - * */ - + * + */ AsyncAbstractResponse::AsyncAbstractResponse(AwsTemplateProcessor callback) : _callback(callback) { // In case of template processing, we're unable to determine real response size if (callback) { @@ -340,12 +329,12 @@ AsyncAbstractResponse::AsyncAbstractResponse(AwsTemplateProcessor callback) : _c void AsyncAbstractResponse::_respond(AsyncWebServerRequest *request) { addHeader(T_Connection, T_close, false); - _assembleHead(_head, request->version()); + _assembleHead(_assembled_headers, request->version()); _state = RESPONSE_HEADERS; - _ack(request, 0, 0); + write_send_buffs(request, 0, 0); } -size_t AsyncAbstractResponse::_ack(AsyncWebServerRequest *request, size_t len, uint32_t time) { +size_t AsyncAbstractResponse::write_send_buffs(AsyncWebServerRequest *request, size_t len, uint32_t time) { (void)time; if (!_sourceValid()) { _state = RESPONSE_FAILED; @@ -354,137 +343,159 @@ size_t AsyncAbstractResponse::_ack(AsyncWebServerRequest *request, size_t len, u } #if ASYNCWEBSERVER_USE_CHUNK_INFLIGHT + /* + for response payloads with unknown length or length larger than TCP_WND we need to control AsyncTCP's queue and in-flight fragmentation. + Either user callback could fill buffer with very small chunks or long running large response could receive a lot of poll() calls here, + both could flood asynctcp's queue with large number of events to handle and fragment socket buffer space for large responses. + Let's ignore polled acks and acks in case when available window size is less than our used buffer size since we won't be able to fill and send it whole + That way we could balance on having at least half tcp win in-flight while minimizing send/ack events in asynctcp Q + This could decrease sustained bandwidth for one single connection but would drastically improve parallelism and equalize bandwidth sharing + */ // return a credit for each chunk of acked data (polls does not give any credits) if (len) { ++_in_flight_credit; + _in_flight -= std::min(len, _in_flight); } - // for chunked responses ignore acks if there are no _in_flight_credits left - if (_chunked && !_in_flight_credit) { - async_ws_log_d("(chunk) out of in-flight credits"); + if (_chunked || !_sendContentLength || (_sentLength > CONFIG_LWIP_TCP_WND_DEFAULT)) { + if (!_in_flight_credit || (ASYNC_RESPONCE_BUFF_SIZE > request->client()->space())) { + // async_ws_log_d("defer user call in_flight:%u, tcpwin:%u", _in_flight, request->client()->space()); + // take the credit back since we are ignoring this ack and rely on other inflight data acks + if (len) { + --_in_flight_credit; + } + return 0; + } } - - _in_flight -= (_in_flight > len) ? len : _in_flight; - // get the size of available sock space #endif + // this is not functionally needed in AsyncAbstractResponse itself, but kept for compatibility if some of the derived classes are rely on it somehow _ackedLength += len; - size_t space = request->client()->space(); - size_t headLen = _head.length(); + size_t payloadlen{0}; // amount of data to be written to tcp sockbuff during this call, used as return value of this method + + // send http headers first if (_state == RESPONSE_HEADERS) { - if (space >= headLen) { - _state = RESPONSE_CONTENT; - space -= headLen; - } else { - String out = _head.substring(0, space); - _head = _head.substring(space); - _writtenLength += request->client()->write(out.c_str(), out.length()); + // copy headers buffer to sock buffer + size_t const pcb_written = request->client()->add(_assembled_headers.c_str() + _writtenHeadersLength, _assembled_headers.length() - _writtenHeadersLength); + _writtenLength += pcb_written; + _writtenHeadersLength += pcb_written; + if (_writtenHeadersLength < _assembled_headers.length()) { +// we were not able to fit all headers in current buff, send this part here and return later for the rest #if ASYNCWEBSERVER_USE_CHUNK_INFLIGHT - _in_flight += out.length(); + _in_flight += pcb_written; --_in_flight_credit; // take a credit #endif - return out.length(); + if (!request->client()->send()) { + // something is wrong, what should we do here? + request->client()->close(); + return 0; + } + return pcb_written; } + // otherwise we've added all the (remainder) headers in current buff + _state = RESPONSE_CONTENT; + payloadlen += pcb_written; + _assembled_headers = String(); // clear } + // send content body if (_state == RESPONSE_CONTENT) { -#if ASYNCWEBSERVER_USE_CHUNK_INFLIGHT - // for response data we need to control the queue and in-flight fragmentation. Sending small chunks could give low latency, - // but flood asynctcp's queue and fragment socket buffer space for large responses. - // Let's ignore polled acks and acks in case when we have more in-flight data then the available socket buff space. - // That way we could balance on having half the buffer in-flight while another half is filling up, while minimizing events in asynctcp q - if (_in_flight > space) { - // async_ws_log_d("defer user call %u/%u", _in_flight, space); - // take the credit back since we are ignoring this ack and rely on other inflight data - if (len) { - --_in_flight_credit; + do { + if (_send_buffer_len && _send_buffer) { + // data is pending in buffer from a previous call or previous iteration + size_t const added_len = + request->client()->add(reinterpret_cast(_send_buffer->data() + _send_buffer_offset), _send_buffer_len - _send_buffer_offset); + if (added_len != _send_buffer_len - _send_buffer_offset) { + // we were not able to add entire buffer's content to tcp buffs, leave it for later + // (this should not happen normally unless connection's TCP window suddenly changed from remote or mem pressure) + _send_buffer_offset += added_len; + break; + } else { + _send_buffer_len = _send_buffer_offset = 0; // consider buffer empty + } + payloadlen += added_len; } - return 0; - } -#endif - size_t outLen; - if (_chunked) { - if (space <= 8) { - return 0; + auto tcp_win = request->client()->space(); + if (tcp_win == 0 || _state == RESPONSE_END) { + break; // no room left or no more data } - outLen = space; - } else if (!_sendContentLength) { - outLen = space; - } else { - outLen = ((_contentLength - _sentLength) > space) ? space : (_contentLength - _sentLength); - } - - uint8_t *buf = (uint8_t *)malloc(outLen + headLen); - if (!buf) { - async_ws_log_e("Failed to allocate"); - request->abort(); - return 0; - } - - if (headLen) { - memcpy(buf, _head.c_str(), _head.length()); - } - - size_t readLen = 0; - - if (_chunked) { - // HTTP 1.1 allows leading zeros in chunk length. Or spaces may be added. - // See RFC2616 sections 2, 3.6.1. - readLen = _fillBufferAndProcessTemplates(buf + headLen + 6, outLen - 8); - if (readLen == RESPONSE_TRY_AGAIN) { - free(buf); - return 0; + if ((_chunked || !_sendContentLength) && (tcp_win < CONFIG_LWIP_TCP_MSS / 2)) { + // available window size is not enough to send a new chunk sized half of tcp mss, let's wait for better chance and reduce pressure to AsyncTCP's event Q + break; } - outLen = sprintf((char *)buf + headLen, "%04x", readLen) + headLen; - buf[outLen++] = '\r'; - buf[outLen++] = '\n'; - outLen += readLen; - buf[outLen++] = '\r'; - buf[outLen++] = '\n'; - } else { - readLen = _fillBufferAndProcessTemplates(buf + headLen, outLen); - if (readLen == RESPONSE_TRY_AGAIN) { - free(buf); - return 0; + + if (!_send_buffer) { + auto p = new (std::nothrow) std::array; + if (p) { + _send_buffer.reset(p); + _send_buffer_len = _send_buffer_offset = 0; + } else { + break; // OOM + } } - outLen = readLen + headLen; - } - if (headLen) { - _head = emptyString; - } + if (_chunked) { + // HTTP 1.1 allows leading zeros in chunk length. Or spaces may be added. + // See https://datatracker.ietf.org/doc/html/rfc9112#section-7.1 + size_t const readLen = + _fillBufferAndProcessTemplates(_send_buffer->data() + 6, std::min(_send_buffer->size(), tcp_win) - 8); // reserve 8 bytes for chunk size data + if (readLen != RESPONSE_TRY_AGAIN) { + // Write 4 hex digits directly without null terminator + static constexpr char hexChars[] = "0123456789abcdef"; + _send_buffer->data()[0] = hexChars[(readLen >> 12) & 0xF]; + _send_buffer->data()[1] = hexChars[(readLen >> 8) & 0xF]; + _send_buffer->data()[2] = hexChars[(readLen >> 4) & 0xF]; + _send_buffer->data()[3] = hexChars[readLen & 0xF]; + _send_buffer->data()[4] = '\r'; + _send_buffer->data()[5] = '\n'; + // data (readLen bytes) is already there + _send_buffer->at(readLen + 6) = '\r'; + _send_buffer->at(readLen + 7) = '\n'; + _send_buffer_len += readLen + 8; // set buffers's size to match added data + _sentLength += readLen; // data is not sent yet, but we won't get a chance to count this later properly for chunked data + if (!readLen) { + // last chunk? + _state = RESPONSE_END; + } + } + } else { + size_t const readLen = _fillBufferAndProcessTemplates(_send_buffer->data(), std::min(_send_buffer->size(), tcp_win)); + if (readLen == 0) { + // no more data to send + _state = RESPONSE_END; + } else if (readLen != RESPONSE_TRY_AGAIN) { + _send_buffer_len += readLen; // set buffers's size to match added data + _sentLength += readLen; // data is not sent yet, but we need it to understand that it would be last block + if (_sendContentLength && (_sentLength == _contentLength)) { + // it was last piece of content + _state = RESPONSE_END; + } + } + } + } while (_send_buffer_len); // go on till we have something in buffer pending to send - if (outLen) { - _writtenLength += request->client()->write((const char *)buf, outLen); + // execute sending whatever we have in sock buffs now + request->client()->send(); + _writtenLength += payloadlen; #if ASYNCWEBSERVER_USE_CHUNK_INFLIGHT - _in_flight += outLen; - --_in_flight_credit; // take a credit + _in_flight += payloadlen; + --_in_flight_credit; // take a credit #endif + if (_send_buffer_len == 0) { + // buffer empty, we can release mem, otherwise need to keep it till next run (should not happen under normal conditions) + _send_buffer.reset(); } + return payloadlen; + } // (_state == RESPONSE_CONTENT) - if (_chunked) { - _sentLength += readLen; - } else { - _sentLength += outLen - headLen; - } - - free(buf); - - if ((_chunked && readLen == 0) || (!_sendContentLength && outLen == 0) || (!_chunked && _sentLength == _contentLength)) { - _state = RESPONSE_WAIT_ACK; - } - return outLen; - - } else if (_state == RESPONSE_WAIT_ACK) { - if (!_sendContentLength || _ackedLength >= _writtenLength) { - _state = RESPONSE_END; - if (!_chunked && !_sendContentLength) { - request->client()->close(); - } - } + // implicit check + if (_state == RESPONSE_WAIT_ACK) { + // we do not need to wait for any acks actually if we won't send any more data, + // connection would be closed gracefully with last piece of data (in AsyncWebServerRequest::_onAck) + _state = RESPONSE_END; } return 0; } @@ -512,8 +523,8 @@ size_t AsyncAbstractResponse::_fillBufferAndProcessTemplates(uint8_t *data, size // Now we've read 'len' bytes, either from cache or from file // Search for template placeholders uint8_t *pTemplateStart = data; - while ((pTemplateStart < &data[len]) && (pTemplateStart = (uint8_t *)memchr(pTemplateStart, TEMPLATE_PLACEHOLDER, &data[len - 1] - pTemplateStart + 1)) - ) { // data[0] ... data[len - 1] + while ((pTemplateStart < &data[len]) && (pTemplateStart = (uint8_t *)memchr(pTemplateStart, TEMPLATE_PLACEHOLDER, &data[len - 1] - pTemplateStart + 1))) { + // data[0] ... data[len - 1] uint8_t *pTemplateEnd = (pTemplateStart < &data[len - 1]) ? (uint8_t *)memchr(pTemplateStart + 1, TEMPLATE_PLACEHOLDER, &data[len - 1] - pTemplateStart) : nullptr; // temporary buffer to hold parameter name @@ -862,24 +873,17 @@ size_t AsyncChunkedResponse::_fillBuffer(uint8_t *data, size_t len) { * */ AsyncProgmemResponse::AsyncProgmemResponse(int code, const char *contentType, const uint8_t *content, size_t len, AwsTemplateProcessor callback) - : AsyncAbstractResponse(callback) { + : AsyncAbstractResponse(callback), _content(content), _index(0) { _code = code; - _content = content; _contentType = contentType; _contentLength = len; - _readLength = 0; } size_t AsyncProgmemResponse::_fillBuffer(uint8_t *data, size_t len) { - size_t left = _contentLength - _readLength; - if (left > len) { - memcpy_P(data, _content + _readLength, len); - _readLength += len; - return len; - } - memcpy_P(data, _content + _readLength, left); - _readLength += left; - return left; + size_t read_size = std::min(len, _contentLength - _index); + memcpy_P(data, _content + _index, read_size); + _index += read_size; + return read_size; } /*