Skip to content

Commit 431a4bf

Browse files
committed
#80 refactor to sync resource, wrap db exceptions
1 parent 466bc52 commit 431a4bf

File tree

5 files changed

+286
-113
lines changed

5 files changed

+286
-113
lines changed

src/main/java/com/arangodb/springframework/transaction/ArangoTransaction.java

Lines changed: 0 additions & 88 deletions
This file was deleted.
Lines changed: 62 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
11
package com.arangodb.springframework.transaction;
22

3+
import com.arangodb.ArangoDBException;
34
import com.arangodb.ArangoDatabase;
45
import com.arangodb.DbName;
56
import com.arangodb.model.StreamTransactionOptions;
67
import com.arangodb.springframework.core.ArangoOperations;
78
import com.arangodb.springframework.repository.query.QueryTransactionBridge;
8-
import org.springframework.transaction.InvalidIsolationLevelException;
9-
import org.springframework.transaction.TransactionDefinition;
10-
import org.springframework.transaction.TransactionException;
9+
import org.springframework.transaction.*;
1110
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
1211
import org.springframework.transaction.support.DefaultTransactionStatus;
13-
14-
import java.util.Collection;
15-
import java.util.function.Function;
12+
import org.springframework.transaction.support.TransactionSynchronizationManager;
1613

1714
/**
1815
* Transaction manager using ArangoDB stream transactions on the
@@ -29,51 +26,97 @@ public class ArangoTransactionManager extends AbstractPlatformTransactionManager
2926
public ArangoTransactionManager(ArangoOperations operations, QueryTransactionBridge bridge) {
3027
this.operations = operations;
3128
this.bridge = bridge;
29+
setNestedTransactionAllowed(false);
30+
setTransactionSynchronization(SYNCHRONIZATION_ALWAYS);
31+
setValidateExistingTransaction(true);
32+
setRollbackOnCommitFailure(true);
3233
}
3334

35+
/**
36+
* Creates a new transaction object. Any synchronized resource will be reused.
37+
*/
3438
@Override
35-
protected Object doGetTransaction() throws TransactionException {
39+
protected Object doGetTransaction() {
3640
DbName database = operations.getDatabaseName();
3741
if (logger.isDebugEnabled()) {
3842
logger.debug("Create new transaction for database " + database);
3943
}
40-
return new ArangoTransaction(operations.driver().db(database));
44+
try {
45+
ArangoTransactionResource resource = (ArangoTransactionResource) TransactionSynchronizationManager.getResource(database);
46+
return new ArangoTransactionObject(operations.driver().db(database), getDefaultTimeout(), resource);
47+
} catch (ArangoDBException error) {
48+
throw new TransactionSystemException("Cannot create transaction object", error);
49+
}
4150
}
4251

52+
/**
53+
* Configures the new transaction object. The resulting resource will be synchronized and the bridge will be initialized.
54+
*
55+
* @see QueryTransactionBridge
56+
*/
4357
@Override
44-
protected void doBegin(Object transaction, TransactionDefinition definition) throws InvalidIsolationLevelException {
58+
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionUsageException {
4559
int isolationLevel = definition.getIsolationLevel();
4660
if (isolationLevel != -1 && (isolationLevel & TransactionDefinition.ISOLATION_SERIALIZABLE) != 0) {
4761
throw new InvalidIsolationLevelException("ArangoDB does not support isolation level serializable");
4862
}
49-
ArangoTransaction tx = (ArangoTransaction) transaction;
63+
ArangoTransactionObject tx = (ArangoTransactionObject) transaction;
5064
tx.configure(definition);
51-
bridge.setCurrentTransaction(tx::getOrBegin);
65+
DbName key = operations.getDatabaseName();
66+
rebind(key, tx.createResource());
67+
bridge.setCurrentTransaction(collections -> {
68+
ArangoTransactionResource resource = tx.getOrBegin(collections);
69+
rebind(key, resource);
70+
return resource.getStreamTransactionId();
71+
});
5272
}
5373

74+
/**
75+
* Commit the current stream transaction iff any. The bridge is cleared afterwards.
76+
*/
5477
@Override
5578
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
56-
ArangoTransaction tx = (ArangoTransaction) status.getTransaction();
79+
ArangoTransactionObject tx = (ArangoTransactionObject) status.getTransaction();
5780
if (logger.isDebugEnabled()) {
5881
logger.debug("Commit stream transaction " + tx);
5982
}
60-
tx.commit();
61-
bridge.clearCurrentTransaction();
83+
try {
84+
tx.commit();
85+
bridge.clearCurrentTransaction();
86+
} catch (ArangoDBException error) {
87+
throw new TransactionSystemException("Cannot commit transaction " + tx, error);
88+
}
6289
}
6390

91+
/**
92+
* Roll back the current stream transaction iff any. The bridge is cleared afterwards.
93+
*/
6494
@Override
6595
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
66-
ArangoTransaction tx = (ArangoTransaction) status.getTransaction();
96+
ArangoTransactionObject tx = (ArangoTransactionObject) status.getTransaction();
6797
if (logger.isDebugEnabled()) {
6898
logger.debug("Rollback stream transaction " + tx);
6999
}
70-
tx.rollback();
71-
bridge.clearCurrentTransaction();
100+
try {
101+
tx.rollback();
102+
bridge.clearCurrentTransaction();
103+
} catch (ArangoDBException error) {
104+
throw new TransactionSystemException("Cannot roll back transaction " + tx, error);
105+
}
72106
}
73107

74108
@Override
75109
protected boolean isExistingTransaction(Object transaction) throws TransactionException {
76-
return transaction instanceof ArangoTransaction
77-
&& ((ArangoTransaction) transaction).exists();
110+
return transaction instanceof ArangoTransactionObject && ((ArangoTransactionObject) transaction).exists();
111+
}
112+
113+
@Override
114+
protected void doCleanupAfterCompletion(Object transaction) {
115+
TransactionSynchronizationManager.unbindResource(operations.getDatabaseName());
116+
}
117+
118+
private static void rebind(DbName key, ArangoTransactionResource resource) {
119+
TransactionSynchronizationManager.unbindResourceIfPossible(key);
120+
TransactionSynchronizationManager.bindResource(key, resource);
78121
}
79122
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package com.arangodb.springframework.transaction;
2+
3+
import com.arangodb.ArangoDatabase;
4+
import com.arangodb.DbName;
5+
import com.arangodb.entity.StreamTransactionEntity;
6+
import com.arangodb.entity.StreamTransactionStatus;
7+
import com.arangodb.model.StreamTransactionOptions;
8+
import org.apache.commons.logging.Log;
9+
import org.apache.commons.logging.LogFactory;
10+
import org.springframework.lang.Nullable;
11+
import org.springframework.transaction.IllegalTransactionStateException;
12+
import org.springframework.transaction.TransactionDefinition;
13+
import org.springframework.transaction.interceptor.TransactionAttribute;
14+
import org.springframework.transaction.support.SmartTransactionObject;
15+
16+
import java.util.Collection;
17+
import java.util.HashSet;
18+
import java.util.Set;
19+
20+
class ArangoTransactionObject implements SmartTransactionObject {
21+
22+
private static final Log logger = LogFactory.getLog(ArangoTransactionObject.class);
23+
24+
private final ArangoDatabase database;
25+
private final Set<String> writeCollections = new HashSet<>();
26+
private int timeout;
27+
private StreamTransactionEntity streamTransaction;
28+
29+
ArangoTransactionObject(ArangoDatabase database, int defaultTimeout, @Nullable ArangoTransactionResource resource) {
30+
this.database = database;
31+
this.timeout = defaultTimeout;
32+
if (resource != null) {
33+
writeCollections.addAll(resource.getCollectionNames());
34+
if (resource.getStreamTransactionId() != null) {
35+
streamTransaction = database.getStreamTransaction(resource.getStreamTransactionId());
36+
}
37+
}
38+
}
39+
40+
ArangoTransactionResource createResource() {
41+
return new ArangoTransactionResource(streamTransaction == null ? null : streamTransaction.getId(), writeCollections);
42+
}
43+
44+
boolean exists() {
45+
return streamTransaction != null;
46+
}
47+
48+
void configure(TransactionDefinition definition) {
49+
if (definition.getTimeout() != -1) {
50+
this.timeout = definition.getTimeout();
51+
}
52+
if (definition instanceof TransactionAttribute) {
53+
addCollections(((TransactionAttribute) definition).getLabels());
54+
}
55+
}
56+
57+
ArangoTransactionResource getOrBegin(Collection<String> collections) {
58+
addCollections(collections);
59+
if (streamTransaction != null) {
60+
return createResource();
61+
}
62+
StreamTransactionOptions options = new StreamTransactionOptions()
63+
.allowImplicit(true)
64+
.writeCollections(writeCollections.toArray(new String[0]))
65+
.lockTimeout(Math.max(timeout, 0));
66+
streamTransaction = database.beginStreamTransaction(options);
67+
if (logger.isDebugEnabled()) {
68+
logger.debug("Began stream transaction " + streamTransaction.getId() + " writing collections " + writeCollections);
69+
}
70+
return createResource();
71+
}
72+
73+
void commit() {
74+
if (streamTransaction != null && streamTransaction.getStatus() == StreamTransactionStatus.running) {
75+
database.commitStreamTransaction(streamTransaction.getId());
76+
}
77+
}
78+
79+
void rollback() {
80+
if (streamTransaction != null && streamTransaction.getStatus() == StreamTransactionStatus.running) {
81+
database.abortStreamTransaction(streamTransaction.getId());
82+
}
83+
}
84+
85+
@Override
86+
public boolean isRollbackOnly() {
87+
return streamTransaction != null && streamTransaction.getStatus() == StreamTransactionStatus.aborted;
88+
}
89+
90+
@Override
91+
public void flush() {
92+
// nothing to do
93+
}
94+
95+
@Override
96+
public String toString() {
97+
return streamTransaction == null ? "(not begun)" : streamTransaction.getId();
98+
}
99+
100+
private void addCollections(Collection<String> collections) {
101+
if (streamTransaction != null) {
102+
if (!writeCollections.containsAll(collections)) {
103+
Set<String> additional = new HashSet<>(collections);
104+
additional.removeAll(writeCollections);
105+
throw new IllegalTransactionStateException("Stream transaction already started on collections " + writeCollections + ", no additional collections allowed: " + additional);
106+
}
107+
}
108+
writeCollections.addAll(collections);
109+
}
110+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.arangodb.springframework.transaction;
2+
3+
import org.springframework.lang.Nullable;
4+
5+
import java.util.Collection;
6+
import java.util.HashSet;
7+
import java.util.Objects;
8+
import java.util.Set;
9+
10+
class ArangoTransactionResource {
11+
12+
private final String streamTransactionId;
13+
private final Set<String> collectionNames;
14+
15+
ArangoTransactionResource(@Nullable String streamTransactionId, Collection<String> collectionNames) {
16+
this.streamTransactionId = streamTransactionId;
17+
this.collectionNames = new HashSet<>(collectionNames);
18+
}
19+
20+
String getStreamTransactionId() {
21+
return streamTransactionId;
22+
}
23+
24+
Set<String> getCollectionNames() {
25+
return collectionNames;
26+
}
27+
28+
@Override
29+
public boolean equals(Object o) {
30+
if (this == o) return true;
31+
if (o == null || getClass() != o.getClass()) return false;
32+
ArangoTransactionResource that = (ArangoTransactionResource) o;
33+
return Objects.equals(streamTransactionId, that.streamTransactionId) && collectionNames.equals(that.collectionNames);
34+
}
35+
36+
@Override
37+
public int hashCode() {
38+
return Objects.hash(streamTransactionId);
39+
}
40+
}

0 commit comments

Comments
 (0)