In Elasticsearch, there is index API where you can index a single document to an index.
However, if you have a lot of documents and index them with this API one by one, the indexing speed is quite slow.
For large number of document indexing, it is better to use the bulk API from the elasticsearch.helpers
submodule.
It has three different bulk method:
helpers.bulk()
helpers.streaming_bulk()
helpers.parallel_bulk()
If you have a lot of products, the official doc recommends using of streaming_bulk()
:
When errors are being collected original document data is included in the error dictionary which can lead to an extra high memory usage. If you need to process a lot of data and want to ignore/collect errors please consider using the streaming_bulk() helper which will just return the errors and not store them in memory.
Usually the bulk API is way faster than the sequential indexing method. Here is a short code snippet to benchmark the different ways of indexing:
import time
from contextlib import contextmanager
from elasticsearch.helpers import streaming_bulk, parallel_bulk
@contextmanager
def report_time(procedure_name):
start = time.time()
yield
end = time.time()
print(f"time spent for {procedure_name}: {end - start}")
def sequential_index(client, docs, index_name):
for doc in docs:
client.index(index=index_name, document=doc)
def bulk_index(client, docs, index_name, bulk_type):
def gen_actions(docs):
for i, doc in enumerate(docs):
action = {"_id": i, "_index": index_name, "_source": doc}
yield action
if bulk_type == "streaming":
for success, status in streaming_bulk(
client, actions=gen_actions(docs), chunk_size=500
):
if not success:
print(status)
if bulk_type == "parallel":
for success, status in parallel_bulk(
client, actions=gen_actions(docs), chunk_size=500
):
if not success:
print(status)
def recreate_index(client, index_name):
if client.indices.exists(index=index_name):
client.indices.delete(index=index_name)
client.indices.create(index=index_name)
if __name__ == "__main__":
n = 1000
index_name = "my_index"
# set up the client
es_client = Elasticsearch(...)
docs = []
for i in range(n):
doc = {"title": f"this is document {i}", "date": "2024-07-25"}
docs.append(doc)
recreate_index(es_client, index_name)
with report_time("sequential indexing"):
sequential_index(es_client, docs, index_name)
recreate_index(es_client, index_name)
with report_time("streaming bulk"):
bulk_index(es_client, docs, index_name, "streaming")
recreate_index(es_client, index_name)
with report_time("parallel bulk"):
bulk_index(es_client, docs, index_name, "parallel")
This is the testing result from my machine:
time spent for sequential indexing: 123.88939809799194
time spent for streaming bulk: 0.6672859191894531
time spent for parallel bulk: 0.45793724060058594
References#
- Elasticsearch helpers from Elastic-py: https://elasticsearch-py.readthedocs.io/en/v8.14.0/helpers.html