环境
- Python 3.7.3 或更高版本
- python(Elasticsearch) 7.12.1 或更高版本
Elasticsearch连接
1 2 3 4 5 6 7 8
| from elasticsearch import Elasticsearch
es = Elasticsearch( ['host:port'], http_auth=('username', 'password'), timeout=300 )
|
通过elasticsearch.helpers的bulk方法进行批量操作
bulk方法的参数解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| client: Elasticsearch客户端实例
actions: 要执行的操作列表,每个操作是一个字典
chunk_size: 每个批次中的文档数量(默认: 500)
max_chunk_bytes: 每个请求的最大字节大小(默认: 100MB)
raise_on_error: 是否在发生错误时抛出BulkIndexError异常(默认: True)
raise_on_exception: 是否在发生异常时抛出异常(默认: False)
expand_action_callback: 对每个操作执行回调以扩展其数据
max_retries: 文档重试的最大次数(默认: 0,表示不重试)
max_backoff: 重试前的最大等待时间(默认: 无)
yield_ok: 是否跳过成功文档的输出(默认: False)
|
批量插入数据
1 2 3 4 5 6 7
| { "_op_type":"create", "_index": index, "_type": "_doc", "_id": id, "_source": source }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| from elasticsearch import Elasticsearch from elasticsearch import helpers es_data=Elasticsearch( ['host:post'], http_auth=('name', 'password'), timeout=300 ) create_es_data = []
create_es_data.append({
"_index": "index_test", "_type": "_doc", "_id": "id_test", "_source": {"source_key":"source_value","source_key2":"source_value2"} }) if len(create_es_data) >= 2: helpers.bulk(es_data, create_es_data) create_es_data = []
|
批量更新数据
1 2 3 4 5 6 7
| { "_op_type":"update", "_index": index, "_type": "_doc", "_id": id, "_source": source }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| from elasticsearch import Elasticsearch from elasticsearch import helpers es_data=Elasticsearch( ['host:post'], http_auth=('name', 'password'), timeout=300 ) update_es_data = []
update_es_data.append({ "_op_type":"update", "_index": "index_test", "_type": "_doc", "_id": "id_test", "_source": {"source_key":"source_value_update"} }) if len(update_es_data) >= 2: helpers.bulk(es_data, update_es_data) update_es_data = []
|
批量删除数据
1 2 3 4 5 6
| { "_op_type":"delete", "_index": index, "_type": "_doc", "_id": id }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| from elasticsearch import Elasticsearch from elasticsearch import helpers es_data=Elasticsearch( ['host:post'], http_auth=('name', 'password'), timeout=300 ) delete_es_data = []
delete_es_data.append({ "_op_type":"delete", "_index": "index_test", "_type": "_doc", "_id": "id_test" }) if len(delete_es_data) >= 2: helpers.bulk(es_data, delete_es_data) delete_es_data = []
|
通过elasticsearch.helpers的scan方法进行批量查询
scan方法的参数解析
1 2 3 4 5 6 7
| client: Elasticsearch客户端实例 query: 查询条件(类似于搜索API的查询体) scroll: 滚动查询的超时时间(默认: "1m") raise_on_error: 是否在发生错误时抛出异常(默认: True) size: 每次滚动返回的结果数量(默认: 1000) request_timeout: 每次请求的超时时间 clear_scroll: 是否在查询结束后清除滚动上下文(默认: True)
|
批量查询数据(基础查询只能查询出10000条数据,所以使用scan进行查询)
1 2 3 4 5 6 7 8 9 10 11
| es_data_list = helpers.scan( client=es_data, index="index_test", params={'_source': "source_key"}, request_timeout=100, query={ "query": { "range": {"createTime": {"gt": 1111111}} } }) for es_data in es_data_list: es_id = es_data["_id"] es_source = es_data["_source"]
|
知识点补充
1. 文档ID生成策略
在批量插入数据时,如果不指定_id
,Elasticsearch会自动为每个文档生成一个唯一的ID。但在某些情况下,我们可能希望自定义ID以便更好地控制或引用这些文档。通过指定_id
字段,我们可以实现这一点。
2. 版本控制
Elasticsearch支持乐观并发控制,这意味着每个文档都有一个与之关联的版本号。当更新文档时,可以指定版本号以确保文档在更新期间没有被其他客户端修改过。如果版本号不匹配,更新操作将失败。
3. Bulk操作的原子性
虽然bulk
操作可以一次性发送多个操作到Elasticsearch,但这些操作并不是原子性的。这意味着如果其中一个操作失败,其他操作仍然可能会成功。为了确保操作的原子性,你可能需要使用事务或乐观并发控制。
scroll
参数在scan
方法中非常重要,它决定了在每次滚动查询之间保持活动状态的时间长度。如果设置得过短,可能会导致滚动查询在完成之前超时;如果设置得过长,可能会浪费系统资源。
5. 错误处理
当使用bulk
或scan
方法时,可能会遇到各种错误,例如网络中断、Elasticsearch集群不可用等。为了确保程序的健壮性,应该适当地处理这些错误,例如重试操作、记录错误信息或通知用户。
6. 性能优化
- 批量大小:调整
bulk
操作中的文档数量可以影响性能。太少的文档可能导致网络开销过大,而太多的文档可能会导致Elasticsearch服务器过载。
- 索引设计:合理的索引设计,包括选择合适的字段类型、使用倒排索引、避免过多的嵌套结构等,都可以提高查询性能。
- 并发控制:在高并发场景下,合理地控制并发请求的数量可以避免Elasticsearch服务器的过载。
7. 使用_source
过滤
在查询时,可以通过_source
过滤来指定只返回所需的字段,从而减少网络传输的数据量,提高性能。这在处理大型文档或只需要文档中的部分信息时非常有用。
8. 安全性
当连接Elasticsearch时,应该使用安全的连接方式(如HTTPS),并确保认证信息的安全存储和传输。此外,还应该定期检查Elasticsearch的安全设置和日志,以确保系统的安全性