Monday, June 20, 2016

When things may get out of control: circuit breakers in practice. Hystrix.

It is amazing how tightly interconnected modern software systems are. Mostly every simple application has dependency on some external service or component, not to mention emerging at a great pace Internet of Things (or simply IoT) movement. It is good and not so at the same time, let us see why ...

There are many use cases when relying on other services, provided by someone externally or internally, makes a lot of sense (messaging, billing, taxes, payments, analytics, logistics, ...) but under the hood every such integration poses risks to our applications: they become dependent on availability and operationability of those services. Network latency, spikes of load, just banal software defects, each of these unknowns can bring our applications on its knees, making users and partners dissatisfied, to say it mildly.

The good news are there is a pattern we can employ to mitigate the risks: circuit breaker. Firstly explained in great details in the Release It! book by Michael T. Nygard, circuit breakers became the de-facto solution for dealing with external services. The idea is pretty simple: track the state of the external service on a given time interval to collect the knowledge about its availability. If the failure is being detected, circuit breaker opens, signalling that external service should better not be invoked for some time.

There are plenty of circuit breaker implementations available but because we are on JVM, we are going to talk about three of those: Netflix Hystrix, Akka and Apache Zest. To keep the posts considerably short, the topic of our discussion is going to be split in two parts: Netflix Hystrix followed by Akka and Apache Zest.

To show off circuit breakers in action, we are going to build a simple client around https://freegeoip.net/: public HTTP web API for software developers to search the geolocation of IP addresses. The client will return just brief geo-details about particular IP or hostname, wrapped into GeoIpDetails class:

@JsonIgnoreProperties(ignoreUnknown = true)
public final class GeoIpDetails {
    private String ip;
    @JsonProperty("country_code") private String countryCode;
    @JsonProperty("country_name") private String countryName;
    private double latitude;
    private double longitude;
}
So let us get started ...

Undoubtedly, Netflix Hystrix is the most advanced and thoroughly battle-tested circuit breaker implementation at the disposal of Java developers. It is built from the ground up to support asynchronous programming paradigm (heavily utilizing RxJava for that) and to have a very low overhead. It is more than just circuit breaker, it is full-fledged library to tolerate latency and failures in distributed systems, but we will touch upon basic Netflix Hystrix concepts only.

Netflix Hystrix has surprisingly simple design and is built on top of command pattern, with HystrixCommand in its core. Commands are identified by keys and are organized in groups. Before we are going to implement our own command, it is worth to talk about how Hystrix isolates the external service integrations.

Essentially, there are two basic strategies which Hystrix supports: offload the work somewhere else (using dedicated thread pool) or do the work in the current thread (relying on semaphores). Using dedicated thread pools, also known as the bulkhead pattern, is the right strategy to use in most use cases: the calling thread is unblocked, plus the timeout expectations could be set as well. With semaphores, the current thread are going to be busy till the work is completed, successfully or not (timeouts are claimed to be also supported since 1.4.x release branch but there are certain side effects).

Enough theory for now, let us jump into the code by creating our own Hystrix command class to access https://freegeoip.net/ using Apache HttpClient library:

public class GeoIpHystrixCommand extends HystrixCommand<String> {
    // Template: http://freegeoip.net/{format}/{host}
    private static final String URL = "http://freegeoip.net/";
    private final String host;
 
    public GeoIpHystrixCommand(final String host) {
        super(
            Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey("GeoIp"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("GetDetails"))
                .andCommandPropertiesDefaults(
                    HystrixCommandProperties.Setter()
                        .withExecutionTimeoutInMilliseconds(5000)
                        .withMetricsHealthSnapshotIntervalInMilliseconds(1000)
                        .withMetricsRollingStatisticalWindowInMilliseconds(20000)
                        .withCircuitBreakerSleepWindowInMilliseconds(10000)
                    )
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("GeoIp"))
                .andThreadPoolPropertiesDefaults(
                    HystrixThreadPoolProperties.Setter()
                        .withCoreSize(4)
                        .withMaxQueueSize(10)
                )
        );
        this.host = host;
    }
 
    @Override
    protected String run() throws Exception {
        return Request
            .Get(new URIBuilder(URL).setPath("/json/" + host).build())
            .connectTimeout(1000)
            .socketTimeout(3000)
            .execute()
            .returnContent()
            .asString();
    }
}

The first thing to get from this snippet is that Hystrix commands have a myriad of different properties which are initialized in the constructor. Command group and key, set to "GeoIp" and "GetDetails" respectively, we have already mentioned. Thread pool key, set to "GeoIp", and thread pool properties (for example, core pool size and maximum queue size) allow to tune thread pool configuration, the default execution isolation strategy used by Hystrix. Please notice that multiple commands may refer to the same thread pool (shedding the load for example), but semaphores are not shared.

Other GeoIpHystrixCommand command properties, arguably most important ones, would need some explanation:

  • executionTimeoutInMilliseconds sets the hard limit on overall command execution before timing out
  • metricsHealthSnapshotIntervalInMilliseconds indicates how often the status of the underlying circuit breaker should be recalculated
  • metricsRollingStatisticalWindowInMilliseconds defines the duration of rolling window to keep the metrics for the circuit breaker
  • circuitBreakerSleepWindowInMilliseconds sets the amount of time to reject requests for opened circuit breaker before trying again

It is worth to mention that Hystrix has sensible default value for every property so you are not obliged to provide them. However, those defaults are quite aggressive (in a very good sense) so you may need to relax some. Hystrix has a terrific documentation which talks about all the properties (and their default values) in details.

Another option which Hystrix incorporates is fallback in case the command execution was not successful, timed out or circuit breaker is tripped. Although fallback is optional, it is very good idea to have one, in case of https://freegeoip.net/ we may just return an empty response.

    @Override
    protected String getFallback() {
        return "{}"; /* empty response */
    }

Great, we have our command, and now what? There are multiple ways Hystrix command could be invoked. The most straightforward one is just synchronous execution using execute() method, for example:

public class GeoIpService {
    private final ObjectMapper mapper = new ObjectMapper();
 
    public GeoIpDetails getDetails(final String host) throws IOException {
        return mapper.readValue(new GeoIpHystrixCommand(host).execute(), 
            GeoIpDetails.class);
    }
}

In case of asynchronous execution, Hystrix has a couple of options, ranging from bare Java's Future to RxJava's Observable, for example:

public Observable<GeoIpDetails> getDetailsObservable(final String host) {
    return new GeoIpHystrixCommand(host)
        .observe()
        .map(result -> {
             try {
                 return mapper.readValue(result, GeoIpDetails.class);
              } catch(final IOException ex) {
                  throw new RuntimeException(ex);
              }
        });
}

The complete sources of the project example is available on Github.

If your project is built on top of very popular Spring Framework, there is a terrific out-of-the box Hystrix support using convenient (auto)configuration and annotations. Let us take a quick look on the same command implementation using Spring Cloud Netflix project (certainly, along with Spring Boot):

@Component
public class GeoIpClient {
    @Autowired private RestTemplate restTemplate;

    @HystrixCommand(
        groupKey = "GeoIp",
        commandKey = "GetDetails",
        fallbackMethod = "getFallback",
        threadPoolKey = "GeoIp",  
        commandProperties = {
            @HystrixProperty(
                name = "execution.isolation.thread.timeoutInMilliseconds", 
                value = "5000"
            ),
            @HystrixProperty(
                name = "metrics.healthSnapshot.intervalInMilliseconds", 
                value = "1000"
            ),
            @HystrixProperty(
                name = "metrics.rollingStats.timeInMilliseconds", 
                value = "20000"
            ),
            @HystrixProperty(
                name = "circuitBreaker.sleepWindowInMilliseconds", 
                value = "10000"
            )
        },
        threadPoolProperties = {
            @HystrixProperty(name = "coreSize", value = "4"),
            @HystrixProperty(name = "maxQueueSize", value = "10")
        }
    )
    public GeoIpDetails getDetails(final String host) {
        return restTemplate.getForObject(
            UriComponentsBuilder
                .fromHttpUrl("http://freegeoip.net/{format}/{host}")
                .buildAndExpand("json", host)
                .toUri(), 
            GeoIpDetails.class);
    }
 
    public GeoIpDetails getFallback(final String host) {
        return new GeoIpDetails();
    }
}

In this case the presence of Hystrix command is really hidden so the client just dials with a plain, injectable Spring bean, annotated with @HystrixCommand and instrumented using @EnableCircuitBreaker annotation.

And last, but not least, there are quite a few additional contributions for Hystrix, available as part of the Hystrix Contrib project. The one we are going to talk about first is hystrix-servo-metrics-publisher which exposes a lot of very useful metrics over JMX. It is essentially a plugin which should be explicitly registered with Hystrix, for example here is one of the ways to do that:

HystrixPlugins
    .getInstance()
    .registerMetricsPublisher(HystrixServoMetricsPublisher.getInstance());

When our application is up and running, here is how it looks like in JVisualVM (please notice that the com.netflix.servo MBean is going to appear only after the first Hystrix command execution or instrumented method invocation so you may not see it immediately on application start):

When talking about Hystrix, it is impossible not to mention Hystrix Dashboard: terrific web UI to monitor Hystrix metrics in real time.

Thanks again to Spring Cloud Netflix, it is very easy to integrate it into your applications using just @EnableHystrixDashboard annotation and another project from Hystrix Contrib portfolio, hystrix-metrics-event-stream which exposes Hystrix metrics over event stream. The complete version of the Spring-based project example is available on Github.

Hopefully at this point you would agree that, essentially, every integration with external services (which are most of the time just a black boxes) introduces instability into our applications and may cause cascading failures and serious outages. With this regards, Netflix Hystrix could be a life saver, worth adopting.

In the next part we are going to look at another circuit breaker implementations, namely the one available as part of Akka toolkit and Apache Zest.

All projects are available under Github repository.

Thursday, April 28, 2016

Laziness at extreme: developing JAX-RS services with Spring Boot

I think it would be fair to state that we, as software developers, are always looking for the ways to write less code which does more stuff, automagically or not. With this regards, Spring Boot project, proud member of the Spring portfolio, disrupted the traditional approaches, dramatically speeding up and simplifying Spring-based applications development.

There is a lot to be said about Spring Boot, intrinsic details of how it works and its seamless integration with most if not all Spring projects. But its capabilities go far beyond that, supporting first-class integration with popular Java frameworks.

In this post we are going to take a look at how we can use Spring Boot in conjunction with Apache CXF project for a rapid REST(ful) web services development. As we are going to see very soon, Spring Boot takes care of quite a lot of boilerplate, letting us to concentrate on the parts of the application which do have real value. Hopefully, at the end of this post the benefits of adopting Spring Boot for your projects become apparent.

With that, let us get started by developing a simple people management REST(ful) web service, wrapped up into familiar PeopleRestService JAX-RS resource:

@Path("/people")
@Component
public class PeopleRestService {
    @GET
    @Produces({MediaType.APPLICATION_JSON})
    public Collection<Person> getPeople() {
        return Collections.singletonList(new Person("a@b.com", "John", "Smith"));
    }
}

Not much to add here, pretty simple implementation which returns the hard-coded collection of people. There are a couple of ways we can package and deploy this JAX-RS service, but arguably the simplest one is by hosting it inside embedded servlet container like Tomcat, Jetty or Undertow. With that comes the routine: container initialization, configuring Spring context locations, registering listeners, ... Let us see how Spring Boot can help here by dissecting the Spring context configuration below.

@Configuration
@EnableAutoConfiguration
@ComponentScan(basePackageClasses = PeopleRestService.class)
public class AppConfig {
    @Autowired private PeopleRestService peopleRestService;
 
    @Bean(destroyMethod = "shutdown")
    public SpringBus cxf() {
        return new SpringBus();
    }

    @Bean(destroyMethod = "destroy") @DependsOn("cxf")
    public Server jaxRsServer() {
        final JAXRSServerFactoryBean factory = new JAXRSServerFactoryBean();

        factory.setServiceBean(peopleRestService);
        factory.setProvider(new JacksonJsonProvider());
        factory.setBus(cxf());
        factory.setAddress("/");

        return factory.create();
    }

    @Bean
    public ServletRegistrationBean cxfServlet() {
        final ServletRegistrationBean servletRegistrationBean = 
            new ServletRegistrationBean(new CXFServlet(), "/api/*");
        servletRegistrationBean.setLoadOnStartup(1);
        return servletRegistrationBean;
    }
}

The AppConfig class looks like a typical Spring Java-based configuration except this unusual @EnableAutoConfiguration annotation, which with no surprise comes from Spring Boot module. Under the hood, this annotation enables a complex and intelligent process of guessing, among many other things, what kind of the application we are going to run and what kind of Spring beans we may need for our application. With this configuration in place, we just need to have a runner for our application, also with a bit of Spring Boot flavor:

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(AppConfig.class, args);
    }
}

Having @SpringBootApplication meta-annotation and using SpringApplication to initialize our Spring context, we have a full-fledged runnable Java application, which could be run from Apache Maven using Spring Boot plugin:

mvn spring-boot:run
Or packaged as a single runnable uber-JAR and invoked from command line:
mvn package
java -jar target/jax-rs-2.0-cxf-spring-boot-0.0.1-SNAPSHOT.jar

And that's it, just a couple of annotations along with a single line of code (main method). Once we run the application, we could make sure that our people management REST(ful) web service is deployed properly and is fully operational:

$ curl -i http://localhost:8080/api/people

HTTP/1.1 200 OK
Content-Type: application/json;charset=utf-8
Transfer-Encoding: chunked
Server: Jetty(9.3.8.v20160314)

[{"email":"a@b.com","firstName":"John","lastName":"Smith"}]

At this point you may wonder how does it work? We have not dealt with servlet container anywhere so how come Jetty is serving our requests? The truth is, we only need to include our container of choice as a dependency, for example using Apache Maven's pom.xml file:

<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-server</artifactId>
    <version>9.3.8.v20160314</version>
</dependency>
Spring Boot along with @EnableAutoConfiguration/@SpringBootApplication does the rest: it detects the presence of Jetty in the classpath, comes to a valid conclusion that our intention is to run web application and complement the Spring context with the necessary pieces. Isn't it just brilliant?

It would be unfair to finish up without covering yet another important feature of the Spring Boot project: integration testing support. In this regards Spring Boot takes the same approach and provides a couple of annotations to take off all the scaffolding we would have to write ourselves otherwise. For example:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = AppConfig.class)
@WebIntegrationTest(randomPort = true)
public class PeopleRestServiceIntegrationTest {
    @Value("${local.server.port}") private int port;
 
    @Before
    public void setUp() {
        RestAssured.port = port;
    }
 
    @Test
    public void testListOfPersonsIsBeingReturnedSuccessfuly() {
        given()
            .when() 
            .contentType(ContentType.JSON)
            .get("/api/people")
            .then()
            .statusCode(200)
            .log()
            .ifValidationFails();
    }
}

Just two annotations, @SpringApplicationConfiguration (please notice that we are using the same configuration in test as for the main application) and @WebIntegrationTest (which takes the specifics of the web application testing into account and runs the embedded servlet container on random port), and we have full-fledged integration test against our people management JAX-RS service. The port which servlet container is running on is available through local.server.port environment property so we can configure REST-assured in the test background. Easy and simple.

In this post we have just looked at the one specific use case of using Spring Boot to increase the development velocity of your JAX-RS projects. Many, many things become very trivial with Spring Boot, with more and more intelligence being added with every single release, not to mention excellent integration with your IDE of choice. I hope you really got excited about Spring Boot and eager to learn more about it. It is worth the time and effort.

The complete project is available on Github.

Thursday, February 25, 2016

Your JAX-RS APIs were not born equal: using dynamic features

This time we are going to talk a little bit about JAX-RS 2.0 APIs and touch on one very interesting aspect of the specification: dynamic features and how they are useful.

Traditionally, when JAX-RS 2.0 APIs are configured and deployed (using Application class, bootstrapped from servlet or created through RuntimeDelegate), there is an option to register additional providers and features. The great examples of those could be bean validation (JSR 349) or Java API for JSON processing (JSR-353) support. Those providers and features are going to be applied to all JAX-RS 2.0 resources and in most use cases this is a desired behavior. However, from time to time there is a need to enable a particular provider or feature only for some resources, leaving others unaffected. This is exactly the use case where dynamic features are going to help us a lot.

For this post we are going to use the latest version 3.1.5 of excellent Apache CXF framework but dynamic features are part of the JAX-RS 2.0 specification and are supported by most (if not all) of the implementations.

Let us consider a very simple JAX-RS 2.0 API to manage people, with a single method to handle HTTP GET requests. Let us assume this is a version 1 of the API and although the @Range annotation is specified for the count query parameter, its support was never implemented and it is present in the code for documentation purposes only.

@Path("/v1/people")
public class PeopleRestService {
    @Produces( { MediaType.APPLICATION_JSON } )
    @GET
    public List<Person> getAll(@Range(min = 1, max = 10) @QueryParam("count") int count) {
        return Collections.nCopies(count, new Person("a@b.com", "A", "B"));
    }
}

In this case, passing an invalid value for the count query parameter is going to result in Internal Server Error. Let us make sure this is exactly what is happening:

$ curl -i http://localhost:8080/rest/api/v1/people?count=-1

HTTP/1.1 500 Server Error
Cache-Control: must-revalidate,no-cache,no-store
Content-Type: text/html;charset=iso-8859-1
Content-Length: 377
Connection: close
Server: Jetty(9.3.7.v20160115)

After some time we realized the issues with this API and decided to implement the proper validation mechanism in place, using the Bean Validation 1.1 integration with JAX-RS 2.0. However, we made a decision to create version 2 of the API and to keep version 1 untouched as its clients do not expect any other HTTP status codes except 200 and 500 to be returned (unfortunately, in real life it happens more often than not).

There are couple of different approaches to implement such per-API customization, but probably the most simple one is by introducing a dedicated annotation, for example @EnableBeanValidation, and annotating JAX-RS 2.0 resource class with it:

@Path("/v2/people")
@EnableBeanValidation
public class ValidatingPeopleRestService {
    @Produces( { MediaType.APPLICATION_JSON } )
    @GET
    public @Valid List<Person> getAll(@Range(min = 1, max = 10) @QueryParam("count") int count) {
        return Collections.nCopies(count, new Person("a@b.com", "A", "B"));
    }
}

To enable Bean Validation 1.1 for all the JAX-RS 2.0 APIs annotated with @EnableBeanValidation we are going to create a dynamic feature class, BeanValidationDynamicFeature:

@Provider
public class BeanValidationDynamicFeature implements DynamicFeature {
    private final JAXRSBeanValidationInInterceptor inInterceptor;
    private final JAXRSBeanValidationOutInterceptor outInterceptor;
    
    public BeanValidationDynamicFeature(final BeanValidationProvider provider) {
        this.inInterceptor = new JAXRSBeanValidationInInterceptor();
        this.inInterceptor.setProvider(provider);
        
        this.outInterceptor = new JAXRSBeanValidationOutInterceptor();
        this.outInterceptor.setProvider(provider);
    }
    
    @Override
    public void configure(final ResourceInfo resourceInfo, final FeatureContext context) {
        if (resourceInfo.getResourceClass().getAnnotation(EnableBeanValidation.class) != null) {
            context.register(inInterceptor);
            context.register(outInterceptor);
        }
    }
}
Its job is pretty simple, just register JAXRSBeanValidationInInterceptor and JAXRSBeanValidationOutInterceptor interceptor instances as additional providers for JAX-RS 2.0 APIs in question. One minor but important note though: exception mappers are not supported by dynamic features, at least with respect to Apache CXF implementation, and should be registered as a regular providers (along with dynamic features themselves), for example:

@Bean @DependsOn("cxf")
public Server jaxRsServer() {
    final JAXRSServerFactoryBean factory = 
        RuntimeDelegate.getInstance().createEndpoint( 
            jaxRsApiApplication(), 
            JAXRSServerFactoryBean.class 
        );
        
    factory.setServiceBean(validatingPeopleRestService());
    factory.setServiceBean(peopleRestService());
    factory.setProvider(new JacksonJsonProvider());
    factory.setProvider(new BeanValidationDynamicFeature(new BeanValidationProvider()));
    factory.setProvider(new ValidationExceptionMapper());
        
    return factory.create();
}

@Bean 
public JaxRsApiApplication jaxRsApiApplication() {
    return new JaxRsApiApplication();
}
    
@Bean 
public ValidatingPeopleRestService validatingPeopleRestService() {
    return new ValidatingPeopleRestService();
}
    
@Bean 
public PeopleRestService peopleRestService() {
    return new PeopleRestService();
}

That is basically all we have to do. Once the BeanValidationDynamicFeature is registered (in this case using JAXRSServerFactoryBean), it is going to be applied to all matching service beans. Let us make sure that for version 2 of our people management API the proper out of the box validation is triggered:

$ curl -i http://localhost:8080/rest/api/v2/people?count=-1

HTTP/1.1 400 Bad Request
Content-Length: 0
Server: Jetty(9.3.7.v20160115)

This time the response is different, indicating that invalid input has been submitted by the client (straight result of Bean Validation 1.1 in action): Bad Request.

Hopefully, dynamic features are going to be yet another useful tool in your toolbox. The example we have covered here is somewhat imaginary but it is very easy to use dynamic features with security, tracing, logging, profiling, ... Moreover, dynamic features can be applied even on a particular resource methods, allowing fined-grained control over your APIs.

The complete project source is available on Github.

Saturday, December 26, 2015

Divided We Win: an event sourcing / CQRS prospective on write and read models separation. Queries.

Quite a while ago we have started to explore command query responsibility segregation (CQRS) architecture as an alternative way to develop distributed systems. Last time we have covered only commands and events but not queries. The goal of this blog post is to fill the gap and discuss the ways to handle queries following CQRS architecture.

We will start where we left off last time, with the sample application which was able to handle commands and persist events in the journal. In order to support read path, or queries, we are going to introduce data store. For the sake of keeping things simple, let it be in-memory H2 database. The data access layer is going to be handled by awesome Slick library.

To begin with, we have to come up with a simple data model for the User class, managed by UserAggregate persistent actor. In this regards, the Users class is a typical mapping of the relational table:

class Users(tag: Tag) extends Table[User](tag, "USERS") {
  def id = column[String]("ID", O.PrimaryKey)
  def email = column[String]("EMAIL", O.Length(512))
  def uniqueEmail = index("USERS_EMAIL_IDX", email, true)
  def * = (id, email) <> (User.tupled, User.unapply)
}
It is important to notice at this point that we enforce uniqueness constraint on User's email. We will come back to this subtle detail later, during the integration with UserAggregate persistent actor. Next thing we need is a service to manage data store access, namely persisting and querying Users. As we are in the Akka universe, obviously it is going to be an actor as well. Here it is:
case class CreateSchema()
case class FindUserByEmail(email: String)
case class UpdateUser(id: String, email: String)
case class FindAllUsers()

trait Persistence {
  val users = TableQuery[Users]  
  val db = Database.forConfig("db")
}

class PersistenceService extends Actor with ActorLogging with Persistence {
  import scala.concurrent.ExecutionContext.Implicits.global
   
  def receive = {
    case CreateSchema => db.run(DBIO.seq(users.schema.create))
      
    case UpdateUser(id, email) => {
      val query = for { user <- users if user.id === id } yield user.email
      db.run(users.insertOrUpdate(User(id, email)))
    }
    
    case FindUserByEmail(email) => {
      val replyTo = sender
      db.run(users.filter( _.email === email.toLowerCase).result.headOption) 
        .onComplete { replyTo ! _ }
    }
    
    case FindAllUsers => {
      val replyTo = sender
      db.run(users.result) onComplete { replyTo ! _ }
    }
  }
}
Please notice that PersistenceService is regular untyped Akka actor, not a persistent one. To keep things focused, we are going to support only four kind of messages:
  • CreateSchema to initialize database schema
  • UpdateUser to update user's email address
  • FindUserByEmail to query user by its email address
  • FindAllUsers to query all users in the data store

Good, the data store services are ready but nothing really fills them with data. Moving on to the next step, we will refactor UserAggregate, more precisely the way it handles UserEmailUpdate command. At its current implementation, user's email update happens unconditionally. But remember, we imposed uniqueness constraints on emails so we are going to change command logic to account for that: before actually performing the update we will run the query against read model (data store) to make sure no user which such email is already registered.

val receiveCommand: Receive = {
  case UserEmailUpdate(email) => 
    try {
      val future = (persistence ? FindUserByEmail(email)).mapTo[Try[Option[User]]]
      val result = Await.result(future, timeout.duration) match {
        case Failure(ex) => Error(id, ex.getMessage)
        case Success(Some(user)) if user.id != id => Error(id, s"Email '$email' already registered")
        case _ => persist(UserEmailUpdated(id, email)) { event =>
          updateState(event)
          persistence ! UpdateUser(id, email)
        }
        Acknowledged(id)
      }
        
      sender ! result
  } catch {
    case ex: Exception if NonFatal(ex) => sender ! Error(id, ex.getMessage) 
  }
}
Pretty simple, but not really idiomatic: the Await.result does not look as it belongs to this code. My first attempt was to use future / map / recover / pipeTo pipeline to keep the flow completely asynchronous. However, the side effect I have observed is that in this case the persist(UserEmailUpdated(id, email)) { event => ... } block is supposed to be executed in another thread most of the time (if result is not ready) but it didn't, very likely because of thread context switch. So the Await.result was here to the rescue.

Now every time user's email update happens, along with persisting the event we are going to record this fact in the data store as well. Nice, we are getting one step closer.

The last thing we have to consider is how to populate the data store from the event journal? Another experimental module from Akka Persistence portfolio is of great help here, Akka Persistence Query. Among other features, it provides the ability to query the event journal by persistence identifiers and this is what we are going to do in order to populate data store from the journal. Not a surprise, there would be UserJournal actor responsible for that.

case class InitSchema()

class UserJournal(persistence: ActorRef) extends Actor with ActorLogging {
  def receive = {
    case InitSchema => {
      val journal = PersistenceQuery(context.system)
        .readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
      val source = journal.currentPersistenceIds()
      
      implicit val materializer = ActorMaterializer()
      source
        .runForeach { persistenceId => 
          journal.currentEventsByPersistenceId(persistenceId, 0, Long.MaxValue)
            .runForeach { event => 
              event.event match {
                case UserEmailUpdated(id, email) => persistence ! UpdateUser(id, email)
              }
            }
        }
    }
  }
}
Basically, the code is simple enough but let us reiterate a bit on what it does. First thing, we ask the journal about all persistence identifiers it has using currentPersistenceIds method. Secondly, for every persistence identifier we query all the events from the journal. Because there is only one event in our case, UserEmailUpdated, we just directly transform it into data store services UpdateUser message.

Awesome, we are mostly done! The simplest thing at the end is to add another endpoint to UserRoute which returns the list of the existing users by querying the read model.

pathEnd {
  get {
    complete {
      (persistence ? FindAllUsers).mapTo[Try[Vector[User]]] map { 
        case Success(users) => 
          HttpResponse(status = OK, entity = users.toJson.compactPrint)
        case Failure(ex) => 
          HttpResponse(status = InternalServerError, entity = ex.getMessage)
      }
    }
  }
}
We are ready to give our revamped CQRS sample application a test drive! Once it is up and running, let ensure that our journal and data store are empty.
$ curl -X GET http://localhost:38080/api/v1/users
[]
Make sense, as we haven't created any users yet. Let us do that by updating two users with different email addresses and querying the read model again.
$ curl -X PUT http://localhost:38080/api/v1/users/123 -d email=a@b.com
Email updated: a@b.com

$ curl -X PUT http://localhost:38080/api/v1/users/124 -d email=a@c.com
Email updated: a@c.com

$ curl -X GET http://localhost:38080/api/v1/users
[
  {"id":"123","email":"a@b.com"},
  {"id":"124","email":"a@c.com"}
]
As expected, this time the result is different and we can see two users are being returned. The moment of truth, let us try to update the email of user with 124 with the one of user with 123.
$ curl -X PUT http://localhost:38080/api/v1/users/123 -d email=a@c.com
Email 'a@c.com' already registered
Teriffic, this is what we wanted! The read (or query) model is very helpful and works just fine. Please notice when we restart the application, the read data store should be repopulated from the journal and return the previously created users:
$ curl -X GET http://localhost:38080/api/v1/users
[
  {"id":"123","email":"a@b.com"},
  {"id":"124","email":"a@c.com"}
]

With this post coming to the end, we are wrapping up the introduction into CQRS architecture. Although one may say many things are left out of scope, I hope the examples presented along our journey were useful to illustrate the idea and CQRS could be an interesting option to consider for your next projects.

As always, the complete source code is available on GitHub. Many thanks to Regis Leray and Esfandiar Amirrahimi, two brilliant developers, for helping out a lot with this series of blog posts.

Monday, October 5, 2015

Creating sample HTTPS server for fun and profit

Often during development or/and testing against real-life scenarios we, developers, are facing a need to run a full-fledged HTTPS server, possibly doing some mocking at the same time. On JVM platform, it used to be not trivial at all unless you know the right tool for the job. In this post we are going to create a skeleton of fully operational HTTPS server using terrific Spray framework and Scala language.

To begin with, we need to generate x509 certificate and private key respectively. Luckily, it is very easy to do using openssl command line tool.

openssl req 
    -x509 
    -sha256 
    -newkey rsa:2048 
    -keyout certificate.key 
    -out certificate.crt 
    -days 1024 
    -nodes

As we are on JVM platform, our essential goal is to have a Java keystore (JKS), a repository of security certificates. However, to import our newly generated certificate into JKS, we have to export it in PKCS #12 format and then create keystore out of it. Again, openssl on the resque.

openssl pkcs12 
    -export 
    -in certificate.crt 
    -inkey certificate.key 
    -out server.p12 
    -name sample-https-server 
    -password pass:change-me-please
Please note that the archive server.p12 is protected by a password. Now, the final step involves the command line tool from JDK distribution called keytool.
keytool -importkeystore 
    -srcstorepass change-me-please 
    -destkeystore sample-https-server.jks 
    -deststorepass change-me-please 
    -srckeystore server.p12 
    -srcstoretype PKCS12 
    -alias sample-https-server
The result is password-protected sample-https-server.jks keystore which we can use in our HTTPS server application to configure SSL context. Spray has very good documentation and plenty of examples available, one of those is sample SslConfiguration which we can use to configure KeyManager, TrustManager and SSLContext.
trait SslConfiguration {
  // If there is no SSLContext in scope implicitly, the default SSLContext is 
  // going to be used. But we want non-default settings so we are making 
  // custom SSLContext available here.
  implicit def sslContext: SSLContext = {
    val keyStoreResource = "/sample-https-server.jks"
    val password = "change-me-please"

    val keyStore = KeyStore.getInstance("jks")
    keyStore.load(getClass.getResourceAsStream(keyStoreResource), password.toCharArray)
    val keyManagerFactory = KeyManagerFactory.getInstance("SunX509")
    keyManagerFactory.init(keyStore, password.toCharArray)
    val trustManagerFactory = TrustManagerFactory.getInstance("SunX509")
    trustManagerFactory.init(keyStore)
    val context = SSLContext.getInstance("TLS")
    context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom)
    context
  }

  // If there is no ServerSSLEngineProvider in scope implicitly, 
  // the default one is going to be used. But we would like to configure
  // cipher suites and protocols  so we are making a custom ServerSSLEngineProvider
  // available here.
  implicit def sslEngineProvider: ServerSSLEngineProvider = {
    ServerSSLEngineProvider { engine =>
      engine.setEnabledCipherSuites(Array("TLS_RSA_WITH_AES_128_CBC_SHA"))
      engine.setEnabledProtocols(Array( "TLSv1", "TLSv1.1", "TLSv1.2" ))
      engine
    }
  }
}
There are a few points to highlight here. First of all, usage of our own keystore created previously (which for convenience we are loading as classpath resource):
val keyStoreResource = "/sample-https-server.jks"
val password = "change-me-please"
Also, we are configuring TLS only (TLS v1.0, TLS v1.1 and TLS v1.2), no SSLv3 support. In addition to that, we are enabling only one cipher: TLS_RSA_WITH_AES_128_CBC_SHA. It has been done mostly for illustration, as in most cases all supported ciphers could be enabled.
engine.setEnabledCipherSuites(Array("TLS_RSA_WITH_AES_128_CBC_SHA"))
engine.setEnabledProtocols(Array( "TLSv1", "TLSv1.1", "TLSv1.2" ))

With that, we are ready to create a real HTTPS server, which thanks to Spray framework is just a couple of lines long:

class HttpsServer(val route: Route = RestService.defaultRoute) extends SslConfiguration {
  implicit val system = ActorSystem()
  implicit val timeout: Timeout = 3 seconds 

  val settings = ServerSettings(system).copy(sslEncryption = true)
  val handler = system.actorOf(Props(new RestService(route)), name = "handler")

  def start(port: Int) = Await.ready(
    IO(Http) ? Http.Bind(handler, interface = "localhost", port = port, settings = Some(settings)), 
    timeout.duration)
      
  def stop() = {
    IO(Http) ? Http.CloseAll
    system.stop(handler)
  }
}

Any HTTPS server which does nothing at all is not very useful. That is where route property comes into play: using Spray routing extensions, we are passing the mappings (or routes) to handle the requests straight to HTTP service actor (RestService).

class RestService(val route: Route) extends HttpServiceActor with ActorLogging {
  def receive = runRoute {
    route
  }
}
With default route being just that:
object RestService {
  val defaultRoute = path("") {
    get {
      complete {
        "OK!\n"
      }
    }
  }
}
Basically, that is all we need and our HTTPS server is ready to take a test drive! The simplest way to run it is by using Scala application.
object HttpsServer extends App {
  val server = new HttpsServer
  server.start(10999)
}

Despite being written in Scala, we can easily embed it into any Java application (using a bit non-standard for Java developers naming conventions), for example:

public class HttpsServerRunner {
    public static void main(String[] args) {
        final HttpsServer server = new HttpsServer(RestService$.MODULE$.defaultRoute());
        server.start(10999);
    }
}

Once up and running (the easiest way to do that is sbt run), the exposed default route of our simple HTTPS server could be accessed either from the browser or using curl command line client (-k command line argument turns off SSL certificate verification):

$ curl -ki https://localhost:10999

HTTP/1.1 200 OK
Server: spray-can/1.3.3
Date: Sun, 04 Oct 2015 01:25:47 GMT
Content-Type: text/plain; charset=UTF-8
Content-Length: 4

OK!
Alternatively, the certificate could be passed along with the curl command so a complete SSL certificate verification takes place, for example:
$  curl -i --cacert src/main/resources/certificate.crt  https://localhost:10999

HTTP/1.1 200 OK
Server: spray-can/1.3.3
Date: Sun, 04 Oct 2015 01:28:05 GMT
Content-Type: text/plain; charset=UTF-8
Content-Length: 4

OK!

All is looking great but could we use HTTPS server as part of integration test suite to verify / stub / mock, for example, the interactions with third-party services? The answer is, yes, absolutely, thanks to JUnit rules. Let us take a look on the simplest implementation possible of HttpsServerRule:

class HttpsServerRule(@BeanProperty val port: Int, val route: Route) 
    extends ExternalResource {
  val server = new HttpsServer(route)
  override def before() = server.start(port)
  override def after() = server.stop()
}

object HttpsServerRule {
  def apply(port: Int) = new HttpsServerRule(port, RestService.defaultRoute);
  def apply(port: Int, route: Route) = new HttpsServerRule(port, route);
}

The JUnit test case for our default implementation uses brilliant RestAssured library which provides a Java DSL for easy testing of REST services.

public class DefaultRestServiceTest {
    @Rule public HttpsServerRule server = 
        HttpsServerRule$.MODULE$.apply(65200);
 
    @Test
    public void testServerIsUpAndRunning() {
        given()
            .baseUri("https://localhost:" + server.getPort())
            .auth().certificate("/sample-https-server.jks", "change-me-please")
            .when()
            .get("/")
            .then()
            .body(containsString("OK!"));
    }
}

For sure, not much you can do with default implementation so providing the custom one is a must-have option. Luckily, we sorted that out early on by accepting the routes.

object CustomRestService {
  val route = 
    path("api" / "user" / IntNumber) { id =>
      get {
        complete {
          "a@b.com"
        }
      }
    }
}

And here is a test case for it:

public class CustomRestServiceTest {
    @Rule public HttpsServerRule server = 
        HttpsServerRule$.MODULE$.apply(65201, CustomRestService$.MODULE$.route());
 
    @Test
    public void testServerIsUpAndRunning() {
        given()
            .baseUri("https://localhost:" + server.getPort())
            .auth().certificate("/sample-https-server.jks", "change-me-please")
            .when()
            .get("/api/user/1")
            .then()
            .body(containsString("a@b.com"));
    }
}

As it turns out, creating a full-blown HTTPS server is not hard at all and could be really fun, once you know the right tool to do that. Spray framework is one of those magic tools. As many of you are aware, Spray is going to be replaced by Akka HTTP which had seen a 1.0 release recently but at the moment lacks a lot of features (including HTTPS support), keeping Spray as a viable choice.

The complete project is available on Github.

Tuesday, August 25, 2015

Divided We Win: an event sourcing / CQRS prospective on write and read models separation. Commands and Events.

In today's post we are going to unveil some very interesting (in my opinion) architecture styles: event sourcing and command query responsibility segregation (CQRS). Essentially, in both of them events are in the heart of the system design and reflect any changes of the state which are happening. It is quite different from the traditional CRUD architecture where usually only the last known state is kept, with any historical background effectively lost.

Another appealing option which CQRS brings on the table is a natural separation of the write model (state modification initiated by commands and resulted in events) and read model (query for, in most case, last know state). It gives you a lot of freedom in picking the right data storage (or storages) for serving a different application use cases and demands. But, as always, there is no free lunch: the price to pay is the increased complexity of the system.

To keep the post reasonably short, the whole discussion is split into two parts: Commands and Events (this one), and Queries (upcoming). Using very simple model of the user entity as an example, we are going to design the application which uses CQRS architecture. There are few libraries available on the JVM platform, but the one we are going to use is Akka, more precisely its two recently added components Akka Persistence and Akka HTTP. Please do not worry if Akka is somewhat new to you, the examples are going to be really basic and (hopefully) easy to follow. So let us get started!

As we mentioned earlier, the changes in our system are happening as the result of a command and may lead to zero or more events. Let us model that by defining two traits:

trait Event
trait Command

In turn, those events may trigger state changes within application entities so let us model that with a generic trait as well:

trait State[T] {
  def updateState(event: Event): State[T]
}

Quite simple so far. Now, as we are modeling a user, UserAggregate is going to constitute our model and be responsible for applying updates to a particular User, identifiable by its id. Also, every user is going to have an email property which could be changed upon receiving UserEmailUpdate command and could cause UserEmailUpdated event to be created as the result. The following code snippet defines all that in terms of state, command and event within UserAggregate companion object.

object UserAggregate {
  case class User(id: String, email: String = "") extends State[User] {
    override def updateState(event: Event): State[User] = event match {
      case UserEmailUpdated(id, email) => copy(email = email)
    }
  }

  case class UserEmailUpdate(email: String) extends Command
  case class UserEmailUpdated(id: String, email: String) extends Event
}
For the readers familiar with CQRS and event sourcing, this example may look very naïve but I think it is good enough for grasping the basics.

Having our foundational blocks defined, it is a time to look on the most interesting part: accepting commands, transforming them into persisted events and applying the state changes, all of that are responsibilities of UserAggregate. In the context of CQRS and event sourcing, persisting events is a crucial capability of the system. Events are the single source of truth and the state of any single entity could be reconstructed to any point in time by replaying all the events relevant to it. This is a moment were Akka Persistence is joining the stage.

To take one step back, Akka is a brilliant library (or even toolkit) to build distributed systems based on actor model. On top of that, Akka Persistence enriches actors with persistence capabilities, letting them to store their messages (or events) in the durable journal. One might say that journal can grow very, very large and replaying all the events to reconstruct the state could take a lot of time. It is a valid point so Akka Persistence also adds the capability to store persistent snapshots in the durable storage as well. It speeds up the things a lot as only the events happening since the last snapshot should be replayed. By default, LevelDB is a durable storage engine used by Akka Persistence but it is pluggable feature with many alternatives available.

With that, let us take a look on UserAggregate actor which is going to be responsible for managing a User entity.

class UserAggregate(id: String) extends PersistentActor with ActorLogging {
  import UserAggregate._

  override def persistenceId = id
  var state: State[User] = User(id)

  def updateState(event: Event): Unit = {
    state = state.updateState(event)
  }
  
  val receiveCommand: Receive = {
    case UserEmailUpdate(email) => {
      persist(UserEmailUpdated(id, email)) { event =>
        updateState(event)
        sender ! Acknowledged(id)
      }
    }
  }

  override def receiveRecover: Receive = {
    case event: Event => updateState(event)
    case SnapshotOffer(_, snapshot: User) => state = snapshot
  }
}

So far it is the most complicated part so let us dissect the key pieces. First of all, UserAggregate extends PersistentActor, which adds the persistent capabilities to it. Second, every persistent actor must have an unique persistenceId: it is used as an identifier inside events journal and snapshot storage. And lastly, in contrast to regular Akka actors, persistent actors do have two entry points: receiveCommand to process commands and receiveRecover to replay events.

Getting back to our example, once the UserAggregate receives UserEmailUpdate command, first of all it persists the UserEmailUpdated event in the journal using persist(...) call and then updates aggregate's state using updateState(...) call, replying with acknowledgement to the sender. To see the complete example in action, let us create a simple REST endpoint using another great project from Akka's family, Akka HTTP, emerged from terrific Spray framework.

For now, we are going to define a simple route to handle a PUT request at /api/v1/users/{id} location, where {id} essentially is a placeholder for user identifier. It will accept a single form-encoded parameter email to update user's email address.

object UserRoute {
  import scala.concurrent.ExecutionContext.Implicits.global
  import scala.language.postfixOps
  
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val timeout: Timeout = 5 seconds

  val route = {
    logRequestResult("eventsourcing-example") {
      pathPrefix("api" / "v1" / "users") {
        path(LongNumber) { id =>
          (put & formFields('email.as[String])) { email =>
            complete {
              system
                .actorSelection(s"user/user-$id")
                .resolveOne
                .recover {
                  case _: ActorNotFound => 
                    system.actorOf(Props(new UserAggregate(id.toString)), s"user-$id")
                }
                .map {                 
                  _ ? UserEmailUpdate(email) map {
                    case Acknowledged(_) => 
                      HttpResponse(status = OK, entity = "Email updated: " + email)
                    case Error(_, message) => 
                      HttpResponse(status = Conflict, entity = message)
                  }
                }
            }
          }
        }
      }
    }
  }
}
The only missed piece is the runnable class to plug the handler for this REST endpoint so let us define one, thanks to Akka HTTP it is trivial:
object Boot extends App with DefaultJsonProtocol {
  Http().bindAndHandle(route, "localhost", 38080)
}
Nothing gives more assurance that the things are really working except an easy and readable test cases, built using ScalaTest framework complemented by Akka HTTP TestKit.
class UserRouteSpec extends FlatSpec 
      with ScalatestRouteTest 
      with Matchers 
      with BeforeAndAfterAll {
  import com.example.domain.user.UserRoute
  
  implicit def executionContext = scala.concurrent.ExecutionContext.Implicits.global

  "UserRoute" should "return success on email update" in {
    Put("http://localhost:38080/api/v1/users/123", FormData("email" ->  "a@b.com")) ~> UserRoute.route ~> check {
      response.status shouldBe StatusCodes.OK
      responseAs[String] shouldBe "Email updated: a@b.com"
    }
  }
}
Lastly, let us run the complete example and use curl from command line to perform a real call to the REST endpoint, forcing all the parts of the application to work together.
$ curl -X PUT http://localhost:38080/api/v1/users/123 -d email=a@b.com
Email updated: a@b.com
Nice! The results are matching our expectations. Before we finish up with this part, there is one more thing: please notice that when application is restarted, the state will be restored for the users for which aggregates did already exist. Another way to say it using Akka Persistence specifics, if event journal has events/snapshots associated with actor's persistenceId (here is why it should be unique and permanent), the recovery process takes place by applying more recent snapshot (if any) and replaying events.

In this post we looked behind the curtain of event sourcing and CQRS, covering only the write (or command) part of the architecture. There are many things to work on like for example, email uniqueness validation, user latest state retrieval, ... which represent the read (or query) flow of the application and are going to be discussed in the consequent blog post. As for now, I hope that event sourcing and CQRS are a bit demystified and might become a part of your future or existing applications.

Final disclaimer: Akka HTTP is marked as experimental component at the moment (nonetheless the main goals stand still, the API may change slightly) while Akka Persistence just graduated out of experimental status with recent Akka's 2.4.0-RC1 release. Please be aware of that.

The complete project is available on GitHub. Many thanks to Regis Leray and Esfandiar Amirrahimi, two brilliant developers, for helping out a lot with this series of blog posts.

Thursday, April 30, 2015

ApacheCon North America 2015: where friends meet

I have had a great pleasure to attend ApacheCon NA 2015 this year, which took place in the city of Austin, TX. By all means, it was an awesome conference and there are some emerging trends and interesting ideas I would like to share in this post.

ApacheCon is somewhat different from any other conference out there. And the main difference is the diversity of projects and topics being presented. It is a strength and a weakness in the same time: companies prefer to send people to specialized conferences which are dedicated to particular area / technology (good examples of those are Strata, JavaOne, SpringOne, ...). From other side, such a diversity encourages communication where often a new endless ideas are born. I personally have had a whole bunch of terrific conversations and met a lot of great people whom I can proudly refer as my friends now.

Getting closer to the matter, this time the conference had a 7 parallel tracks squeezed into 3 full days. The vast majority of the talks were about big/fast data and containerization / cloud deployment. It was clear that people are very interested in those topics as mostly every big data talk gathered quite a numerous crowd. Among the "big names" I have seen guys from Cloudera, Red Hat, Google, eBay, Pivotal and Linkedin.

There were a lot of brilliant talks but there are a couple of them which I was able to attend and would like to mention in particular:

I was very lucky to give a talk Apache CXF, Tika and Lucene: The Power of Search the JAX-RS Way as well where I presented a very interesting combination of Apache CXF, Apache Tika, Apache Lucene and Apache Olingo working together to quickly integrate search capabilities into new or existing web APIs.

And last but not least, I have discovered that Luke project, very well-known tool in the Apache Lucene community, is looking for contributors to help to support it and move it forward. If you are interested, please join.

Huge thank you to Apache Software Foundation for organizing such a great conference this year and giving me the chance to attend it. Keep up doing this terrific work guys!