Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ on:
pull_request:
branches:
- master
workflow_dispatch:

env:
CARGO_TERM_COLOR: always
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/target
/Cargo.lock
*.todo

aerospike-rust-client.sublime-project
aerospike-rust-client.sublime-workspace
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ harness = false
members = ["tools/benchmark", "aerospike-core", "aerospike-rt", "aerospike-sync", "aerospike-macro"]

[dev-dependencies]
env_logger = "0.9"
log = "0.4"
env_logger = "0.10.0"
hex = "0.4"
bencher = "0.1"
criterion = { version = "0.5.1", features = ["async_tokio", "async_futures", "async"]}
serde = "1.0"
serde_json = "1.0"
rand = "0.8"
lazy_static = "1.4"
Expand Down
1 change: 1 addition & 0 deletions aerospike-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ serde = { version = "1.0", features = ["derive"], optional = true }
aerospike-rt = {path = "../aerospike-rt"}
futures = {version = "0.3.16" }
async-trait = "0.1.51"
num = "0.4.0"

[features]
serialization = ["serde"]
Expand Down
100 changes: 74 additions & 26 deletions aerospike-core/src/batch/batch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,49 +36,97 @@ impl BatchExecutor {
}


pub async fn execute_batch_read(
pub async fn execute_batch_read<T: serde::de::DeserializeOwned + Send + 'static>(
&self,
policy: &BatchPolicy,
batch_reads: Vec<BatchRead>,
) -> Result<Vec<BatchRead>> {
let batch_nodes = self.get_batch_nodes(&batch_reads, policy.replica)?;
let mut jobs = Vec::<BatchReadCommand>::new();
for (node, node_jobs) in batch_nodes {
for node_chunk in node_jobs.chunks(MAX_BATCH_REQUEST_SIZE) {
jobs.push( BatchReadCommand::new(policy, node.clone(), node_chunk.to_vec()) );
batch_reads: Vec<BatchRead<T>>,
) -> Result<Vec<BatchRead<T>>> {
let total = batch_reads.len();
let jobs = self.get_batch_nodes(policy, batch_reads)?;
let reads = self.execute_batch_jobs::<T>(jobs, policy.concurrency).await?;

let mut as_iter = reads.into_iter();
if let Some(BatchReadCommand { mut batch_reads, mut original_indexes, .. }) = as_iter.next() {
// Reserve enough to make the first element the return value
batch_reads.reserve_exact(total - batch_reads.len());
original_indexes.reserve_exact(total - original_indexes.len());
// Shove everything into the same list
for another_job in as_iter {
batch_reads.extend(another_job.batch_reads);
original_indexes.extend(another_job.original_indexes);
}

// Put records back where it belongs... this is 0(n) because everything is swapped into its correct position
for i in 0..batch_reads.len() {
while original_indexes[i] != i {
let to = original_indexes[i];
batch_reads.swap(i, to);
original_indexes.swap(i, to);
}
}
Ok(batch_reads)
} else {
Ok(Default::default())
}
let reads = self.execute_batch_jobs(jobs, policy.concurrency).await?;
let mut all_results: Vec<_> = reads.into_iter().flat_map(|cmd|cmd.batch_reads).collect();
all_results.sort_by_key(|(_, i)|*i);
Ok(all_results.into_iter().map(|(b, _)|b).collect())
}

async fn execute_batch_jobs(
async fn execute_batch_jobs<T: serde::de::DeserializeOwned + Send + 'static>(
&self,
jobs: Vec<BatchReadCommand>,
jobs: Vec<BatchReadCommand<T>>,
concurrency: Concurrency,
) -> Result<Vec<BatchReadCommand>> {
) -> Result<Vec<BatchReadCommand<T>>> {
let handles = jobs.into_iter().map(|job|job.execute(self.cluster.clone()));
match concurrency {
Concurrency::Sequential => futures::future::join_all(handles).await.into_iter().collect(),
Concurrency::Parallel => futures::future::join_all(handles.map(aerospike_rt::spawn)).await.into_iter().map(|value|value.map_err(|e|e.to_string())?).collect(),
}
}

fn get_batch_nodes(
fn get_batch_nodes<'l, T: serde::de::DeserializeOwned + Send>(
&self,
batch_reads: &[BatchRead],
replica: crate::policy::Replica,
) -> Result<HashMap<Arc<Node>, Vec<(BatchRead, usize)>>> {
let mut map = HashMap::new();
for (index, batch_read) in batch_reads.iter().enumerate() {
let node = self.node_for_key(&batch_read.key, replica)?;
map.entry(node)
.or_insert_with(Vec::new)
.push((batch_read.clone(), index));
policy: &BatchPolicy,
batch_reads: Vec<BatchRead<T>>,
) -> Result<Vec<BatchReadCommand<T>>> {
let mut map: HashMap<Arc<Node>, (Vec<BatchRead<T>>, Vec<usize>)> = HashMap::new();
let mut vec = Vec::new();
let choices = batch_reads.first().map(|read|self.cluster.n_nodes_for_policy(&read.key.namespace, policy.replica)).unwrap_or_default();
vec.reserve(choices);
let estimate = batch_reads.len() / (choices.max(2) - 1);

for (index, batch_read) in batch_reads.into_iter().enumerate() {
let node = self.node_for_key(&batch_read.key, policy.replica)?;
let (reads, indexes) = map.entry(node)
.or_insert_with(||{
let mut reads = Vec::new();
let mut indexes = Vec::new();
if estimate > MAX_BATCH_REQUEST_SIZE {
reads.reserve_exact(MAX_BATCH_REQUEST_SIZE);
indexes.reserve_exact(MAX_BATCH_REQUEST_SIZE);
} else {
reads.reserve(estimate);
indexes.reserve(estimate);
}
(reads, indexes)
});

// Enough reads, make a new one.
if reads.len() >= MAX_BATCH_REQUEST_SIZE {
// To avoid copying node above, we just re-do node from key when it's needed.
let node = self.node_for_key(&batch_read.key, policy.replica)?;
vec.push(BatchReadCommand::new(policy, node, std::mem::take(reads), std::mem::take(indexes)));
// If we're blowing out buffers, we'll probably do it again.
reads.reserve_exact(MAX_BATCH_REQUEST_SIZE);
indexes.reserve_exact(MAX_BATCH_REQUEST_SIZE);
}
reads.push(batch_read);
indexes.push(index);
}

vec.reserve_exact(map.len());
for (node, (reads, indexes)) in map {
vec.push(BatchReadCommand::new(policy, node, reads, indexes));
}
Ok(map)
Ok(vec)
}

fn node_for_key(&self, key: &Key, replica: crate::policy::Replica) -> Result<Arc<Node>> {
Expand Down
10 changes: 5 additions & 5 deletions aerospike-core/src/batch/batch_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ use serde::Serialize;
/// Key and bin names used in batch read commands where variable bins are needed for each key.
#[cfg_attr(feature = "serialization", derive(Serialize))]
#[derive(Debug, Clone)]
pub struct BatchRead {
pub struct BatchRead<T: serde::de::DeserializeOwned> {
/// Key.
pub key: Key,

/// Bins to retrieve for this key.
pub bins: Bins,

/// Will contain the record after the batch read operation.
pub record: Option<Record>,
pub record: Option<Record<T>>,
}

impl BatchRead {
impl<T: serde::de::DeserializeOwned> BatchRead<T> {
/// Create a new `BatchRead` instance for the given key and bin selector.
pub const fn new(key: Key, bins: Bins) -> Self {
BatchRead {
Expand All @@ -44,11 +44,11 @@ impl BatchRead {
}

#[doc(hidden)]
pub fn match_header(&self, other: &BatchRead, match_set: bool) -> bool {
pub fn match_header(&self, other: &BatchRead<T>, match_set: bool) -> bool {
let key = &self.key;
let other_key = &other.key;
(key.namespace == other_key.namespace)
&& (match_set && (key.set_name == other_key.set_name))
&& (self.bins == other.bins)
}
}
}
57 changes: 11 additions & 46 deletions aerospike-core/src/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@ use std::convert::From;

/// Container object for a record bin, comprising a name and a value.
#[derive(Clone)]
pub struct Bin<'a> {
pub struct Bin {
/// Bin name
pub name: &'a str,
pub name: String,

/// Bin value
pub value: Value,
}

impl<'a> Bin<'a> {
impl Bin {
/// Construct a new bin given a name and a value.
pub const fn new(name: &'a str, val: Value) -> Self {
pub const fn new(name: String, val: Value) -> Self {
Bin { name, value: val }
}
}

impl<'a> AsRef<Bin<'a>> for Bin<'a> {
impl AsRef<Bin> for Bin {
fn as_ref(&self) -> &Self {
self
}
Expand All @@ -45,10 +45,10 @@ impl<'a> AsRef<Bin<'a>> for Bin<'a> {
#[macro_export]
macro_rules! as_bin {
($bin_name:expr, None) => {{
$crate::Bin::new($bin_name, $crate::Value::Nil)
$crate::Bin::new($bin_name.into(), $crate::Value::Nil)
}};
($bin_name:expr, $val:expr) => {{
$crate::Bin::new($bin_name, $crate::Value::from($val))
$crate::Bin::new($bin_name.into(), $crate::Value::from($val))
}};
}

Expand Down Expand Up @@ -78,50 +78,15 @@ impl Bins {
}
}

impl<'a> From<&'a [&'a str]> for Bins {
fn from(bins: &'a [&'a str]) -> Self {
impl From<&[&str]> for Bins {
fn from(bins: &[&str]) -> Self {
let bins = bins.iter().copied().map(String::from).collect();
Bins::Some(bins)
}
}

impl<'a> From<[&'a str; 1]> for Bins {
fn from(bins: [&'a str; 1]) -> Self {
let bins = bins.iter().copied().map(String::from).collect();
Bins::Some(bins)
}
}

impl<'a> From<[&'a str; 2]> for Bins {
fn from(bins: [&'a str; 2]) -> Self {
let bins = bins.iter().copied().map(String::from).collect();
Bins::Some(bins)
}
}

impl<'a> From<[&'a str; 3]> for Bins {
fn from(bins: [&'a str; 3]) -> Self {
let bins = bins.iter().copied().map(String::from).collect();
Bins::Some(bins)
}
}

impl<'a> From<[&'a str; 4]> for Bins {
fn from(bins: [&'a str; 4]) -> Self {
let bins = bins.iter().copied().map(String::from).collect();
Bins::Some(bins)
}
}

impl<'a> From<[&'a str; 5]> for Bins {
fn from(bins: [&'a str; 5]) -> Self {
let bins = bins.iter().copied().map(String::from).collect();
Bins::Some(bins)
}
}

impl<'a> From<[&'a str; 6]> for Bins {
fn from(bins: [&'a str; 6]) -> Self {
impl<const COUNT: usize> From<[&str; COUNT]> for Bins {
fn from(bins: [&str; COUNT]) -> Self {
let bins = bins.iter().copied().map(String::from).collect();
Bins::Some(bins)
}
Expand Down
Loading