X-Git-Url: https://git.argeo.org/?a=blobdiff_plain;f=org.argeo.api.uuid%2Fsrc%2Forg%2Fargeo%2Fapi%2Fuuid%2FConcurrentTimeUuidState.java;h=fd158fb8315db647b8d0b07e7790d1ec57aa2b3f;hb=98f2c711418690e62e35c931e35d1dc92fd07926;hp=9a414e8fa7e3585a21e6c2bde6ac3e33a91fbea3;hpb=580e8ff2af47001eb3e0947488395b48773be469;p=lgpl%2Fargeo-commons.git diff --git a/org.argeo.api.uuid/src/org/argeo/api/uuid/ConcurrentTimeUuidState.java b/org.argeo.api.uuid/src/org/argeo/api/uuid/ConcurrentTimeUuidState.java index 9a414e8fa..fd158fb83 100644 --- a/org.argeo.api.uuid/src/org/argeo/api/uuid/ConcurrentTimeUuidState.java +++ b/org.argeo.api.uuid/src/org/argeo/api/uuid/ConcurrentTimeUuidState.java @@ -4,25 +4,35 @@ import java.security.SecureRandom; import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.Collections; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.atomic.AtomicLong; + +import org.argeo.api.uuid.UuidFactory.TimeUuidState; /** * A simple base implementation of {@link TimeUuidState}, which maintains - * different clock sequences for each thread. + * different clock sequences for each thread, based on a specified range. This + * range is defined as clock_seq_hi (cf. RFC4122) and only clock_seq_low is + * dynamically allocated. It means that there can be at most 256 parallel clock + * sequences. If that limit is reached, the clock sequence which has not be used + * for the most time is reallocated to the new thread. It is assumed that the + * context where time uUIDs will be generated will often be using thread pools + * (e.g. {@link ForkJoinPool#commonPool()}, http server, database access, etc.) + * and that such reallocation won't have to happen too often. */ -public class ConcurrentTimeUuidState implements TimeUuidState { -// public final static ThreadLocal isTimeUuidThread = new ThreadLocal<>() { -// -// @Override -// protected Boolean initialValue() { -// return false; -// } -// }; +public class ConcurrentTimeUuidState implements UuidFactory.TimeUuidState { +// private final static Logger logger = System.getLogger(ConcurrentTimeUuidState.class.getName()); /** The maximum possible value of the clocksequence. */ - private final static int MAX_CLOCKSEQUENCE = 16384; + private final static int MAX_CLOCKSEQUENCE = 0x3F00; - private final ThreadLocal holder; + private final ClockSequenceProvider clockSequenceProvider; + private final ThreadLocal currentHolder; private final Instant startInstant; /** A start timestamp to which {@link System#nanoTime()}/100 can be added. */ @@ -31,30 +41,31 @@ public class ConcurrentTimeUuidState implements TimeUuidState { private final Clock clock; private final boolean useClockForMeasurement; - private final SecureRandom secureRandom; + private long nodeIdBase; public ConcurrentTimeUuidState(SecureRandom secureRandom, Clock clock) { useClockForMeasurement = clock != null; this.clock = clock != null ? clock : Clock.systemUTC(); Objects.requireNonNull(secureRandom); - this.secureRandom = secureRandom; // compute the start reference startInstant = Instant.now(this.clock); long nowVm = nowVm(); - Duration duration = Duration.between(TimeUuidState.GREGORIAN_START, startInstant); - startTimeStamp = durationToUuidTimestamp(duration) - nowVm; + Duration duration = Duration.between(TimeUuid.TIMESTAMP_ZERO, startInstant); + startTimeStamp = TimeUuid.durationToTimestamp(duration) - nowVm; + + clockSequenceProvider = new ClockSequenceProvider(secureRandom); // initalise a state per thread - holder = new ThreadLocal<>() { + currentHolder = new ThreadLocal<>() { @Override - protected Holder initialValue() { - Holder value = new Holder(); - value.lastTimestamp = startTimeStamp; - value.clockSequence = newClockSequence(); -// isTimeUuidThread.set(true); + protected ConcurrentTimeUuidState.Holder initialValue() { + ConcurrentTimeUuidState.Holder value = new ConcurrentTimeUuidState.Holder(); + value.threadId = Thread.currentThread().getId(); + value.lastTimestamp = 0; + clockSequenceProvider.newClockSequence(value); return value; } }; @@ -65,22 +76,18 @@ public class ConcurrentTimeUuidState implements TimeUuidState { */ public long useTimestamp() { + Holder holder = currentHolder.get(); + if (holder.clockSequence < 0) { + clockSequenceProvider.newClockSequence(holder); + } - long previousTimestamp = holder.get().lastTimestamp; + long previousTimestamp = holder.lastTimestamp; long now = computeNow(); // rare case where we are sooner // (e.g. if system time has changed in between and we use the clock) if (previousTimestamp > now) { - long newClockSequence = newClockSequence(); - for (int i = 0; i < 64; i++) { - if (newClockSequence != holder.get().clockSequence) - break; - newClockSequence = newClockSequence(); - } - if (newClockSequence != holder.get().clockSequence) - throw new IllegalStateException("Cannot change clock sequence"); - holder.get().clockSequence = newClockSequence; + clockSequenceProvider.newClockSequence(holder); } // very unlikely case where it took less than 100ns between both @@ -93,14 +100,14 @@ public class ConcurrentTimeUuidState implements TimeUuidState { now = computeNow(); assert previousTimestamp != now; } - holder.get().lastTimestamp = now; + holder.lastTimestamp = now; return now; } - protected long computeNow() { + private long computeNow() { if (useClockForMeasurement) { - Duration duration = Duration.between(TimeUuidState.GREGORIAN_START, Instant.now(clock)); - return durationToUuidTimestamp(duration); + Duration duration = Duration.between(TimeUuid.TIMESTAMP_ZERO, Instant.now(clock)); + return TimeUuid.durationToTimestamp(duration); } else { return startTimeStamp + nowVm(); } @@ -110,31 +117,197 @@ public class ConcurrentTimeUuidState implements TimeUuidState { return System.nanoTime() / 100; } - protected long durationToUuidTimestamp(Duration duration) { - return (duration.getSeconds() * 10000000 + duration.getNano() / 100); + @Override + public long getClockSequence() { + return currentHolder.get().clockSequence; } - /* - * STATE OPERATIONS - */ + @Override + public long getLastTimestamp() { + return currentHolder.get().lastTimestamp; + } + + protected void reset(long nodeIdBase, long range) { + synchronized (clockSequenceProvider) { + this.nodeIdBase = nodeIdBase; + clockSequenceProvider.reset(range); + clockSequenceProvider.notifyAll(); + } + } - protected long newClockSequence() { - return secureRandom.nextInt(ConcurrentTimeUuidState.MAX_CLOCKSEQUENCE); + @Override + public long getLeastSignificantBits() { + return currentHolder.get().leastSig; + } + + @Override + public long getMostSignificantBits() { + long timestamp = useTimestamp(); + long mostSig = TimeUuid.toMostSignificantBits(timestamp); + return mostSig; } /* - * ACCESSORS + * INTERNAL CLASSSES */ + private class Holder { + private long lastTimestamp; + private long clockSequence; + private long threadId; -// @Override -// public byte[] getNodeId() { -// byte[] arr = new byte[6]; -// System.arraycopy(holder.get().nodeId, 0, arr, 0, 6); -// return arr; -// } + private long leastSig; + + @Override + public boolean equals(Object obj) { + boolean isItself = this == obj; + if (!isItself && clockSequence == ((Holder) obj).clockSequence) + throw new IllegalStateException("There is another holder with the same clockSequence " + clockSequence); + return isItself; + } + + private void setClockSequence(long clockSequence) { + this.clockSequence = clockSequence; + this.leastSig = nodeIdBase // already computed node base + | (((clockSequence & 0x3F00) >> 8) << 56) // clk_seq_hi_res + | ((clockSequence & 0xFF) << 48); // clk_seq_low + } + + @Override + public String toString() { + return "Holder " + clockSequence + ", threadId=" + threadId + ", lastTimestamp=" + lastTimestamp; + } + + } + + private static class ClockSequenceProvider { + /** Set to an illegal value. */ + private long range = MAX_CLOCKSEQUENCE;// this is actually clk_seq_hi +// private int rangeSize = 256; + private volatile long min; + private volatile long max; + private final AtomicLong counter = new AtomicLong(-1); + + private final SecureRandom secureRandom; + + private final Map activeHolders = Collections.synchronizedMap(new WeakHashMap<>()); + + ClockSequenceProvider(SecureRandom secureRandom) { + this.secureRandom = secureRandom; +// reset(range); + } + + synchronized void reset(long range) { + // long min = secureRandom.nextInt(ConcurrentTimeUuidState.MAX_CLOCKSEQUENCE - + // rangeSize); + // long max = min + rangeSize; + + long min, max; + if (range >= 0) { + if (range > MAX_CLOCKSEQUENCE) + throw new IllegalArgumentException("Range " + Long.toHexString(range) + " is too big"); + long previousRange = this.range; + this.range = range & 0x3F00; + if (this.range != range) { + this.range = previousRange; + throw new IllegalArgumentException( + "Range is not properly formatted: " + range + " (0x" + Long.toHexString(range) + ")"); + } + + min = this.range; + max = min | 0xFF; + } else {// full range + this.range = range; + min = 0; + max = MAX_CLOCKSEQUENCE; + } + assert min == (int) min; + assert max == (int) max; + + // TODO rather use assertions + if (min >= max) + throw new IllegalArgumentException("Minimum " + min + " is bigger than maximum " + max); + if (min < 0 || min > MAX_CLOCKSEQUENCE) + throw new IllegalArgumentException("Minimum " + min + " is not valid"); + if (max < 0 || max > MAX_CLOCKSEQUENCE) + throw new IllegalArgumentException("Maximum " + max + " is not valid"); + this.min = min; + this.max = max; + + Set active = activeHolders.keySet(); + int activeCount = active.size(); + if (activeCount > getRangeSize()) + throw new IllegalStateException( + "There are too many holders for range [" + min + "," + max + "] : " + activeCount); + + // reset the counter with a random value in range + long firstCount = min + secureRandom.nextInt(getRangeSize()); + counter.set(firstCount); + + // reset holders + for (Holder holder : active) { + // save old clocksequence? + newClockSequence(holder); + } + } + + private synchronized void newClockSequence(Holder holder) { + // Too many holders, we will remove the oldes ones + while (activeHolders.size() > getRangeSize()) { + long oldestTimeStamp = -1; + Holder holderToRemove = null; + holders: for (Holder h : activeHolders.keySet()) { + if (h == holder)// skip the caller + continue holders; + + if (oldestTimeStamp < 0) { + oldestTimeStamp = h.lastTimestamp; + holderToRemove = h; + } + if (h.lastTimestamp <= oldestTimeStamp) { + oldestTimeStamp = h.lastTimestamp; + holderToRemove = h; + } + + } + assert holderToRemove != null; +// long oldClockSequence = holderToRemove.clockSequence; + holderToRemove.clockSequence = -1; + activeHolders.remove(holderToRemove); +// if (logger.isLoggable(WARNING)) +// logger.log(WARNING, "Removed " + holderToRemove + ", oldClockSequence=" + oldClockSequence); + } + + long newClockSequence = -1; + int tryCount = 0;// an explicit exit condition + do { + tryCount++; + if (tryCount >= getRangeSize()) + throw new IllegalStateException("No more clock sequence available"); + + newClockSequence = counter.incrementAndGet(); + assert newClockSequence >= 0 : "Clock sequence cannot be negative"; + if (newClockSequence > max) { + // reset counter + newClockSequence = min; + counter.set(newClockSequence); + } + } while (activeHolders.containsValue(newClockSequence)); + // TODO use an iterator to check the values + holder.setClockSequence(newClockSequence); + activeHolders.put(holder, newClockSequence); +// if (logger.isLoggable(DEBUG)) { +// String clockDesc = range >= 0 ? Long.toHexString(newClockSequence & 0x00FF) +// : Long.toHexString(newClockSequence | 0x8000); +// String rangeDesc = Long.toHexString(min | 0x8000) + "-" + Long.toHexString(max | 0x8000); +// logger.log(DEBUG, "New clocksequence " + clockDesc + " for thread " + Thread.currentThread().getId() +// + " (in range " + rangeDesc + ")"); +// } + } + + private synchronized int getRangeSize() { + return (int) (max - min); + } - @Override - public long getClockSequence() { - return holder.get().clockSequence; } + }