Temporal Versioning in Coherence

Storing the versions of an entity that make up its history within a Coherence cache is a common pattern. Extending the pattern to support temporal queries, i.e. queries evaluated against, and returning, the different versions of entities, as they existed at previous points in time, requires a little more work and has generally relied on the mutation of metadata. Such data mutations are generally a bad thing, as they increase code complexity and the chance of data corruption. In this post I’ll propose an alternative approach that makes use of custom Coherence indexes in place of the mutations, thereby allowing the data to remain immutable and taming complexity.

Update: While writing this post and its associated code, I was building on top of the patterns documented by Ben Stopford in his blog post on versioning in Coherence. Unbeknownst to me at the time, Alexey Ragozin had previously covered what looks to be a very similar concept to what I’m about to cover share, in his post on time-series index for managing versioned data. Hats off to Alexey. What is it they say about great minds and idiots? Still, as I’ve pretty much finished the post I thought I may as well put it out there.

First, let’s start with a little background on versioning…

Versioning Patterns

Using Coherence to cache not only the latest version of an entity, but its history as well, is a powerful pattern. It’s the first step to building an immutable data fabric within Coherence. It allows not only the current state of an entity to be queried and retrieved, but also the state of the entity at any previous point in time.

As way of an example, take a warehouse stock control system, where each item in the warehouse is assigned a unique stockId and which has a requirement to generate a weekly stock level report and allow people to view historic reports.

In a traditional system, without versioning, data about the stock might be stored in a database or key-value store by its stockId. As stock levels changed, the corresponding entries would be updated, effectively erasing any history of previous states. Each week the system would build up a snapshot of the current state of the stock, generate the report and archive it off somewhere. The report is likely to be relatively large as it must make a copy of the data, so that users can view historic reports as the system keeps running and stock levels change.

What is more, if any mistakes are found in the data following the report generation will probably need a great deal of manual intervention to correct, with any reports generated after the mistake was made having to be re-run. Such correction are generally laborious and error prone. The further back the error, the more manual effort required to correct it.

So let’s look at some ways in which we can improve the design of the system…

Simple Versioning

The most simple versioning pattern is usually just to add a version number to the business key used to store the entity. In our example the data would be keyed against the combination of a stockId and a version number. Each time the data associated with the stock needs to be updated, e.g. as new stock comes in, or stock is sent out to fulfil a customer’s order, the new stock level is stored as a new entry with an incremented version number. In this way the system builds up a series of versions for each item under stock control as the data changes over time.

simple versioned entry

The weekly stock report now needs to store nothing more than the full set of versioned key for the stock items. As updates to the stock come in, new versions will be created, so the report does not need to take a full snapshot of the data, any previous report can be fully rebuilt from just this set of versioned keys as the underlying data is immutable.

Unfortunately, managing the correction of reports when errors in the stock levels are corrected will still be largely a manual process.

Note: To still allow key based access to an entity using the business key alone, (which is a common requirement and one you’d want to keep very quick and efficient), then one of the patterns outlined in Ben’s post on the subject can, and should, be used.

Temporal Versioning

With simple versioning it’s not possible to query the system for the stock levels at a specific point in time, as the system doesn’t track when each version was the live version. Temporal versioning fixes this shortcoming by storing the timestamp of the when, as new versions arrive and supersede previous version the system builds up a time-line of versions, with each version having a time window where it was considered the latest or live version. This makes it possible to retrospectively ask what version of an entity was live at any particular point in time.

temporal versioned entry

Our stock report need now keep nothing more than the timestamp at which the report was generated. From this, the system is able to gather the state of the system at that point in time, and rebuild the report. In fact, the system can now generate the report at any arbitrary previous point in time too.

Still, even with temporal versioning the system doesn’t elegantly deal with retrospective ‘corrections’ to the stock levels as it stands. To solve that side of the problem a different design change needs to be made. All we’ve effectively changed so far is the key by which data entries can be retrieved. To enhance the system to elegantly handle data corrections we’d probably look to change what’s in the value of the entry from the current system’s snapshot of state i.e. the current stock level, to a delta i.e. a positive or negative adjustment to the previous stock level. So, for example, when new stock comes in a new entry is added with a positive stock adjustment record, and when stock goes out to customers then a negative adjustment record is recorded. Such records often reference the related delivery report or customer order that was the source of the change. What we’re discussing here is effectively an event sourcing model, which Martin Fowler covers in his own blog and, as it’s pretty much off topic for this post, I’ll leave you to read Martin’s posts on the subject if you so please.  In addition to event sourcing, a system that needs to support changes to historical data may well also benefit from support for bi-temporal data, which I’ll leave as another exercise for the reader – though as a hint, the custom indexes covered below can be used to index as many temporal dimensions as you fancy having, (or can get your head around!).

As you can see, storing changes to systems as a time-series of facts about how the system changes is a powerful pattern. Now let’s take a look at how you might implement this on top of Oracle Coherence.

Implementing versioning within Coherence

Simple versioning in Coherence

The implementation details of simple versioning in Coherence are nothing new. I learned about them from posts such as Ben Stopford’s Coherence Versioning, and I won’t go into any detail here. Suffice to say that all flavors of the pattern use key affinity to ensure all versions of an entity reside on the same Coherence node, new versions are routed through a single latest entry to remove race conditions, and a trigger is used to set the version of entries. The types used as the key and the value within the grid would look something like this:

@Portable
public class VKey {
    @PortableProperty(value = 1)
    private int version;

    @PortableProperty(value = 2)
    private Object domainKey;

    @Override
    public int getVersion() { return version; }

    @Override
    public Object getDomainObject() { return domainKey; }

    ...
}

@Portable
public class VValue {
    @PortableProperty(value = 1)
    private int version;

    @PortableProperty(value = 2)
    private Object domainValue;

    @Override
    public int getVersion() { return version; }

    @Override
    public Object getDomainObject() { return domainValue; }

    ...
}

If you’re not familiar with these patterns, then I strongly suggest you read Ben’s post before carrying on.

Temporal versioning in Coherence

Extending the model to include a temporal dimension requires an arrived timestamp to be captured as part of each values metadata. A first attempt at this might look like:

@Portable
public class VValue {
   @PortableProperty(value = 1)
   private int version;

   @PortableProperty(value = 2)
   private Object domainValue;

   @PortableProperty(value = 3)
   private Date arrived;

   @Override
   public int getVersion() { return version; }

   @Override
   public Object getDomainObject() { return domainValue; }

   @Override
   public Date getArrived() { return arrived; }

...
}

However, there is a problem with this basic approach : while capturing the timestamp at the point of entry into the system is simple enough, (problems of distributed clocks aside), implementing temporal queries on top of the data is problematic: it is easy to add a lower-bound constraint to retrieve all entries where the created timestamp is greater-than-or-equal-to the required snapshot. Unfortunately, adding the upper bound is not so easy, as the time at which a version is superseded by another is not stored on the entry itself, but is implied from the arrived timestamp of the subsequent version or, in the case of the currently live version, is open-ended.

The problem of no upper-bounds for temporal queries is most commonly resolved by adding a superseded timestamp to each entry, and setting it to match the arrived timestamp of the superseding version, when that arrives. With this approach, it is now trivial to add an upper-bound constraint along side the lower-bound so that only objects that were live at the specified snapshot time are retained within the data set i.e.:

arrived <= snapshot && snapshot < superseded

To improve the performance of such a common filter operation it is common to add indexes to both the created and superseded fields.

The problem with the above approach is that it copies data, i.e. the arrived timestamp from one entry to the superseded of another, which can become out of sync, and it mutates an existing entry, (previous version), when adding a new one, which opens up many more possibilities for data corruption and increased code complexity.

The solution I’m proposing is to not store the superseded timestamp on the entity itself, but to make use of Coherence’s custom index support to build a time-line index designed to support temporal queries. The index will store a time-line of versions per business-key e.g. the stockId in our example above. The time-line will consist of the set of versioned keys, ordered by their arrived timestamp, (or indeed any other timestamp). This is relatively simple, requiring only a custom index, extractor and filter. So let’s see some code…

1. Create a IndexAwareExtractor to install your custom index

Indexes in Coherence are tied to an Extractor. For normal indexes this needs to be the same extractor that is used by any filter that you expect to make use of the index. It is possible to add custom indexes and extractors that are compatible with the in-built Coherence filters, but that is not what we are trying to achieve here, as no in-built filters are going to be able to understand the time-line stored within the index, and so the role of the extractor is just to install and un-install the index.

@Portable
public class TemporalExtractor implements IndexAwareExtractor, Serializable {

   @PortableProperty(value = 1)
   private ValueExtractor businessKeyExtractor;

   @PortableProperty(value = 2)
   private ValueExtractor timestampExtractor;

   @PortableProperty(value = 3)
   private ValueExtractor versionExtractor;

   public TemporalExtractor(ValueExtractor businessKeyExtractor, ValueExtractor timestampExtractor, ValueExtractor versionExtractor) {
      this.businessKeyExtractor = businessKeyExtractor;
      this.timestampExtractor = timestampExtractor;
      this.versionExtractor = versionExtractor;
   }

   @Override
   public MapIndex createIndex(boolean ordered, Comparator comparator, Map mapIndex, BackingMapContext context) {
      MapIndex index = new TemporalIndex(this, context);
      mapIndex.put(this, index);
      return index;
   }

   @Override
   public MapIndex destroyIndex(Map mapIndex) {
      return (MapIndex) mapIndex.remove(this);
   }
}

The extractor takes the ValueExtractors that the index will need to extract the business key from the entry’s key, and both the version and timestamp from the entry’s value. These are made available to our custom TemporalIndex which is covered below.

The only methods of note within the extractor itself are the createIndex(), and destroyIndex(), which are called by Coherence as our index is added and removed, respectively, (or on new nodes as they join the cluster). The create method adds an instance of our custom index to the cache’s index map, keyed off the extractor, while the destroy removes the said same entry.  Because the extractor is used as the key into the index map it is important that both equals() and hashcode() are implemented correctly.

2. Create a custom index that tracks the timeline of each object

The index will hold a map of business key to the time line of versions of that key. The time line stores the temporal points, i.e. the arrived time stamps, at which each new version arrives and the corresponding entry’s key.

Temporal reverse index

So let’s first create something to hold the time line. The time line holds an ordered map of timestamp to their key. However, things are slightly complicated by the fact that it is possible to have two versions of an entity at the same timestamp, so the class need to be able to accommodate this, which it does by storing not a single entry against each timestamp, but an ordered set of entries, ordered by version.

public class TimeLine {
   private TreeMap<Long, TreeSet> timeLine;

   public TimeLine(Comparator comparator) {
      this.comparator = comparator;
      this.timeLine = new TreeMap<Long, TreeSet>();
   }

   public void insert(Object key, long timestamp) {
      Set<Object> entries = getEntriesAtTimestamp(timestamp, true);
      entries.add(key);
   }

   public boolean remove(Object key, long timestamp) {
      Set<Object> keys = getEntriesAtTimestamp(timestamp, false);
      if (keys == null || !keys.remove(key)) {
         return false;
      }

      if (keys.isEmpty()) {
         timeLine.remove(timestamp);
      }

      return true;
   }

   public boolean isEmpty() {
      return timeLine.isEmpty();
   }

   public Object get(long snapshot) {
      Map.Entry<?, TreeSet<Object>> floor =timeLine.floorEntry(snapshot);
      return floor == null ? null : floor.getValue().last();
   }

   private Set<Object> getEntriesAtTimestamp(long timestamp, boolean createIfNecessary) {
      TreeSet<Object> keys = timeLine.get(timestamp);
      if (keys == null && createIfNecessary) {
         keys = new TreeSet<Object>(comparator);
         timeLine.put(timestamp, keys);
      }
      return keys;
   }
}

Next, we’ll need the custom reverse index itself. To conform to what Coherence expects the index needs to implement the MapIndex interface. However, most of the methods on the interface will never be called as the index will only be used from our custom filter(s), so the set of methods on the interface used by the standard Coherence filters can throw an UnsupportedOperationException, (and have been excluded from the code below to improve readability).

public class TemporalIndex implements MapIndex {
   private final TemporalExtractor extractor;
   private final Serializer serialiser;
   private final Map<Object, TimeLine> timeLineIndex;

   public TemporalIndex(TemporalExtractor extractor, BackingMapContext ctx) {
      this.extractor = extractor;
      this.serialiser = ctx.getManagerContext().getCacheService().getSerializer();
      this.timeLineIndex = new HashMap<Object, TimeLine>();
   }

   @Override
   public void insert(Map.Entry entry) {
      TimeLine timeLine = getTimeLine(entry, true);
      addToTimeLine(entry, timeLine);
   }

   @Override
   public void delete(Map.Entry entry) {
      TimeLine timeLine = getTimeLine(entry, true);
      removeFromTimeLine(entry, timeLine);
   }

   public TimeLine getTimeLine(Object fullKey) {
      Object businessKey = extractBusinessKeyFromKey(fullKey);
      return getTimeLine(businessKey, false);
   }

   private TimeLine getTimeLine(Map.Entry entry, boolean createIfNeeded) {
      Object businessKey = extractBusinessKeyFromEntry(entry);
      return getTimeLine(businessKey, createIfNeeded);
   }

   private TimeLine getTimeLine(Object businessKey, boolean createIfNeeded) {
      TimeLine timeLine = timeLineIndex.get(businessKey);
      if (timeLine == null && createIfNeeded) {
         timeLine = new TimeLine();
         timeLineIndex.put(businessKey, timeLine);
      }

      return timeLine;
   }

   private void addToTimeLine(Map.Entry entry, TimeLine timeLine) {
      Long arrived = extractFromEntry(extractor.getTimestampExtractor(), entry);
      timeLine.insert(getCoherenceKey(entry), arrived);
   }

   private void removeFromTimeLine(Map.Entry entry, TimeLine timeLine) {
      Object arrived = extractFromEntry(extractor.getTimestampExtractor(), entry);
      timeLine.remove(getCoherenceKey(entry), arrived);
      if (timeLine.isEmpty()) {
         timeLineIndex.remove(extractBusinessKeyFromEntry(entry));
      }
   }

   private Object extractBusinessKeyFromEntry(Map.Entry entry) {
      return InvocableMapHelper.extractFromEntry(extractor.getKeyExtractor(), entry);
   }

   private Object extractBusinessKeyFromKey(Object fullKey) {
      if (extractor.getKeyExtractor() instanceof PofExtractor) {
         PofExtractor keyExtractor = (...) extractor.getKeyExtractor();
         PofNavigator navigator = keyExtractor.getNavigator();
         PofValue pofValue = PofValueParser.parse((Binary) fullKey, (PofContext) serialiser);
         return navigator.navigate(pofValue).getValue();
      }
      return extractor.getKeyExtractor().extract(fullKey);
   }

   private static Object getCoherenceKey(Map.Entry entry) {
      if (entry == null) {
         return null;
      }

      return entry instanceof BinaryEntry ? ((BinaryEntry) entry).getBinaryKey() : entry.getKey();
   }
}

So there’s a bit of complexity in there around handling both standard Java serialization and Coherence’s POF, (which can be removed if you only use one), but the crux of the class is the implementation of the MapIndex calls for insertEntry() and deleteEntry(), which update the index as the state of the cache changes. By the way, updateEntry() throws an unsupported operation in my implementation as the data is immutable, right!? But its easy enough to enhance it to support mutation should you want.

The class also has a public getTimeLine(Object fullKey) method, which will be used later by our custom filters.

3. Create some custom filters to use the new temporal index

The custom filters that will use our custom index must implement  IndexAwareFilter. Below is a simple SnapshotFilter class, which will limit the result set of an operation to the versions of the entities that were live at a particular point in time, though it is easy enough to add other filters to bring back a range of versions, for example.

@Portable
public class SnapshotFilter implements IndexAwareFilter, Serializable {
   @PortableProperty(value = 1)
   private TemporalExtractor extractor;

   @PortableProperty(value = 2)
   private long snapshot;

   public SnapshotFilter(TemporalExtractor extractor, Object snapshot) {
      this.extractor = extractor;
      this.snapshot = snapshot;
   }

   @Override
   public Filter applyIndex(Map indexMap, Set keys) {
      TemporalIndex index = getIndex(indexMap);
      List<Object> matches = new ArrayList<Object>();
      Iterator it = keys.iterator();
      while (it.hasNext()) {
         Object key = it.next();
         if (!matches(key, index)) {
            matches.remove(key);
         }
      }
      return null;
   }

   private TemporalIndex getIndex(Map indexMap) {
      Object index = indexMap.get(extractor);
      if (index instanceof TemporalIndex) {
         return (TemporalIndex) index;
      }

      throw new UnsupportedOperationException(...);
   }

   private boolean matches(Object fullKey, TemporalIndex index) {
      TimeLine timeLine = index.getTimeLine(fullKey);
      return fullKey.equals(timeLine.get(snapshot));
   }
}

And that’s it, oh mind you, it might be handy if you install you index…

4. Installing and use your new shiny temporal index

Is as easy as…

   // Create it:
   TemporalExtractor extractor = new TemporalExtractor(buisnessKeyExtractor, createdTimestampExtractor, versionExtractor);
   cache.addIndex(extactor, false, null);

   // Use it:
   Set results = cache.keySet(new SnapshotFilter(extractor, timestamp));

Congratulation, you now have a temporal index that doesn’t rely on mutating the entries within your cache, and take it from me, such mutations are a constant source of bugs.

The full source code for this is available on my GitHub account.

What is not covered above is how to change the system to use an event-sourcing pattern or bi-temporal queries. This being the second part of the design changes we discussed. However, event-sourcing and all of its benefits is covered in great detail on Martin’s blog, and adding support for bi-temporal queries using the custom index is actually fairly trivial.

This has been a fairly lengthy post, so I’ll leave it there. Please feel free to ask any questions, (or point out any mistakes!). Happy coding all.

Over and out.

(I’d like to say thanks to Harvey Raja from Oracle for the discussions we had on this subject, which led to the design going down the routes it did – thanks Harvey!).

Advertisements

7 thoughts on “Temporal Versioning in Coherence

  1. Pingback: www.BenStopford.com » Blog Archive » Latest-Versioned/Marker Patterns and MVCC

    • Absolutely. The above can be used to build indexes for both timelines, or more if you so desire. What would be an interesting pattern to implement would be to use an event-sourcing style ‘time-line of facts’ , which also included corrections, and then building a bi-temporal view of the data on top. The fun starts when you need to be able to update your view when a new correction comes in.

      Like

    • Hey David. Thanks – I’ll take a look. Did you know that the pdf link you’ve provided is broken? So’s the SIG talk one on the wiki. Let me know when/if they are fixed so I can have a gander. 😀

      Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s