-
Notifications
You must be signed in to change notification settings - Fork 3.9k
xds: Changes to XdsClient Watcher APIs #12446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
shivaspeaks
wants to merge
24
commits into
grpc:master
Choose a base branch
from
shivaspeaks:master
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
21352b7
MAINTAINERS.md: Add Shiva
shivaspeaks 458735d
Merge branch 'grpc:master' into master
shivaspeaks 4f064fa
Update XdsTrustManagerFactoryTest.java
shivaspeaks 0f96110
refactor and add documentation
shivaspeaks d7ef02a
Merge branch 'master' of https://github.com/grpc/grpc-java
shivaspeaks 405217b
Merge branch 'grpc:master' into master
shivaspeaks 587969d
Merge branch 'grpc:master' into master
shivaspeaks b1d0d6d
xds: client watcher API changes
shivaspeaks 3fcff8d
xds: client watcher API changes
shivaspeaks bf905c2
xds: client watcher API changes
shivaspeaks 4f5f3b3
xds: client watcher API changes
shivaspeaks 494a75a
xds: client watcher API changes
shivaspeaks a5117ad
xds: client watcher API changes
shivaspeaks 352ec9e
xds: client watcher API changes
shivaspeaks 148f105
xds: client watcher API changes
shivaspeaks 0c78954
xds: client watcher API changes
shivaspeaks b5571d1
xds: client watcher API changes
shivaspeaks c3a43b2
watcher api changes
shivaspeaks 910b20b
watcher api changes
shivaspeaks 319c113
Merge pull request #3 from shivaspeaks/client-watcher-changes-2
shivaspeaks 2baf738
watcher api changes
shivaspeaks a51ac28
Merge branch 'master' of https://github.com/shivaspeaks/grpc-java
shivaspeaks 3779aea
Merge branch 'client-watcher-changes-2' of https://github.com/shivasp…
shivaspeaks 4ba6de4
address comments
shivaspeaks File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,7 @@ | |
| import io.grpc.ServerServiceDefinition; | ||
| import io.grpc.Status; | ||
| import io.grpc.StatusException; | ||
| import io.grpc.StatusOr; | ||
| import io.grpc.SynchronizationContext; | ||
| import io.grpc.SynchronizationContext.ScheduledHandle; | ||
| import io.grpc.internal.GrpcUtil; | ||
|
|
@@ -382,18 +383,30 @@ private DiscoveryState(String resourceName) { | |
| } | ||
|
|
||
| @Override | ||
| public void onChanged(final LdsUpdate update) { | ||
| public void onResourceChanged(final StatusOr<LdsUpdate> update) { | ||
| if (stopped) { | ||
| return; | ||
| } | ||
| logger.log(Level.FINEST, "Received Lds update {0}", update); | ||
| if (update.listener() == null) { | ||
| onResourceDoesNotExist("Non-API"); | ||
|
|
||
| if (!update.hasValue()) { | ||
| Status status = update.getStatus(); | ||
| StatusException statusException = Status.UNAVAILABLE.withDescription( | ||
| String.format("Listener %s unavailable: %s", resourceName, status.getDescription())) | ||
| .withCause(status.asException()) | ||
| .asException(); | ||
| handleConfigNotFoundOrMismatch(statusException); | ||
| return; | ||
| } | ||
|
|
||
| String ldsAddress = update.listener().address(); | ||
| if (ldsAddress == null || update.listener().protocol() != Protocol.TCP | ||
| final LdsUpdate ldsUpdate = update.getValue(); | ||
| logger.log(Level.FINEST, "Received Lds update {0}", ldsUpdate); | ||
| if (ldsUpdate.listener() == null) { | ||
| handleConfigNotFoundOrMismatch( | ||
| Status.NOT_FOUND.withDescription("Listener is null in LdsUpdate").asException()); | ||
| return; | ||
| } | ||
| String ldsAddress = ldsUpdate.listener().address(); | ||
| if (ldsAddress == null || ldsUpdate.listener().protocol() != Protocol.TCP | ||
| || !ipAddressesMatch(ldsAddress)) { | ||
| handleConfigNotFoundOrMismatch( | ||
| Status.UNKNOWN.withDescription( | ||
|
|
@@ -402,16 +415,14 @@ public void onChanged(final LdsUpdate update) { | |
| listenerAddress, ldsAddress)).asException()); | ||
| return; | ||
| } | ||
|
|
||
| if (!pendingRds.isEmpty()) { | ||
| // filter chain state has not yet been applied to filterChainSelectorManager and there | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Restore this comment. |
||
| // are two sets of sslContextProviderSuppliers, so we release the old ones. | ||
| releaseSuppliersInFlight(); | ||
| pendingRds.clear(); | ||
| } | ||
|
|
||
| filterChains = update.listener().filterChains(); | ||
| defaultFilterChain = update.listener().defaultFilterChain(); | ||
| // Filters are loaded even if the server isn't serving yet. | ||
| filterChains = ldsUpdate.listener().filterChains(); | ||
| defaultFilterChain = ldsUpdate.listener().defaultFilterChain(); | ||
| updateActiveFilters(); | ||
|
|
||
| List<FilterChain> allFilterChains = filterChains; | ||
|
|
@@ -450,43 +461,33 @@ public void onChanged(final LdsUpdate update) { | |
| } | ||
| } | ||
|
|
||
| private boolean ipAddressesMatch(String ldsAddress) { | ||
| HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress); | ||
| HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress); | ||
| if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort() | ||
| || ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) { | ||
| return false; | ||
| } | ||
| InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost()); | ||
| InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost()); | ||
| return listenerIp.equals(ldsIp); | ||
| } | ||
|
|
||
| @Override | ||
| public void onResourceDoesNotExist(final String resourceName) { | ||
| if (stopped) { | ||
| return; | ||
| } | ||
| StatusException statusException = Status.UNAVAILABLE.withDescription( | ||
| String.format("Listener %s unavailable, xDS node ID: %s", resourceName, | ||
| xdsClient.getBootstrapInfo().node().getId())).asException(); | ||
| handleConfigNotFoundOrMismatch(statusException); | ||
| } | ||
|
|
||
| @Override | ||
| public void onError(final Status error) { | ||
| public void onAmbientError(final Status error) { | ||
| if (stopped) { | ||
| return; | ||
| } | ||
| String description = error.getDescription() == null ? "" : error.getDescription() + " "; | ||
| Status errorWithNodeId = error.withDescription( | ||
| description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId()); | ||
| logger.log(Level.FINE, "Error from XdsClient", errorWithNodeId); | ||
|
|
||
| if (!isServing) { | ||
| listener.onNotServing(errorWithNodeId.asException()); | ||
| } | ||
| } | ||
|
|
||
| private boolean ipAddressesMatch(String ldsAddress) { | ||
| HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress); | ||
| HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress); | ||
| if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort() | ||
| || ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) { | ||
| return false; | ||
| } | ||
| InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost()); | ||
| InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost()); | ||
| return listenerIp.equals(ldsIp); | ||
| } | ||
|
|
||
| private void shutdown() { | ||
| stopped = true; | ||
| cleanUpRouteDiscoveryStates(); | ||
|
|
@@ -775,54 +776,42 @@ private RouteDiscoveryState(String resourceName) { | |
| } | ||
|
|
||
| @Override | ||
| public void onChanged(final RdsUpdate update) { | ||
| syncContext.execute(new Runnable() { | ||
| @Override | ||
| public void run() { | ||
| if (!routeDiscoveryStates.containsKey(resourceName)) { | ||
| return; | ||
| } | ||
| if (savedVirtualHosts == null && !isPending) { | ||
| logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName); | ||
| } | ||
| savedVirtualHosts = ImmutableList.copyOf(update.virtualHosts); | ||
| updateRdsRoutingConfig(); | ||
| maybeUpdateSelector(); | ||
| public void onResourceChanged(final StatusOr<RdsUpdate> update) { | ||
| syncContext.execute(() -> { | ||
| if (!routeDiscoveryStates.containsKey(resourceName)) { | ||
| return; // Watcher has been cancelled. | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public void onResourceDoesNotExist(final String resourceName) { | ||
| syncContext.execute(new Runnable() { | ||
| @Override | ||
| public void run() { | ||
| if (!routeDiscoveryStates.containsKey(resourceName)) { | ||
| return; | ||
| if (update.hasValue()) { | ||
| if (savedVirtualHosts == null && !isPending) { | ||
| logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName); | ||
| } | ||
| logger.log(Level.WARNING, "Rds {0} unavailable", resourceName); | ||
| savedVirtualHosts = ImmutableList.copyOf(update.getValue().virtualHosts); | ||
| } else { | ||
| logger.log(Level.WARNING, "Rds {0} unavailable: {1}", | ||
| new Object[]{resourceName, update.getStatus()}); | ||
| savedVirtualHosts = null; | ||
| updateRdsRoutingConfig(); | ||
| maybeUpdateSelector(); | ||
| } | ||
| // In both cases, a change has occurred that requires a config update. | ||
| updateRdsRoutingConfig(); | ||
| maybeUpdateSelector(); | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public void onError(final Status error) { | ||
| syncContext.execute(new Runnable() { | ||
| @Override | ||
| public void run() { | ||
| if (!routeDiscoveryStates.containsKey(resourceName)) { | ||
| return; | ||
| } | ||
| String description = error.getDescription() == null ? "" : error.getDescription() + " "; | ||
| Status errorWithNodeId = error.withDescription( | ||
| description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId()); | ||
| logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.", | ||
| new Object[]{resourceName, errorWithNodeId}); | ||
| maybeUpdateSelector(); | ||
| public void onAmbientError(final Status error) { | ||
| syncContext.execute(() -> { | ||
| if (!routeDiscoveryStates.containsKey(resourceName)) { | ||
| return; // Watcher has been cancelled. | ||
| } | ||
| String description = error.getDescription() == null ? "" : error.getDescription() + " "; | ||
| Status errorWithNodeId = error.withDescription( | ||
| description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId()); | ||
| logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.", | ||
| new Object[]{resourceName, errorWithNodeId}); | ||
|
|
||
| // Per gRFC A88, ambient errors should not trigger a configuration change. | ||
| // Therefore, we do NOT call maybeUpdateSelector() here. | ||
| }); | ||
| } | ||
|
|
||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there be a separate PR for propagating this to the data plane rpc error?