Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ def __init__(self) -> None:
async def __aenter__(self) -> Self:
self._active = True
await self._state.initialize()
self._after_initialize()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what this was for these dummy statistics. Could you please double-check @janbuchar ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was probably so that record_* methods wouldn't randomly fail during context pipeline execution because of incorrectly initialized state. I assume it's not necessary anymore?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't seen any failure in tests, and those methods execute. But the same can be seen on master when deleting this line, so I guess it was made redundant by some other change?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably 🤞

return self

async def __aexit__(
Expand Down
55 changes: 28 additions & 27 deletions src/crawlee/statistics/_statistics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Inspiration: https://github.com/apify/crawlee/blob/v3.9.2/packages/core/src/crawlers/statistics.ts
from __future__ import annotations

import asyncio
import math
import time
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -84,8 +85,6 @@ def __init__(
self._id = Statistics.__next_id
Statistics.__next_id += 1

self._instance_start: datetime | None = None

self.error_tracker = ErrorTracker(
save_error_snapshots=save_error_snapshots,
snapshot_kvs_name=persist_state_kvs_name,
Expand All @@ -111,6 +110,9 @@ def __init__(
# Flag to indicate the context state.
self._active = False

# Pre-existing runtime offset, that can be non-zero when restoring serialized state from KVS.
self._runtime_offset = timedelta(seconds=0)

def replace_state_model(self, state_model: type[TNewStatisticsState]) -> Statistics[TNewStatisticsState]:
"""Create near copy of the `Statistics` with replaced `state_model`."""
new_statistics: Statistics[TNewStatisticsState] = Statistics(
Expand Down Expand Up @@ -165,14 +167,17 @@ async def __aenter__(self) -> Self:
if self._active:
raise RuntimeError(f'The {self.__class__.__name__} is already active.')

self._active = True
self._instance_start = datetime.now(timezone.utc)

await self._state.initialize()
self._after_initialize()

self._runtime_offset = self.state.crawler_runtime

# Start periodic logging and let it print initial state before activation.
self._periodic_logger.start()
await asyncio.sleep(0.01)
self._active = True

self.state.crawler_last_started_at = datetime.now(timezone.utc)
self.state.crawler_started_at = self.state.crawler_started_at or self.state.crawler_last_started_at
return self

async def __aexit__(
Expand All @@ -191,14 +196,16 @@ async def __aexit__(

if not self.state.crawler_last_started_at:
raise RuntimeError('Statistics.state.crawler_last_started_at not set.')
self.state.crawler_finished_at = datetime.now(timezone.utc)
self.state.crawler_runtime += self.state.crawler_finished_at - self.state.crawler_last_started_at

await self._state.teardown()

# Stop logging and deactivate the statistics to prevent further changes to crawler_runtime
await self._periodic_logger.stop()
self.state.crawler_finished_at = datetime.now(timezone.utc)
self.state.crawler_runtime = (
self._runtime_offset + self.state.crawler_finished_at - self.state.crawler_last_started_at
)

self._active = False
await self._state.teardown()

@property
def state(self) -> TStatisticsState:
Expand Down Expand Up @@ -255,10 +262,19 @@ def record_request_processing_failure(self, request_id_or_key: str) -> None:

del self._requests_in_progress[request_id_or_key]

def _update_crawler_runtime(self) -> None:
current_run_duration = (
(datetime.now(timezone.utc) - self.state.crawler_last_started_at)
if self.state.crawler_last_started_at
else timedelta()
)
self.state.crawler_runtime = current_run_duration + self._runtime_offset

def calculate(self) -> FinalStatistics:
"""Calculate the current statistics."""
if self._instance_start is None:
raise RuntimeError('The Statistics object is not initialized')
if self._active:
# Only update state when active. If not, just report the last known runtime.
self._update_crawler_runtime()

total_minutes = self.state.crawler_runtime.total_seconds() / 60
state = self._state.current_value
Expand Down Expand Up @@ -291,21 +307,6 @@ def _log(self) -> None:
else:
self._periodic_message_logger.info(self._log_message, extra=stats.to_dict())

def _after_initialize(self) -> None:
state = self._state.current_value

if state.crawler_started_at is None:
state.crawler_started_at = datetime.now(timezone.utc)

if state.stats_persisted_at is not None and state.crawler_last_started_at:
self._instance_start = datetime.now(timezone.utc) - (
state.stats_persisted_at - state.crawler_last_started_at
)
elif state.crawler_last_started_at:
self._instance_start = state.crawler_last_started_at

state.crawler_last_started_at = self._instance_start

def _save_retry_count_for_request(self, record: RequestProcessingRecord) -> None:
retry_count = record.retry_count
state = self._state.current_value
Expand Down
26 changes: 26 additions & 0 deletions tests/unit/crawlers/_basic/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1701,3 +1701,29 @@ async def test_crawler_statistics_persistence(tmp_path: Path) -> None:

assert first_run_state.crawler_finished_at < second_run_state.crawler_finished_at
assert first_run_state.crawler_runtime < second_run_state.crawler_runtime


async def test_crawler_intermediate_statistics() -> None:
"""Test that crawler statistics are correctly updating total runtime on every calculate call."""
crawler = BasicCrawler()
check_time = timedelta(seconds=0.1)

async def wait_for_statistics_initialization() -> None:
while not crawler.statistics.active: # noqa: ASYNC110 # It is ok for tests.
await asyncio.sleep(0.1)

@crawler.router.default_handler
async def handler(_: BasicCrawlingContext) -> None:
await asyncio.sleep(check_time.total_seconds() * 5)

# Start crawler and wait until statistics are initialized.
crawler_task = asyncio.create_task(crawler.run(['https://a.placeholder.com']))
await wait_for_statistics_initialization()

# Wait some time and check that runtime is updated.
await asyncio.sleep(check_time.total_seconds())
crawler.statistics.calculate()
assert crawler.statistics.state.crawler_runtime >= check_time

# Wait for crawler to finish
await crawler_task