Skip to content

Commit 9ef3f70

Browse files
SWIFT-1456 Implement selectServer for MongoClient (#737)
1 parent 091bd8b commit 9ef3f70

File tree

6 files changed

+444
-201
lines changed

6 files changed

+444
-201
lines changed

Package.swift

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ let package = Package(
1515
.package(url: "https://github.com/mongodb/swift-bson", .upToNextMajor(from: "3.0.0"))
1616
],
1717
targets: [
18-
.target(name: "MongoSwift", dependencies: ["CLibMongoC", "NIO", "NIOConcurrencyHelpers", "SwiftBSON",]),
1918
.target(name: "MongoSwiftSync", dependencies: ["MongoSwift", "NIO"]),
2019
.target(name: "AtlasConnectivity", dependencies: ["MongoSwiftSync"]),
2120
.target(name: "TestsCommon", dependencies: ["MongoSwift", "Nimble"]),
@@ -34,3 +33,10 @@ let package = Package(
3433
)
3534
]
3635
)
36+
37+
#if compiler(>=5.3)
38+
package.dependencies += [.package(url: "https://github.com/apple/swift-atomics", .upToNextMajor(from: "1.0.0"))]
39+
package.targets += [.target(name: "MongoSwift", dependencies: ["Atomics", "CLibMongoC", "NIO", "NIOConcurrencyHelpers", "SwiftBSON"])]
40+
#else
41+
package.targets += [.target(name: "MongoSwift", dependencies: ["CLibMongoC", "NIO", "NIOConcurrencyHelpers", "SwiftBSON"])]
42+
#endif

Sources/MongoSwift/MongoClient.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,9 @@ public class MongoClient {
248248
/// The pool of connections backing this client.
249249
internal let connectionPool: ConnectionPool
250250

251+
/// The connection string used to create this client.
252+
internal let connectionString: MongoConnectionString
253+
251254
/// Executor responsible for executing operations on behalf of this client and its child objects.
252255
internal let operationExecutor: OperationExecutor
253256

@@ -320,6 +323,7 @@ public class MongoClient {
320323
try connString.applyOptions(options)
321324
}
322325
try connString.validate()
326+
self.connectionString = connString
323327

324328
self.operationExecutor = OperationExecutor(
325329
eventLoopGroup: eventLoopGroup,

Sources/MongoSwift/SDAM.swift

Lines changed: 11 additions & 194 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ internal enum SDAMConstants {
55
internal static let defaultHeartbeatFrequencyMS = 10000
66
internal static let idleWritePeriodMS = 10000
77
internal static let smallestMaxStalenessSeconds = 90
8+
internal static let defaultLocalThresholdMS = 15
9+
internal static let defaultServerSelectionTimeoutMS = 30000
10+
internal static let minWireVersion = 6 // 3.6
11+
internal static let maxWireVersion = 15 // 5.2
812
}
913

1014
/// A struct representing a network address, consisting of a host and port.
@@ -243,20 +247,21 @@ public struct ServerDescription {
243247
tags: [String: String]?,
244248
lastWriteDate: Date?,
245249
maxWireVersion: Int?,
246-
lastUpdateTime: Date?
250+
lastUpdateTime: Date?,
251+
averageRoundTripTimeMS: Double?
247252
) {
248253
self.address = address
249254
self.type = type
250255
self.tags = tags ?? [:]
251256
self.lastWriteDate = lastWriteDate
252257
self.lastUpdateTime = lastUpdateTime ?? Date()
253-
self.maxWireVersion = maxWireVersion ?? 0
258+
self.maxWireVersion = maxWireVersion ?? SDAMConstants.maxWireVersion
259+
self.averageRoundTripTimeMS = averageRoundTripTimeMS
254260

255261
// these fields are not used by the server selection tests
256262
self.serverId = 0
257263
self.roundTripTime = 0
258-
self.averageRoundTripTimeMS = nil
259-
self.minWireVersion = 0
264+
self.minWireVersion = SDAMConstants.minWireVersion
260265
self.me = self.address
261266
self.setName = nil
262267
self.setVersion = nil
@@ -339,7 +344,7 @@ public struct TopologyDescription: Equatable {
339344

340345
/// Internal representation of topology type. If enums could be marked non-exhaustive in Swift, this would be
341346
/// the public representation too.
342-
fileprivate enum _TopologyType: String, Equatable {
347+
internal enum _TopologyType: String, Equatable {
343348
case single = "Single"
344349
case replicaSetNoPrimary = "ReplicaSetNoPrimary"
345350
case replicaSetWithPrimary = "ReplicaSetWithPrimary"
@@ -348,7 +353,7 @@ public struct TopologyDescription: Equatable {
348353
case loadBalanced = "LoadBalanced"
349354
}
350355

351-
fileprivate let _topologyType: _TopologyType
356+
internal let _topologyType: _TopologyType
352357

353358
private init(_ _type: _TopologyType) {
354359
self._topologyType = _type
@@ -429,191 +434,3 @@ public struct TopologyDescription: Equatable {
429434
self.servers = servers
430435
}
431436
}
432-
433-
extension TopologyDescription {
434-
internal func findSuitableServers(
435-
readPreference: ReadPreference? = nil,
436-
heartbeatFrequencyMS: Int
437-
) throws -> [ServerDescription] {
438-
try readPreference?.validateMaxStalenessSeconds(
439-
heartbeatFrequencyMS: heartbeatFrequencyMS,
440-
topologyType: self.type
441-
)
442-
switch self.type._topologyType {
443-
case .unknown:
444-
return []
445-
case .single, .loadBalanced:
446-
return self.servers
447-
case .replicaSetNoPrimary, .replicaSetWithPrimary:
448-
switch readPreference?.mode {
449-
case .secondary:
450-
return self.filterReplicaSetServers(
451-
readPreference: readPreference,
452-
heartbeatFrequencyMS: heartbeatFrequencyMS,
453-
includePrimary: false
454-
)
455-
case .nearest:
456-
return self.filterReplicaSetServers(
457-
readPreference: readPreference,
458-
heartbeatFrequencyMS: heartbeatFrequencyMS,
459-
includePrimary: true
460-
)
461-
case .secondaryPreferred:
462-
// If mode is 'secondaryPreferred', attempt the selection algorithm with mode 'secondary' and the
463-
// user's maxStalenessSeconds and tag_sets. If no server matches, select the primary.
464-
let secondaryMatches = self.filterReplicaSetServers(
465-
readPreference: readPreference,
466-
heartbeatFrequencyMS: heartbeatFrequencyMS,
467-
includePrimary: false
468-
)
469-
return secondaryMatches.isEmpty ? self.servers.filter { $0.type == .rsPrimary } : secondaryMatches
470-
case .primaryPreferred:
471-
// If mode is 'primaryPreferred' or a readPreference is not provided, select the primary if it is known,
472-
// otherwise attempt the selection algorithm with mode 'secondary' and the user's
473-
// maxStalenessSeconds and tag_sets.
474-
let primaries = self.servers.filter { $0.type == .rsPrimary }
475-
if !primaries.isEmpty {
476-
return primaries
477-
}
478-
return self.filterReplicaSetServers(
479-
readPreference: readPreference,
480-
heartbeatFrequencyMS: heartbeatFrequencyMS,
481-
includePrimary: false
482-
)
483-
default: // or .primary
484-
// the default mode is 'primary'.
485-
return self.servers.filter { $0.type == .rsPrimary }
486-
}
487-
case .sharded:
488-
return self.servers.filter { $0.type == .mongos }
489-
}
490-
}
491-
492-
/// Filters the replica set servers in this topology first by max staleness and then by tag sets.
493-
private func filterReplicaSetServers(
494-
readPreference: ReadPreference?,
495-
heartbeatFrequencyMS: Int,
496-
includePrimary: Bool
497-
) -> [ServerDescription] {
498-
// The initial set of servers from which to filter. Only include the secondaries unless includePrimary is true.
499-
var servers = self.servers.filter { ($0.type == .rsPrimary && includePrimary) || $0.type == .rsSecondary }
500-
501-
// Filter by max staleness. If maxStalenessSeconds is not configured as a positive number, all servers are
502-
// eligible.
503-
if let maxStalenessSeconds = readPreference?.maxStalenessSeconds, maxStalenessSeconds > 0 {
504-
let primary = self.servers.first { $0.type == .rsPrimary }
505-
let maxLastWriteDate = self.getMaxLastWriteDate()
506-
servers.removeAll {
507-
guard let staleness = $0.calculateStalenessSeconds(
508-
primary: primary,
509-
maxLastWriteDate: maxLastWriteDate,
510-
heartbeatFrequencyMS: heartbeatFrequencyMS
511-
) else {
512-
return false
513-
}
514-
return staleness > maxStalenessSeconds
515-
}
516-
}
517-
518-
// Filter by tag sets.
519-
guard let tagSets = readPreference?.tagSets else {
520-
return servers
521-
}
522-
for tagSet in tagSets {
523-
let matches = servers.filter { server in tagSet.allSatisfy { server.tags[$0.key] == $0.value.stringValue } }
524-
if !matches.isEmpty {
525-
return matches
526-
}
527-
}
528-
529-
// If no matches were found during tag set filtering, return an empty list.
530-
return []
531-
}
532-
533-
/// Returns a `Date` representing the latest `lastWriteDate` configured on a secondary in the topology, or `nil`
534-
/// if none is found.
535-
private func getMaxLastWriteDate() -> Date? {
536-
let secondaryLastWriteDates = self.servers.compactMap {
537-
$0.type == .rsSecondary ? $0.lastWriteDate : nil
538-
}
539-
return secondaryLastWriteDates.max()
540-
}
541-
}
542-
543-
extension ServerDescription {
544-
/// Calculates the staleness of this server. If the server is not a secondary, the staleness is 0. Otherwise,
545-
/// compare against the primary if one is present, or the maximum last write date seen in the topology if present.
546-
/// If staleness cannot be calculated due to an absence of values, `nil` is returned.
547-
fileprivate func calculateStalenessSeconds(
548-
primary: ServerDescription?,
549-
maxLastWriteDate: Date?,
550-
heartbeatFrequencyMS: Int
551-
) -> Int? {
552-
guard self.type == .rsSecondary else {
553-
return 0
554-
}
555-
guard let lastWriteDate = self.lastWriteDate else {
556-
return nil
557-
}
558-
if let primary = primary {
559-
guard let primaryLastWriteDate = primary.lastWriteDate else {
560-
return nil
561-
}
562-
let selfInterval = self.lastUpdateTime.timeIntervalSince(lastWriteDate)
563-
let primaryInterval = primary.lastUpdateTime.timeIntervalSince(primaryLastWriteDate)
564-
// timeIntervalSince returns a TimeInterval in seconds, so heartbeatFrequencyMS needs to be converted from
565-
// milliseconds to seconds.
566-
let stalenessSeconds = selfInterval - primaryInterval + Double(heartbeatFrequencyMS) / 1000.0
567-
return Int(stalenessSeconds.rounded(.up))
568-
} else {
569-
guard let maxLastWriteDate = maxLastWriteDate else {
570-
return nil
571-
}
572-
let interval = maxLastWriteDate.timeIntervalSince(lastWriteDate)
573-
let stalenessSeconds = interval + Double(heartbeatFrequencyMS) / 1000.0
574-
return Int(stalenessSeconds.rounded(.up))
575-
}
576-
}
577-
}
578-
579-
extension ReadPreference {
580-
fileprivate func validateMaxStalenessSeconds(
581-
heartbeatFrequencyMS: Int,
582-
topologyType: TopologyDescription.TopologyType
583-
) throws {
584-
if let maxStalenessSeconds = self.maxStalenessSeconds {
585-
if self.mode == .primary && maxStalenessSeconds > 0 {
586-
throw MongoError.InvalidArgumentError(
587-
message: "A positive maxStalenessSeconds cannot be specified when the read preference mode is"
588-
+ " primary"
589-
)
590-
}
591-
if topologyType == .replicaSetWithPrimary || topologyType == .replicaSetNoPrimary {
592-
if maxStalenessSeconds * 1000 < heartbeatFrequencyMS + SDAMConstants.idleWritePeriodMS {
593-
throw MongoError.InvalidArgumentError(
594-
message: "maxStalenessSeconds must be at least the sum of the heartbeatFrequencyMS configured"
595-
+ " on the client (\(heartbeatFrequencyMS)) and the idleWritePeriodMS"
596-
+ " (\(SDAMConstants.idleWritePeriodMS))"
597-
)
598-
}
599-
if maxStalenessSeconds < SDAMConstants.smallestMaxStalenessSeconds {
600-
throw MongoError.InvalidArgumentError(
601-
message: "The maxStalenessSeconds configured for a replica set must be at least"
602-
+ " \(SDAMConstants.smallestMaxStalenessSeconds)"
603-
)
604-
}
605-
}
606-
}
607-
}
608-
}
609-
610-
extension ServerDescription {
611-
internal mutating func updateAverageRoundTripTime(roundTripTime: Double) {
612-
if let oldAverageRTT = self.averageRoundTripTimeMS {
613-
let alpha = 0.2
614-
self.averageRoundTripTimeMS = alpha * roundTripTime + (1 - alpha) * oldAverageRTT
615-
} else {
616-
self.averageRoundTripTimeMS = roundTripTime
617-
}
618-
}
619-
}

0 commit comments

Comments
 (0)