« Ramnivas on annotations, and @Singleton | Main | AOSD 2005 Conference has started... »

March 09, 2005

Event Driven Architecture

At the recent TSS Symposium (a really great event btw - you should go if you ever get the chance), I had the pleasure of attending Gregor Hohpe's talk on event-driven architectures. He gave a very pragmatic approach to building event driven systems, only taking on as much complexity as you need.

There's an obvious connection between AOP and event-driven systems (join points are events that occur during the runtime execution of a program...). So I'm sitting at the back in Gregor's session, and I get to thinking, "how easy could I make this using AOP?" And then I'm scribbling on the notes page of the handouts. Fortunately it was the last session of the day, so I went back to do some hacking and get the idea out of my system. What follows is the result of that hour of playing around - which if you were at TSS and attending my "Adopting AspectJ" talk is also what I showed briefly during that session.

My first decision was to build an @DSL to support the event driven style. My @DSL has three annotations:

@Retention(RetentionPolicy.RUNTIME) public @interface RaisesEvent { String value(); }
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface OnEvent { String value(); }
public @interface EventSubscriber { // marker }

Both fields and methods can be annotated with @RaisesEvent("eventName"). An event is published when an annotated field is updated, and after returning from an annotated method. Types that wish to receive event notifications are annotated with the @EventSubscriber annotation. Methods within such types can be annotated with @OnEvent("eventName") and will be invoked whenever a matching event is published. (I chose a simple name based matching scheme, other more complex schemes are clearly possible).

Here's a simple class that creates events:

class Producer { public Producer() {} @RaisesEvent("price-update") private int price; @RaisesEvent("calculation-complete") public int calculate(int x, int y) { return x*y; } public void setPrice(int price) { this.price = price; } }

And here's a class that subscribes to these events:

@EventSubscriber static class Recipient { // exposed public fields for testing public int calculationResult = 0; public int price = 0; @OnEvent("calculation-complete") void onCalculationCompletion(MethodBasedEvent event) { System.out.println("calculation-complete, result = " + event.getResult()); calculationResult = (Integer)event.getResult(); } @OnEvent("price-update") void onPriceUpdate(FieldBasedEvent event){ System.out.println("price-update, new price = " + event.getValue()); price = (Integer) event.getValue(); } }

The semantics I gave this system is that every instance of an @EventSubscriber class receives each event that it has an @OnEvent annotation for. The Producer and Recipient classes were defined as inner classes inside my test case, which passes as follows:

public class EventDispatcherTest extends TestCase { static class Recipient { ... static class Producer { ... public void testMethodEvent() { Recipient r1 = new Recipient(); Recipient r2 = new Recipient(); Producer p = new Producer(); int z = p.calculate(6,7); assertEquals("should have set result on r1", 6*7,r1.calculationResult); assertEquals("should have set result on r2", 6*7,r2.calculationResult); } public void testFieldSetEvent() { Recipient r1 = new Recipient(); Recipient r2 = new Recipient(); Producer p = new Producer(); p.setPrice(10); assertEquals("should have set price on r1",10,r1.price); assertEquals("should have set price on r2",10,r2.price); } @Override protected void setUp() throws Exception { super.setUp(); // reset "static" state outside of test class... SubscriberTracker.aspectOf(Recipient.class).removeAll(); } }

So how does the implementation look?

The implementation consists of two aspects, one to keep track of subscriber instances, and one to handle the publish/subscribe logic. There is also a mini hierarchy of Event interfaces and implementations (Event is the supertype of MethodBasedEvent and FieldBasedEvent). These Event interfaces hold information such as the new field value for field events, and the arguments and return value for method events.

Let's look at the SubscriberTracker aspect first.

import org.aspectprogrammer.eda.annotation.EventSubscriber; import java.util.Map; import java.util.WeakHashMap; import java.util.Set; /** * we need to keep track of all the instances of an event subscriber * so that we can dispatch events to them */ public aspect SubscriberTracker pertypewithin(@EventSubscriber *) { // use WeakHashMap for auto-garbage collection of keys private Map subscribers = new WeakHashMap(); /** * after returning from the initialization of any new object * (of the type that I am pertypewithin...), add it to the map. */ after(Object o) returning() : initialization(new(..)) && this(o) { subscribers.put(o,true); } public Set getSubscribers() { return subscribers.keySet(); } // for testing public void removeAll() { subscribers.clear(); } }

This aspect simply keeps track of all live instances of each type that has an @EventSubscriber annotation (see the pertypewithin clause). We need this so that we can dispatch events to them.

The EventDispatcher aspect is a little more complex, but not much more. Let's look at the aspect in parts. Here's the portion to do with event generation:

public aspect EventDispatcher { /** * the execution of any method with the @RaisesEvent annotation. * The annotation itself is exposed by the pointcut. */ pointcut eventGeneratingExecutionJP(RaisesEvent event) : execution(@RaisesEvent * *(..)) && @annotation(event); /** * Build a MethodBasedEvent from thisJoinPoint, and publish it */ after(RaisesEvent event) returning(Object ret) : eventGeneratingExecutionJP(event) { MethodBasedEventImpl methodEvent = new MethodBasedEventImpl(thisJoinPoint.getThis(), thisJoinPoint.getArgs(), ret); publish(event.value(),methodEvent); } /** * the set (update) of any field with the @RaisesEvent annotation * The annotation and the new value are exposed by the pointcut */ pointcut eventGeneratingSetJP(RaisesEvent event, Object fieldValue) : set(@RaisesEvent * *) && @annotation(event) && args(fieldValue); /** * Build a field event from thisJoinPoint and publish it */ after(RaisesEvent event, Object fieldValue) returning : eventGeneratingSetJP(event,fieldValue) { FieldBasedEventImpl fieldSetEvent = new FieldBasedEventImpl(thisJoinPoint.getTarget(), fieldValue); publish(event.value(),fieldSetEvent); } ...

This should all be fairly straightforward. Notice how easy it is to expose and use annotation values in advice.

Here's the portion of the aspect concerned with subscription. Whenever we load (static initialization) a type with the @EventSubscriber annotation we find all its methods with the @OnEvent annotation and keep them in a Map for dispatching events when they are published.

public aspect EventDispatcher { ... private Map> subscriptions = new HashMap>(); after() returning : staticinitialization(@EventSubscriber *) { subscribe(thisJoinPoint.getSignature().getDeclaringType()); } ... private void subscribe(Class clazz) { Method[] methods = clazz.getDeclaredMethods(); for(Method method : methods) { OnEvent onEvent = method.getAnnotation(OnEvent.class); if (onEvent != null) { Class[] parameterTypes = method.getParameterTypes(); if ((parameterTypes.length != 1) || (!Event.class.isAssignableFrom(parameterTypes[0]))) { throw new RuntimeException( "Signature of @OnEvent method must be V(Event): found " + method); } List subscribers = subscriptions.get(onEvent.value()); if (subscribers == null) subscribers = new ArrayList(); subscribers.add(method); subscriptions.put(onEvent.value(),subscribers); } } }

Notice that methods annotated with @OnEvent are expected to have a certain signature (a single parameter that is of type Event or a subtype of Event). We'll come back to that thought later.

Now that you've seen the subscribe helper method, it's as good a time as any to look at what publish does:

private void publish(String eventName, Event event) { List methods = subscriptions.get(eventName); if (methods == null) return; for( Method method : methods ) { Set subscribers = SubscriberTracker.aspectOf( method.getDeclaringClass()).getSubscribers(); for (Object subscriber : subscribers) { try { method.invoke(subscriber,new Object[]{event}); } catch (Exception ex) { throw new RuntimeException( "Unable to publish event to subscriber " + method + " : " + ex); } } } }

Note that whilst I've implemented a simple synchronous event dispatch it would be easy to change the implementation of publish to perform parallel asynchronous notifications using util.concurrent (or indeed to use any other mechanism to connect publishers and subscribers). The beauty of the scheme is that this change is completely encapsulated in the aspect.

Here's the final piece of the puzzle. We're building a mini @DSL that has basic rules associated with its usage: @OnEvent methods should only occur in types that have the @EventSubscriber annotation, and @OnEvent should only take a single parameter of type Event+, as described earlier. We can do better than a runtime check for this. We can actually extend the compiler to understand the rules of our @DSL. Here's the final piece of the puzzle:

declare warning : execution(@OnEvent * *(..)) && !within(@EventSubscriber *) : "@OnEvent methods must be declared in a type " + "with the @EventSubscriber annotation"; declare warning : execution(@OnEvent * *(!Event+)) : "@OnEvent methods must take a single argumement of type " + "Event (or a subtype of Event)";

Now if anyone using our aspect inadvertantly breaks one of these rules, the compiler will tell them straightaway.

All in all, a pretty powerful little system for such a small amount of code...

Posted by adrian at March 9, 2005 07:46 PM [permalink]


I nominate these for inclusion in AspectJ library!

Posted by: Nicholas Lesiecki at March 10, 2005 03:33 AM

Post a comment

Thanks for signing in, . Now you can comment. (sign out)

(If you haven't left a comment here before, you may need to be approved by the site owner before your comment will appear. Until then, it won't appear on the entry. Thanks for waiting.)

Remember me?