SpazioCodice

indicizzazione su larga scala

Indicizzazione Su Larga Scala

In questo caso di studio descriviamo il processo di indicizzazione che è stato implementato per il progetto Share-VDE, un ambiente di discovery collaborativo che raccoglie i dati provenienti da numerose biblioteche americane e del Nord Europa.

Uno degli aspetti critici del progetto, che è poi l’oggetto del caso di studio descritto in questo articolo, è rappresentato dall’enorme quantità di dati che il sistema deve gestire. La gestione consiste

  • nell’inserire, modificare e cancellare i dati presenti nel dataset 
  • nella possibilità di effettuare delle ricerche su tali dati     

 

La gestione del dato coinvolge differenti prodotti, ognuno dei quali con le sue peculiarità. Nello specifico

  • il sistema di storage primario è rappresentato dall’RDBMS, PostgreSQL
  • i servizi di ricerca sono forniti da Apache Solr  

 

Di conseguenza, è necessario muovere una quantità consistente di dati tra uno storage e l’altro nelle seguenti casistiche 

  • scenario iniziale / disaster recovery: l’intero dataset, ossia l’intero contenuto della base dati deve essere indicizzato
  • scenario incrementale: le istituzioni forniscono periodicamente dei dataset incrementali che includono le modifiche apportate ai dati in un certo periodo

Il Progetto

Share VDE

Share-VDE è una iniziativa guidata dalle biblioteche per formare e gestire un catalogo bibliografico comunitario in un unico ambiente centralizzato basato sui linked data.

Il progetto, nato con un focus su dati bibliografici e di autorità, ha allargato con il tempo i suoi confini per includere altri domini come arti, musica. L’insieme dei vari domini e partecipanti viene chiamato Share Family.

La piattaforma Share Family è stata ideata, promossa e sviluppata dalla Casalini Libri e da @Cult, entrambi altamente specializzati nello sviluppo e gestione software in ambito bibliografico e biblioteconomico. 

La piattaforma ospita i dati delle seguenti istituzioni:

 

Il progetto include inoltre un numero di istituzioni che beneficiano e contribuiscono alla base di conoscenza del progetto con alcune limitazioni 

 

ed infine istituzioni che partecipano alle fasi di ricerca e sviluppo che ruotano attorno al progetto:

L'idea, in breve

Il seguente schema illustra il processo di indicizzazione dei dati:

I dati sono memorizzati sul database relazionale; l’estrazione, in file CSV, avviene tramite la direttiva COPY, disponibile nativamente in PostgreSQL. Successivamente, i file vengono processati tramite un processo MapReduce su un cluster Hadoop che manipola il loro contenuto al fine di creare e indicizzare i documenti Solr

Uno sguardo più da vicino

Il paragrafo precedente riassume in due parole come funziona il processo di indicizzazione e quali sono le componenti base che ne fanno parte. 

In questa sezione, daremo uno sguardo più da vicino al ruolo di ognuno di quei componenti.

PostgreSQL

Come abbiamo detto PostgreSQL, il database relazionale, è la fonte primaria della base di conoscenza del progetto: sebbene l’intero schema sia alquanto complesso, le tabelle che ospitano le entità con i loro attributi sono decisamente poche. 

Tali tabelle seguono una logica row-oriented simile a quella che si trova nei Content Management System: trattando il progetto di una serie di strumenti collaborativi, la definizione delle entità nonché l’insieme delle proprietà disponibili per ogni tipologia di entità non è noto a priori: tali proprietà devono poter essere infatti aggiunte o rimosse con una certa flessibilità.

Per questa ragione, le entità (una persona, nell’esempio di sotto) non vengono rappresentate utilizzando delle tabelle dedicate come questa:

TABELLA PERSON

Un approccio del genere, soffre dei seguenti svantaggi (in questo contesto):

  • il numero esatto dei tipi non è noto in anticipo. Oltre che per le proprietà, è richiesto un certo grado di flessibilità anche nell’aggiungere (e rimuovere) i tipi. Se utilizziamo una tabella per rappresentare un tipo, come nell’esempio di sopra, creare un nuovo tipo (o rimuoverlo) implica una modifica strutturale allo schema del database.
  • Come detto, la flessibilità è richiesta anche per le proprietà. Di nuovo, utilizzando l’approccio di sopra (e semplificando), ogni proprietà corrisponde ad una colonna, e pertanto aggiungere o rimuovere proprietà implica modifiche strutturali (ALTER TABLE) allo schema.
  • Persino all’interno della stessa tipologia di entità, le proprietà possono essere definite in modo sparso. L’approccio per tabelle provocherebbe delle righe formalmente tutte identiche ma con molti valori null.

L’approccio seguito per rappresentare i dati è invece row-oriented, come nell’esempio seguente: 

TABELLA DICTIONARY

Una tabella che elenca le proprietà che ogni entità nel dominio può avere. Ogni proprietà è associata con un tipo di dati, una cardinalità, e altri metadati che qualificano la proprietà stessa. 

Dividiamo le proprietà in tre categorie: 

  • attribute: valori letterali come testo, numeri, date, booleani
  • relazioni: connessioni tra entità Share-VDE esistenti
  • collegamenti: connessioni tra entità Share-VDE e riferimenti esterni

TABELLA: ENTITY

Per ogni entità nel sistema, la tabella contiene una riga. Le entità hanno un set di metadati associati, come l’identificativo univoco, il tipo (e.g. Agent, Work) e altri metadati amministrativi. 

TABELLA: PROPERTY

La tabella contiene le proprietà, ossia i loro valori associati alle entità di riferimento. 

Da notare che quanto descritto sopra è una semplificazione, al fine di illustrare un’idea ad alto livello dell’approccio. 

Una volta deciso come organizzare i dati all’interno del database, il punto da dirimere è stato come leggere ed indicizzare in maniera efficiente questo enorme ammontare di dati. Trattandosi di una quantità considerevole di dati, il lato negativo dell’approccio scelto (property-based) consiste nell’esplosione di dati, in termini di cardinalità. 

Abbiamo utilizzato, in maniera prototipale, tre differenti approcci:

Il verdetto è stato più o meno lo stesso in tutti i casi: un client JDBC che legge i dati direttamente dal database non è una soluzione scalabile perché il fetch dei dati prende veramente tanto tempo

Il comando PostgreSQL "COPY"

Il comando PostgreSQL COPY muove i dati tra le tabelle e i file. E’ un tool nativamente disponibile su PostgreSQL e offre buone prestazioni, sopratutto se le SELECT utilizzate negli export sono molto semplici (e.g., niente ORDER BY, GROUP BY, o DISTINCT). Ma questa condizione è ottima nel nostro caso perché non abbiamo bisogno di un ordine particolare delle proprietà esportate. 

Ovviamente noi dobbiamo raggruppare tali proprietà per entità, ma come vedremo, questo “ordinamento” viene fornito da Hadoop, nello specifico dallo Shuffling che avviene tra la fase di Map e Reduce.

Ecco un esempio di comando COPY:  

				
					COPY
(SELECT ... FROM PROPERTY ...)
TO STDOUT (DELIMITER u&'\001E');

COPY
(SELECT ... FROM RELATIONSHIPS ...)
TO STDOUT (DELIMITER u&'\001E');

COPY
(SELECT ... FROM LINKS ...)
TO STDOUT (DELIMITER u&'\001E');
				
			

COPY scrive il suo output sullo standard out ma permette anche di indicare un file come destinazione. Da precisare che questo file verrà prodotto localmente alla macchina dove gira PostgreSQL; questa cosa può essere un problema, specialmente se il database gira in cloud, dove non abbiamo la possibilità di accedere al filesystem attraverso SFTP o SSH.

Decisamente meglio invocare il comando utilizzando lo standard out, redirezionare il tutto su un file locale e spedirlo poi da qualche parte (su AWS S3, nel nostro caso). 

Piuttosto che lanciare un unico comando consistente in molteplici SELECT concatenate con una clausola UNION, abbiamo notato che è preferibile lanciare in parallelo più comandi COPY, ognuno con una specifica select. Il risultato saranno molteplici files (uno per comando) che verranno spediti tutti su un bucket S3.    

Amazon Elastic MapReduce (EMR)

I file CSV (gli export prodotti come descritto nella sezione precedente) che si trovano su S3 costituiscono l’input ideale per un job Hadoop MapReduce:

  • un numero ridotto di file, di grandi dimensioni
  • strutturati (sono CSV)
  • ogni riga rappresenta una proprietà di una data entità
  • il primo campo di ogni riga rappresenta l’identificativo univoco assegnato ad ogni entità
  • le righe non sono ordinate in alcun modo particolare
  • la definizione di una entità (i.e., l’insieme delle sue proprietà) può trovarsi all’interno di diversi file
 

Ecco un esempio dei file di input a disposizione:

				
					> more attributes.dump

1,Person,257,Claire
2,Person,327,Carnby
1,Person,327,Redfield
1,Person,128,1927
2,Person,257,Edward
2,Person,128,1992
2,Person,201,Detective
...

> more relationships.dump

1,828,377,484,2292
1,223,332,233,1290
2,828,112,992,9294
2,192,117,433,1027
1,111,212,243,998

> more links.dump

1,abstract,837,https://example.org/02383.pdf
1,record,992,https://somewhere.org/records#88823
1,toc,123,https://oa.net/docs/8283.pdf
2,cover,888,https://pics.org/xyz/8348343.png
...
				
			

Utilizziamo un job MapReduce, e lasciamo che nella fase di shuffle, le righe vengano ordinate per entità. L’implementazione del job è minimalistica e veramente semplice: 

  • un mapper che effettua una prima manipolazione dei CSV
  • un reducer che crea e raggruppa in batch, i documenti da indicizzare  

Invece che provare la cosa su un cluster Hadoop installato in locale, Amazon Elastic MapReduce (EMR) fornisce una valida alternativa, in quanto è possibile con pochi click avere a disposizione un insieme anche vasto di macchine sul quale far eseguire il nostro job distribuito. 

Il mapper analizza ogni riga e separa la chiave (l’identificativo dell’entità) dai valori e metadati (rimanente parte della riga). 

Per esempio, il seguente contenuto:

				
					1,Person,257,Claire
2,Person,327,Carnby
1,Person,327,Redfield
1,Person,128,1927
2,Person,257,Edward
2,Person,128,1992
2,Person,201,Detective
...
1,828,377,484,2292
1,223,332,233,1290
2,828,112,992,9294
2,192,117,433,1027
1,111,212,243,998
...
1,jlis,837,https://example.org/02383.pdf
1,record,992,https://somewhere.org/records#88823
1,toc,123,https://oa.net/docs/8283.pdf
2,cover,888,https://pics.org/xyz/8348343.png
...
				
			

diventa

				
					1 => Person,257,Claire
2 => Person,327,Carnby
1 => Person,327,Redfield
1 => Person,128,1927
2 => Person,257,Edward
2 => Person,128,1992
2 => Person,201,Detective
...
1 => 828,377,484,2292
1 => 223,332,233,1290
2 => 828,112,992,9294
2 => 192,117,433,1027
1 => 111,212,243,998
...
1 => jlis,837,https://example.org/02383.pdf
1 => record,992,https://somewhere.org/records#88823
1 => toc,123,https://oa.net/docs/8283.pdf
2 => cover,888,https://pics.org/xyz/8348343.png
...
				
			

Nella fase successiva, chiamata shuffle, le chiavi vengono ordinate e il contenuto risultante viene passato ai reducer. L’input in ingresso dei reducer non consiste più in una coppia chiave-valore, bensì in una coppia chiave-lista di valori (appartenenti tutti alla stessa chiave). 

Seguendo l’esempio di sopra, ecco che l’entità con identificativo 1 viene passato al reducer dopo la fase di shuffle: 

				
					1 => ["Person,257,Claire",
      "Person,327,Redfield",
      "Person,128,1927",
      "828,377,484,2292",
      "223,332,233,1290",
      "111,212,243,998",
      "jlis,837,https://example.org/02383.pdf",
      "record,992,https://somewhere.org/records#88823",
      "toc,123,https://oa.net/docs/8283.pdf"]
				
			

questa invece è l’entità con identificativo 2

				
					2 => ["Person,327,Carnby",
      "Person,257,Edward",
      "Person,128,1992",
      "Person,201,Detective",
      "828,112,992,9294",
      "192,117,433,1027",
      "cover,888,https://pics.org/xyz/8348343.png"
				
			

I dati di sopra sono tutto ciò di cui abbiamo bisogno per creare un documento Solr e indicizzarlo. Ecco un diagramma che illustra a grandi linee quanto descritto.

Apache Solr

The last puzzle piece is Apache Solr, the popular, open-source enterprise search platform built on Apache Lucene.

From an indexing perspective, the main challenge of the Solr cluster is still related to data, the initial size, and the expected growth rate. Solr requires creating the distributed index with a predefined number of slices (shards), so in a given moment, with a given amount of data and a given growth/increment rate, we should always make sure 

  • the cluster is well-balanced: each shard should manage a reasonable amount of data; “reasonable” here refers to the resources owned by the hosting node 
  • hardware resources (e.g., RAM, CPU, Disk) are appropriately allocated according to indexing and query throughput requirements 

In Share-VDE, data is expected to grow on a large scale: we started with a few million entities (15-20), and we know the production system will hold more or less two billion documents.

Estimating in advance a cluster that should hold such an amount of data is quite complex; the under/over estimation is a high risk with could lead to relevant consequences in terms of money. 

We have chosen an incremental approach:

  • plan the cumulative updates in terms of incoming data from institutions
  • start with X as the initial data size, Y as the expected growth rate until the cluster reaches Z
  • estimate and create a Solr cluster for holding the X-Z scenario
  • when the cluster reaches Z, it becomes the new X (new initial size), the loop starts at a higher level, and we reindex everything in the new bigger cluster

We found two significant benefits of using the approach above

  • Each cluster estimation is very tied to the data it holds 
  • Estimation iterations are very short     
  • We can understand, accurately estimate, and adapt the upper limit of each cluster instance, as soon as the data size increases  

Stay Out of Troubles

In this section we summarise a set of lesson learned from the project where we implemented all what described in this post. 

Java 8

Java 8 is the supported Java Virtual Machine (JVM) for cluster instances created using Amazon EMR release version 5.0.0 or later.

Although Java 8 is a bit old right now, that is not an issue in 99% of cases. 

The only thing is that you should be aware of that in advance, to avoid (like what happened to us) a consistent refactoring because your code uses some cool stuff introduced later. 

Batching

The atomic indexing unit in Apache Solr is called Document (actually a SolrInputDocument instance). Regardless of the way documents are received, once they arrive in Solr, they are indexed one by one.  

Clients can improve the overall indexing throughput by batching documents instead of sending them separately as soon as they are created.

There are several indexing client classes available in Solrj, the official Solr Java client, but apart from the ConcurrentUpdateSolrClient, which transparently accumulates documents and creates batches, the other SolrClient subclasses require the caller to take care of that batching work; CloudSolrClient, the client we use for targeting a SolrCloud is not an exception to that rule; in a massive update scenario, is always better to create a batch of documents like in the following example  

				
					var batch = new ArrayList();
...
batch.add(doc1);
batch.add(doc2);
...
batch.add(docn);

solrClient.add(batch);
				
			

One thing to remember: CloudSolrClient divides your batch into small sub-groups, each one targeting the corresponding leader shard. This is important for defining the batch size, which should consider the cluster size.  

Isolate indexing failures

Documents are always indexed one by one, regardless if they are sent in batches or not.

When batches of documents are sent to Apache Solr for indexing, by default the process stops at the first failure, therefore skipping the remaining documents. This behavior is counterintuitive because the operation result is unpredictable from a client’s perspective. 

In an ideal world, there should be zero indexing failures, but in reality, errors happen for many reasons, that are not always under our direct control.

Let’s see an example. We have the following document list:

[ d1, d2, d3, d4, d5]

The actual sub-set of documents that are indexed changes depending on where the first failure occurs: if it happens at d1, no document will be indexed; if it happens on d2, only d1 will be indexed, and so on.

Being documents isolated and disjoint units, what we needed for our use case was a more “lenient” behavior where:

  • all failed documents are logged (not just the first one)
  • the batch is processed entirely, without stopping at first failure

Apache Solr lets you customize the indexing pipeline using components called UpdateRequestProcessors. Fortunately, the processor that injects the behavior we need is part of the Solr distribution; its name is TolerantUpdateRequestProcessor; we just need to configure it:

What happens in case of one or multiple failures? The response will be always OK but it will contain a section that reports the failures. For each failed document there’s an entry in the response which includes:

  • the unique key value
  • the operation type (e.g. ADD, DELETE)
  • the error message 

That interaction allows collecting every failure without having the whole batch fail. Every failure is then persisted in a database table and, as a fallback in the case of SQL exceptions, in a log file.  

Avoid sending Unnecessary Data

I understand, at first sight, that the advice could sound a bit trivial.

As described above, the way we captured the data model in the database is generic. That is a typical choice when dealing with several domains or where entities are not precisely known in advance in terms of types, attributes, and relationships.

You will likely find a similar approach in Content Management systems (e.g., Alfresco, Nuxeo, Drupal, Magento).

The drawback of the approach is that you do not have static types or static definitions of types: things change depending on

  • the dictionary: which properties have been defined in a given moment?
  • The data: which properties concretely have a value in the dataset?

After the first ingestion in our staging environment, we noticed a lot of fields that were useless from a search perspective, fields that, in other words, don’t encapsulate any relevance signal.

We, therefore, excluded those fields from being indexed, using a mix of approaches:

Database

As part of each property attribute definition, in our “dictionary” table, we added an “indexable” column. Consequently, the COPY commands used for exporting data take into account that column, therefore filtering out things we don’t want to index.

Indexing Client

Fields we index in Solr have a dynamically built name, starting from the property name in the database dictionary. Without entering implementation details,

 a property (example) called “birthPlace” in the dictionary becomes in Solr a field whose name is something like this: dil_eng_birthPlace.

In addition to that, the cardinality between a dictionary property and Solr fields is usually 1-to-many. This is because what is logically defined as a single property in the dictionary (e.g., birthPlace) generates several fields in Solr, each of them associated with a purpose (e.g., search, faceting, sorting). 

In the indexing client, the mechanism that creates and populates the Solr fields starting from a property is generic, so we often were in a situation where the field itself was marked as “indexable” but we weren’t interested in features like faceting or sorting. In those cases, we don’t want to add useless fields to the documents sent to Solr.  

The indexing client has a configurable list of fields not added to built documents.

Apache Solr 

The two remaining places where we defined the exclusions are both defined in Apache SolrThe first one is again an UpdateRequestProcessor, and from a functional perspective, it follows the same approach we already do in the indexing client:

The difference is that fields are removed once they arrive in Solr. For that reason, most probably we will remove this component in favour of adding things on the client side (therefore avoiding sending useless data to Solr)

Last but not least, in the Solr schema, we have a dynamic field that acts as a last-chance fallback:  

With or without Solr?

Creating a SolrCloud infrastructure isn’t a trivial thing, so before experimenting with a real cluster, we added a couple of parameters in our MapReduce job for emulating the blocking Solr call:

  • dry.run: a flag that disables the Solr interaction. The MapReduce job builds and collects documents, then instead of sending batches, it does simply nothing
  • dry.run.io.wait: the number of msecs used for emulating the Solr call. The calling thread stops for that amount of time. 
Hadoop Specific configurations

Surprisingly, we didn’t change so much the Hadoop configuration; that probably means default values perform quite well, at least in the benchmarks we’ve ran so far.

Settings we changed are the following: 

  •  dfs.blocksize: as soon as data grows (3 GB, 30 GB, 300 GB), we found some minor benefit in increasing the block size from 128 to 512 MB 
  • mapreduce.reduce.merge.inmem.threshold (-1),  mapreduce.task.io.sort.mb (300): these changes reduced the total spilled records, initially reported as too high (3 times the map output records)
As said, even using the default values without any change, the results were a bit improved, not so much different from what you will see in the next section.

Benchmarks

Scenario #1: small documents

Here’s the information about the environment we set up for the first scenario. 

  • Documents: two test blocks, the first 20 million, the second 120 million
  • Average document size in the index: 4 KB in the first block, 7-9 KB in the second block
  • Average number of fields per document: 24 in the first block, 45 in the second block
  • Amazon EMR Cluster
    • Master: 1 m4.large (2 vCore, 8 GiB memory), EBS: 64 GiB
    • Core: 10 m4.large (same as above)
    • Number of reducers: 10
  • Apache Solr:
    • 5 EC2 nodes in three incremental iterations  
      • r5a.large (2 vCPUs, RAM 16 GB)
      • m5a.xlarge (4 vCPUs, RAM 16 GB)
      • c5a.2xlarge (8 vCPUs, RAM 16 GB) 
    • shards: 5 
    • replication factor: 2
Before targeting a real Solr cluster, we ran some experiments using the dry.run and dry.run.iowait parameters described above. The following picture illustrates the results: v

We ran the actual indexing targeting the SolrCloud described above using several node configurations: 2, 4, and 8 CPU (per node). 

The first configuration, 2 CPUs, was not enough: apart from the duration, which was very high, the cluster behavior was unstable and unreliable; it couldn’t manage the input load coming from the ten reducers.

Increasing the number of CPUs per node from 2 to 4 improved the process: few errors and a trend that followed the chart above. The problem was the overall throughput: the duration of the indexing pipeline was between 46 and 49 minutes, indicating that the Solr communication was taking more or less 70-75 msecs per call.  

We increased the number of CPUs again from 4 to 8, and finally, we got an interesting result: 20 million documents in 8 minutes without any error. Very important, we also got the same rate using the second test dataset (120 million) 

Using the chart above, we can estimate an average of 11 msecs per Solr call, which we consider a good number.  

Scenario #2: Real documents

As described in the first part of this article, the dump files we export from PostgreSQL are 19. Consequently, the actual resulting (average) document definition is a bit bigger than the previous scenario.

The second benchmark uses all those files. It is important to understand that documents 

  • are the same in terms of cardinality
  • have a more significant number of fields/values 

the target search infrastructure is quite similar to the previous benchmark:

  • 5 EC2 nodes in three incremental iterations  
    • m5a.xlarge (4 vCPUs, RAM 16 GB)
    • c5a.2xlarge (8 vCPUs, RAM 16 GB) 
    • c5a.4xlarge (16 vCPUs, RAM 32 GB)
  • shards: 5 
  • replication factor: 2

We had to discard the r5a.large nodes because they were not enough, even for a minimal scenario; instead, we increased from c5a.2xlarge to c5a.4xlarge in the last step, which led us to very good results.

Here is the diagram which compares the different throughput between small (previous benchmark) and real documents (this benchmark).    

The red line is translated up a bit, mainly because of the size of the input data: mappers take 2 minutes to process 3 GB of data and 10 minutes for 30 GB. After doubling the block size (from 128 MB to 256 MB), 30 GB took more or less 8 minutes. This is very good: a signal that our mappers scale very well.   

Reducers are a different topic: because of the blocking Solr I/O call, they represent a bottleneck: we had to increase the CPUs per node from 8 to 16 (32 GB of RAM), which is definitely a lot in terms of resources and costs.

Conclusions

The Share-VDE project is still in progress, and the indexing iterations, too. For that reason, we will keep updating the 

  • “Benchmarks” paragraph as soon as we collect new data
  • “Stay Out of Trouble” section for each tip or trick that can bring even a minor benefit  

In summarizing, the output of everything above is that the process/approach works, scales, and it can give good throughputs. However, as expected, 

There ain’t no such thing as a free lunch

and in this specific case, it means the critical factor is a tradeoff between 

  • throughput
  • resources (i.e., costs)

As said, we will keep updating this article with new data; in the meantime, any feedback/question is warmly welcome.

Share this post

Rispondi

Scopri di più da SpazioCodice

Abbonati ora per continuare a leggere e avere accesso all'archivio completo.

Continua a leggere