Indexing At Scale

This case study describes the indexing pipeline we implemented for the Share-VDE project, a collaborative discovery environment that gathers data from several bibliographic institutions.

One of the critical aspects of the project, which is the challenge we will describe in this article, is represented by the massive amount of data the system can handle. Such data is 

  • editable: editors can add, remove or update entities in the dataset 
  • searchable: the whole dataset is exposed through a rich set of Search API    

The datasource layer includes multiple storages, each of them with a specific purpose. Expressly, while data is primarily stored in PostgreSQL, search services are provided by Apache Solr.

Consequently, we had to move a consistent amount of data between the RDBMS and the Search Engine. That “massive movement” happens primarily in the following scenarios:

  • indexing from scratch: at the very beginning, for indexing the entire RDBMS content
  • massive delta indexing: institutions provides periodically small/medium datasets representing updated data
  • disaster-recovery: different purpose, same topic as above; everything should be (re)indexed from scratch    

The project

Share VDE

Share-VDE is a library-driven initiative that brings together the bibliographic catalogs and authority files of a community of libraries in a shared discovery environment based on linked data.

Share-VDE expanded its scope to embrace a wider community of institutions from the art and music domains, building the Share Family.

The Share family project is promoted by the library supplier Casalini Libri and @Cult, the software house focused on developing knowledge-sharing solutions.

The Share-VDE platform hosts the data of the following member institutions:

The institutions taking advantage of the Share-VDE Linked Data Lifecycle Support and contributing to the Share-VDE knowledge base of library data are:

The institutions that participated in the R&D phases are:

The idea: In a Nutshell

The following picture illustrates the indexing pipeline implementation.

Data is stored in PostgreSQL; it is extracted in CSV files through PostgreSQL COPY directives. Then, it flows through a Hadoop MapReduce pipeline which manipulates the text files and creates and sends the documents to a SolrCloud search infrastructure. 

A closer look

The flash paragraph above immediately provides an idea about the building blocks of the implementation. 

However, you may want to have a closer look at the role played by each component in the pipeline: that’s precisely the purpose of the following sections.  


Data is primarily stored in PostgreSQL: although the schema is quite complex, entities, properties, and things we want to search are stored on a few tables. 

Those tables have a shape that follows a flexible row-oriented approach: being a collaborative project where several institutions contribute to the knowledge base, the whole set of properties is not precisely known in advance; properties can be added or removed at runtime. 

For that reason, entities (people, in the example below), cannot be represented using a traditional table like this: 


The table-oriented / column-oriented approach suffers from the following 

  • the exact number of types is unknown in advance: and we must have a flexible structure that allows us to add/remove types. The design above would require creating/dropping tables, which is a kind of structural change we want to avoid in a production environment.
  • The “flexibility” requirement in the previous point can also be applied to properties: the properties of a given type can change over time. Again, a column-oriented approach like the above would require altering tables every time we work on a given property. We must have the possibility to add/remove/update a property definition at runtime
  • Even within the same entity type, properties are sparse: a given person could have a birthDate, an occupation, and a death date, while another could provide only a death place.

The following is instead the row-oriented approach we used for representing the same data:  


A table that lists the properties that each entity in the domain model can have. Each property is associated with a datatype, a cardinality, and some other metadata that qualifies the property. 

Properties are divided into three categories: 

  • attributes: literal values (like text, numbers, dates, boolean)
  • relationships: connections between existing entities
  • links: connections between entities and external references


For each entity in the system, there’s a row in this table. Entities have several metadata associated, like the identifier, the type (e.g. Agent, Work), and some other administrative data.  


The table captures the properties (the actual properties instances) associated with entities. 

Please remember what is described above is a simplification to illustrate the schema design’s approach. To give you an idea, besides the “property” table, there are two other main tables representing links and relationships.

The challenge has been: how to read the “indexable” data efficiently. It’s a lot of data; one of the drawbacks between the column and the row-oriented approach consists of a kind of “data explosion” in cardinality. 

See the example above: the “PERSON” table contains a couple of rows and four columns, eight cells in total; the same representation in the row-oriented approach takes two tables and more or less 40 cells (actually more, remember that is a simplification). 

We ran a couple of proofs of concept:

The verdict was more or less the same: a JDBC client that fetches such data from the database is not a reliable solution because it takes a lot of time. In addition, it requires implementing some async logic for dealing with the different speeds between readers and indexer workers.  

The PostgreSQL COPY Command

The PostgreSQL COPY command moves data between tables and files. It is a built-in tool, and it offers a very high export throughput, especially if the wrapped SELECT command is kept as simple as possible (e.g., no ORDER BY, GROUP BY, or DISTINCT clauses).

So far, so good. We don’t need any grouping or distinct result set, but what about the order? To build a document, everything (properties, relationships, links) belonging to a given entity should be together in the exported file; otherwise, it is impossible to know when the entity definition ends without scrolling the entire set, which we want to avoid.

In addition, the entity definition is spread across several tables: which requires us to think of an alternative approach for grouping things belonging to a given entity together in the export. In other words, there’s no a single COPY command for exporting things, and each command output has a separate result set, in a separate file, providing a certain number of properties for a given entity.   

Let’s quickly illustrate the matter. Here are the export commands:




Commands write their output in the standard out. The COPY command allows to indicate a file as output directly, but in that case, it is supposed to be a local file (i.e., the file is created in the database machine). That is a problem, especially if the database machine is not on-premise and does not provide SSH/SFTP access. It’s better to invoke the COPY command and get its standard output for creating a file somewhere (in AWS s3, in our case).   

Another thing you could wonder: there are multiple COPY commands instead of just one using a UNION because, in this way, we can execute them in parallel.

As said, the output is a set of CSV files (19, at the time of writing); a given entity will potentially have a partial definition in all of them.

How to process them efficiently to create the documents sent to Solr Consider each file, in a production environment, reaches a relevant size (between 150 GB and 200 GB depending on the content)  

Amazon Elastic MapReduce (EMR)

The input data of our indexing process is a perfect fit for a Hadoop MapReduce job:

  • a small number of big text files
  • text files are structured (CSV)
  • each row represents one entity property (e.g., an attribute, a relationship, a link)
  • the first field of each row is the entity identifier
  • rows are not ordered by entity identifier
  • a given entity identifier (i.e., rows belonging to a given entity) could appear in many files

Here’s an example of the inputs we have:

					> more attributes.dump


> more relationships.dump


> more links.dump


We leverage the map, shuffle, and sort capabilities of a MapReduce job that implements

  • a mapper that does a minimal manipulation
  • a reducer that creates, batches, and indexes documents 

Instead of relying on an on-premise Hadoop cluster, we used the Amazon Elastic MapReduce (EMR) managed service to directly focus on the system logic instead of dealing with infrastructure tasks. 

The mapper parses each row and separates it into a key (the entity identifier) and a value (the remaining part of the row). 

For example, the rows:



					1 => Person,257,Claire
2 => Person,327,Carnby
1 => Person,327,Redfield
1 => Person,128,1927
2 => Person,257,Edward
2 => Person,128,1992
2 => Person,201,Detective
1 => 828,377,484,2292
1 => 223,332,233,1290
2 => 828,112,992,9294
2 => 192,117,433,1027
1 => 111,212,243,998
1 => jlis,837,
1 => record,992,
1 => toc,123,
2 => cover,888,

The subsequent phase is called shuffle: data is transferred from mappers to reducers. 

Before starting of reducer, all intermediate key-value pairs generated by the mapper get sorted and grouped by key, and that’s exactly what we need for building our documents

Following the example above, a reducer will get called two times with the following inputs:

					1 => ["Person,257,Claire",


					2 => ["Person,327,Carnby",

The data above is all that we need to build the documents for Solr. 

Here’s a picture that summarizes the indexing pipeline: 

Apache Solr

The last puzzle piece is Apache Solr, the popular, open-source enterprise search platform built on Apache Lucene.

From an indexing perspective, the main challenge of the Solr cluster is still related to data, the initial size, and the expected growth rate. Solr requires creating the distributed index with a predefined number of slices (shards), so in a given moment, with a given amount of data and a given growth/increment rate, we should always make sure 

  • the cluster is well-balanced: each shard should manage a reasonable amount of data; “reasonable” here refers to the resources owned by the hosting node 
  • hardware resources (e.g., RAM, CPU, Disk) are appropriately allocated according to indexing and query throughput requirements 

In Share-VDE, data is expected to grow on a large scale: we started with a few million entities (15-20), and we know the production system will hold more or less two billion documents.

Estimating in advance a cluster that should hold such an amount of data is quite complex; the under/over estimation is a high risk with could lead to relevant consequences in terms of money. 

We have chosen an incremental approach:

  • plan the cumulative updates in terms of incoming data from institutions
  • start with X as the initial data size, Y as the expected growth rate until the cluster reaches Z
  • estimate and create a Solr cluster for holding the X-Z scenario
  • when the cluster reaches Z, it becomes the new X (new initial size), the loop starts at a higher level, and we reindex everything in the new bigger cluster

We found two significant benefits of using the approach above

  • Each cluster estimation is very tied to the data it holds 
  • Estimation iterations are very short     
  • We can understand, accurately estimate, and adapt the upper limit of each cluster instance, as soon as the data size increases  

Stay Out of Troubles

In this section we summarise a set of lesson learned from the project where we implemented all what described in this post. 

Java 8

Java 8 is the supported Java Virtual Machine (JVM) for cluster instances created using Amazon EMR release version 5.0.0 or later.

Although Java 8 is a bit old right now, that is not an issue in 99% of cases. 

The only thing is that you should be aware of that in advance, to avoid (like what happened to us) a consistent refactoring because your code uses some cool stuff introduced later. 


The atomic indexing unit in Apache Solr is called Document (actually a SolrInputDocument instance). Regardless of the way documents are received, once they arrive in Solr, they are indexed one by one.  

Clients can improve the overall indexing throughput by batching documents instead of sending them separately as soon as they are created.

There are several indexing client classes available in Solrj, the official Solr Java client, but apart from the ConcurrentUpdateSolrClient, which transparently accumulates documents and creates batches, the other SolrClient subclasses require the caller to take care of that batching work; CloudSolrClient, the client we use for targeting a SolrCloud is not an exception to that rule; in a massive update scenario, is always better to create a batch of documents like in the following example  

					var batch = new ArrayList();


One thing to remember: CloudSolrClient divides your batch into small sub-groups, each one targeting the corresponding leader shard. This is important for defining the batch size, which should consider the cluster size.  

Isolate indexing failures

Documents are always indexed one by one, regardless if they are sent in batches or not.

When batches of documents are sent to Apache Solr for indexing, by default the process stops at the first failure, therefore skipping the remaining documents. This behavior is counterintuitive because the operation result is unpredictable from a client’s perspective. 

In an ideal world, there should be zero indexing failures, but in reality, errors happen for many reasons, that are not always under our direct control.

Let’s see an example. We have the following document list:

[ d1, d2, d3, d4, d5]

The actual sub-set of documents that are indexed changes depending on where the first failure occurs: if it happens at d1, no document will be indexed; if it happens on d2, only d1 will be indexed, and so on.

Being documents isolated and disjoint units, what we needed for our use case was a more “lenient” behavior where:

  • all failed documents are logged (not just the first one)
  • the batch is processed entirely, without stopping at first failure

Apache Solr lets you customize the indexing pipeline using components called UpdateRequestProcessors. Fortunately, the processor that injects the behavior we need is part of the Solr distribution; its name is TolerantUpdateRequestProcessor; we just need to configure it:

What happens in case of one or multiple failures? The response will be always OK but it will contain a section that reports the failures. For each failed document there’s an entry in the response which includes:

  • the unique key value
  • the operation type (e.g. ADD, DELETE)
  • the error message 

That interaction allows collecting every failure without having the whole batch fail. Every failure is then persisted in a database table and, as a fallback in the case of SQL exceptions, in a log file.  

Avoid sending Unnecessary Data

I understand, at first sight, that the advice could sound a bit trivial.

As described above, the way we captured the data model in the database is generic. That is a typical choice when dealing with several domains or where entities are not precisely known in advance in terms of types, attributes, and relationships.

You will likely find a similar approach in Content Management systems (e.g., Alfresco, Nuxeo, Drupal, Magento).

The drawback of the approach is that you do not have static types or static definitions of types: things change depending on

  • the dictionary: which properties have been defined in a given moment?
  • The data: which properties concretely have a value in the dataset?

After the first ingestion in our staging environment, we noticed a lot of fields that were useless from a search perspective, fields that, in other words, don’t encapsulate any relevance signal.

We, therefore, excluded those fields from being indexed, using a mix of approaches:


As part of each property attribute definition, in our “dictionary” table, we added an “indexable” column. Consequently, the COPY commands used for exporting data take into account that column, therefore filtering out things we don’t want to index.

Indexing Client

Fields we index in Solr have a dynamically built name, starting from the property name in the database dictionary. Without entering implementation details,

 a property (example) called “birthPlace” in the dictionary becomes in Solr a field whose name is something like this: dil_eng_birthPlace.

In addition to that, the cardinality between a dictionary property and Solr fields is usually 1-to-many. This is because what is logically defined as a single property in the dictionary (e.g., birthPlace) generates several fields in Solr, each of them associated with a purpose (e.g., search, faceting, sorting). 

In the indexing client, the mechanism that creates and populates the Solr fields starting from a property is generic, so we often were in a situation where the field itself was marked as “indexable” but we weren’t interested in features like faceting or sorting. In those cases, we don’t want to add useless fields to the documents sent to Solr.  

The indexing client has a configurable list of fields not added to built documents.

Apache Solr 

The two remaining places where we defined the exclusions are both defined in Apache SolrThe first one is again an UpdateRequestProcessor, and from a functional perspective, it follows the same approach we already do in the indexing client:

The difference is that fields are removed once they arrive in Solr. For that reason, most probably we will remove this component in favour of adding things on the client side (therefore avoiding sending useless data to Solr)

Last but not least, in the Solr schema, we have a dynamic field that acts as a last-chance fallback:  

With or without Solr?

Creating a SolrCloud infrastructure isn’t a trivial thing, so before experimenting with a real cluster, we added a couple of parameters in our MapReduce job for emulating the blocking Solr call:

  • a flag that disables the Solr interaction. The MapReduce job builds and collects documents, then instead of sending batches, it does simply nothing
  • the number of msecs used for emulating the Solr call. The calling thread stops for that amount of time. 
Hadoop Specific configurations

Surprisingly, we didn’t change so much the Hadoop configuration; that probably means default values perform quite well, at least in the benchmarks we’ve ran so far.

Settings we changed are the following: 

  •  dfs.blocksize: as soon as data grows (3 GB, 30 GB, 300 GB), we found some minor benefit in increasing the block size from 128 to 512 MB 
  • mapreduce.reduce.merge.inmem.threshold (-1), (300): these changes reduced the total spilled records, initially reported as too high (3 times the map output records)
As said, even using the default values without any change, the results were a bit improved, not so much different from what you will see in the next section.


Scenario #1: small documents

Here’s the information about the environment we set up for the first scenario. 

  • Documents: two test blocks, the first 20 million, the second 120 million
  • Average document size in the index: 4 KB in the first block, 7-9 KB in the second block
  • Average number of fields per document: 24 in the first block, 45 in the second block
  • Amazon EMR Cluster
    • Master: 1 m4.large (2 vCore, 8 GiB memory), EBS: 64 GiB
    • Core: 10 m4.large (same as above)
    • Number of reducers: 10
  • Apache Solr:
    • 5 EC2 nodes in three incremental iterations  
      • r5a.large (2 vCPUs, RAM 16 GB)
      • m5a.xlarge (4 vCPUs, RAM 16 GB)
      • c5a.2xlarge (8 vCPUs, RAM 16 GB) 
    • shards: 5 
    • replication factor: 2
Before targeting a real Solr cluster, we ran some experiments using the and parameters described above. The following picture illustrates the results: v

We ran the actual indexing targeting the SolrCloud described above using several node configurations: 2, 4, and 8 CPU (per node). 

The first configuration, 2 CPUs, was not enough: apart from the duration, which was very high, the cluster behavior was unstable and unreliable; it couldn’t manage the input load coming from the ten reducers.

Increasing the number of CPUs per node from 2 to 4 improved the process: few errors and a trend that followed the chart above. The problem was the overall throughput: the duration of the indexing pipeline was between 46 and 49 minutes, indicating that the Solr communication was taking more or less 70-75 msecs per call.  

We increased the number of CPUs again from 4 to 8, and finally, we got an interesting result: 20 million documents in 8 minutes without any error. Very important, we also got the same rate using the second test dataset (120 million) 

Using the chart above, we can estimate an average of 11 msecs per Solr call, which we consider a good number.  

Scenario #2: Real documents

As described in the first part of this article, the dump files we export from PostgreSQL are 19. Consequently, the actual resulting (average) document definition is a bit bigger than the previous scenario.

The second benchmark uses all those files. It is important to understand that documents 

  • are the same in terms of cardinality
  • have a more significant number of fields/values 

the target search infrastructure is quite similar to the previous benchmark:

  • 5 EC2 nodes in three incremental iterations  
    • m5a.xlarge (4 vCPUs, RAM 16 GB)
    • c5a.2xlarge (8 vCPUs, RAM 16 GB) 
    • c5a.4xlarge (16 vCPUs, RAM 32 GB)
  • shards: 5 
  • replication factor: 2

We had to discard the r5a.large nodes because they were not enough, even for a minimal scenario; instead, we increased from c5a.2xlarge to c5a.4xlarge in the last step, which led us to very good results.

Here is the diagram which compares the different throughput between small (previous benchmark) and real documents (this benchmark).    

The red line is translated up a bit, mainly because of the size of the input data: mappers take 2 minutes to process 3 GB of data and 10 minutes for 30 GB. After doubling the block size (from 128 MB to 256 MB), 30 GB took more or less 8 minutes. This is very good: a signal that our mappers scale very well.   

Reducers are a different topic: because of the blocking Solr I/O call, they represent a bottleneck: we had to increase the CPUs per node from 8 to 16 (32 GB of RAM), which is definitely a lot in terms of resources and costs.


The Share-VDE project is still in progress, and the indexing iterations, too. For that reason, we will keep updating the 

  • “Benchmarks” paragraph as soon as we collect new data
  • “Stay Out of Trouble” section for each tip or trick that can bring even a minor benefit  

In summarizing, the output of everything above is that the process/approach works, scales, and it can give good throughputs. However, as expected, 

There ain’t no such thing as a free lunch

and in this specific case, it means the critical factor is a tradeoff between 

  • throughput
  • resources (i.e., costs)

As said, we will keep updating this article with new data; in the meantime, any feedback/question is warmly welcome.

Share this post

Leave a Reply