该文章演示了如何使用ElasticsearchCRUD在Elasticsearch中进行现场重建索引。 reindex使用扫描并滚动获取数据,然后使用批量插入更新到新的索引。 reindex支持别名映射,使得可以进行实时索引。
设置文档搜索引擎和索引
AdventureWorks2012用于填充搜索引擎的数据。
从Entity Person添加person索引。
此代码创建一个新的索引persons_v1。 然后使用别名将其映射到persons。
1
2
3
4
5
6
7
8 1public void CreatePersonAliasForPersonV1Mapping(string alias)
2{
3 using (var context = new ElasticsearchContext("http://localhost:9200/", _elasticsearchMappingResolver))
4 {
5 context.AliasCreateForIndex(alias, _elasticsearchMappingResolver.GetElasticSearchMapping(typeof(Person)).GetIndexForType(typeof(Person)));
6 }
7}
8
现在设置索引和别名。 Elasticsearch 已经准备好了一个实时重建索引。
这可以在这里查看: http://localhost:9200/_aliases 或者http://localhost:9200/_cat/aliases
1
2
3
4
5
6
7
8 1{
2 "persons_v1": {
3 "aliases": {
4 "persons": {}
5 }
6 }
7}
8
步骤1:从索引persons_v1创建新索引persons_v2
从旧索引创建新的索引。 这可以直接执行。 当reindex正在执行时,可以添加新的文档或更新文档。 被删除的文件将不会被注意到。 因此,您应该使用一个布尔删除的字段。
reindex需要旧索引的索引和类型,还需要新的索引和新的索引类型。 如果您使用父/子文档索引,则必须对子文档重复此步骤。 ElasticsearchCRUD版本1.0.15之前,不支持父/子文档重建。
1
2
3
4
5
6
7
8
9 1// - 这个时间戳通常是DateTime.UtcNow
2// - 因此,可以找到在reindex期间更新的所有索引
3DateTime beginDateTime = DateTime.UtcNow;
4
5var reindex = new ElasticsearchCrudReindex<Person, PersonV2>(
6 new IndexTypeDescription("persons_v1", "person"),
7 new IndexTypeDescription("persons_v2", "person"),
8 "http://localhost:9200");
9
reindex可以根据需要进行配置。 该方法使用扫描和滚动。 您可以更改默认设置,并在每个请求和响应中允许更多的文档。 您不应该让它太大,因为您不希望每个请求和响应发送500MB。
ScanAndScrollConfiguration定义每个滚动条保持打开的时间以及所有接下来的滚动条,并定义时间单位。 下面定义了5s。 如果有足够的时间,将从每个分片中获取1000个文档。
例如,如果您的索引中有5个分片,并且配置了足够的时间,并且有足够的文档,则下面的配置将在每个请求中获取5000个文档。
控制台日志用于显示进度
1
2
3 1reindex.ScanAndScrollConfiguration = new ScanAndScrollConfiguration(new TimeUnitSecond(5), 1000);
2reindex.TraceProvider = new ConsoleTraceProvider(TraceEventType.Information);
3
reindex方法本身需要2个函数以及Json _search查询的Json内容。 使用小于范围的查询来在定义的DateTime之前选择所有文档。 如果需要不同的查询逻辑,可以根据需要进行定义。 如果您不担心更新或其他任何内容,您可以使用所有匹配项。
reindex也需要转换方法。 这就是进行reindex的原因。 在此示例中,删除的bool字段将添加到文档。 第二个函数用于定义文档_id
1
2
3
4
5 1reindex.Reindex(
2 PersonReindexConfiguration.BuildSearchModifiedDateTimeLessThan(beginDateTime),
3 PersonReindexConfiguration.GetKeyMethod,
4 PersonReindexConfiguration.CreatePersonV2FromPerson);
5
Json内容生成方法BuildSearchModifiedDateTimeLessThan构建Json查询。 这是一个非常原始的实现,如果需要,您可以更加有说服力。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 1public static string BuildSearchModifiedDateTimeLessThan(DateTime dateTimeUtc)
2{
3 return BuildSearchRange("lt", "modifieddate", dateTimeUtc);
4}
5
6//{
7// "query" : {
8// "range": { "modifieddate": { "lt": "2003-12-29T00:00:00" } }
9// }
10//}
11private static string BuildSearchRange(string lessThanOrGreaterThan, string updatePropertyName, DateTime dateTimeUtc)
12{
13 string isoDateTime = dateTimeUtc.ToString("s");
14 var buildJson = new StringBuilder();
15 buildJson.AppendLine("{");
16 buildJson.AppendLine("\"query\": {");
17 buildJson.AppendLine("\"range\": { \"" + updatePropertyName + "\": { \"" + lessThanOrGreaterThan + "\": \"" + isoDateTime + "\" } }");
18 buildJson.AppendLine("}");
19 buildJson.AppendLine("}");
20
21 return buildJson.ToString();
22}
23
将旧文档类型转换为新文档类型的转换方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 1public static PersonV2 CreatePersonV2FromPerson(Person item)
2{
3 return new PersonV2
4 {
5 BusinessEntityID = item.BusinessEntityID,
6 PersonType = item.PersonType,
7 NameStyle = item.NameStyle,
8 Title = item.Title,
9 FirstName = item.FirstName,
10 MiddleName = item.MiddleName,
11 LastName = item.LastName,
12 Suffix = item.Suffix,
13 EmailPromotion = item.EmailPromotion,
14 AdditionalContactInfo = item.AdditionalContactInfo,
15 Demographics = item.Demographics,
16 rowguid = item.rowguid,
17 ModifiedDate = item.ModifiedDate,
18 Deleted = false
19 };
20}
21
22Returns the _id property for the document _id
23public static object GetKeyMethod(Person person)
24{
25 return person.BusinessEntityID;
26}
27
步骤2:更换别名persons 到 索引person_v2
现在别名从旧的索引切换到新的索引
1
2 1reindex.SwitchAliasfromOldToNewIndex("persons");
2
别名指向persons_v2索引http://localhost:9200/_aliases
1
2
3
4
5
6
7
8
9
10
11 1{
2 "persons_v1": {
3 "aliases": {}
4 },
5 "persons_v2": {
6 "aliases": {
7 "persons": {}
8 }
9 }
10}
11
步骤3:现在获得所有在REINDEXING和REINDEX上更新的文档
现在新的索引已经开始运行,所有在重新构建索引发生时更新的文档现在都被重新索引了。 它使用大于范围的查询搜索并返回大于开始DateTime的所有文档。
如果相同的文档在新索引中被更新,它将被覆盖。 再次,您可以决定这是否重要,并创建适当的查询搜索。
1
2
3
4
5
6
7
8
9
10 1 // ---------------------------------------------------------
2//步骤3:现在可以获得所有在重新输入时更新的文档
3// ---------------------------------------------------------
4//注意:如果文档在此期间再次被更新,那么这个方法将被忽略。
5//如果需要,您必须检查新索引中项目的更新时间戳!
6reindex.Reindex(
7 PersonReindexConfiguration.BuildSearchModifiedDateTimeGreaterThan(beginDateTime),
8 PersonReindexConfiguration.GetKeyMethod,
9 PersonReindexConfiguration.CreatePersonV2FromPerson);
10
运行应用程序时,可以查看进度:
结论
Live reindex是Elasticsearch中的一个很棒的功能,在搜索引擎或NoSQL中,ElasticsearchCRUD 1.0.15中将支持父/子文档索引reindex。