Skip to content
Draft
2 changes: 1 addition & 1 deletion chain/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dependencies {
implementation project(':db:core')
implementation project(':pow:core')
api project(':ssz')
implementation project(':util')
implementation project(':util:core')

implementation 'com.google.guava:guava'
implementation 'io.projectreactor:reactor-core'
Expand Down
2 changes: 1 addition & 1 deletion consensus/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ dependencies {
implementation project(':core')
implementation project(':crypto')
implementation project(':ssz')
implementation project(':util')
implementation project(':util:core')

implementation 'com.google.guava:guava'

Expand Down
2 changes: 1 addition & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
dependencies {
implementation project(':types')
api project(':types')
implementation project(':crypto')
implementation project(':ssz')

Expand Down
2 changes: 1 addition & 1 deletion pow/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ dependencies {
implementation project(':core')
implementation project(':consensus')
implementation project(':ssz')
implementation project(':util')
implementation project(':util:core')
implementation project(':crypto')

implementation 'io.projectreactor:reactor-core'
Expand Down
2 changes: 1 addition & 1 deletion pow/ethereumj/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ dependencies {
implementation project(':types')
implementation project(':core')
implementation project(':consensus')
implementation project(':util')
implementation project(':util:core')
implementation ("org.ethereum:ethereumj-core") {
changing = true

Expand Down
2 changes: 1 addition & 1 deletion pow/validator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ dependencies {
implementation project(':chain')
implementation project(':consensus')
implementation project(':validator')
implementation project(':util')
implementation project(':util:core')
implementation project(':db:core')

implementation 'io.projectreactor:reactor-core'
Expand Down
4 changes: 3 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ include 'test'
// Strict types definition
include 'types'
// Standalone utils without any relation to Ethereum 2.0
include 'util'
include 'util:core'
// Eth2.0 time utilities
include 'util:time'
// Validator services
include 'validator'
// Wire API mock
Expand Down
4 changes: 2 additions & 2 deletions ssz/build.gradle
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
apply plugin: 'java-library'

dependencies {
implementation project(':types')
implementation project(':util')
api 'net.consensys.cava:cava-ssz'
implementation project(':types')
implementation project(':util:core')
implementation 'net.consensys.cava:cava-units'

testImplementation 'junit:junit'
Expand Down
2 changes: 1 addition & 1 deletion start/benchmaker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ application {
dependencies {
implementation project(':types')
implementation project(':wire')
implementation project(':util')
implementation project(':util:core')
implementation project(':start:common')
implementation project(':start:config')
implementation project(':crypto')
Expand Down
2 changes: 1 addition & 1 deletion start/common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ dependencies {
implementation project(':ssz')
implementation project(':validator')
implementation project(':wire')
implementation project(':util')
implementation project(':util:core')

implementation 'com.google.guava:guava'
implementation 'io.projectreactor:reactor-core'
Expand Down
2 changes: 1 addition & 1 deletion start/config/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ dependencies {
implementation project(':consensus')
implementation project(':crypto')
implementation project(':types')
implementation project(':util')
implementation project(':util:core')

implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'com.fasterxml.jackson.core:jackson-databind'
Expand Down
2 changes: 1 addition & 1 deletion start/node/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ application {
dependencies {
implementation project(':types')
implementation project(':wire')
implementation project(':util')
implementation project(':util:core')
implementation project(':start:common')
implementation project(':start:config')
implementation project(':crypto')
Expand Down
2 changes: 1 addition & 1 deletion start/simulator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ createScript(project, 'org.ethereum.beacon.simulator.Simulator', 'simulator')
dependencies {
implementation project(':types')
implementation project(':wire')
implementation project(':util')
implementation project(':util:core')
implementation project(':start:common')
implementation project(':start:config')
implementation project(':crypto')
Expand Down
2 changes: 1 addition & 1 deletion test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies {
testImplementation project(':db:core')
testImplementation project(':chain')
testImplementation project(':start:simulator')
testImplementation project(':util')
testImplementation project(':util:core')
testImplementation project(':pow:core')
}

Expand Down
5 changes: 0 additions & 5 deletions util/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +0,0 @@
dependencies {
implementation 'io.projectreactor:reactor-core'
implementation 'com.google.guava:guava'
implementation 'commons-beanutils:commons-beanutils'
}
5 changes: 5 additions & 0 deletions util/core/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
dependencies {
implementation 'io.projectreactor:reactor-core'
implementation 'com.google.guava:guava'
implementation 'commons-beanutils:commons-beanutils'
}
12 changes: 12 additions & 0 deletions util/time/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
dependencies {
implementation project(':util:core')
implementation project(':core')

implementation 'io.projectreactor:reactor-core'
implementation 'commons-net:commons-net'
implementation 'org.apache.logging.log4j:log4j-core'

testImplementation project(':consensus')
testImplementation project(':ssz')
testImplementation 'junit:junit'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.ethereum.beacon.time;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ethereum.beacon.core.types.Time;
import org.ethereum.beacon.schedulers.Scheduler;
import org.ethereum.beacon.stream.SimpleProcessor;
import org.ethereum.beacon.time.provider.NetworkTime;
import org.ethereum.beacon.time.provider.StatisticsTime;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.util.concurrent.atomic.AtomicLong;

/**
* Strategy which prioritize {@link NetworkTime} but if the delta between {@link NetworkTime} and
* {@link StatisticsTime} goes above allowedDelta, it uses {@link StatisticsTime} as time provider.
*/
public class NetworkFirstStrategy implements TimeStrategy {
private final SimpleProcessor<Time> timeProcessor;
private final AtomicLong latestNetwork = new AtomicLong(-1);
private final AtomicLong latestStatistics = new AtomicLong(-1);
private static final Logger logger = LogManager.getLogger("time");

public NetworkFirstStrategy(
Scheduler scheduler,
NetworkTime networkTime,
StatisticsTime statisticsTime,
int allowedDelta) {
this.timeProcessor = new SimpleProcessor<Time>(scheduler, "NetworkFirstStrategy");
Flux.from(networkTime.getTimeStream())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May we have a component with two inbound streams and a single processor that handles them both?

.subscribe(
t -> {
this.latestNetwork.set(t.getValue());
if (latestStatistics.get() == -1) {
return;
}
if (Math.abs(latestNetwork.get() - latestStatistics.get()) <= allowedDelta) {
timeProcessor.onNext(t);
}
});
Flux.from(statisticsTime.getTimeStream())
.subscribe(
t -> {
this.latestStatistics.set(t.getValue());
if (latestNetwork.get() == -1) {
return;
}
if (Math.abs(latestNetwork.get() - latestStatistics.get()) > allowedDelta) {
logger.trace(() -> String.format("Using time from statistics time provider: %s", t.getValue()));
timeProcessor.onNext(t);
}
});
}

@Override
public Publisher<Time> getTimeStream() {
return timeProcessor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.ethereum.beacon.time;

import org.ethereum.beacon.core.types.Time;
import org.ethereum.beacon.time.provider.StatisticsTime;
import org.reactivestreams.Publisher;

public class StatisticsStrategy implements TimeStrategy {
private final StatisticsTime statisticsTime;

public StatisticsStrategy(StatisticsTime statisticsTime) {
this.statisticsTime = statisticsTime;
}

@Override
public Publisher<Time> getTimeStream() {
return statisticsTime.getTimeStream();
}
}
12 changes: 12 additions & 0 deletions util/time/src/main/java/org/ethereum/beacon/time/TimeStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.ethereum.beacon.time;

import org.ethereum.beacon.core.types.Time;
import org.reactivestreams.Publisher;

/**
* High level time supplier, uses several {@link org.ethereum.beacon.time.provider.TimeProvider} to
* produce result time
*/
public interface TimeStrategy {
Publisher<Time> getTimeStream();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.ethereum.beacon.time.mapper;

import org.ethereum.beacon.core.types.Time;
import org.ethereum.beacon.schedulers.Scheduler;
import org.ethereum.beacon.stream.SimpleProcessor;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.util.function.Function;

public class ObjectTimeMapper<T> implements TimeMapper {
private final Function<T, Time> timeFunc;
private final SimpleProcessor<Time> timeProcessor;

public ObjectTimeMapper(
Scheduler scheduler, Publisher<T> objectStream, Function<T, Time> timeFunc) {
this.timeProcessor = new SimpleProcessor<>(scheduler, "TimeMapper");
this.timeFunc = timeFunc;
Flux.from(objectStream).map(this::mapObjectFunc).subscribe(timeProcessor::onNext);
}

Time mapObjectFunc(T obj) {
return timeFunc.apply(obj);
}

@Override
public Publisher<Time> getTimeStream() {
return timeProcessor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.ethereum.beacon.time.mapper;

import org.ethereum.beacon.core.types.Time;
import org.reactivestreams.Publisher;

/** Maps stream of objects with some kind of time info to time stream */
public interface TimeMapper {
Publisher<Time> getTimeStream();
}
Loading