Extracting Index content from Coherence

When storing semi-structured data within a Coherence cache it’s common to add appropriate indexes on key fields to speed up queries and value extraction. Coherence indexes work in a similar way to those of a more traditional database, in that they allow queries to more efficiently target interesting entries, (or rows as they would be in the database world), than full cache (or table) scans.

It’s not uncommon to want to be able to see just want values are within the index, either as a debugging tool or to help build responsive screens and pick-lists in some interactive UI. Below you’ll find a handy aggregator capable of pain-free index content extraction.

The full code is at the end of the post, (stripped down to keep things uncluttered), or if you’re after the full unadulterated version, then take a look here my GitHub account. For those wanting more of a walk-through of the code read on…

Walking through the code

So the first thing we’re going to need is some logic that runs on the storage enabled nodes within the Coherence cluster. It its to gain access to the cache indexes, then it will need to run on the service the cache belongs too. The obvious candidates are Coherence aggregators and entry processors. The latter locks the cache entries and support mutation of the data, which we don’t want or need, so we’re going to built this using an aggregator.

Introducing the IndexContentExtractor – as recommended by dentists

The aggregator takes as contructor arguments the ValueExtractor that was used to create the index and, optionally, a Filter that can be applied server side to filter the index content. Such filtering can be useful if, for example, you’re building a user filterable pick-list. The two main methods that Coherence is going to call on the class are the aggregate() and aggregateResults() methods.

The former, aggregate, is normally call by Coherence to aggregate a set of entries. In our case we’re not interested about the entries – we just need a single entry to get at the index content. Once we have the index we extract the set of unique values it holds, optionally filter them, and the return them as the result of the call. Coherence will normal call aggregate() once on each storage enabled node, though this isn’t guaranteed and may change in future versions. See notes on performance below on how this can effect the functionality.

The latter, aggregateResults, method is called by Coherence to reduce the results from all the aggregate() calls into a single result. This is run either on the cluster member node instigating the aggregate call, or on the proxy node if the call was instigated by an Extend client. In our case the aggregateResults() call just needs to build a ‘master set’ from all the individual result sets returned from each aggregate() invocation.

@Portable
public class IndexContentExtractor implements InvocableMap.ParallelAwareAggregator, Serializable {

    @PortableProperty(value = 1)
    private ValueExtractor indexExtractor;

    @PortableProperty(value = 2)
    private Filter filter;

    public IndexContentExtractor(ValueExtractor indexExtractor) {
        this(indexExtractor, null);
    }

    public IndexContentExtractor(ValueExtractor indexExtractor, Filter filter) {
        this.indexExtractor = indexExtractor;
        this.filter = filter;
    }

    @Override
    public Object aggregate(Set set) {
        if (set.isEmpty()) {
            return Collections.emptySet();
        }
        final Map indexMap = getIndexMap(set);
        final Set values = getIndexedValues(indexMap);
        filterValues(values);
        return values;
    }

    @Override
    public Object aggregateResults(Collection results) {
        Set values = new HashSet();
        for (Collection s : (Collection<Collection>)results){
            values.addAll(s);
        }
        return values;
    }

    ...
}

Getting hold of the elusive index

When Coherence calls our aggregate(Set) method it passing through a set of cache entries. We don’t really care about them, as we’re not actually aggregating from them, except they do give us a way of getting hold of the map of indexes defined on the cache. That’s assuming they are BinaryEntry‘s, otherwise we fail gracefully. This means the extractor won’t work on some cache types, such as replicated caches. However, it will work with both Pof and standard Java serialisation.

    private static Map getIndexMap(Set set) {
        final Object entry = set.iterator().next();
        if (!(entry instanceof BinaryEntry)) {
            throw new UnsupportedOperationException("Only supports binary caches");
        }

        final BinaryEntry binaryEntry = (BinaryEntry) entry;
        return binaryEntry.getBackingMapContext().getIndexMap();
    }

Extracting the index content

Once we have the map of indexes finding the index we’re interested in and grabbing its content is childs play:

    private Set getIndexedValues(Map indexMap) {
        final MapIndex index = (MapIndex) indexMap.get(indexExtractor);
        final Map contents = index.getIndexContents();
        return contents == null ? Collections.emptySet() : new HashSet<Object>(contents.keySet());
    }

Filtering the values server side

Given the index values we can now apply our optional filter. Again the code is pretty straight forward:

    private void filterValues(Set values) {
        if (filter == null) {
            return;
        }
        for (final Iterator it = values.iterator(); it.hasNext(); ) {
            if (!filter.evaluate(it.next())) {
                it.remove();
            }
        }
    }

Bringing it all together

If we bring all of that together we end up with the following:

@Portable
public class IndexContentExtractor implements InvocableMap.ParallelAwareAggregator, Serializable {

    @PortableProperty(value = 1)
    private ValueExtractor indexExtractor;

    @PortableProperty(value = 2)
    private Filter filter;

    public IndexContentExtractor(ValueExtractor indexExtractor) {
        this(indexExtractor, null);
    }

    public IndexContentExtractor(ValueExtractor indexExtractor, Filter filter) {
        this.indexExtractor = indexExtractor;
        this.filter = filter;
    }

    @Override
    public Object aggregate(Set set) {
        if (set.isEmpty()) {
            return Collections.emptySet();
        }
        final Map indexMap = getIndexMap(set);
        final Set values = getIndexedValues(indexMap);
        filterValues(values);
        return values;
    }

    @Override
    public Object aggregateResults(Collection results) {
        Set values = new HashSet();
        for (Collection s : (Collection<Collection>)results){
            values.addAll(s);
        }
        return values;
    }

    private Set getIndexedValues(Map indexMap) {
        final MapIndex index = (MapIndex) indexMap.get(indexExtractor);
        final Map contents = index.getIndexContents();
        return contents == null ? Collections.emptySet() : new HashSet<Object>(contents.keySet());
    }

    private void filterValues(Set values) {
        if (filter == null) {
            return;
        }
        for (final Iterator it = values.iterator(); it.hasNext(); ) {
            if (!filter.evaluate(it.next())) {
                it.remove();
            }
        }
    }

    private static Map getIndexMap(Set set) {
        final Object entry = set.iterator().next();
        if (!(entry instanceof BinaryEntry)) {
            throw new UnsupportedOperationException("Only supports binary caches");
        }

        final BinaryEntry binaryEntry = (BinaryEntry) entry;
        return binaryEntry.getBackingMapContext().getIndexMap();
    }
}

Using the extractor

To use the extractor from client code you just need to call aggregate() on the NamedCache instance. Set the filter parameter to AlwaysFilter.INSTANCE to ensure that the aggregator runs at least once on each storage enabled node. Then just create an instance of the IndexContentExtractor, passing through the extractor used to create the index and, optionally, any filters you might want to apply.

If you do use a filter then it will be passed the contents of the index i.e. what ever the extractor used to build the index returns from its extract() call. If you want the filter to operate the value as a whole you’ll need to pass through the IdentityExtractor.INSTANCE as the extractor in the filters contructor. Alternatively, where the value held in the index is more complex, it is possible to pass Pof or reflection based value extractors to the filter to pick our sub fields, if needed.

In the example below, the index in question stores String values and the code is pulling back any values stored in the index that contain a specific sub-string entered by a user:

    Filter filter = new LikeFilter(IdentityExtractor.INSTANCE, "%" + searchTerm + "%", false);
    Set<String> results = (Set<String>)cache.aggregate(AlwaysFilter.INSTANCE, new IndexContentExtractor(indexExtractor, filter));

You can find some examples in the tests on my GitHub account.

Notes on performance

Coherence can invoke the aggregate() more than once per storage-enabled node if it so chooses e.g. to process different partitions of data in parallel. The criteria it uses to determine how many times to call seems to vary and I’ve not had the time to investigate the exact algorithm, and even if I did, it may well change in a later release. When Coherence does call the aggregator more than once per node, then each call will produce the same results. While this is not ideal from a performance point of view, functionally the reduce phase, i.e. the aggregateResults(), will remove these duplicates, and so functionality is not effected.

I’ve not found a way around this potential duplication, but then again I’ve never seen it be a performance problem, so haven’t felt the need to invest a great deal of time thinking about it. It would be possible to iterate through the key set of the cache to find a key that is owned by each storage node. But such an approach may well be slower due to having to pull the keys from the storage nodes, meaning more machine hops.

That’s it. As ever if you have a questions or comments then please holla!

Over and out.

Temporal Versioning in Coherence

Storing the versions of an entity that make up its history within a Coherence cache is a common pattern. Extending the pattern to support temporal queries, i.e. queries evaluated against, and returning, the different versions of entities, as they existed at previous points in time, requires a little more work and has generally relied on the mutation of metadata. Such data mutations are generally a bad thing, as they increase code complexity and the chance of data corruption. In this post I’ll propose an alternative approach that makes use of custom Coherence indexes in place of the mutations, thereby allowing the data to remain immutable and taming complexity.

Update: While writing this post and its associated code, I was building on top of the patterns documented by Ben Stopford in his blog post on versioning in Coherence. Unbeknownst to me at the time, Alexey Ragozin had previously covered what looks to be a very similar concept to what I’m about to cover share, in his post on time-series index for managing versioned data. Hats off to Alexey. What is it they say about great minds and idiots? Still, as I’ve pretty much finished the post I thought I may as well put it out there.

First, let’s start with a little background on versioning…

Versioning Patterns

Using Coherence to cache not only the latest version of an entity, but its history as well, is a powerful pattern. It’s the first step to building an immutable data fabric within Coherence. It allows not only the current state of an entity to be queried and retrieved, but also the state of the entity at any previous point in time.

As way of an example, take a warehouse stock control system, where each item in the warehouse is assigned a unique stockId and which has a requirement to generate a weekly stock level report and allow people to view historic reports.

In a traditional system, without versioning, data about the stock might be stored in a database or key-value store by its stockId. As stock levels changed, the corresponding entries would be updated, effectively erasing any history of previous states. Each week the system would build up a snapshot of the current state of the stock, generate the report and archive it off somewhere. The report is likely to be relatively large as it must make a copy of the data, so that users can view historic reports as the system keeps running and stock levels change.

What is more, if any mistakes are found in the data following the report generation will probably need a great deal of manual intervention to correct, with any reports generated after the mistake was made having to be re-run. Such correction are generally laborious and error prone. The further back the error, the more manual effort required to correct it.

So let’s look at some ways in which we can improve the design of the system…

Simple Versioning

The most simple versioning pattern is usually just to add a version number to the business key used to store the entity. In our example the data would be keyed against the combination of a stockId and a version number. Each time the data associated with the stock needs to be updated, e.g. as new stock comes in, or stock is sent out to fulfil a customer’s order, the new stock level is stored as a new entry with an incremented version number. In this way the system builds up a series of versions for each item under stock control as the data changes over time.

simple versioned entry

The weekly stock report now needs to store nothing more than the full set of versioned key for the stock items. As updates to the stock come in, new versions will be created, so the report does not need to take a full snapshot of the data, any previous report can be fully rebuilt from just this set of versioned keys as the underlying data is immutable.

Unfortunately, managing the correction of reports when errors in the stock levels are corrected will still be largely a manual process.

Note: To still allow key based access to an entity using the business key alone, (which is a common requirement and one you’d want to keep very quick and efficient), then one of the patterns outlined in Ben’s post on the subject can, and should, be used.

Temporal Versioning

With simple versioning it’s not possible to query the system for the stock levels at a specific point in time, as the system doesn’t track when each version was the live version. Temporal versioning fixes this shortcoming by storing the timestamp of the when, as new versions arrive and supersede previous version the system builds up a time-line of versions, with each version having a time window where it was considered the latest or live version. This makes it possible to retrospectively ask what version of an entity was live at any particular point in time.

temporal versioned entry

Our stock report need now keep nothing more than the timestamp at which the report was generated. From this, the system is able to gather the state of the system at that point in time, and rebuild the report. In fact, the system can now generate the report at any arbitrary previous point in time too.

Still, even with temporal versioning the system doesn’t elegantly deal with retrospective ‘corrections’ to the stock levels as it stands. To solve that side of the problem a different design change needs to be made. All we’ve effectively changed so far is the key by which data entries can be retrieved. To enhance the system to elegantly handle data corrections we’d probably look to change what’s in the value of the entry from the current system’s snapshot of state i.e. the current stock level, to a delta i.e. a positive or negative adjustment to the previous stock level. So, for example, when new stock comes in a new entry is added with a positive stock adjustment record, and when stock goes out to customers then a negative adjustment record is recorded. Such records often reference the related delivery report or customer order that was the source of the change. What we’re discussing here is effectively an event sourcing model, which Martin Fowler covers in his own blog and, as it’s pretty much off topic for this post, I’ll leave you to read Martin’s posts on the subject if you so please.  In addition to event sourcing, a system that needs to support changes to historical data may well also benefit from support for bi-temporal data, which I’ll leave as another exercise for the reader – though as a hint, the custom indexes covered below can be used to index as many temporal dimensions as you fancy having, (or can get your head around!).

As you can see, storing changes to systems as a time-series of facts about how the system changes is a powerful pattern. Now let’s take a look at how you might implement this on top of Oracle Coherence.

Implementing versioning within Coherence

Simple versioning in Coherence

The implementation details of simple versioning in Coherence are nothing new. I learned about them from posts such as Ben Stopford’s Coherence Versioning, and I won’t go into any detail here. Suffice to say that all flavors of the pattern use key affinity to ensure all versions of an entity reside on the same Coherence node, new versions are routed through a single latest entry to remove race conditions, and a trigger is used to set the version of entries. The types used as the key and the value within the grid would look something like this:

@Portable
public class VKey {
    @PortableProperty(value = 1)
    private int version;

    @PortableProperty(value = 2)
    private Object domainKey;

    @Override
    public int getVersion() { return version; }

    @Override
    public Object getDomainObject() { return domainKey; }

    ...
}

@Portable
public class VValue {
    @PortableProperty(value = 1)
    private int version;

    @PortableProperty(value = 2)
    private Object domainValue;

    @Override
    public int getVersion() { return version; }

    @Override
    public Object getDomainObject() { return domainValue; }

    ...
}

If you’re not familiar with these patterns, then I strongly suggest you read Ben’s post before carrying on.

Temporal versioning in Coherence

Extending the model to include a temporal dimension requires an arrived timestamp to be captured as part of each values metadata. A first attempt at this might look like:

@Portable
public class VValue {
   @PortableProperty(value = 1)
   private int version;

   @PortableProperty(value = 2)
   private Object domainValue;

   @PortableProperty(value = 3)
   private Date arrived;

   @Override
   public int getVersion() { return version; }

   @Override
   public Object getDomainObject() { return domainValue; }

   @Override
   public Date getArrived() { return arrived; }

...
}

However, there is a problem with this basic approach : while capturing the timestamp at the point of entry into the system is simple enough, (problems of distributed clocks aside), implementing temporal queries on top of the data is problematic: it is easy to add a lower-bound constraint to retrieve all entries where the created timestamp is greater-than-or-equal-to the required snapshot. Unfortunately, adding the upper bound is not so easy, as the time at which a version is superseded by another is not stored on the entry itself, but is implied from the arrived timestamp of the subsequent version or, in the case of the currently live version, is open-ended.

The problem of no upper-bounds for temporal queries is most commonly resolved by adding a superseded timestamp to each entry, and setting it to match the arrived timestamp of the superseding version, when that arrives. With this approach, it is now trivial to add an upper-bound constraint along side the lower-bound so that only objects that were live at the specified snapshot time are retained within the data set i.e.:

arrived <= snapshot && snapshot < superseded

To improve the performance of such a common filter operation it is common to add indexes to both the created and superseded fields.

The problem with the above approach is that it copies data, i.e. the arrived timestamp from one entry to the superseded of another, which can become out of sync, and it mutates an existing entry, (previous version), when adding a new one, which opens up many more possibilities for data corruption and increased code complexity.

The solution I’m proposing is to not store the superseded timestamp on the entity itself, but to make use of Coherence’s custom index support to build a time-line index designed to support temporal queries. The index will store a time-line of versions per business-key e.g. the stockId in our example above. The time-line will consist of the set of versioned keys, ordered by their arrived timestamp, (or indeed any other timestamp). This is relatively simple, requiring only a custom index, extractor and filter. So let’s see some code…

1. Create a IndexAwareExtractor to install your custom index

Indexes in Coherence are tied to an Extractor. For normal indexes this needs to be the same extractor that is used by any filter that you expect to make use of the index. It is possible to add custom indexes and extractors that are compatible with the in-built Coherence filters, but that is not what we are trying to achieve here, as no in-built filters are going to be able to understand the time-line stored within the index, and so the role of the extractor is just to install and un-install the index.

@Portable
public class TemporalExtractor implements IndexAwareExtractor, Serializable {

   @PortableProperty(value = 1)
   private ValueExtractor businessKeyExtractor;

   @PortableProperty(value = 2)
   private ValueExtractor timestampExtractor;

   @PortableProperty(value = 3)
   private ValueExtractor versionExtractor;

   public TemporalExtractor(ValueExtractor businessKeyExtractor, ValueExtractor timestampExtractor, ValueExtractor versionExtractor) {
      this.businessKeyExtractor = businessKeyExtractor;
      this.timestampExtractor = timestampExtractor;
      this.versionExtractor = versionExtractor;
   }

   @Override
   public MapIndex createIndex(boolean ordered, Comparator comparator, Map mapIndex, BackingMapContext context) {
      MapIndex index = new TemporalIndex(this, context);
      mapIndex.put(this, index);
      return index;
   }

   @Override
   public MapIndex destroyIndex(Map mapIndex) {
      return (MapIndex) mapIndex.remove(this);
   }
}

The extractor takes the ValueExtractors that the index will need to extract the business key from the entry’s key, and both the version and timestamp from the entry’s value. These are made available to our custom TemporalIndex which is covered below.

The only methods of note within the extractor itself are the createIndex(), and destroyIndex(), which are called by Coherence as our index is added and removed, respectively, (or on new nodes as they join the cluster). The create method adds an instance of our custom index to the cache’s index map, keyed off the extractor, while the destroy removes the said same entry.  Because the extractor is used as the key into the index map it is important that both equals() and hashcode() are implemented correctly.

2. Create a custom index that tracks the timeline of each object

The index will hold a map of business key to the time line of versions of that key. The time line stores the temporal points, i.e. the arrived time stamps, at which each new version arrives and the corresponding entry’s key.

Temporal reverse index

So let’s first create something to hold the time line. The time line holds an ordered map of timestamp to their key. However, things are slightly complicated by the fact that it is possible to have two versions of an entity at the same timestamp, so the class need to be able to accommodate this, which it does by storing not a single entry against each timestamp, but an ordered set of entries, ordered by version.

public class TimeLine {
   private TreeMap<Long, TreeSet> timeLine;

   public TimeLine(Comparator comparator) {
      this.comparator = comparator;
      this.timeLine = new TreeMap<Long, TreeSet>();
   }

   public void insert(Object key, long timestamp) {
      Set<Object> entries = getEntriesAtTimestamp(timestamp, true);
      entries.add(key);
   }

   public boolean remove(Object key, long timestamp) {
      Set<Object> keys = getEntriesAtTimestamp(timestamp, false);
      if (keys == null || !keys.remove(key)) {
         return false;
      }

      if (keys.isEmpty()) {
         timeLine.remove(timestamp);
      }

      return true;
   }

   public boolean isEmpty() {
      return timeLine.isEmpty();
   }

   public Object get(long snapshot) {
      Map.Entry<?, TreeSet<Object>> floor =timeLine.floorEntry(snapshot);
      return floor == null ? null : floor.getValue().last();
   }

   private Set<Object> getEntriesAtTimestamp(long timestamp, boolean createIfNecessary) {
      TreeSet<Object> keys = timeLine.get(timestamp);
      if (keys == null && createIfNecessary) {
         keys = new TreeSet<Object>(comparator);
         timeLine.put(timestamp, keys);
      }
      return keys;
   }
}

Next, we’ll need the custom reverse index itself. To conform to what Coherence expects the index needs to implement the MapIndex interface. However, most of the methods on the interface will never be called as the index will only be used from our custom filter(s), so the set of methods on the interface used by the standard Coherence filters can throw an UnsupportedOperationException, (and have been excluded from the code below to improve readability).

public class TemporalIndex implements MapIndex {
   private final TemporalExtractor extractor;
   private final Serializer serialiser;
   private final Map<Object, TimeLine> timeLineIndex;

   public TemporalIndex(TemporalExtractor extractor, BackingMapContext ctx) {
      this.extractor = extractor;
      this.serialiser = ctx.getManagerContext().getCacheService().getSerializer();
      this.timeLineIndex = new HashMap<Object, TimeLine>();
   }

   @Override
   public void insert(Map.Entry entry) {
      TimeLine timeLine = getTimeLine(entry, true);
      addToTimeLine(entry, timeLine);
   }

   @Override
   public void delete(Map.Entry entry) {
      TimeLine timeLine = getTimeLine(entry, true);
      removeFromTimeLine(entry, timeLine);
   }

   public TimeLine getTimeLine(Object fullKey) {
      Object businessKey = extractBusinessKeyFromKey(fullKey);
      return getTimeLine(businessKey, false);
   }

   private TimeLine getTimeLine(Map.Entry entry, boolean createIfNeeded) {
      Object businessKey = extractBusinessKeyFromEntry(entry);
      return getTimeLine(businessKey, createIfNeeded);
   }

   private TimeLine getTimeLine(Object businessKey, boolean createIfNeeded) {
      TimeLine timeLine = timeLineIndex.get(businessKey);
      if (timeLine == null && createIfNeeded) {
         timeLine = new TimeLine();
         timeLineIndex.put(businessKey, timeLine);
      }

      return timeLine;
   }

   private void addToTimeLine(Map.Entry entry, TimeLine timeLine) {
      Long arrived = extractFromEntry(extractor.getTimestampExtractor(), entry);
      timeLine.insert(getCoherenceKey(entry), arrived);
   }

   private void removeFromTimeLine(Map.Entry entry, TimeLine timeLine) {
      Object arrived = extractFromEntry(extractor.getTimestampExtractor(), entry);
      timeLine.remove(getCoherenceKey(entry), arrived);
      if (timeLine.isEmpty()) {
         timeLineIndex.remove(extractBusinessKeyFromEntry(entry));
      }
   }

   private Object extractBusinessKeyFromEntry(Map.Entry entry) {
      return InvocableMapHelper.extractFromEntry(extractor.getKeyExtractor(), entry);
   }

   private Object extractBusinessKeyFromKey(Object fullKey) {
      if (extractor.getKeyExtractor() instanceof PofExtractor) {
         PofExtractor keyExtractor = (...) extractor.getKeyExtractor();
         PofNavigator navigator = keyExtractor.getNavigator();
         PofValue pofValue = PofValueParser.parse((Binary) fullKey, (PofContext) serialiser);
         return navigator.navigate(pofValue).getValue();
      }
      return extractor.getKeyExtractor().extract(fullKey);
   }

   private static Object getCoherenceKey(Map.Entry entry) {
      if (entry == null) {
         return null;
      }

      return entry instanceof BinaryEntry ? ((BinaryEntry) entry).getBinaryKey() : entry.getKey();
   }
}

So there’s a bit of complexity in there around handling both standard Java serialization and Coherence’s POF, (which can be removed if you only use one), but the crux of the class is the implementation of the MapIndex calls for insertEntry() and deleteEntry(), which update the index as the state of the cache changes. By the way, updateEntry() throws an unsupported operation in my implementation as the data is immutable, right!? But its easy enough to enhance it to support mutation should you want.

The class also has a public getTimeLine(Object fullKey) method, which will be used later by our custom filters.

3. Create some custom filters to use the new temporal index

The custom filters that will use our custom index must implement  IndexAwareFilter. Below is a simple SnapshotFilter class, which will limit the result set of an operation to the versions of the entities that were live at a particular point in time, though it is easy enough to add other filters to bring back a range of versions, for example.

@Portable
public class SnapshotFilter implements IndexAwareFilter, Serializable {
   @PortableProperty(value = 1)
   private TemporalExtractor extractor;

   @PortableProperty(value = 2)
   private long snapshot;

   public SnapshotFilter(TemporalExtractor extractor, Object snapshot) {
      this.extractor = extractor;
      this.snapshot = snapshot;
   }

   @Override
   public Filter applyIndex(Map indexMap, Set keys) {
      TemporalIndex index = getIndex(indexMap);
      List<Object> matches = new ArrayList<Object>();
      Iterator it = keys.iterator();
      while (it.hasNext()) {
         Object key = it.next();
         if (!matches(key, index)) {
            matches.remove(key);
         }
      }
      return null;
   }

   private TemporalIndex getIndex(Map indexMap) {
      Object index = indexMap.get(extractor);
      if (index instanceof TemporalIndex) {
         return (TemporalIndex) index;
      }

      throw new UnsupportedOperationException(...);
   }

   private boolean matches(Object fullKey, TemporalIndex index) {
      TimeLine timeLine = index.getTimeLine(fullKey);
      return fullKey.equals(timeLine.get(snapshot));
   }
}

And that’s it, oh mind you, it might be handy if you install you index…

4. Installing and use your new shiny temporal index

Is as easy as…

   // Create it:
   TemporalExtractor extractor = new TemporalExtractor(buisnessKeyExtractor, createdTimestampExtractor, versionExtractor);
   cache.addIndex(extactor, false, null);

   // Use it:
   Set results = cache.keySet(new SnapshotFilter(extractor, timestamp));

Congratulation, you now have a temporal index that doesn’t rely on mutating the entries within your cache, and take it from me, such mutations are a constant source of bugs.

The full source code for this is available on my GitHub account.

What is not covered above is how to change the system to use an event-sourcing pattern or bi-temporal queries. This being the second part of the design changes we discussed. However, event-sourcing and all of its benefits is covered in great detail on Martin’s blog, and adding support for bi-temporal queries using the custom index is actually fairly trivial.

This has been a fairly lengthy post, so I’ll leave it there. Please feel free to ask any questions, (or point out any mistakes!). Happy coding all.

Over and out.

(I’d like to say thanks to Harvey Raja from Oracle for the discussions we had on this subject, which led to the design going down the routes it did – thanks Harvey!).