diff --git a/.github/workflows/check-sdk-tests.yml b/.github/workflows/check-sdk-tests.yml index 474c84a..b9f2858 100644 --- a/.github/workflows/check-sdk-tests.yml +++ b/.github/workflows/check-sdk-tests.yml @@ -57,13 +57,13 @@ jobs: runs-on: ubuntu-latest if: always() outputs: - run-sdk-tests: ${{ steps.get-labels.outputs.run-sdk-tests }} + run-sdk-tests: ${{ steps.check-manual.outputs.run-sdk-tests || steps.get-labels-pr.outputs.run-sdk-tests }} steps: - name: Check out repository uses: actions/checkout@v4 - name: Skip label check for manual runs - id: get-labels + id: check-manual if: ${{ github.event_name == 'workflow_dispatch' }} run: | echo "Manual workflow dispatch detected, skipping PR label check." @@ -74,11 +74,13 @@ jobs: if: ${{ github.event_name == 'pull_request' }} run: | sleep 5 - LABELS=$(gh api repos/${{ github.repository }}/issues/${{ github.event.pull_request.number }}/labels --jq '.[].name') + LABELS=$(gh pr view ${{ github.event.pull_request.number }} --json labels --jq '.labels[].name') echo "Current labels: $LABELS" if echo "$LABELS" | grep -q "run-bittensor-sdk-tests"; then + echo "run-sdk-tests=true" >> $GITHUB_ENV echo "run-sdk-tests=true" >> $GITHUB_OUTPUT else + echo "run-sdk-tests=false" >> $GITHUB_ENV echo "run-sdk-tests=false" >> $GITHUB_OUTPUT fi env: @@ -171,48 +173,51 @@ jobs: - name: Check-out repository uses: actions/checkout@v4 - - name: Install dependencies + - name: Install system dependencies run: | sudo apt-get update && sudo apt-get install -y clang curl libssl-dev llvm libudev-dev protobuf-compiler - - name: Create Python virtual environment - working-directory: ${{ github.workspace }} - run: python3 -m venv ${{ github.workspace }}/venv + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.13' + + - name: Install uv + uses: astral-sh/setup-uv@v4 - name: Clone Bittensor SDK repo - working-directory: ${{ github.workspace }} run: git clone https://github.com/opentensor/bittensor.git - - name: Setup Bittensor SDK from cloned repo + - name: Checkout Bittensor branch working-directory: ${{ github.workspace }}/bittensor run: | - source ${{ github.workspace }}/venv/bin/activate if ! git fetch origin $BITTENSOR_BRANCH; then echo "❌ Error: Branch '$BITTENSOR_BRANCH' does not exist in opentensor/bittensor." exit 1 fi git checkout FETCH_HEAD echo "✅ Using Bittensor branch: $BITTENSOR_BRANCH" - python3 -m pip install --upgrade pip uv - uv pip install '.[dev]' - - name: Clone Bittensor async-substrate-interface repo + - name: Install Bittensor SDK dependencies + working-directory: ${{ github.workspace }}/bittensor + run: uv pip install --system '.[dev]' + + - name: Clone async-substrate-interface repo run: git clone https://github.com/opentensor/async-substrate-interface.git - - name: Checkout PR branch in async-substrate-interface repo + - name: Checkout PR branch in async-substrate-interface working-directory: ${{ github.workspace }}/async-substrate-interface run: | git fetch origin ${{ github.event.pull_request.head.ref }} git checkout ${{ github.event.pull_request.head.ref }} echo "Current branch: $(git rev-parse --abbrev-ref HEAD)" - - name: Install async-substrate-interface package + - name: Install async-substrate-interface with dev dependencies working-directory: ${{ github.workspace }}/async-substrate-interface run: | - source ${{ github.workspace }}/venv/bin/activate - python3 -m pip uninstall async-substrate-interface -y - uv pip install . + uv pip uninstall --system async-substrate-interface || true + uv pip install --system '.[dev]' - name: Download Cached Docker Image uses: actions/download-artifact@v4 @@ -222,10 +227,8 @@ jobs: - name: Load Docker Image run: docker load -i subtensor-localnet.tar - - name: Run tests - run: | - source ${{ github.workspace }}/venv/bin/activate - python3 -m pytest ${{ matrix.test-file }} -s + - name: Run e2e tests + run: pytest ${{ matrix.test-file }} -s run-integration-and-unit-test: @@ -237,52 +240,54 @@ jobs: - name: Check-out repository uses: actions/checkout@v4 - - name: Install dependencies + - name: Install system dependencies run: | sudo apt-get update && sudo apt-get install -y clang curl libssl-dev llvm libudev-dev protobuf-compiler - - name: Create Python virtual environment - working-directory: ${{ github.workspace }} - run: python3 -m venv venv + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.13' + + - name: Install uv + uses: astral-sh/setup-uv@v4 - name: Clone Bittensor SDK repo - working-directory: ${{ github.workspace }} run: git clone https://github.com/opentensor/bittensor.git - - name: Setup Bittensor SDK from cloned repo + - name: Checkout Bittensor branch working-directory: ${{ github.workspace }}/bittensor run: | - source ${{ github.workspace }}/venv/bin/activate if ! git fetch origin $BITTENSOR_BRANCH; then echo "❌ Error: Branch '$BITTENSOR_BRANCH' does not exist in opentensor/bittensor." exit 1 fi git checkout FETCH_HEAD echo "✅ Using Bittensor branch: $BITTENSOR_BRANCH" - python3 -m pip install --upgrade pip uv - uv pip install '.[dev]' - - name: Checkout PR branch in async-substrate-interface repo - uses: actions/checkout@v4 - with: - repository: ${{ github.event.pull_request.head.repo.full_name }} - ref: ${{ github.event.pull_request.head.ref }} - path: async-substrate-interface + - name: Install Bittensor SDK dependencies + working-directory: ${{ github.workspace }}/bittensor + run: uv pip install --system '.[dev]' - - name: Install /async-substrate-interface package + - name: Clone async-substrate-interface repo + run: git clone https://github.com/opentensor/async-substrate-interface.git + + - name: Checkout PR branch in async-substrate-interface working-directory: ${{ github.workspace }}/async-substrate-interface run: | - source ${{ github.workspace }}/venv/bin/activate - pip uninstall async-substrate-interface -y - uv pip install . + git fetch origin ${{ github.event.pull_request.head.ref }} + git checkout ${{ github.event.pull_request.head.ref }} + echo "Current branch: $(git rev-parse --abbrev-ref HEAD)" - - name: Run SDK integration tests + - name: Install async-substrate-interface with dev dependencies + working-directory: ${{ github.workspace }}/async-substrate-interface run: | - source ${{ github.workspace }}/venv/bin/activate - pytest ${{ github.workspace }}/bittensor/tests/integration_tests + uv pip uninstall --system async-substrate-interface || true + uv pip install --system '.[dev]' - - name: Run bittensor-sdk unit tests - run: | - source ${{ github.workspace }}/venv/bin/activate - pytest ${{ github.workspace }}/bittensor/tests/unit_tests \ No newline at end of file + - name: Run SDK integration tests + run: pytest ${{ github.workspace }}/bittensor/tests/integration_tests + + - name: Run Bittensor SDK unit tests + run: pytest ${{ github.workspace }}/bittensor/tests/unit_tests \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 07012ee..f8ba582 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,9 @@ # Changelog +## 1.5.11 /2025-11-14 +* Race Condition Bug fixes by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/234 + +**Full Changelog**: https://github.com/opentensor/async-substrate-interface/compare/v1.5.10...v1.5.11 + ## 1.5.10 /2025-11-12 * bug fixes 1.5.10 by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/231 * no double-sleep in async-substrate-interface websocket querying diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 81ed1df..076eb10 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -586,6 +586,7 @@ def __init__( self._max_retries = max_retries self._last_activity = asyncio.Event() self._last_activity.set() + self._waiting_for_response = 0 @property def state(self): @@ -599,6 +600,24 @@ async def __aenter__(self): await self.connect() return self + async def mark_waiting_for_response(self): + """ + Mark that a response is expected. This will cause the websocket to not automatically close. + + Note: you must mark as response received once you have received the response. + """ + async with self._lock: + self._waiting_for_response += 1 + + async def mark_response_received(self): + """ + Mark that the expected response has been received. Automatic shutdown of websocket will proceed normally. + + Note: only do this if you have previously marked as waiting for response + """ + async with self._lock: + self._waiting_for_response -= 1 + @staticmethod async def loop_time() -> float: return asyncio.get_running_loop().time() @@ -738,7 +757,10 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]: task_res = task.result() # If ConnectionClosedOK, graceful shutdown - don't reconnect - if isinstance(task_res, websockets.exceptions.ConnectionClosedOK): + if ( + isinstance(task_res, websockets.exceptions.ConnectionClosedOK) + and self._waiting_for_response <= 0 + ): logger.debug("Graceful shutdown detected, not reconnecting") return None # Clean exit @@ -793,7 +815,12 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]: async def __aexit__(self, exc_type, exc_val, exc_tb): if self.shutdown_timer is not None: - if self.state != State.CONNECTING: + if ( + self.state != State.CONNECTING + and self._sending.qsize() == 0 + and not self._received_subscriptions + and self._waiting_for_response <= 0 + ): if self._exit_task is not None: self._exit_task.cancel() try: @@ -812,6 +839,7 @@ async def _exit_with_timer(self): try: if self.shutdown_timer is not None: await asyncio.sleep(self.shutdown_timer) + logger.debug("Exiting with timer") await self.shutdown() except asyncio.CancelledError: pass @@ -2495,6 +2523,7 @@ async def _make_rpc_request( logger.debug( f"Submitted payload ID {payload['id']} with websocket ID {item_id}: {output_payload}" ) + await ws.mark_waiting_for_response() while True: for item_id in request_manager.unresponded(): @@ -2552,6 +2581,7 @@ async def _make_rpc_request( ) if request_manager.is_complete: + await ws.mark_response_received() break else: await asyncio.sleep(0.01) @@ -3948,6 +3978,7 @@ async def result_handler(message: dict, subscription_id) -> tuple[dict, bool]: } if "finalized" in message_result and wait_for_finalization: + logger.debug("Extrinsic finalized. Unsubscribing.") async with self.ws as ws: await ws.unsubscribe(subscription_id) return { @@ -3956,14 +3987,17 @@ async def result_handler(message: dict, subscription_id) -> tuple[dict, bool]: "finalized": True, }, True elif ( - "inblock" in message_result + any(x in message_result for x in ["inblock", "inBlock"]) and wait_for_inclusion and not wait_for_finalization ): + logger.debug("Extrinsic included. Unsubscribing.") async with self.ws as ws: await ws.unsubscribe(subscription_id) return { - "block_hash": message_result["inblock"], + "block_hash": message_result.get( + "inblock", message_result.get("inBlock") + ), "extrinsic_hash": "0x{}".format(extrinsic.extrinsic_hash.hex()), "finalized": False, }, True diff --git a/pyproject.toml b/pyproject.toml index 3a06346..bc7a3cb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "async-substrate-interface" -version = "1.5.10" +version = "1.5.11" description = "Asyncio library for interacting with substrate. Mostly API-compatible with py-substrate-interface" readme = "README.md" license = { file = "LICENSE" } diff --git a/tests/helpers/proxy_server.py b/tests/helpers/proxy_server.py index c561289..e3a4615 100644 --- a/tests/helpers/proxy_server.py +++ b/tests/helpers/proxy_server.py @@ -1,3 +1,4 @@ +import contextlib import logging import time @@ -26,7 +27,8 @@ def connect(self): def close(self): if self.upstream_connection: self.upstream_connection.close() - self.server.shutdown() + with contextlib.suppress(AttributeError): + self.server.shutdown() def proxy_request(self, websocket: ServerConnection): for message in websocket: diff --git a/tests/integration_tests/test_async_substrate_interface.py b/tests/integration_tests/test_async_substrate_interface.py index 1acbed6..7fac0a9 100644 --- a/tests/integration_tests/test_async_substrate_interface.py +++ b/tests/integration_tests/test_async_substrate_interface.py @@ -16,6 +16,7 @@ @pytest.mark.asyncio async def test_legacy_decoding(): + print("Testing test_legacy_decoding") # roughly 4000 blocks before metadata v15 was added pre_metadata_v15_block = 3_010_611 @@ -39,10 +40,12 @@ async def test_legacy_decoding(): block_hash=block_hash, ) assert timestamp.value == 1716358476004 + print("test_legacy_decoding succeeded") @pytest.mark.asyncio async def test_ss58_conversion(): + print("Testing test_ss58_conversion") async with AsyncSubstrateInterface( LATENT_LITE_ENTRYPOINT, ss58_format=42, decode_ss58=False ) as substrate: @@ -79,10 +82,12 @@ async def test_ss58_conversion(): if len(value.value) > 0: for decoded_key in value.value: assert isinstance(decoded_key, str) + print("test_ss58_conversion succeeded") @pytest.mark.asyncio async def test_fully_exhaust_query_map(): + print("Testing test_fully_exhaust_query_map") async with AsyncSubstrateInterface(LATENT_LITE_ENTRYPOINT) as substrate: block_hash = await substrate.get_chain_finalised_head() non_fully_exhauster_start = time.time() @@ -121,10 +126,12 @@ async def test_fully_exhaust_query_map(): fully_exhausted_records_count += 1 assert fully_exhausted_records_count == initial_records_count_fully_exhaust assert initial_records_count_fully_exhaust == exhausted_records_count + print("test_fully_exhaust_query_map succeeded") @pytest.mark.asyncio async def test_get_events_proper_decoding(): + print("Testing test_get_events_proper_decoding") # known block/hash pair that has the events we seek to decode block = 5846788 block_hash = "0x0a1c45063a59b934bfee827caa25385e60d5ec1fd8566a58b5cc4affc4eec412" @@ -137,10 +144,12 @@ async def test_get_events_proper_decoding(): 30, "0xa6b4e5c8241d60ece0c25056b19f7d21ae845269fc771ad46bf3e011865129a5", ) + print("test_get_events_proper_decoding succeeded") @pytest.mark.asyncio async def test_query_multiple(): + print("Testing test_query_multiple") block = 6153277 cks = [ "5FH9AQM4kqbkdC9jyV5FrdEWVYt41nkhFstop7Vhyfb9ZsXt", @@ -155,10 +164,12 @@ async def test_query_multiple(): storage_function="OwnedHotkeys", block_hash=block_hash, ) + print("test_query_multiple succeeded") @pytest.mark.asyncio async def test_reconnection(): + print("Testing test_reconnection") async with AsyncSubstrateInterface( ARCHIVE_ENTRYPOINT, ss58_format=42, retry_timeout=8.0 ) as substrate: @@ -166,10 +177,12 @@ async def test_reconnection(): bh = await substrate.get_chain_finalised_head() assert isinstance(bh, str) assert isinstance(await substrate.get_block_number(bh), int) + print("test_reconnection succeeded") @pytest.mark.asyncio async def test_query_map_with_odd_number_of_params(): + print("Testing test_query_map_with_odd_number_of_params") async with AsyncSubstrateInterface(ARCHIVE_ENTRYPOINT, ss58_format=42) as substrate: qm = await substrate.query_map( "SubtensorModule", @@ -179,10 +192,12 @@ async def test_query_map_with_odd_number_of_params(): first_record = qm.records[0] assert len(first_record) == 2 assert len(first_record[0]) == 4 + print("test_query_map_with_odd_number_of_params succeeded") @pytest.mark.asyncio async def test_improved_reconnection(): + print("Testing test_improved_reconnection") ws_logger_path = "/tmp/websockets-proxy-test" ws_logger = logging.getLogger("websockets.proxy") if os.path.exists(ws_logger_path): @@ -236,10 +251,12 @@ async def test_improved_reconnection(): shutdown_thread.start() shutdown_thread.join(timeout=5) server_thread.join(timeout=5) + print("test_improved_reconnection succeeded") @pytest.mark.asyncio async def test_get_payment_info(): + print("Testing test_get_payment_info") alice_coldkey = bittensor_wallet.Keypair.create_from_uri("//Alice") bob_coldkey = bittensor_wallet.Keypair.create_from_uri("//Bob") async with AsyncSubstrateInterface( @@ -275,3 +292,4 @@ async def test_get_payment_info(): partial_fee_all_options = payment_info_all_options["partial_fee"] assert partial_fee_all_options > partial_fee_no_era assert partial_fee_all_options > partial_fee_era + print("test_get_payment_info succeeded") diff --git a/tests/integration_tests/test_disk_cache.py b/tests/integration_tests/test_disk_cache.py index 3e379ab..b6cbf45 100644 --- a/tests/integration_tests/test_disk_cache.py +++ b/tests/integration_tests/test_disk_cache.py @@ -9,6 +9,7 @@ @pytest.mark.asyncio async def test_disk_cache(): + print("Testing test_disk_cache") entrypoint = "wss://entrypoint-finney.opentensor.ai:443" async with DiskCachedAsyncSubstrateInterface(entrypoint) as disk_cached_substrate: current_block = await disk_cached_substrate.get_block_number(None) @@ -72,3 +73,4 @@ async def test_disk_cache(): assert parent_block_hash == parent_block_hash_sync assert block_runtime_info == block_runtime_info_sync assert block_runtime_version_for == block_runtime_version_for_sync + print("test_disk_cache succeeded") diff --git a/tests/integration_tests/test_substrate_interface.py b/tests/integration_tests/test_substrate_interface.py index 42885ac..f6cf0eb 100644 --- a/tests/integration_tests/test_substrate_interface.py +++ b/tests/integration_tests/test_substrate_interface.py @@ -7,6 +7,7 @@ def test_legacy_decoding(): + print("Testing test_legacy_decoding") # roughly 4000 blocks before metadata v15 was added pre_metadata_v15_block = 3_010_611 @@ -30,9 +31,11 @@ def test_legacy_decoding(): block_hash=block_hash, ) assert timestamp.value == 1716358476004 + print("test_legacy_decoding succeeded") def test_ss58_conversion(): + print("Testing test_ss58_conversion") with SubstrateInterface( LATENT_LITE_ENTRYPOINT, ss58_format=42, decode_ss58=False ) as substrate: @@ -69,9 +72,11 @@ def test_ss58_conversion(): if len(value.value) > 0: for decoded_key in value.value: assert isinstance(decoded_key, str) + print("test_ss58_conversion succeeded") def test_get_events_proper_decoding(): + print("Testing test_get_events_proper_decoding") # known block/hash pair that has the events we seek to decode block = 5846788 block_hash = "0x0a1c45063a59b934bfee827caa25385e60d5ec1fd8566a58b5cc4affc4eec412" @@ -84,9 +89,11 @@ def test_get_events_proper_decoding(): 30, "0xa6b4e5c8241d60ece0c25056b19f7d21ae845269fc771ad46bf3e011865129a5", ) + print("test_get_events_proper_decoding succeeded") def test_query_multiple(): + print("Testing test_query_multiple") block = 6153277 cks = [ "5FH9AQM4kqbkdC9jyV5FrdEWVYt41nkhFstop7Vhyfb9ZsXt", @@ -101,9 +108,11 @@ def test_query_multiple(): storage_function="OwnedHotkeys", block_hash=block_hash, ) + print("test_query_multiple succeeded") def test_query_map_with_odd_number_of_params(): + print("Testing test_query_map_with_odd_number_of_params") with SubstrateInterface(LATENT_LITE_ENTRYPOINT, ss58_format=42) as substrate: qm = substrate.query_map( "SubtensorModule", @@ -113,9 +122,11 @@ def test_query_map_with_odd_number_of_params(): first_record = qm.records[0] assert len(first_record) == 2 assert len(first_record[0]) == 4 + print("test_query_map_with_odd_number_of_params succeeded") def test_get_payment_info(): + print("Testing test_get_payment_info") alice_coldkey = bittensor_wallet.Keypair.create_from_uri("//Alice") bob_coldkey = bittensor_wallet.Keypair.create_from_uri("//Bob") with SubstrateInterface( @@ -151,3 +162,4 @@ def test_get_payment_info(): partial_fee_all_options = payment_info_all_options["partial_fee"] assert partial_fee_all_options > partial_fee_no_era assert partial_fee_all_options > partial_fee_era + print("test_get_payment_info succeeded") diff --git a/tests/unit_tests/asyncio_/test_substrate_interface.py b/tests/unit_tests/asyncio_/test_substrate_interface.py index d4d692f..1253e6c 100644 --- a/tests/unit_tests/asyncio_/test_substrate_interface.py +++ b/tests/unit_tests/asyncio_/test_substrate_interface.py @@ -13,19 +13,20 @@ @pytest.mark.asyncio async def test_invalid_url_raises_exception(): """Test that invalid URI raises an InvalidURI exception.""" + print("Testing test_invalid_url_raises_exception") async_substrate = AsyncSubstrateInterface("non_existent_entry_point") with pytest.raises(InvalidURI): await async_substrate.initialize() with pytest.raises(InvalidURI): - async with AsyncSubstrateInterface( - "non_existent_entry_point" - ) as async_substrate: + async with AsyncSubstrateInterface("non_existent_entry_point") as _: pass + print("test_invalid_url_raises_exception succeeded") @pytest.mark.asyncio async def test_runtime_call(monkeypatch): + print("Testing test_runtime_call") substrate = AsyncSubstrateInterface("ws://localhost", _mock=True) fake_runtime = MagicMock() @@ -96,10 +97,12 @@ async def test_runtime_call(monkeypatch): substrate.rpc_request.assert_any_call( "state_call", ["SubstrateApi_SubstrateMethod", "", None], runtime=ANY ) + print("test_runtime_call succeeded") @pytest.mark.asyncio async def test_websocket_shutdown_timer(): + print("Testing test_websocket_shutdown_timer") # using default ws shutdown timer of 5.0 seconds async with AsyncSubstrateInterface("wss://lite.sub.latent.to:443") as substrate: await substrate.get_chain_head() @@ -115,10 +118,12 @@ async def test_websocket_shutdown_timer(): await substrate.get_chain_head() await asyncio.sleep(6) # same sleep time as before assert substrate.ws.state is State.OPEN # connection should still be open + print("test_websocket_shutdown_timer succeeded") @pytest.mark.asyncio async def test_runtime_switching(): + print("Testing test_runtime_switching") block = 6067945 # block where a runtime switch happens async with AsyncSubstrateInterface( ARCHIVE_ENTRYPOINT, ss58_format=42, chain_name="Bittensor" @@ -133,3 +138,4 @@ async def test_runtime_switching(): ) assert one is not None assert two is not None + print("test_runtime_switching succeeded") diff --git a/tests/unit_tests/sync/test_substrate_interface.py b/tests/unit_tests/sync/test_substrate_interface.py index 284b8cb..68f51b4 100644 --- a/tests/unit_tests/sync/test_substrate_interface.py +++ b/tests/unit_tests/sync/test_substrate_interface.py @@ -7,6 +7,7 @@ def test_runtime_call(monkeypatch): + print("Testing test_runtime_call") substrate = SubstrateInterface("ws://localhost", _mock=True) fake_runtime = MagicMock() fake_metadata_v15 = MagicMock() @@ -75,9 +76,11 @@ def test_runtime_call(monkeypatch): "state_call", ["SubstrateApi_SubstrateMethod", "", None] ) substrate.close() + print("test_runtime_call succeeded") def test_runtime_switching(): + print("Testing test_runtime_switching") block = 6067945 # block where a runtime switch happens with SubstrateInterface( ARCHIVE_ENTRYPOINT, ss58_format=42, chain_name="Bittensor" @@ -86,3 +89,4 @@ def test_runtime_switching(): assert substrate.get_extrinsics(block_number=block - 20) is not None assert substrate.get_extrinsics(block_number=block) is not None assert substrate.get_extrinsics(block_number=block - 21) is not None + print("test_runtime_switching succeeded")