From c0bbdb4a1e104101c1a1213d9b323bf8beec4f42 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 7 Nov 2025 11:16:35 +0100 Subject: [PATCH 1/4] Fix runtime not being updated during run and only in the end --- .../_adaptive_playwright_crawler.py | 1 - src/crawlee/statistics/_statistics.py | 44 +++++++++---------- .../crawlers/_basic/test_basic_crawler.py | 26 +++++++++++ 3 files changed, 46 insertions(+), 25 deletions(-) diff --git a/src/crawlee/crawlers/_adaptive_playwright/_adaptive_playwright_crawler.py b/src/crawlee/crawlers/_adaptive_playwright/_adaptive_playwright_crawler.py index a3139d3da1..c51180e1fc 100644 --- a/src/crawlee/crawlers/_adaptive_playwright/_adaptive_playwright_crawler.py +++ b/src/crawlee/crawlers/_adaptive_playwright/_adaptive_playwright_crawler.py @@ -71,7 +71,6 @@ def __init__(self) -> None: async def __aenter__(self) -> Self: self._active = True await self._state.initialize() - self._after_initialize() return self async def __aexit__( diff --git a/src/crawlee/statistics/_statistics.py b/src/crawlee/statistics/_statistics.py index 68b4ff6551..21168cbf11 100644 --- a/src/crawlee/statistics/_statistics.py +++ b/src/crawlee/statistics/_statistics.py @@ -1,6 +1,7 @@ # Inspiration: https://github.com/apify/crawlee/blob/v3.9.2/packages/core/src/crawlers/statistics.ts from __future__ import annotations +import asyncio import math import time from datetime import datetime, timedelta, timezone @@ -84,8 +85,6 @@ def __init__( self._id = Statistics.__next_id Statistics.__next_id += 1 - self._instance_start: datetime | None = None - self.error_tracker = ErrorTracker( save_error_snapshots=save_error_snapshots, snapshot_kvs_name=persist_state_kvs_name, @@ -111,6 +110,9 @@ def __init__( # Flag to indicate the context state. self._active = False + # Pre-existing runtime offset when importing existing statistics. + self._runtime_offset = timedelta(seconds=0) + def replace_state_model(self, state_model: type[TNewStatisticsState]) -> Statistics[TNewStatisticsState]: """Create near copy of the `Statistics` with replaced `state_model`.""" new_statistics: Statistics[TNewStatisticsState] = Statistics( @@ -133,7 +135,7 @@ def with_default_state( persist_state_kvs_factory: Callable[[], Coroutine[None, None, KeyValueStore]] | None = None, log_message: str = 'Statistics', periodic_message_logger: Logger | None = None, - log_interval: timedelta = timedelta(minutes=1), + log_interval: timedelta = timedelta(seconds=5), statistics_log_format: Literal['table', 'inline'] = 'table', save_error_snapshots: bool = False, ) -> Statistics[StatisticsState]: @@ -166,13 +168,17 @@ async def __aenter__(self) -> Self: raise RuntimeError(f'The {self.__class__.__name__} is already active.') self._active = True - self._instance_start = datetime.now(timezone.utc) await self._state.initialize() - self._after_initialize() + self._runtime_offset = self.state.crawler_runtime + + # Start periodic logging and let it print first message before setting the start time. self._periodic_logger.start() + await asyncio.sleep(0.01) + self.state.crawler_last_started_at = datetime.now(timezone.utc) + self.state.crawler_started_at = self.state.crawler_started_at or self.state.crawler_last_started_at return self async def __aexit__( @@ -192,7 +198,9 @@ async def __aexit__( if not self.state.crawler_last_started_at: raise RuntimeError('Statistics.state.crawler_last_started_at not set.') self.state.crawler_finished_at = datetime.now(timezone.utc) - self.state.crawler_runtime += self.state.crawler_finished_at - self.state.crawler_last_started_at + self.state.crawler_runtime = ( + self._runtime_offset + self.state.crawler_finished_at - self.state.crawler_last_started_at + ) await self._state.teardown() @@ -257,9 +265,12 @@ def record_request_processing_failure(self, request_id_or_key: str) -> None: def calculate(self) -> FinalStatistics: """Calculate the current statistics.""" - if self._instance_start is None: - raise RuntimeError('The Statistics object is not initialized') - + current_run_duration = ( + (datetime.now(timezone.utc) - self.state.crawler_last_started_at) + if self.state.crawler_last_started_at + else timedelta() + ) + self.state.crawler_runtime = current_run_duration + self._runtime_offset total_minutes = self.state.crawler_runtime.total_seconds() / 60 state = self._state.current_value serialized_state = state.model_dump(by_alias=False) @@ -291,21 +302,6 @@ def _log(self) -> None: else: self._periodic_message_logger.info(self._log_message, extra=stats.to_dict()) - def _after_initialize(self) -> None: - state = self._state.current_value - - if state.crawler_started_at is None: - state.crawler_started_at = datetime.now(timezone.utc) - - if state.stats_persisted_at is not None and state.crawler_last_started_at: - self._instance_start = datetime.now(timezone.utc) - ( - state.stats_persisted_at - state.crawler_last_started_at - ) - elif state.crawler_last_started_at: - self._instance_start = state.crawler_last_started_at - - state.crawler_last_started_at = self._instance_start - def _save_retry_count_for_request(self, record: RequestProcessingRecord) -> None: retry_count = record.retry_count state = self._state.current_value diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index b2b75e50f7..f8f0d0fe8e 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -1701,3 +1701,29 @@ async def test_crawler_statistics_persistence(tmp_path: Path) -> None: assert first_run_state.crawler_finished_at < second_run_state.crawler_finished_at assert first_run_state.crawler_runtime < second_run_state.crawler_runtime + + +async def test_crawler_intermediate_statistics(tmp_path: Path) -> None: + """Test that crawler statistics are correctly updating total runtime on every calculate call.""" + crawler = BasicCrawler() + check_time = timedelta(seconds=0.1) + + async def wait_for_statistics_initialization() -> None: + while not crawler.statistics.active: # noqa: ASYNC110 # It is ok for tests. + await asyncio.sleep(0.1) + + @crawler.router.default_handler + async def handler(_: BasicCrawlingContext) -> None: + await asyncio.sleep(check_time.total_seconds() * 5) + + # Start crawler and wait until statistics are initialized. + crawler_task = asyncio.create_task(crawler.run(['https://a.placeholder.com'])) + await wait_for_statistics_initialization() + + # Wait some time and check that runtime is updated. + await asyncio.sleep(check_time.total_seconds()) + crawler.statistics.calculate() + assert crawler.statistics.state.crawler_runtime >= check_time + + # Wait for crawler to finish + await crawler_task From ee62ae6fec9ff62f9cf8749fb5bbcadd3e250c0b Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 7 Nov 2025 14:19:49 +0100 Subject: [PATCH 2/4] Ensure logging consistency with exported state --- src/crawlee/statistics/_statistics.py | 23 +++++++++++-------- .../crawlers/_basic/test_basic_crawler.py | 2 +- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/crawlee/statistics/_statistics.py b/src/crawlee/statistics/_statistics.py index 21168cbf11..b2b82ad74c 100644 --- a/src/crawlee/statistics/_statistics.py +++ b/src/crawlee/statistics/_statistics.py @@ -167,15 +167,14 @@ async def __aenter__(self) -> Self: if self._active: raise RuntimeError(f'The {self.__class__.__name__} is already active.') - self._active = True - await self._state.initialize() self._runtime_offset = self.state.crawler_runtime - # Start periodic logging and let it print first message before setting the start time. + # Start periodic logging and let it print initial state before activation. self._periodic_logger.start() await asyncio.sleep(0.01) + self._active = True self.state.crawler_last_started_at = datetime.now(timezone.utc) self.state.crawler_started_at = self.state.crawler_started_at or self.state.crawler_last_started_at @@ -197,16 +196,16 @@ async def __aexit__( if not self.state.crawler_last_started_at: raise RuntimeError('Statistics.state.crawler_last_started_at not set.') + + # Stop logging and deactivate the statistics to prevent further changes to crawler_runtime + await self._periodic_logger.stop() self.state.crawler_finished_at = datetime.now(timezone.utc) self.state.crawler_runtime = ( self._runtime_offset + self.state.crawler_finished_at - self.state.crawler_last_started_at ) - await self._state.teardown() - - await self._periodic_logger.stop() - self._active = False + await self._state.teardown() @property def state(self) -> TStatisticsState: @@ -263,14 +262,20 @@ def record_request_processing_failure(self, request_id_or_key: str) -> None: del self._requests_in_progress[request_id_or_key] - def calculate(self) -> FinalStatistics: - """Calculate the current statistics.""" + def _update_crawler_runtime(self) -> None: current_run_duration = ( (datetime.now(timezone.utc) - self.state.crawler_last_started_at) if self.state.crawler_last_started_at else timedelta() ) self.state.crawler_runtime = current_run_duration + self._runtime_offset + + def calculate(self) -> FinalStatistics: + """Calculate the current statistics.""" + if self._active: + # Only update state when active. If not, just report the last known runtime. + self._update_crawler_runtime() + total_minutes = self.state.crawler_runtime.total_seconds() / 60 state = self._state.current_value serialized_state = state.model_dump(by_alias=False) diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index f8f0d0fe8e..8173ec324b 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -1703,7 +1703,7 @@ async def test_crawler_statistics_persistence(tmp_path: Path) -> None: assert first_run_state.crawler_runtime < second_run_state.crawler_runtime -async def test_crawler_intermediate_statistics(tmp_path: Path) -> None: +async def test_crawler_intermediate_statistics() -> None: """Test that crawler statistics are correctly updating total runtime on every calculate call.""" crawler = BasicCrawler() check_time = timedelta(seconds=0.1) From 7c180bbc79832e549a70a8046b76755c3037b929 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josef=20Proch=C3=A1zka?= Date: Fri, 7 Nov 2025 14:31:18 +0100 Subject: [PATCH 3/4] Revert testing change --- src/crawlee/statistics/_statistics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crawlee/statistics/_statistics.py b/src/crawlee/statistics/_statistics.py index b2b82ad74c..520b0785bd 100644 --- a/src/crawlee/statistics/_statistics.py +++ b/src/crawlee/statistics/_statistics.py @@ -135,7 +135,7 @@ def with_default_state( persist_state_kvs_factory: Callable[[], Coroutine[None, None, KeyValueStore]] | None = None, log_message: str = 'Statistics', periodic_message_logger: Logger | None = None, - log_interval: timedelta = timedelta(seconds=5), + log_interval: timedelta = timedelta(minutes=1), statistics_log_format: Literal['table', 'inline'] = 'table', save_error_snapshots: bool = False, ) -> Statistics[StatisticsState]: From c65fb3b51e927465bc4ddcaf47d3d4dba9f4d440 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Tue, 11 Nov 2025 09:45:22 +0100 Subject: [PATCH 4/4] Update comment --- src/crawlee/statistics/_statistics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crawlee/statistics/_statistics.py b/src/crawlee/statistics/_statistics.py index 520b0785bd..667b96eebe 100644 --- a/src/crawlee/statistics/_statistics.py +++ b/src/crawlee/statistics/_statistics.py @@ -110,7 +110,7 @@ def __init__( # Flag to indicate the context state. self._active = False - # Pre-existing runtime offset when importing existing statistics. + # Pre-existing runtime offset, that can be non-zero when restoring serialized state from KVS. self._runtime_offset = timedelta(seconds=0) def replace_state_model(self, state_model: type[TNewStatisticsState]) -> Statistics[TNewStatisticsState]: