From edc4ee1ac076ecae5ed1df60b5e2b0633e547608 Mon Sep 17 00:00:00 2001 From: Mathieu Baudier Date: Fri, 16 Feb 2018 15:06:19 +0100 Subject: [PATCH] Reintroduce simple transaction manager --- demo/argeo_node_rap.properties | 1 + .../argeo/cms/internal/kernel/CmsState.java | 23 ++- .../osgi/useradmin/LdifUserAdminTest.java | 34 +++- .../transaction/simple/SimpleTransaction.java | 164 +++++++++++++++++ .../simple/SimpleTransactionException.java | 14 ++ .../simple/SimpleTransactionManager.java | 173 ++++++++++++++++++ .../org/argeo/transaction/simple/UuidXid.java | 132 +++++++++++++ .../src/org/argeo/node/NodeConstants.java | 6 +- 8 files changed, 532 insertions(+), 15 deletions(-) create mode 100644 org.argeo.enterprise/src/org/argeo/transaction/simple/SimpleTransaction.java create mode 100644 org.argeo.enterprise/src/org/argeo/transaction/simple/SimpleTransactionException.java create mode 100644 org.argeo.enterprise/src/org/argeo/transaction/simple/SimpleTransactionManager.java create mode 100644 org.argeo.enterprise/src/org/argeo/transaction/simple/UuidXid.java diff --git a/demo/argeo_node_rap.properties b/demo/argeo_node_rap.properties index bb6fe3ee0..3a7c093bd 100644 --- a/demo/argeo_node_rap.properties +++ b/demo/argeo_node_rap.properties @@ -21,6 +21,7 @@ java.security.manager= java.security.policy=file:../../all.policy argeo.node.repo.type=h2 +#argeo.node.transaction.manager=bitronix #argeo.node.useradmin.uris=ldap://cn=Directory%20Manager:argeoargeo@localhost:10389/dc=example,dc=com diff --git a/org.argeo.cms/src/org/argeo/cms/internal/kernel/CmsState.java b/org.argeo.cms/src/org/argeo/cms/internal/kernel/CmsState.java index 0b5bd0d48..b72734f19 100644 --- a/org.argeo.cms/src/org/argeo/cms/internal/kernel/CmsState.java +++ b/org.argeo.cms/src/org/argeo/cms/internal/kernel/CmsState.java @@ -5,7 +5,6 @@ import static bitronix.tm.TransactionManagerServices.getTransactionSynchronizati import static java.util.Locale.ENGLISH; import java.io.File; -import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.file.spi.FileSystemProvider; @@ -21,9 +20,11 @@ import javax.transaction.UserTransaction; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.argeo.cms.CmsException; import org.argeo.cms.i18n.LocaleUtils; import org.argeo.node.NodeConstants; import org.argeo.node.NodeState; +import org.argeo.transaction.simple.SimpleTransactionManager; import org.argeo.util.LangUtils; import org.osgi.framework.Bundle; import org.osgi.framework.BundleContext; @@ -37,7 +38,7 @@ import bitronix.tm.BitronixTransactionSynchronizationRegistry; import bitronix.tm.TransactionManagerServices; public class CmsState implements NodeState { - private final Log log = LogFactory.getLog(CmsState.class); + private final static Log log = LogFactory.getLog(CmsState.class); private final BundleContext bc = FrameworkUtil.getBundle(CmsState.class).getBundleContext(); // REFERENCES @@ -87,7 +88,14 @@ public class CmsState implements NodeState { private void initServices() { // JTA - initTransactionManager(); + String tmType = KernelUtils.getFrameworkProp(NodeConstants.TRANSACTION_MANAGER, NodeConstants.TRANSACTION_MANAGER_SIMPLE); + if (NodeConstants.TRANSACTION_MANAGER_SIMPLE.equals(tmType)) { + initSimpleTransactionManager(); + } else if (NodeConstants.TRANSACTION_MANAGER_BITRONIX.equals(tmType)) { + initBitronixTransactionManager(); + } else { + throw new CmsException("Usupported transaction manager type " + tmType); + } // JCR RepositoryServiceFactory repositoryServiceFactory = new RepositoryServiceFactory(); @@ -110,7 +118,14 @@ public class CmsState implements NodeState { LangUtils.dico(Constants.SERVICE_PID, NodeConstants.NODE_FS_PROVIDER_PID)); } - private void initTransactionManager() { + private void initSimpleTransactionManager() { + SimpleTransactionManager transactionManager = new SimpleTransactionManager(); + bc.registerService(TransactionManager.class, transactionManager, null); + bc.registerService(UserTransaction.class, transactionManager, null); + // TODO TransactionSynchronizationRegistry + } + + private void initBitronixTransactionManager() { // TODO manage it in a managed service, as startup could be long ServiceReference existingTm = bc.getServiceReference(TransactionManager.class); if (existingTm != null) { diff --git a/org.argeo.enterprise/ext/test/org/argeo/osgi/useradmin/LdifUserAdminTest.java b/org.argeo.enterprise/ext/test/org/argeo/osgi/useradmin/LdifUserAdminTest.java index d930a4981..956bb2e23 100644 --- a/org.argeo.enterprise/ext/test/org/argeo/osgi/useradmin/LdifUserAdminTest.java +++ b/org.argeo.enterprise/ext/test/org/argeo/osgi/useradmin/LdifUserAdminTest.java @@ -21,6 +21,7 @@ import java.util.UUID; import javax.transaction.TransactionManager; import org.argeo.naming.LdapAttrs; +import org.argeo.transaction.simple.SimpleTransactionManager; import org.osgi.service.useradmin.Authorization; import org.osgi.service.useradmin.Group; import org.osgi.service.useradmin.Role; @@ -32,7 +33,11 @@ import bitronix.tm.resource.ehcache.EhCacheXAResourceProducer; import junit.framework.TestCase; public class LdifUserAdminTest extends TestCase implements BasicTestConstants { - private BitronixTransactionManager tm; + final static int TM_SIMPLE = 0; + final static int TM_BITRONIX = 1; + + private int tmType = TM_SIMPLE; + private TransactionManager tm; private URI uri; private AbstractUserDirectory userAdmin; private Path tempDir; @@ -152,11 +157,16 @@ public class LdifUserAdminTest extends TestCase implements BasicTestConstants { uri = ldifPath.toUri(); } - bitronix.tm.Configuration tmConf = TransactionManagerServices.getConfiguration(); - tmConf.setServerId(UUID.randomUUID().toString()); - tmConf.setLogPart1Filename(new File(tempDir.toFile(), "btm1.tlog").getAbsolutePath()); - tmConf.setLogPart2Filename(new File(tempDir.toFile(), "btm2.tlog").getAbsolutePath()); - tm = TransactionManagerServices.getTransactionManager(); + // Init transaction manager + if (TM_SIMPLE == tmType) { + tm = new SimpleTransactionManager(); + } else if (TM_BITRONIX == tmType) { + bitronix.tm.Configuration tmConf = TransactionManagerServices.getConfiguration(); + tmConf.setServerId(UUID.randomUUID().toString()); + tmConf.setLogPart1Filename(new File(tempDir.toFile(), "btm1.tlog").getAbsolutePath()); + tmConf.setLogPart2Filename(new File(tempDir.toFile(), "btm2.tlog").getAbsolutePath()); + tm = TransactionManagerServices.getTransactionManager(); + } userAdmin = initUserAdmin(uri, tm); } @@ -174,13 +184,15 @@ public class LdifUserAdminTest extends TestCase implements BasicTestConstants { userAdmin = new LdifUserAdmin(props); userAdmin.init(); // JTA - EhCacheXAResourceProducer.registerXAResource(UserDirectory.class.getName(), userAdmin.getXaResource()); + if (TM_BITRONIX == tmType) + EhCacheXAResourceProducer.registerXAResource(UserDirectory.class.getName(), userAdmin.getXaResource()); userAdmin.setTransactionManager(tm); return userAdmin; } private void persistAndRestart() { - EhCacheXAResourceProducer.unregisterXAResource(UserDirectory.class.getName(), userAdmin.getXaResource()); + if (TM_BITRONIX == tmType) + EhCacheXAResourceProducer.unregisterXAResource(UserDirectory.class.getName(), userAdmin.getXaResource()); if (userAdmin instanceof LdifUserAdmin) ((LdifUserAdmin) userAdmin).save(); userAdmin.destroy(); @@ -189,8 +201,10 @@ public class LdifUserAdminTest extends TestCase implements BasicTestConstants { @Override protected void tearDown() throws Exception { - EhCacheXAResourceProducer.unregisterXAResource(UserDirectory.class.getName(), userAdmin.getXaResource()); - tm.shutdown(); + if (TM_BITRONIX == tmType) { + EhCacheXAResourceProducer.unregisterXAResource(UserDirectory.class.getName(), userAdmin.getXaResource()); + ((BitronixTransactionManager) tm).shutdown(); + } if (userAdmin != null) userAdmin.destroy(); if (tempDir != null) diff --git a/org.argeo.enterprise/src/org/argeo/transaction/simple/SimpleTransaction.java b/org.argeo.enterprise/src/org/argeo/transaction/simple/SimpleTransaction.java new file mode 100644 index 000000000..fbb138668 --- /dev/null +++ b/org.argeo.enterprise/src/org/argeo/transaction/simple/SimpleTransaction.java @@ -0,0 +1,164 @@ +package org.argeo.transaction.simple; + +import java.util.ArrayList; +import java.util.List; + +import javax.transaction.HeuristicMixedException; +import javax.transaction.HeuristicRollbackException; +import javax.transaction.RollbackException; +import javax.transaction.Status; +import javax.transaction.Synchronization; +import javax.transaction.SystemException; +import javax.transaction.Transaction; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +class SimpleTransaction implements Transaction, Status { + private final static Log log = LogFactory.getLog(SimpleTransaction.class); + + private final Xid xid; + private int status = Status.STATUS_ACTIVE; + private final List xaResources = new ArrayList(); + + private final SimpleTransactionManager transactionManager; + + public SimpleTransaction(SimpleTransactionManager transactionManager) { + this.xid = new UuidXid(); + this.transactionManager = transactionManager; + } + + @Override + public synchronized void commit() throws RollbackException, + HeuristicMixedException, HeuristicRollbackException, + SecurityException, IllegalStateException, SystemException { + status = STATUS_PREPARING; + for (XAResource xaRes : xaResources) { + if (status == STATUS_MARKED_ROLLBACK) + break; + try { + xaRes.prepare(xid); + } catch (XAException e) { + status = STATUS_MARKED_ROLLBACK; + log.error("Cannot prepare " + xaRes + " for " + xid, e); + } + } + if (status == STATUS_MARKED_ROLLBACK) { + rollback(); + throw new RollbackException(); + } + status = STATUS_PREPARED; + + status = STATUS_COMMITTING; + for (XAResource xaRes : xaResources) { + if (status == STATUS_MARKED_ROLLBACK) + break; + try { + xaRes.commit(xid, false); + } catch (XAException e) { + status = STATUS_MARKED_ROLLBACK; + log.error("Cannot prepare " + xaRes + " for " + xid, e); + } + } + if (status == STATUS_MARKED_ROLLBACK) { + rollback(); + throw new RollbackException(); + } + + // complete + status = STATUS_COMMITTED; + if (log.isTraceEnabled()) + log.trace("COMMITTED " + xid); + clearResources(XAResource.TMSUCCESS); + transactionManager.unregister(xid); + } + + @Override + public synchronized void rollback() throws IllegalStateException, + SystemException { + status = STATUS_ROLLING_BACK; + for (XAResource xaRes : xaResources) { + try { + xaRes.rollback(xid); + } catch (XAException e) { + log.error("Cannot rollback " + xaRes + " for " + xid, e); + } + } + + // complete + status = STATUS_ROLLEDBACK; + if (log.isTraceEnabled()) + log.trace("ROLLEDBACK " + xid); + clearResources(XAResource.TMFAIL); + transactionManager.unregister(xid); + } + + @Override + public synchronized boolean enlistResource(XAResource xaRes) + throws RollbackException, IllegalStateException, SystemException { + if (xaResources.add(xaRes)) { + try { + xaRes.start(getXid(), XAResource.TMNOFLAGS); + return true; + } catch (XAException e) { + log.error("Cannot enlist " + xaRes, e); + return false; + } + } else + return false; + } + + @Override + public synchronized boolean delistResource(XAResource xaRes, int flag) + throws IllegalStateException, SystemException { + if (xaResources.remove(xaRes)) { + try { + xaRes.end(getXid(), flag); + } catch (XAException e) { + log.error("Cannot delist " + xaRes, e); + return false; + } + return true; + } else + return false; + } + + protected void clearResources(int flag) { + for (XAResource xaRes : xaResources) + try { + xaRes.end(getXid(), flag); + } catch (XAException e) { + log.error("Cannot end " + xaRes, e); + } + xaResources.clear(); + } + + @Override + public synchronized int getStatus() throws SystemException { + return status; + } + + @Override + public void registerSynchronization(Synchronization sync) + throws RollbackException, IllegalStateException, SystemException { + throw new UnsupportedOperationException(); + } + + @Override + public void setRollbackOnly() throws IllegalStateException, SystemException { + status = STATUS_MARKED_ROLLBACK; + } + + @Override + public int hashCode() { + return xid.hashCode(); + } + + Xid getXid() { + return xid; + } + +} diff --git a/org.argeo.enterprise/src/org/argeo/transaction/simple/SimpleTransactionException.java b/org.argeo.enterprise/src/org/argeo/transaction/simple/SimpleTransactionException.java new file mode 100644 index 000000000..d00def966 --- /dev/null +++ b/org.argeo.enterprise/src/org/argeo/transaction/simple/SimpleTransactionException.java @@ -0,0 +1,14 @@ +package org.argeo.transaction.simple; + +public class SimpleTransactionException extends RuntimeException { + private static final long serialVersionUID = -7465792070797750212L; + + public SimpleTransactionException(String message) { + super(message); + } + + public SimpleTransactionException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/org.argeo.enterprise/src/org/argeo/transaction/simple/SimpleTransactionManager.java b/org.argeo.enterprise/src/org/argeo/transaction/simple/SimpleTransactionManager.java new file mode 100644 index 000000000..fef728146 --- /dev/null +++ b/org.argeo.enterprise/src/org/argeo/transaction/simple/SimpleTransactionManager.java @@ -0,0 +1,173 @@ +package org.argeo.transaction.simple; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import javax.transaction.HeuristicMixedException; +import javax.transaction.HeuristicRollbackException; +import javax.transaction.InvalidTransactionException; +import javax.transaction.NotSupportedException; +import javax.transaction.RollbackException; +import javax.transaction.Status; +import javax.transaction.Synchronization; +import javax.transaction.SystemException; +import javax.transaction.Transaction; +import javax.transaction.TransactionManager; +import javax.transaction.TransactionSynchronizationRegistry; +import javax.transaction.UserTransaction; +import javax.transaction.xa.Xid; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class SimpleTransactionManager implements TransactionManager, UserTransaction { + private final static Log log = LogFactory.getLog(SimpleTransactionManager.class); + + private ThreadLocal current = new ThreadLocal(); + + private Map knownTransactions = Collections + .synchronizedMap(new HashMap()); + private SyncRegistry syncRegistry = new SyncRegistry(); + + @Override + public void begin() throws NotSupportedException, SystemException { + if (getCurrent() != null) + throw new NotSupportedException("Nested transactions are not supported"); + SimpleTransaction transaction = new SimpleTransaction(this); + knownTransactions.put(transaction.getXid(), transaction); + current.set(transaction); + if (log.isTraceEnabled()) + log.trace("STARTED " + transaction.getXid()); + } + + @Override + public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, + SecurityException, IllegalStateException, SystemException { + if (getCurrent() == null) + throw new IllegalStateException("No transaction registered with the current thread."); + getCurrent().commit(); + } + + @Override + public int getStatus() throws SystemException { + if (getCurrent() == null) + return Status.STATUS_NO_TRANSACTION; + return getTransaction().getStatus(); + } + + @Override + public Transaction getTransaction() throws SystemException { + return getCurrent(); + } + + protected SimpleTransaction getCurrent() throws SystemException { + SimpleTransaction transaction = current.get(); + if (transaction == null) + return null; + int status = transaction.getStatus(); + if (Status.STATUS_COMMITTED == status || Status.STATUS_ROLLEDBACK == status) { + current.remove(); + return null; + } + return transaction; + } + + void unregister(Xid xid) { + knownTransactions.remove(xid); + } + + @Override + public void resume(Transaction tobj) throws InvalidTransactionException, IllegalStateException, SystemException { + if (getCurrent() != null) + throw new IllegalStateException("Transaction " + current.get() + " already registered"); + current.set((SimpleTransaction) tobj); + } + + @Override + public void rollback() throws IllegalStateException, SecurityException, SystemException { + if (getCurrent() == null) + throw new IllegalStateException("No transaction registered with the current thread."); + getCurrent().rollback(); + } + + @Override + public void setRollbackOnly() throws IllegalStateException, SystemException { + if (getCurrent() == null) + throw new IllegalStateException("No transaction registered with the current thread."); + getCurrent().setRollbackOnly(); + } + + @Override + public void setTransactionTimeout(int seconds) throws SystemException { + throw new UnsupportedOperationException(); + } + + @Override + public Transaction suspend() throws SystemException { + Transaction transaction = getCurrent(); + current.remove(); + return transaction; + } + + public TransactionSynchronizationRegistry getTsr() { + return syncRegistry; + } + + private class SyncRegistry implements TransactionSynchronizationRegistry { + @Override + public Object getTransactionKey() { + try { + SimpleTransaction transaction = getCurrent(); + if (transaction == null) + return null; + return getCurrent().getXid(); + } catch (SystemException e) { + throw new SimpleTransactionException("Cannot get transaction key", e); + } + } + + @Override + public void putResource(Object key, Object value) { + throw new UnsupportedOperationException(); + } + + @Override + public Object getResource(Object key) { + throw new UnsupportedOperationException(); + } + + @Override + public void registerInterposedSynchronization(Synchronization sync) { + throw new UnsupportedOperationException(); + } + + @Override + public int getTransactionStatus() { + try { + return getStatus(); + } catch (SystemException e) { + throw new SimpleTransactionException("Cannot get status", e); + } + } + + @Override + public boolean getRollbackOnly() { + try { + return getStatus() == Status.STATUS_MARKED_ROLLBACK; + } catch (SystemException e) { + throw new SimpleTransactionException("Cannot get status", e); + } + } + + @Override + public void setRollbackOnly() { + try { + getCurrent().setRollbackOnly(); + } catch (Exception e) { + throw new SimpleTransactionException("Cannot set rollback only", e); + } + } + + } +} diff --git a/org.argeo.enterprise/src/org/argeo/transaction/simple/UuidXid.java b/org.argeo.enterprise/src/org/argeo/transaction/simple/UuidXid.java new file mode 100644 index 000000000..1009c8200 --- /dev/null +++ b/org.argeo.enterprise/src/org/argeo/transaction/simple/UuidXid.java @@ -0,0 +1,132 @@ +package org.argeo.transaction.simple; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.UUID; + +import javax.transaction.xa.Xid; + +/** + * Implementation of {@link Xid} based on {@link UUID}, using max significant + * bits as global transaction id, and least significant bits as branch + * qualifier. + */ +public class UuidXid implements Xid, Serializable { + private static final long serialVersionUID = -5380531989917886819L; + public final static int FORMAT = (int) serialVersionUID; + + private final static int BYTES_PER_LONG = Long.SIZE / Byte.SIZE; + + private final int format; + private final byte[] globalTransactionId; + private final byte[] branchQualifier; + private final String uuid; + private final int hashCode; + + public UuidXid() { + this(UUID.randomUUID()); + } + + public UuidXid(UUID uuid) { + this.format = FORMAT; + this.globalTransactionId = uuidToBytes(uuid.getMostSignificantBits()); + this.branchQualifier = uuidToBytes(uuid.getLeastSignificantBits()); + this.uuid = uuid.toString(); + this.hashCode = uuid.hashCode(); + } + + public UuidXid(Xid xid) { + this(xid.getFormatId(), xid.getGlobalTransactionId(), xid + .getBranchQualifier()); + } + + private UuidXid(int format, byte[] globalTransactionId, + byte[] branchQualifier) { + this.format = format; + this.globalTransactionId = globalTransactionId; + this.branchQualifier = branchQualifier; + this.uuid = bytesToUUID(globalTransactionId, branchQualifier) + .toString(); + this.hashCode = uuid.hashCode(); + } + + @Override + public int getFormatId() { + return format; + } + + @Override + public byte[] getGlobalTransactionId() { + return Arrays.copyOf(globalTransactionId, globalTransactionId.length); + } + + @Override + public byte[] getBranchQualifier() { + return Arrays.copyOf(branchQualifier, branchQualifier.length); + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj instanceof UuidXid) { + UuidXid that = (UuidXid) obj; + return Arrays.equals(globalTransactionId, that.globalTransactionId) + && Arrays.equals(branchQualifier, that.branchQualifier); + } + if (obj instanceof Xid) { + Xid that = (Xid) obj; + return Arrays.equals(globalTransactionId, + that.getGlobalTransactionId()) + && Arrays + .equals(branchQualifier, that.getBranchQualifier()); + } + return uuid.equals(obj.toString()); + } + + @Override + protected Object clone() throws CloneNotSupportedException { + return new UuidXid(format, globalTransactionId, branchQualifier); + } + + @Override + public String toString() { + return uuid; + } + + public UUID asUuid() { + return bytesToUUID(globalTransactionId, branchQualifier); + } + + public static byte[] uuidToBytes(long bits) { + ByteBuffer buffer = ByteBuffer.allocate(BYTES_PER_LONG); + buffer.putLong(0, bits); + return buffer.array(); + } + + public static UUID bytesToUUID(byte[] most, byte[] least) { + if (most.length < BYTES_PER_LONG) + most = Arrays.copyOf(most, BYTES_PER_LONG); + if (least.length < BYTES_PER_LONG) + least = Arrays.copyOf(least, BYTES_PER_LONG); + ByteBuffer buffer = ByteBuffer.allocate(2 * BYTES_PER_LONG); + buffer.put(most, 0, BYTES_PER_LONG); + buffer.put(least, 0, BYTES_PER_LONG); + buffer.flip(); + return new UUID(buffer.getLong(), buffer.getLong()); + } + + // public static void main(String[] args) { + // UUID uuid = UUID.randomUUID(); + // System.out.println(uuid); + // uuid = bytesToUUID(uuidToBytes(uuid.getMostSignificantBits()), + // uuidToBytes(uuid.getLeastSignificantBits())); + // System.out.println(uuid); + // } +} diff --git a/org.argeo.node.api/src/org/argeo/node/NodeConstants.java b/org.argeo.node.api/src/org/argeo/node/NodeConstants.java index 75b7826c9..36be492e9 100644 --- a/org.argeo.node.api/src/org/argeo/node/NodeConstants.java +++ b/org.argeo.node.api/src/org/argeo/node/NodeConstants.java @@ -78,7 +78,7 @@ public interface NodeConstants { String NODE_SERVICE = NODE; /* - * FIRST INIT FRAMEWORK PROPERTIES + * INIT FRAMEWORK PROPERTIES */ String NODE_INIT = "argeo.node.init"; String I18N_DEFAULT_LOCALE = "argeo.i18n.defaultLocale"; @@ -87,6 +87,10 @@ public interface NodeConstants { String ROLES_URI = "argeo.node.roles.uri"; /** URI to an LDIF file or LDAP server used as initialization or backend */ String USERADMIN_URIS = "argeo.node.useradmin.uris"; + // Transaction manager + String TRANSACTION_MANAGER = "argeo.node.transaction.manager"; + String TRANSACTION_MANAGER_SIMPLE = "simple"; + String TRANSACTION_MANAGER_BITRONIX = "bitronix"; // Node /** Properties configuring the node repository */ String NODE_REPO_PROP_PREFIX = "argeo.node.repo."; -- 2.30.2