Skip to main content
  1. Posts/

Speed up document indexing in Elasticsearch via bulk indexing

·355 words·2 mins·
Python Elasticsearch
Table of Contents

In Elasticsearch, there is index API where you can index a single document to an index.

However, if you have a lot of documents and index them with this API one by one, the indexing speed is quite slow.

For large number of document indexing, it is better to use the bulk API from the elasticsearch.helpers submodule. It has three different bulk method:

  • helpers.bulk()
  • helpers.streaming_bulk()
  • helpers.parallel_bulk()

If you have a lot of products, the official doc recommends using of streaming_bulk():

When errors are being collected original document data is included in the error dictionary which can lead to an extra high memory usage. If you need to process a lot of data and want to ignore/collect errors please consider using the streaming_bulk() helper which will just return the errors and not store them in memory.

Usually the bulk API is way faster than the sequential indexing method. Here is a short code snippet to benchmark the different ways of indexing:

import time
from contextlib import contextmanager

from elasticsearch.helpers import streaming_bulk, parallel_bulk


@contextmanager
def report_time(procedure_name):
    start = time.time()
    yield
    end = time.time()
    print(f"time spent for {procedure_name}: {end - start}")


def sequential_index(client, docs, index_name):
    for doc in docs:
        client.index(index=index_name, document=doc)


def bulk_index(client, docs, index_name, bulk_type):
    def gen_actions(docs):
        for i, doc in enumerate(docs):
            action = {"_id": i, "_index": index_name, "_source": doc}
            yield action

    if bulk_type == "streaming":
        for success, status in streaming_bulk(
            client, actions=gen_actions(docs), chunk_size=500
        ):
            if not success:
                print(status)

    if bulk_type == "parallel":
        for success, status in parallel_bulk(
            client, actions=gen_actions(docs), chunk_size=500
        ):
            if not success:
                print(status)


def recreate_index(client, index_name):
    if client.indices.exists(index=index_name):
        client.indices.delete(index=index_name)
    client.indices.create(index=index_name)


if __name__ == "__main__":
    n = 1000
    index_name = "my_index"
    # set up the client
    es_client = Elasticsearch(...)

    docs = []
    for i in range(n):
        doc = {"title": f"this is document {i}", "date": "2024-07-25"}
        docs.append(doc)

    recreate_index(es_client, index_name)
    with report_time("sequential indexing"):
        sequential_index(es_client, docs, index_name)

    recreate_index(es_client, index_name)
    with report_time("streaming bulk"):
        bulk_index(es_client, docs, index_name, "streaming")

    recreate_index(es_client, index_name)
    with report_time("parallel bulk"):
        bulk_index(es_client, docs, index_name, "parallel")

This is the testing result from my machine:

time spent for sequential indexing: 123.88939809799194
time spent for streaming bulk: 0.6672859191894531
time spent for parallel bulk: 0.45793724060058594

References
#

Related

Elasticsearch Version Conflict Error
··1358 words·7 mins
Python Elasticsearch
Retry for Google Cloud Client
·197 words·1 min
Python GCP
Make Python logging Work in GCP
·570 words·3 mins
Python Logging GCP