python操作Elasticsearch 批量增删改查

环境

  • Python 3.7.3 或更高版本
  • python(Elasticsearch) 7.12.1 或更高版本

Elasticsearch连接

1
2
3
4
5
6
7
8
from elasticsearch import Elasticsearch

# 创建Elasticsearch客户端实例
es = Elasticsearch(
['host:port'], # Elasticsearch集群地址
http_auth=('username', 'password'), # 认证信息(如果需要)
timeout=300 # 超时设置
)

通过elasticsearch.helpers的bulk方法进行批量操作

bulk方法的参数解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
client: Elasticsearch客户端实例

actions: 要执行的操作列表,每个操作是一个字典

chunk_size: 每个批次中的文档数量(默认: 500

max_chunk_bytes: 每个请求的最大字节大小(默认: 100MB)

raise_on_error: 是否在发生错误时抛出BulkIndexError异常(默认: True

raise_on_exception: 是否在发生异常时抛出异常(默认: False

expand_action_callback: 对每个操作执行回调以扩展其数据

max_retries: 文档重试的最大次数(默认: 0,表示不重试)

max_backoff: 重试前的最大等待时间(默认: 无)

yield_ok: 是否跳过成功文档的输出(默认: False
  1. 批量插入数据

1
2
3
4
5
6
7
{
"_op_type":"create",
"_index": index,
"_type": "_doc",
"_id": id,
"_source": source
}
  • 代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es_data=Elasticsearch(
['host:post'],
http_auth=('name', 'password'),
timeout=300
)
create_es_data = []

# 批量插入
create_es_data.append({
#"_op_type":"create",#当操作为插入时可以省略
"_index": "index_test",# 索引名称
"_type": "_doc",
"_id": "id_test",# 文档ID(可选)
"_source": {"source_key":"source_value","source_key2":"source_value2"}#要写入的数据
})
if len(create_es_data) >= 2:
helpers.bulk(es_data, create_es_data)
create_es_data = []
  1. 批量更新数据

1
2
3
4
5
6
7
{
"_op_type":"update",
"_index": index,
"_type": "_doc",
"_id": id,
"_source": source
}
  • 代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es_data=Elasticsearch(
['host:post'],
http_auth=('name', 'password'),
timeout=300
)
update_es_data = []

# 批量插入
update_es_data.append({
"_op_type":"update", # 操作类型:更新
"_index": "index_test", # 索引名称
"_type": "_doc", # 文档类型(在Elasticsearch 7.x中通常为"_doc")
"_id": "id_test", # 要更新的文档ID
"_source": {"source_key":"source_value_update"}#将只会更新source_key为source_value_update,其余的值不变。当es中index的类型为nested时,要更新数据需要更新整个字段
})
if len(update_es_data) >= 2:
helpers.bulk(es_data, update_es_data)
update_es_data = []

批量删除数据

1
2
3
4
5
6
{
"_op_type":"delete",
"_index": index,
"_type": "_doc",
"_id": id
}
  • 代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es_data=Elasticsearch(
['host:post'],
http_auth=('name', 'password'),
timeout=300
)
delete_es_data = []

# 批量插入
delete_es_data.append({
"_op_type":"delete", # 操作类型:删除
"_index": "index_test",# 索引名称
"_type": "_doc",# 文档类型(在Elasticsearch 7.x中通常为"_doc")
"_id": "id_test"#要删除的id
})
if len(delete_es_data) >= 2:
helpers.bulk(es_data, delete_es_data)
delete_es_data = []

通过elasticsearch.helpers的scan方法进行批量查询

scan方法的参数解析

1
2
3
4
5
6
7
client: Elasticsearch客户端实例
query: 查询条件(类似于搜索API的查询体)
scroll: 滚动查询的超时时间(默认: "1m"
raise_on_error: 是否在发生错误时抛出异常(默认: True
size: 每次滚动返回的结果数量(默认: 1000
request_timeout: 每次请求的超时时间
clear_scroll: 是否在查询结束后清除滚动上下文(默认: True

批量查询数据(基础查询只能查询出10000条数据,所以使用scan进行查询)

1
2
3
4
5
6
7
8
9
10
11
es_data_list = helpers.scan(
client=es_data, index="index_test",
params={'_source': "source_key"},
request_timeout=100, query={
"query": {
"range": {"createTime": {"gt": 1111111}}
}
})
for es_data in es_data_list:
es_id = es_data["_id"]
es_source = es_data["_source"]

知识点补充

1. 文档ID生成策略

在批量插入数据时,如果不指定_id,Elasticsearch会自动为每个文档生成一个唯一的ID。但在某些情况下,我们可能希望自定义ID以便更好地控制或引用这些文档。通过指定_id字段,我们可以实现这一点。

2. 版本控制

Elasticsearch支持乐观并发控制,这意味着每个文档都有一个与之关联的版本号。当更新文档时,可以指定版本号以确保文档在更新期间没有被其他客户端修改过。如果版本号不匹配,更新操作将失败。

3. Bulk操作的原子性

虽然bulk操作可以一次性发送多个操作到Elasticsearch,但这些操作并不是原子性的。这意味着如果其中一个操作失败,其他操作仍然可能会成功。为了确保操作的原子性,你可能需要使用事务或乐观并发控制。

4. Scan API的Scroll参数

scroll参数在scan方法中非常重要,它决定了在每次滚动查询之间保持活动状态的时间长度。如果设置得过短,可能会导致滚动查询在完成之前超时;如果设置得过长,可能会浪费系统资源。

5. 错误处理

当使用bulkscan方法时,可能会遇到各种错误,例如网络中断、Elasticsearch集群不可用等。为了确保程序的健壮性,应该适当地处理这些错误,例如重试操作、记录错误信息或通知用户。

6. 性能优化

  • 批量大小:调整bulk操作中的文档数量可以影响性能。太少的文档可能导致网络开销过大,而太多的文档可能会导致Elasticsearch服务器过载。
  • 索引设计:合理的索引设计,包括选择合适的字段类型、使用倒排索引、避免过多的嵌套结构等,都可以提高查询性能。
  • 并发控制:在高并发场景下,合理地控制并发请求的数量可以避免Elasticsearch服务器的过载。

7. 使用_source过滤

在查询时,可以通过_source过滤来指定只返回所需的字段,从而减少网络传输的数据量,提高性能。这在处理大型文档或只需要文档中的部分信息时非常有用。

8. 安全性

当连接Elasticsearch时,应该使用安全的连接方式(如HTTPS),并确保认证信息的安全存储和传输。此外,还应该定期检查Elasticsearch的安全设置和日志,以确保系统的安全性


python操作Elasticsearch 批量增删改查
https://flyfishs.top/2024/04/28/python操作Elasticsearch/
作者
飞鱼
发布于
2024年4月28日
许可协议