Bulk Indexing with the Elasticsearch Java REST Client
With the introduction of Elasticsearch v5.0.0, Elastic began to recommend moving to their new REST client for server interactions rather than the transport client that was shipped previously. The intent was to make a HTTP client which would act with little to no interaction with the internal Elasticsearch codebase, leading to something that was generally easier to maintain. Although this is much better as a consumer of the API (as you have to update your calls much less frequently), it does remove some of the grace functionality that was previously available in the older clients. In this post specifically, I'm going to address the lack of an equivalent to the BulkProcessor
in the newer client. The removal of an easy to use Bulk API interaction from the new client makes integration difficult as requests are not formed in the same way as the other JSON based calls (Elastic refers to the Content-Type
as nd-json
). Not only is formatting now more complicated, but you also need to take care of the logic of flushing on intervals, or buffering, etc.
Due to these new pains, I have created a new tool called the BulkOperator which acts as a similar implementation of the BulkProcessor
class but designed to work with the new REST client. It operates in much the same way as the predecessor you'll be used to, but also includes the utility classes needed to easily add requests to your action sets. Operators act as a very small wrapper to the Bulk API whilst saving you from having to think too hard about implementation details for your use cases. Operators can flush based on a schedule, the current buffer state, manually, or any combination of the above. This should allow you to migrate from your existing implementation without friction. It also includes control over the concurrency allowed in the operator, as well as lifecycle hooks to manage the handling of success/failure cases.
Operator API
The API is intentionally small in order to reduce integration points with your own code. All creation of operator components is done via builder methods, to make (this specific) code cleaner to read and to also reduce the amount of options being thrown into a constructor. For those interested, the builders are generated using Immutables. Construction of both an operator and operator actions is pretty straightforward and should be familiar to those of you who have used the API in the past (regardless of how you've consumed it). The following snippet demonstrates how easy it is to generate and execute a request via the Bulk API using an operator:
// construct your Elasticsearch client
RestClient restClient = createNewRestClient();
// create an operator to handle _bulk requests
BulkOperator operator = BulkOperator
.builder(restClient)
.concurrency(3) // controls the request concurrency
.interval(60_000) // controls the flush interval
.maxActions(10_000) // controls the maximum number of actions to buffer before flushing
.lifecycle(new CustomLifecycle()) // binds a lifecycle listener
.build();
// create an action to carry out
BulkAction action = BulkAction
.builder()
.operation("index")
.index("my_test_index")
.type("my_test_type")
.source("{\"test\":true}")
.build();
// attach it to the operator
operator.add(action);
It should be noted that the initial construction of an operator instance will automatically start and required flushing schedules, rather than needing to be explicitly started. I believe this just feels more natural and will be friendlier to use for most developers. An operator can and should be closed as needed, via the close()
method. Once closed, any further operations will be rejected.
Lifecycle Hooks
In the scenario above, we're also attaching a lifecycle listener which will fire based on certain stages of a request to the API. This is very similar to the feature provided in the older Elasticsearch client, and allows you to do things like log metrics on requests, handle failures, etc. As a bonus in our case, the operator instance executing the request is also passed into the listeners; allowing you to write lifecycle hooks for things like retry and backoff. The implemented BulkLifecycle
interface currently consists of a handful of methods which can act as hooks. To ensure that you don't hit compatibility clashes in future, you can extend NoopLifecycle
which will simply provided an empty implementation for all interface methods. In fact, using a NoopLifecycle
is the default behaviour, rather than passing around null
lifecycle instances. The following snippet shows an example lifecycle, which can then be passed to an operator at creation time.
public class CustomLifecycle implements BulkLifecycle {
/**
* This will fire immediately before a request is executed to the API.
*
* @param executionId
* The executionId parameter is simply an atomic counter, local
* to your operator, to make it easier to track requests throughout
* the lifecycle.
* @param operator
* The operator instance executing the request, in case you want
* to log out the reference or inspect the current state.
* @param operation
* The operation to be sent to the _bulk API, which has already
* been compiled into the correct formatting to allow for inspection.
*/
@Override
public void beforeBulk(long executionId, BulkOperator operator, BulkOperation operation) {
System.out.println("Executing bulk request [" + executionId + "]");
}
/**
* This will fire immediately after a successful request to the API.
*
* @param executionId
* The same executionId as the one provided in the other stages.
* @param operator
* The operator instance, just for reference (likely not used).
* @param operation
* Once again the operation just in case you need it for logging.
* @param response
* The raw response object coming back from Elasticsearch.
*/
@Override
public void afterBulk(long executionId, BulkOperator operator, BulkOperation operation, Response response) {
System.out.println("Bulk request [" + executionId + "] succeeded.");
}
/**
* This will fire immediately after a failed request to the API.
*
* @param executionId
* The same executionId as the one provided in the other stages.
* @param operator
* The operator instance, allowing you to requeue any failed actions.
* @param operation
* Once again the operation just in case you need it for logging.
* @param failure
* The exception thrown by the client during the failure.
*/
@Override
public void afterBulk(long executionId, BulkOperator operator, BulkOperation operation, Throwable failure) {
System.out.println("Bulk request [" + executionId + "] failed!");
}
}
Production Use
The code that initially comprises this library has been in production for around 7 months at the time of writing, with it first being introduced during our migration to Elasticsearch v5.0.0. The system it's running inside of typically handles traffic which resolves into _index requests at around 2000-3000/s, and we have never seen issues with indexing related to the operators (although we did have a fun bug with our requeue logic in a bad lifecycle implementation). Now that the functionality has been separated out into the open sourced version, the internal implementation has been replaced and now the version on the repo lives in production. As a bonus, this means that I have a particular interest in maintaining the library as needed.
Although it's fairly easy to hand roll something similar to this library, I felt it necessary to make available as it's a) non-obvious, and b) required by several projects I work on - so why not share? It's available on Maven Central (with details in the repository), so you should be able to grab it whether you use Maven, Gradle, or some other build tool. That's really all there is to it; feel free to try it out and let me know your suggestions for improvements, or any issues that you run into!