Skip to content

Commit 71de440

Browse files
authored
Discovery: Filter out non 'eth' nodes and regularly update 'eth' field. (#3593)
* Discovery: Filter out non 'eth' nodes and regularly update 'eth' field. * Rename UpdaterHook to something related to ForkId * Use proper encoding for P2P ForkId * Avoid using 'hash' for variable name
1 parent f1b094a commit 71de440

File tree

11 files changed

+182
-35
lines changed

11 files changed

+182
-35
lines changed

execution_chain/common/common.nim

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,9 @@ func forkId*(com: CommonRef, head: BlockNumber, time: EthTime): ForkID {.gcsafe.
362362
## EIP 2364/2124
363363
com.forkIdCalculator.newID(head, time.uint64)
364364

365+
func compatibleForkId*(com: CommonRef, id: ForkID): bool =
366+
com.forkIdCalculator.compatible(id)
367+
365368
func isEIP155*(com: CommonRef, number: BlockNumber): bool =
366369
com.config.eip155Block.isSome and number >= com.config.eip155Block.value
367370

execution_chain/common/hardforks.nim

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -343,24 +343,43 @@ type
343343
byBlock: seq[uint64]
344344
byTime: seq[uint64]
345345
genesisCRC: uint32
346+
cache: seq[ForkID]
346347

347348
func newID*(calc: ForkIdCalculator, head, time: uint64): ForkID =
348-
var hash = calc.genesisCRC
349+
var crc = calc.genesisCRC
349350
for fork in calc.byBlock:
350351
if fork <= head:
351-
# Fork already passed, checksum the previous hash and the fork number
352-
hash = crc32(hash, fork.toBytesBE)
352+
# Fork already passed, checksum the previous crc and the fork number
353+
crc = crc32(crc, fork.toBytesBE)
353354
continue
354-
return (hash, fork)
355+
return (crc, fork)
355356

356357
for fork in calc.byTime:
357358
if fork <= time:
358-
# Fork already passed, checksum the previous hash and fork timestamp
359-
hash = crc32(hash, fork.toBytesBE)
359+
# Fork already passed, checksum the previous crc and fork timestamp
360+
crc = crc32(crc, fork.toBytesBE)
360361
continue
361-
return (hash, fork)
362+
return (crc, fork)
362363

363-
(hash, 0'u64)
364+
(crc, 0'u64)
365+
366+
func compatible*(calc: var ForkIdCalculator, forkId: ForkID): bool =
367+
if calc.cache.len == 0:
368+
calc.cache = newSeqOfCap[ForkID](calc.byBlock.len + calc.byTime.len)
369+
var crc = calc.genesisCRC
370+
for fork in calc.byBlock:
371+
crc = crc32(crc, fork.toBytesBE)
372+
calc.cache.add( (crc, fork) )
373+
374+
for fork in calc.byTime:
375+
crc = crc32(crc, fork.toBytesBE)
376+
calc.cache.add( (crc, fork) )
377+
378+
for id in calc.cache:
379+
if id == forkId:
380+
return true
381+
382+
false
364383

365384
func initForkIdCalculator*(map: ForkTransitionTable,
366385
genesisCRC: uint32,
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# nimbus-execution-client
2+
# Copyright (c) 2025 Status Research & Development GmbH
3+
# Licensed under either of
4+
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
5+
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
6+
# at your option.
7+
# This file may not be copied, modified, or distributed except according to
8+
# those terms
9+
10+
import
11+
stew/endians2
12+
13+
from eth/common/base import ForkID
14+
15+
type
16+
ChainForkId* = object
17+
forkHash*: array[4, byte] # The RLP encoding must be exactly 4 bytes.
18+
forkNext*: uint64 # The RLP encoding must be variable-length
19+
20+
func to*(id: ChainForkId, _: type ForkID): ForkID =
21+
(uint32.fromBytesBE(id.forkHash), id.forkNext)
22+
23+
func to*(id: ForkID, _: type ChainForkId): ChainForkId =
24+
ChainForkId(
25+
forkHash: id.crc.toBytesBE,
26+
forkNext: id.nextFork
27+
)
28+

execution_chain/networking/discoveryv5.nim

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ import
1919
eth/p2p/discoveryv5/protocol {.all.}
2020

2121
export
22-
Protocol, Node, Address, enr, newProtocol, open, close, seedTable, start, queryRandom, closeWait
23-
22+
Protocol, Node, Address, enr, newProtocol, open, close, seedTable, start, queryRandom, closeWait,
23+
updateRecord
24+
2425
proc receiveV5*(d: Protocol, a: Address, packet: openArray[byte]): Result[void, cstring] =
2526
privateAccess(Protocol)
2627
privateAccess(PendingRequest)

execution_chain/networking/eth1_discovery.nim

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ import
1515
chronicles,
1616
results,
1717
metrics,
18+
eth/common/base_rlp,
1819
./discoveryv5,
1920
./discoveryv4,
20-
./eth1_enr
21+
./eth1_enr,
22+
./chain_forkid
2123

2224
export
2325
discoveryv4.NodeId,
@@ -37,9 +39,12 @@ type
3739
AddressV4 = discoveryv4.Address
3840
AddressV5 = discoveryv5.Address
3941

42+
CompatibleForkIdProc* = proc(id: ForkID): bool {.noSideEffect, raises: [].}
43+
4044
Eth1Discovery* = ref object
4145
discv4: DiscV4
4246
discv5: DiscV5
47+
compatibleForkId: CompatibleForkIdProc
4348

4449
#------------------------------------------------------------------------------
4550
# Private functions
@@ -87,6 +92,23 @@ proc processClient(
8792
proto.discv5.receiveV5(addrv5, buf).isOkOr:
8893
debug "Discovery receive error", discv4=discv4.error, discv5=error
8994

95+
func eligibleNode(proto: Eth1Discovery, rec: Record): bool =
96+
# Filter out non `eth` node
97+
let
98+
bytes = rec.tryGet("eth", seq[byte]).valueOr:
99+
return false
100+
101+
if proto.compatibleForkId.isNil:
102+
# Allow all `eth` node to pass if there is no filter
103+
return true
104+
105+
let
106+
chainForkIds = try: rlp.decode(bytes, array[1, ChainForkId])
107+
except RlpError: return false
108+
chainForkId = chainForkIds[0]
109+
110+
proto.compatibleForkId(chainForkId.to(ForkID))
111+
90112
#------------------------------------------------------------------------------
91113
# Public functions
92114
#------------------------------------------------------------------------------
@@ -98,7 +120,8 @@ proc new*(
98120
bootstrapNodes: openArray[ENode],
99121
bindPort: Port,
100122
bindIp = IPv6_any(),
101-
rng = newRng()
123+
rng = newRng(),
124+
compatibleForkId = CompatibleForkIdProc(nil)
102125
): Eth1Discovery =
103126
let bootnodes = bootstrapNodes.to(enr.Record)
104127
Eth1Discovery(
@@ -120,7 +143,8 @@ proc new*(
120143
bindIp = bindIp,
121144
enrAutoUpdate = true,
122145
rng = rng
123-
)
146+
),
147+
compatibleForkId: compatibleForkId,
124148
)
125149

126150
proc open*(
@@ -173,6 +197,8 @@ proc lookupRandomNode*(proto: Eth1Discovery, queue: AsyncQueue[NodeV4]) {.async:
173197
if proto.discv5.isNil.not:
174198
let nodes = await proto.discv5.queryRandom()
175199
for node in nodes:
200+
if not proto.eligibleNode(node.record):
201+
continue
176202
let v4 = node.to(NodeV4).valueOr:
177203
continue
178204
await queue.addLast(v4)
@@ -190,6 +216,16 @@ proc getRandomBootnode*(proto: Eth1Discovery): Opt[NodeV4] =
190216
return Opt.none(NodeV4)
191217
return Opt.some(newNode(enode))
192218

219+
func updateForkID*(proto: Eth1Discovery, forkId: ForkID) =
220+
# https://github.com/ethereum/devp2p/blob/bc76b9809a30e6dc5c8dcda996273f0f9bcf7108/enr-entries/eth.md
221+
if proto.discv5.isNil.not:
222+
let
223+
list = [forkId.to(ChainForkId)]
224+
bytes = rlp.encode(list)
225+
kv = ("eth", bytes)
226+
proto.discv5.updateRecord([kv]).isOkOr:
227+
return
228+
193229
proc closeWait*(proto: Eth1Discovery) {.async: (raises: []).} =
194230
privateAccess(DiscV4)
195231
if proto.discv4.isNil.not and proto.discv5.isNil:

execution_chain/networking/p2p.nim

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,16 @@ import
1919
./eth1_discovery
2020

2121
export
22-
p2p_types, rlpx, enode
22+
p2p_types, rlpx, enode, ForkIdProc, CompatibleForkIdProc
2323

2424
logScope:
2525
topics = "p2p"
2626

27+
type
28+
ForkIdProcs* = object
29+
forkId*: ForkIdProc
30+
compatibleForkId*: CompatibleForkIdProc
31+
2732
proc newEthereumNode*(
2833
keys: KeyPair,
2934
address: Address,
@@ -34,14 +39,15 @@ proc newEthereumNode*(
3439
bindUdpPort: Port,
3540
bindTcpPort: Port,
3641
bindIp = IPv6_any(),
37-
rng = newRng()): EthereumNode =
42+
rng = newRng(),
43+
forkIdProcs = ForkIdProcs()): EthereumNode =
3844

3945
if rng == nil: # newRng could fail
4046
raiseAssert "Cannot initialize RNG"
4147

4248
let
4349
discovery = Eth1Discovery.new(
44-
keys.seckey, address, bootstrapNodes, bindUdpPort, bindIp, rng)
50+
keys.seckey, address, bootstrapNodes, bindUdpPort, bindIp, rng, forkIdProcs.compatibleForkId)
4551
node = EthereumNode(
4652
keys: keys,
4753
networkId: networkId,
@@ -53,7 +59,7 @@ proc newEthereumNode*(
5359
rng: rng,
5460
)
5561
node.peerPool = newPeerPool[EthereumNode](
56-
node, discovery, minPeers = minPeers)
62+
node, discovery, minPeers = minPeers, forkId = forkIdProcs.forkId)
5763
node
5864

5965
proc processIncoming(server: StreamServer,

execution_chain/networking/peer_pool.nim

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ import
1818
./p2p_metrics,
1919
./[eth1_discovery, p2p_peers]
2020

21-
export sets, tables
21+
from eth/common/base import ForkID
22+
23+
export sets, tables, CompatibleForkIdProc
2224

2325
logScope:
2426
topics = "p2p peer_pool"
@@ -30,6 +32,8 @@ type
3032

3133
WorkerFuture = Future[void].Raising([CancelledError])
3234

35+
ForkIdProc* = proc(): ForkID {.noSideEffect, raises: [].}
36+
3337
# Usually Network generic param is instantiated with EthereumNode
3438
PeerPoolRef*[Network] = ref object
3539
network: Network
@@ -40,6 +44,10 @@ type
4044
running: bool
4145
discovery: Eth1Discovery
4246
workers: seq[WorkerFuture]
47+
forkId: ForkIdProc
48+
lastForkId: ForkID
49+
connectTimer: Future[void].Raising([CancelledError])
50+
updateTimer: Future[void].Raising([CancelledError])
4351
connectingNodes*: HashSet[Node]
4452
connectedNodes*: Table[Node, PeerRef[Network]]
4553
observers*: Table[int, PeerObserverRef[Network]]
@@ -52,6 +60,7 @@ type
5260
const
5361
lookupInterval = 5
5462
connectLoopSleep = chronos.milliseconds(2000)
63+
updateLoopSleep = chronos.seconds(15)
5564
maxConcurrentConnectionRequests = 40
5665
sleepBeforeTryingARandomBootnode = chronos.milliseconds(3000)
5766

@@ -188,23 +197,55 @@ proc maybeConnectToMorePeers(p: PeerPoolRef) {.async: (raises: [CancelledError])
188197
return
189198
await p.connectToNode(n)
190199

200+
func updateForkID(p: PeerPoolRef) =
201+
if p.forkId.isNil:
202+
return
203+
204+
let forkId = p.forkId()
205+
if p.lastForkId == forkId:
206+
return
207+
208+
p.discovery.updateForkID(forkId)
209+
p.lastForkId = forkId
210+
191211
proc run(p: PeerPoolRef) {.async: (raises: [CancelledError]).} =
192212
trace "Running PeerPool..."
193213

214+
# initial cycle
215+
p.updateForkID()
194216
await p.discovery.start()
217+
await p.maybeConnectToMorePeers()
218+
195219
p.running = true
196220
while p.running:
197221
debug "Amount of peers", amount = p.connectedNodes.len()
198-
await p.maybeConnectToMorePeers()
199-
await sleepAsync(connectLoopSleep)
222+
223+
# Create or replenish timer
224+
if p.connectTimer.isNil or p.connectTimer.finished:
225+
p.connectTimer = sleepAsync(connectLoopSleep)
226+
227+
if p.updateTimer.isNil or p.updateTimer.finished:
228+
p.updateTimer = sleepAsync(updateLoopSleep)
229+
230+
let
231+
res = await one(p.connectTimer, p.updateTimer)
232+
233+
if res == p.connectTimer:
234+
await p.maybeConnectToMorePeers()
235+
236+
if res == p.updateTimer:
237+
p.updateForkID()
200238

201239
#------------------------------------------------------------------------------
202240
# Private functions
203241
#------------------------------------------------------------------------------
204242

205243
func newPeerPool*[Network](
206244
network: Network,
207-
discovery: Eth1Discovery, minPeers = 10): PeerPoolRef[Network] =
245+
discovery: Eth1Discovery,
246+
minPeers = 10,
247+
forkId = ForkIdProc(nil),
248+
): PeerPoolRef[Network] =
208249
new result
209250
result.network = network
210251
result.minPeers = minPeers
@@ -213,6 +254,7 @@ func newPeerPool*[Network](
213254
result.connectedNodes = initTable[Node, PeerRef[Network]]()
214255
result.connectingNodes = initHashSet[Node]()
215256
result.observers = initTable[int, PeerObserverRef[Network]]()
257+
result.forkId = forkId
216258

217259
proc addObserver*(p: PeerPoolRef, observerId: int, observer: PeerObserverRef) =
218260
doAssert(observerId notin p.observers)

execution_chain/nimbus_execution_client.nim

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,30 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
104104
if extPorts.isSome:
105105
(address.tcpPort, address.udpPort) = extPorts.get()
106106

107-
let bootstrapNodes = conf.getBootNodes()
107+
let
108+
bootstrapNodes = conf.getBootNodes()
109+
fc = nimbus.fc
110+
111+
func forkIdProc(): ForkID {.raises: [].} =
112+
let header = fc.latestHeader()
113+
com.forkId(header.number, header.timestamp)
114+
115+
func compatibleForkIdProc(id: ForkID): bool {.raises: [].} =
116+
com.compatibleForkId(id)
117+
118+
let forkIdProcs = ForkIdProcs(
119+
forkId: forkIdProc,
120+
compatibleForkId: compatibleForkIdProc,
121+
)
108122

109123
nimbus.ethNode = newEthereumNode(
110124
keypair, address, conf.networkId, conf.agentString,
111125
minPeers = conf.maxPeers,
112126
bootstrapNodes = bootstrapNodes,
113127
bindUdpPort = conf.udpPort, bindTcpPort = conf.tcpPort,
114128
bindIp = conf.listenAddress,
115-
rng = nimbus.ctx.rng)
129+
rng = nimbus.ctx.rng,
130+
forkIdProcs = forkIdProcs)
116131

117132
# Add protocol capabilities
118133
nimbus.wire = nimbus.ethNode.addEthHandlerCapability(nimbus.txPool)

0 commit comments

Comments
 (0)