6 tips to speed up your Solr indexing jobs

If your Solr indexing jobs are too slow in your opinion and you are willing to invest some time and efforts to speed them up, I might have couple practical tips for you. I worked recently on reducing the execution time of a Solr indexing job processing approximately 100 millions items from 20+ hours to under 2 hours. Out of this experience, I would like to share 6 tips used for optimizing my Solr indexing job, with the hope at least one of them will help to optimize yours.

Introduction

Before deep diving into the customizations, let me share more context about the genesis of these tips. The Solr indexing job to optimize has been performing properly for couple years. Enhancements were requested to index more data, implemented and tested successfully until the Solr indexing job was ran during cutover on the pre-production environment: the indexing time increased dramatically, over a day. The first investigations demonstrated:

  • The volume of data to index has increased over years in the production and pre-production systems but not in the other central systems used for testing. Consequently, during development and early testing phases, the Solr indexing job never really ran in real conditions.
  • The number of calls to the database triggered by the indexing logic was extremely high and the recent enhancements contributed for a good part of it. Although each query was fast and around 1ms, the number of queries was so high, that it could never be fast.

The first set of customizations was focused on reducing the number of calls to the database: implement ValueResolver instead of FieldValueProvider interface, use dedicated DAO to retrieve data instead of accessing data via Model API, pre-load data for all batch items to index, etc. These optimizations definitely reduced significantly the indexing time but the indexing job did not scale: the more items were indexed, the more time it took to index one item. I tracked the indexing speed of the jobs (see my blog post for more details) and found out that the speed dropped over time. The reason was again too many queries being executed against the database, except that this time the queries originated from the caches, not the indexing logic. The second set of customizations was focused on actively managing caches to avoid them executing queries. Once all optimizations were in place, the Solr indexing job ran with high and constant indexing speed from start to finish.

Tip #1: bulk load data to index

Reducing the number of queries executed against the database is key to speed up your indexing logic. Consequently, your ValueResolver or FieldValueProvider implementation should load all data required for indexing with one query. The approach for this customization is to implement the following logic in the AbstractValueResolver.loadData() method:

  1. Load all data required by your ValueResolver implementation for the current batch. All items to be indexed by the current batch can be retrieved via IndexerBatchContext.getItems() method.
  2. Store all data in IndexerBatchContext attributes
  3. Supply the data for each indexed item from the IndexerBatchContext attribute

Note that you should migrate all your FieldValueProvider implementations to ValueResolver as it is otherwise not possible to implement the logic described above.

You can extend the AbstractValueResolver class as following to ease implementation of bulk loading in your ValueResolver implementations.

abstract class AbstractBulkValueResolver<T extends ItemModel, M, Q, K> extends AbstractValueResolver<T, M, Q> { private final String attribute; public AbstractBulkValueResolver(final String attribute) { super(); this.attribute = attribute; } @Override protected M loadData(final IndexerBatchContext batchContext, final Collection<IndexedProperty> indexedProperties, final T model) throws FieldValueProviderException { final Map<K, M> allData = (Map<K, M>)batchContext.getAttributes() .computeIfAbsent(attribute, (attribute) -> loadAllData(batchContext, indexedProperties)); final K key = getKey(model); return allData.get(key); } abstract protected Map<K, M> loadAllData(final IndexerBatchContext batchContext, final Collection<IndexedProperty> indexedProperties); abstract protected K getKey(T model);
}

You should then implement the bulk loading logic in each ValueResolver implementation like shown in the example below.

class ProductWarehouseCodeValueResolver extends AbstractBulkValueResolver<ProductModel, List<String>, List<String>, PK> { private FlexibleSearchService flexibleSearchService; public ProductWarehouseCodeValueResolver() { super("productWarehouseCodes"); } protected Map<PK, List<String>> loadAllData(final IndexerBatchContext batchContext, final Collection<IndexedProperty> indexedProperties) throws FieldValueProviderException { final FlexibleSearchQuery fQuery = new FlexibleSearchQuery( "SELECT {p.PK}, {w.code} FROM {Product AS p INNER JOIN StockLevel AS s ON {s.productCode} = {p.code} " + "INNER JOIN Warehouse AS w ON {w.pk} = {s.warehouse}} WHERE {pk.pk} IN (?products)" ); fQuery.addQueryParameter("products", batchContext.getItems().stream().map(ItemModel::getPk).collect(Collectors.toList())); fQuery.setDisableCaching(true); fQuery.setResultClassList(asList(PK.class, String.class)); return flexibleSearchService.<List<Object>>search(fQuery).getResult().stream().collect( Collectors.groupingBy(row -> (PK) row.get(0), Collectors.mapping(row -> (String) row.get(1), Collectors.toList()))); } protected PK getKey(final ProductModel product) { return product.getPk(); } [...]
}

The memory footprint of your Solr indexing jobs will increase as you cache data. You can control it through two parameters:

  • The batch size: the more items are indexed in a batch, the more data will likely be fetched and cached per batch.
  • The number of indexing threads: the more threads are configured, the more indexing batches are be executed in parallel, causing more data to be cached.

You might consequently want first to reduce the number of indexing threads and/or batch size to ensure your heap can support it and then gradually increase these parameters while monitoring the heap usage to find your optimal settings. Note though that you might have a maximum value for your batch size. In my example, the primary keys of the items to index are set as parameters in the IN clause of the query. Certain databases like Oracle limit the maximum number of parameters in the IN clause to 1,000. Other databases do not limit but suffer from performance degradation when the number of parameters exceed certain values.

Tip #2: adjust the prefetch size

When you query items through Flexible Search, the system queries first their primary keys and then retrieves their data in batches as you iterate through the results. The batch size is configurable via a session attribute called prefetchSize and is set to 100 by default. For example, if you query 500 Product items, it will perform in background 5 queries fetching data for 100 Product items each time. Generally, fetching 500 items at once takes sightly longer than fetching 100 items but significantly less than 5 times.

The Solr indexing job indexes items also in batches and the batch size can be configured in the index configuration. Ideally, the prefetchSize should be the same as the Solr index batch size during indexing to reduce the number of queries performed by the system. You can achieve that by extending the DefaultIndexerWorker as shown below.

public class MyIndexerWorker extends DefaultIndexerWorker { private IndexerWorkerParameters workerParameters; @Override public void initialize(final IndexerWorkerParameters workerParameters) { super.initialize(workerParameters); this.workerParameters = workerParameters; } @Override protected void initializeSession() { super.initializeSession(); final int prefetchSize = this.workerParameters.getPks().size(); getSessionService().setAttribute(FlexibleSearch.PREFETCH_SIZE, prefetchSize); }
}

If your ValueResolver implementations query items and you know that the query result size will be significantly greater than the Solr index batch size, then you might want to adjust the prefetch size there as well to reduce even more the number of executed queries like in the example below.

abstract class AbstractBulkValueResolver<T extends ItemModel, M, Q, K> extends AbstractValueResolver<T, M, Q> { private SessionService sessionService; protected <T> T executeWithPrefetchSize(final int prefetchSize, final Supplier<T> executable) { final int currentPrefetchSize = getSessionService().getAttribute(FlexibleSearch.PREFETCH_SIZE); final boolean adjustPrefetchSize = currentPrefetchSize != prefetchSize; if (adjustPrefetchSize) { getSessionService().setAttribute(FlexibleSearch.PREFETCH_SIZE, prefetchSize); } try { return executable.get(); } finally { if (adjustPrefetchSize) { getSessionService().setAttribute(FlexibleSearch.PREFETCH_SIZE, currentPrefetchSize); } } } [...]
} class ProductStockLevelValueResolver<P extends ProductModel> extends AbstractBulkValueResolver<P, List<StockLevelModel>, List<StockLevelModel>, String> { protected Map<String, List<StockLevelModel>> loadAllData(final IndexerBatchContext batchContext, final Collection<IndexedProperty> indexedProperties) throws FieldValueProviderException { // Assume that a product will have in average 3 stock level items associated final List<StockLevelModel> stockLevels = executeWithPrefetchSize(batchContext.getItems().size() * 3, () -> { final FlexibleSearchQuery fQuery = new FlexibleSearchQuery( "SELECT {s.PK} FROM {StockLevel AS s} WHERE {s.productCode} IN (?productCodes)" ); fQuery.addQueryParameter("productCodes", batchContext.getItems().stream() .map(ProductModel.class::cast) .map(ProductModel::getCode) .collect(Collectors.toList())); fQuery.setDisableCaching(true); return flexibleSearchService.<StockLevelModel>search(fQuery).getResult(); }); return stockLevels.stream().collect(Collectors.groupingBy(StockLevelModel::getProductCode)); } [...]
}

I would however recommend to not overthink it as the performance gains will most likely not worth the extra effort: you might reduce by 1 or 2 queries per batch, gaining few milliseconds per batch and most likely less than a minute in your overall indexing process.

Tip #3: remove indexed items from entity and query caches

This tip only applies if your entity and query caches hit their maximum size as your Solr indexing jobs are running. To understand how caches are impacting the indexing speed, let us dive in the platform. When you executes a Flexible Search query to fetch items, it caches the query as well as its result in the query cache unless caching is disabled. The result is a list of primary keys for the found items. As you navigate through the result, the system fetches the item data from the database in batches by executing the query SELECT * FROM <table> WHERE PK IN (?, ?, ...). The items with their data are cached in entity cache, while the batch query fetching the item data is cached in the query cache, even when caching is disabled in the Flexible Search query. When a cache hits its maximum size, it evicts items to make room to cache new items. If an evicted item is needed, the system will fetch again the data from the database executing SELECT * FROM <table> WHERE PK = ? and will cache the item, causing potentially the eviction of another item, if the cache is full.

Enough theory, let us see how it applies to indexing. When the Solr indexing job runs, parallel indexing workers fetch items to index through Flexible Search queries and fill consequently the entity and query caches. When the caches hit their maximum size, I could observe the following pattern. One indexing worker loads the batch items to index and causes eviction mainly in the entity cache as it is full. But it evicts items loaded by another indexing worker and not indexed yet. As this worker indexes these evicted items, it causes the system to reload them and … to evict items from other indexing worker. The indexing workers are working against each other and the system keeps reloading single items delaying the entire indexing process.

The problem can be solved in different ways. I decided to remove the indexed items from the entity and query caches to avoid that these caches reach their maximum size and the pattern described previously happens. I customized therefore the default batch indexing strategy to remove indexed items from the cache after each batch.

public class MyIndexerBatchStrategy extends DefaultIndexerBatchStrategy { public static final String PARAM_DISABLE_CACHING = "disableCaching"; private static final Logger LOG = LogManager.getLogger(MyIndexerBatchStrategy.class); private GenerationalCacheDelegate generationalCacheDelegate; @Override public void execute() throws InterruptedException, IndexerException { try { super.execute(); } finally { if (shouldRemoveIndexedItemsFromCache()) { removeIndexedItemsFromCache(); } } } protected boolean shouldRemoveIndexedItemsFromCache() { if (getIndexOperation() != IndexOperation.DELETE) { final IndexedType indexedType = this.getIndexedType(); if (indexedType != null) { return Boolean.valueOf(indexedType.getAdditionalParameters().get(PARAM_DISABLE_CACHING)); } } return false; } protected void removeIndexedItemsFromCache() { final long startTimestamp = System.currentTimeMillis(); final Cache cache = Registry.getCurrentTenantNoFallback().getCache(); final Object[] jaloItemCacheKey = new Object[]{Cache.CACHEKEY_JALOITEMCACHE, null}; final Object[] entityCacheKey = new Object[]{Cache.CACHEKEY_HJMP, Cache.CACHEKEY_ENTITY, null, null}; long evictedUnits = 0; for (final PK pk : this.getPks()) { entityCacheKey[2] = pk.getTypeCodeAsString(); entityCacheKey[3] = pk; final AbstractCacheUnit entityCacheUnit = cache.getUnit(new AnonymousCacheUnit(cache, entityCacheKey)); if (entityCacheUnit != null) { cache.removeUnit(entityCacheUnit); evictedUnits++; } jaloItemCacheKey[1] = pk; final AbstractCacheUnit jaloItemCacheUnit = cache.getUnit(new AnonymousCacheUnit(cache, jaloItemCacheKey)); if (jaloItemCacheUnit != null) { cache.removeUnit(jaloItemCacheUnit); evictedUnits++; } } final Object[] ejbFindByPkListCacheKey = new Object[] { Cache.CACHEKEY_HJMP, Cache.CACHEKEY_FIND, this.getPks().iterator().next().getTypeCodeAsString(), "ejbFindByPKList", Collections.unmodifiableList(Arrays.asList(new HashSet<>(this.getPks()))) }; final AbstractCacheUnit ejbFindByPkListCacheUnit = cache.getUnit(new AnonymousCacheUnit(cache, ejbFindByPkListCacheKey){ @Override public CacheKey getKey() { if (this.cacheKey == null) { final Object[] keyAsArray = this.getKeyAsArray(); this.cacheKey = getGenerationalCacheDelegate().getGenerationalCacheKey( new FinderResult.FinderResultCacheKey((String) keyAsArray[2], keyAsArray, cache.getTenantId()) ); } return this.cacheKey; } }); if (ejbFindByPkListCacheUnit != null) { cache.removeUnit(ejbFindByPkListCacheUnit); evictedUnits++; } final long endTimestamp = System.currentTimeMillis(); LOG.debug("Removed {} item(s) from cache within {}ms", evictedUnits, endTimestamp - startTimestamp); } protected GenerationalCacheDelegate getGenerationalCacheDelegate() { return generationalCacheDelegate; } @Resource public void setGenerationalCacheDelegate(GenerationalCacheDelegate generationalCacheDelegate) { this.generationalCacheDelegate = generationalCacheDelegate; }
}

Notice first that it evicts only the indexed items. It doesn’t evict any other items loaded by ValueResolver implementations during the indexing. Secondly, you should create an additional parameter named disableCaching and set to true in your index configuration in order to enable the logic. As it is configurable per index, you could for example decide to clean cache for all Solr indexing jobs except the product catalog one as keeping products in cache is critical for your performances.

Tip #4: don’t query large items, query their data

As I explained in the previous tip, fetching items through Flexible Search implies filling the entity and query caches, which, when full, can impact negatively the indexing speed. Consequently, you want to make sure none of your ValueResolver implementations will perform queries causing the entity or query caches to be full. The trick is to query the item data rather than the items themselves. For example, if you need to fetch the StockLevel items associated to the indexed products to index the available stock quantities, you should query directly the attribute available along with the product key rather than the StockLevel primary key.

Instead of implementing something like this:

protected Map<String, List<StockLevelModel>> loadAllData(final IndexerBatchContext batchContext, final Collection<IndexedProperty> indexedProperties) throws FieldValueProviderException { final FlexibleSearchQuery fQuery = new FlexibleSearchQuery( "SELECT {s.PK} FROM {StockLevel AS s} WHERE {s.productCode} IN (?productCodes)" ); fQuery.addQueryParameter("productCodes", batchContext.getItems().stream() .map(ProductModel.class::cast) .map(ProductModel::getCode) .collect(Collectors.toList())); fQuery.setDisableCaching(true); return flexibleSearchService.<StockLevelModel>search(fQuery).getResult() .stream().collect(Collectors.groupingBy(StockLevelModel::getProductCode));
}

You should rather implement something like this:

protected Map<String, List<Integer>> loadAllData(final IndexerBatchContext batchContext, final Collection<IndexedProperty> indexedProperties) throws FieldValueProviderException { final FlexibleSearchQuery fQuery = new FlexibleSearchQuery( "SELECT {s.productCode}, {s.available} FROM {StockLevel AS s} WHERE {s.productCode} IN (?productCodes)" ); fQuery.addQueryParameter("productCodes", batchContext.getItems().stream() .map(ProductModel.class::cast) .map(ProductModel::getCode) .collect(Collectors.toList())); fQuery.setDisableCaching(true); fQuery.setResultClassList(asList(String.class, Integer.class)); return flexibleSearchService.<List<Object>>search(fQuery).getResult() .stream().collect(Collectors.groupingBy(row -> (String)row.get(0), Collectors.mapping(row -> (Integer) row.get(1), Collectors.toList()));
}

If you query items with low cardinality in your system, like for example unit of measures or warehouses, and you can easily store all of them in the entity cache, then you do not need to adjust your queries and can continue to fetch items as whole.

Tip #5: group providers with similar logic

I see often on projects FieldValueProvider or ValueResolver implementations developed with a very specific purpose and indexing one specific information. It definitely helps to separate concerns but does not serve always indexing performances.

The ValueResolver interface introduced an important but often unnoticed optimization compared to the old FieldValueProvider interface. It can resolve the values for multiple indexed properties. When multiple indexed properties are configured with the same provider implementing the ValueResolver interface, the system calls the provider only once per item to index, with the list of indexed properties as parameter. Notice that the SolrIndexedProperty item offers the valueProviderParameter and valueProviderParameters attributes to configure the provider behavior.

For example, if you need to index the available stock quantities as well as the warehouse names for each product, you can achieve that with implementing a unique provider like the following one.

class ProductStockLevel { private ProductModel product; private WarehouseModel warehouse; private Integer available; // Getter and setter methods for the POJO [...]
} class ProductStockLevelValueResolver<P extends ProductModel> extends AbstractBulkValueResolver<P, List<ProductStockLevel>, List<ProductStockLevel>, String> { [...] @Override protected Map<String, List<ProductStockLevel>> loadAllData(final IndexerBatchContext batchContext, final Collection<IndexedProperty> indexedProperties) throws FieldValueProviderException { // Ensure all warehouses are loaded in the entity cache so that modelService.get() will hit the cache and not the database loadAllWarehouses(); // Load the stock level information the provider should be able to index (e.g. availability, warehouse) final FlexibleSearchQuery fQuery = new FlexibleSearchQuery( "SELECT {s.productCode}, {s.available}, {s.warehouse} FROM {StockLevel AS s} WHERE {s.productCode} IN (?productCodes)" ); fQuery.addQueryParameter("productCodes", batchContext.getItems().stream().map(ProductModel.class::cast).map(ProductModel::getCode).collect(Collectors.toList())); fQuery.setDisableCaching(true); fQuery.setResultClassList(asList(String.class, Integer.class, PK.class)); return flexibleSearchService.<List<Object>>search(fQuery).getResult().stream().collect( Collectors.groupingBy(row -> (PK) row.get(0), Collectors.mapping(row -> { final ProductStockLevel productStock = new ProductStockLevel(); productStock.setWarehouse(modelService.get((PK) row.get(2))); productStock.setAvailable((Integer) row.get(1)); return productStock; }, Collectors.toList()))); } protected void loadAllWarehouses() { // Fetch all warehouses to add them to the entity cache. Since the query will be cached, it will be executed // the first time and not the subsequent times (unless evicted from the cache). final FlexibleSearchQuery fQuery = new FlexibleSearchQuery("SELECT {w.PK} FROM {Warehouse AS w}"); fQuery.setDisableCaching(false); fQuery.setResultClassList(asList(WarehouseModel.class)); // Iterate through the result to force the lazy list to load all warehouses in the entity cache flexibleSearchService.<WarehouseModel>search(fQuery).getResult().forEach((warehouse) -> {}); } @Override protected String getKey(final ProductModel product) { return product.getCode(); } @Override protected void addFieldValues(final InputDocument inputDocument, final IndexerBatchContext batchContext, final IndexedProperty indexedProperty, final P product, final ValueResolverContext<List<ProductStockLevel>, List<ProductStockLevel>> valueResolverContext) throws FieldValueProviderException { final List<ProductStockLevel> productStockLevels = valueResolverContext.getData(); if (isNotEmpty(productStockLevels)) { final Expression expression = getSpelExpressionForIndexedProperty(batchContext, indexedProperty); inputDocument.addField(indexedProperty, productStockLevels.stream() .map((productStockLevel) -> expression.getValue(productStockLevel)) .filter(Objects::nonNull) .collect(Collectors.toList())); } } protected Expression getSpelExpressionForIndexedProperty(final IndexerBatchContext batchContext, final IndexedProperty indexedProperty) { // The Spel expression will be the same for all indexed items. Instead of parsing it over and over, it is cached // in the batch context (ideally, it should be parsed at the very beginning of the indexing process, cached in // the indexing context and retrieved from there any time needed). final Map<IndexedProperty, Expression> parsers = (Map<IndexedProperty, Expression>) batchContext.getAttributes().computeIfAbsent("spelExpressions", (key) -> new HashMap<>()); return parsers.computeIfAbsent(indexedProperty, (key) -> { String expression = key.getValueProviderParameter(); if (isBlank(expression)) { expression = key.getName(); } return new SpelExpressionParser().parseExpression(expression); }); } [...]
}

The ImpEx script for configuring your Solr indexed properties would then look like this:

INSERT_UPDATE SolrIndexedProperty; id[unique=true]; ... ; valueProvider; valueProviderParameter;
; available ; ... ; productStockLevelValueResolver ; available ;
; warehouseName ; ... ; productStockLevelValueResolver ; warehouse.name ;

Note that I used Spring Expression Language in this example to demonstrate the flexibility of the indexing framework but you can of course use different, more efficient and less complex mechanisms.

Tip #6: commit at the end

If you commit your index in Solr at the end of each batch, you might commit very often and Solr might slowdown your indexing process. Solr offers an auto-commit strategy, particularly useful when indexing large amount of data. In a nutshell, Solr can auto-commit either after a certain number of new documents were added to the index or a certain time has elapsed since the last commit. The auto-commit settings, shown below, can be configured in the solrconfig.xml file.

<config> [...] <updateHandler> [...] <autoCommit> <maxDocs>${solr.autoCommit.maxDocs:50000}</maxDocs> <maxTime>${solr.autoCommit.maxTime:-1}</maxTime> <openSearcher>false</openSearcher> </autoCommit> [...] </updateHandler> [...]
</config>

If you configure your index to commit at the end of the indexing process, the auto-commit can efficiently perform commits during the indexing process and limit the performance impact on your indexing process.

Finding the right value depends on your Solr server configuration and your indexing speed. You have to find the right balance between:

  • The size of your update log: the longer you delay the commit, the bigger will be your update log. You should look if you have enough disk space for that, especially if your indexes are taking a lot of space.
  • The restart time of your Solr server after a crash: when Solr crashes and restarts, it reprocesses the update log. The larger is the update log, the longer Solr will take to be operational. Since search is a critical element, you should ensure your Solr server can recover relatively quickly.
  • The indexing speed of your jobs: the indexing speed determines the rate at which documents are added or modified in the Solr server and at which the update log grows.

In my situation, I tried different values for the maxDocs setting and compared the speed of my indexing jobs to pick the best setting. I ended up setting the parameter to 500k, which is the amount of documents added/modified by the indexing job within 30 seconds.

Conclusion

SAP Commerce offers a very efficient indexing framework but certain edge case scenarios require to explore beyond the regular boundaries. Like all frameworks and platforms. If you are in this situation, these tips will hopefully help on your journey. They are ideas of ways to explore with some concrete examples to ease the investigations. Keep in mind it is absolutely possible, that none or many tips do not apply to your scenario. It is also not an exhaustive list and there are many more tips out there to optimize Solr indexing job. Consequently, feel free to share through comments or own blog post referencing this one your own experience and tips on optimizing your indexing jobs!