基于Kafka+Flink+Redis的电商大屏实时计算案例

释放双眼,带上耳机,听听看~!

前言

一年一度的双11又要到了,阿里的双11销量大屏可以说是一道特殊的风景线。
实时大屏(real-time dashboard)正在被越来越多的企业采用,用来及时呈现关键的数据指标。
并且在实际操作中,肯定也不会仅仅计算一两个维度。
由于Flink的“真·流式计算”这一特点,它比Spark Streaming要更适合大屏应用。
本文从笔者的实际工作经验抽象出简单的模型,并简要叙述计算流程(当然大部分都是源码)。

数据格式与接入

简化的子订单消息体如下。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1{
2    "userId": 234567,
3    "orderId": 2902306918400,
4    "subOrderId": 2902306918401,
5    "siteId": 10219,
6    "siteName": "site_blabla",
7    "cityId": 101,
8    "cityName": "北京市",
9    "warehouseId": 636,
10    "merchandiseId": 187699,
11    "price": 299,
12    "quantity": 2,
13    "orderStatus": 1,
14    "isNewOrder": 0,
15    "timestamp": 1572963672217
16}
17

由于订单可能会包含多种商品,故会被拆分成子订单来表示,每条JSON消息表示一个子订单。
现在要按照自然日来统计以下指标,并以1秒的刷新频率呈现在大屏上:

  • 每个站点(站点ID即siteId)的总订单数、子订单数、销量与GMV;

  • 当前销量排名前N的商品(商品ID即merchandiseId)与它们的销量。

由于大屏的最大诉求是实时性,等待迟到数据显然不太现实,因此我们采用处理时间作为时间特征,并以1分钟的频率做checkpointing。


1
2
3
4
5
1StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
3env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
4env.getCheckpointConfig().setCheckpointTimeout(30 * 1000);
5

然后订阅Kafka的订单消息作为数据源。


1
2
3
4
5
6
7
8
9
10
11
1    Properties consumerProps = ParameterUtil.getFromResourceFile("kafka.properties");
2    DataStream<String> sourceStream = env
3      .addSource(new FlinkKafkaConsumer011<>(
4        ORDER_EXT_TOPIC_NAME,                        // topic
5        new SimpleStringSchema(),                    // deserializer
6        consumerProps                                // consumer properties
7      ))
8      .setParallelism(PARTITION_COUNT)
9      .name("source_kafka_" + ORDER_EXT_TOPIC_NAME)
10      .uid("source_kafka_" + ORDER_EXT_TOPIC_NAME);
11

给带状态的算子设定算子ID(通过调用uid()方法)是个好习惯,能够保证Flink应用从保存点重启时能够正确恢复状态现场。为了尽量稳妥,Flink官方也建议为每个算子都显式地设定ID
,参考:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html\#should-i-assign-ids-to-all-operators-in-my-job

接下来将JSON数据转化为POJO,JSON框架采用FastJSON。


1
2
3
4
1    DataStream<SubOrderDetail> orderStream = sourceStream
2      .map(message -> JSON.parseObject(message, SubOrderDetail.class))
3      .name("map_sub_order_detail").uid("map_sub_order_detail");
4

JSON已经是预先处理好的标准化格式,所以POJO类SubOrderDetail的写法可以通过Lombok极大地简化。
如果JSON的字段有不规范的,那么就需要手写Getter和Setter,并用@JSONField注解来指明。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1@Getter
2@Setter
3@NoArgsConstructor
4@AllArgsConstructor
5@ToString
6public class SubOrderDetail implements Serializable {
7  private static final long serialVersionUID = 1L;
8  
9  private long userId;
10  private long orderId;
11  private long subOrderId;
12  private long siteId;
13  private String siteName;
14  private long cityId;
15  private String cityName;
16  private long warehouseId;
17  private long merchandiseId;
18  private long price;
19  private long quantity;
20  private int orderStatus;
21  private int isNewOrder;
22  private long timestamp;
23}
24

统计站点指标

将子订单流按站点ID分组,开1天的滚动窗口,并同时设定ContinuousProcessingTimeTrigger触发器,以1秒周期触发计算。
注意处理时间的时区问题,这是老生常谈了。


1
2
3
4
5
1    WindowedStream<SubOrderDetail, Tuple, TimeWindow> siteDayWindowStream = orderStream
2      .keyBy("siteId")
3      .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
4      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)));
5

接下来写个聚合函数。


1
2
3
4
1    DataStream<OrderAccumulator> siteAggStream = siteDayWindowStream
2      .aggregate(new OrderAndGmvAggregateFunc())
3      .name("aggregate_site_order_gmv").uid("aggregate_site_order_gmv");
4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
1  public static final class OrderAndGmvAggregateFunc
2    implements AggregateFunction<SubOrderDetail, OrderAccumulator, OrderAccumulator> {
3    private static final long serialVersionUID = 1L;
4
5    @Override
6    public OrderAccumulator createAccumulator() {
7      return new OrderAccumulator();
8    }
9
10    @Override
11    public OrderAccumulator add(SubOrderDetail record, OrderAccumulator acc) {
12      if (acc.getSiteId() == 0) {
13        acc.setSiteId(record.getSiteId());
14        acc.setSiteName(record.getSiteName());
15      }
16      acc.addOrderId(record.getOrderId());
17      acc.addSubOrderSum(1);
18      acc.addQuantitySum(record.getQuantity());
19      acc.addGmv(record.getPrice() * record.getQuantity());
20      return acc;
21    }
22
23    @Override
24    public OrderAccumulator getResult(OrderAccumulator acc) {
25      return acc;
26    }
27
28    @Override
29    public OrderAccumulator merge(OrderAccumulator acc1, OrderAccumulator acc2) {
30      if (acc1.getSiteId() == 0) {
31        acc1.setSiteId(acc2.getSiteId());
32        acc1.setSiteName(acc2.getSiteName());
33      }
34      acc1.addOrderIds(acc2.getOrderIds());
35      acc1.addSubOrderSum(acc2.getSubOrderSum());
36      acc1.addQuantitySum(acc2.getQuantitySum());
37      acc1.addGmv(acc2.getGmv());
38      return acc1;
39    }
40  }
41

累加器类OrderAccumulator的实现很简单,看源码就大概知道它的结构了,因此不再多废话。
唯一需要注意的是订单ID可能重复,所以需要用名为orderIds的HashSet来保存它。
HashSet应付我们目前的数据规模还是没太大问题的,如果是海量数据,就考虑换用HyperLogLog吧。

接下来就该输出到Redis供呈现端查询了。
这里有个问题:
一秒内有数据变化的站点并不多,而ContinuousProcessingTimeTrigger每次触发都会输出窗口里全部的聚合数据,这样做了很多无用功,并且还会增大Redis的压力。
所以,我们在聚合结果后再接一个ProcessFunction,代码如下。


1
2
3
4
5
1    DataStream<Tuple2<Long, String>> siteResultStream = siteAggStream
2      .keyBy(0)
3      .process(new OutputOrderGmvProcessFunc(), TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {}))
4      .name("process_site_gmv_changed").uid("process_site_gmv_changed");
5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
1  public static final class OutputOrderGmvProcessFunc
2    extends KeyedProcessFunction<Tuple, OrderAccumulator, Tuple2<Long, String>> {
3    private static final long serialVersionUID = 1L;
4
5    private MapState<Long, OrderAccumulator> state;
6
7    @Override
8    public void open(Configuration parameters) throws Exception {
9      super.open(parameters);
10      state = this.getRuntimeContext().getMapState(new MapStateDescriptor<>(
11        "state_site_order_gmv",
12        Long.class,
13        OrderAccumulator.class)
14      );
15    }
16
17    @Override
18    public void processElement(OrderAccumulator value, Context ctx, Collector<Tuple2<Long, String>> out) throws Exception {
19      long key = value.getSiteId();
20      OrderAccumulator cachedValue = state.get(key);
21
22      if (cachedValue == null || value.getSubOrderSum() != cachedValue.getSubOrderSum()) {
23        JSONObject result = new JSONObject();
24        result.put("site_id", value.getSiteId());
25        result.put("site_name", value.getSiteName());
26        result.put("quantity", value.getQuantitySum());
27        result.put("orderCount", value.getOrderIds().size());
28        result.put("subOrderCount", value.getSubOrderSum());
29        result.put("gmv", value.getGmv());
30        out.collect(new Tuple2<>(key, result.toJSONString());
31        state.put(key, value);
32      }
33    }
34
35    @Override
36    public void close() throws Exception {
37      state.clear();
38      super.close();
39    }
40  }
41

说来也简单,就是用一个MapState状态缓存当前所有站点的聚合数据。
由于数据源是以子订单为单位的,因此如果站点ID在MapState中没有缓存,或者缓存的子订单数与当前子订单数不一致,表示结果有更新,这样的数据才允许输出。

最后就可以安心地接上Redis Sink了,结果会被存进一个Hash结构里。


1
2
3
4
5
6
7
1    // 看官请自己构造合适的FlinkJedisPoolConfig
2    FlinkJedisPoolConfig jedisPoolConfig = ParameterUtil.getFlinkJedisPoolConfig(false, true);
3    siteResultStream
4      .addSink(new RedisSink<>(jedisPoolConfig, new GmvRedisMapper()))
5      .name("sink_redis_site_gmv").uid("sink_redis_site_gmv")
6      .setParallelism(1);
7

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1  public static final class GmvRedisMapper implements RedisMapper<Tuple2<Long, String>> {
2    private static final long serialVersionUID = 1L;
3    private static final String HASH_NAME_PREFIX = "RT:DASHBOARD:GMV:";
4
5    @Override
6    public RedisCommandDescription getCommandDescription() {
7      return new RedisCommandDescription(RedisCommand.HSET, HASH_NAME_PREFIX);
8    }
9
10    @Override
11    public String getKeyFromData(Tuple2<Long, String> data) {
12      return String.valueOf(data.f0);
13    }
14
15    @Override
16    public String getValueFromData(Tuple2<Long, String> data) {
17      return data.f1;
18    }
19
20    @Override
21    public Optional<String> getAdditionalKey(Tuple2<Long, String> data) {
22      return Optional.of(
23        HASH_NAME_PREFIX +
24        new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) +
25        "SITES"
26      );
27    }
28  }
29

商品Top N

我们可以直接复用前面产生的orderStream,玩法与上面的GMV统计大同小异。
这里用1秒滚动窗口就可以了。


1
2
3
4
5
6
7
8
9
1    WindowedStream<SubOrderDetail, Tuple, TimeWindow> merchandiseWindowStream = orderStream
2      .keyBy("merchandiseId")
3      .window(TumblingProcessingTimeWindows.of(Time.seconds(1)));
4
5    DataStream<Tuple2<Long, Long>> merchandiseRankStream = merchandiseWindowStream
6      .aggregate(new MerchandiseSalesAggregateFunc(), new MerchandiseSalesWindowFunc())
7      .name("aggregate_merch_sales").uid("aggregate_merch_sales")
8      .returns(TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() { }));
9

聚合函数与窗口函数的实现更加简单了,最终返回的是商品ID与商品销量的二元组。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
1  public static final class MerchandiseSalesAggregateFunc
2    implements AggregateFunction<SubOrderDetail, Long, Long> {
3    private static final long serialVersionUID = 1L;
4
5    @Override
6    public Long createAccumulator() {
7      return 0L;
8    }
9
10    @Override
11    public Long add(SubOrderDetail value, Long acc) {
12      return acc + value.getQuantity();
13    }
14
15    @Override
16    public Long getResult(Long acc) {
17      return acc;
18    }
19
20    @Override
21    public Long merge(Long acc1, Long acc2) {
22      return acc1 + acc2;
23    }
24  }
25
26
27  public static final class MerchandiseSalesWindowFunc
28    implements WindowFunction<Long, Tuple2<Long, Long>, Tuple, TimeWindow> {
29    private static final long serialVersionUID = 1L;
30
31    @Override
32    public void apply(
33      Tuple key,
34      TimeWindow window,
35      Iterable<Long> accs,
36      Collector<Tuple2<Long, Long>> out) throws Exception {
37      long merchId = ((Tuple1<Long>) key).f0;
38      long acc = accs.iterator().next();
39      out.collect(new Tuple2<>(merchId, acc));
40    }
41  }
42

既然数据最终都要落到Redis,那么我们完全没必要在Flink端做Top N的统计,直接利用Redis的有序集合(zset)就行了,商品ID作为field,销量作为分数值,简单方便。
不过flink-redis-connector项目中默认没有提供ZINCRBY命令的实现(必须再吐槽一次),我们可以自己加,步骤参照之前写过的那篇加SETEX的命令的文章,不再赘述。
RedisMapper的写法如下。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1  public static final class RankingRedisMapper implements RedisMapper<Tuple2<Long, Long>> {
2    private static final long serialVersionUID = 1L;
3    private static final String ZSET_NAME_PREFIX = "RT:DASHBOARD:RANKING:";
4
5    @Override
6    public RedisCommandDescription getCommandDescription() {
7      return new RedisCommandDescription(RedisCommand.ZINCRBY, ZSET_NAME_PREFIX);
8    }
9
10    @Override
11    public String getKeyFromData(Tuple2<Long, Long> data) {
12      return String.valueOf(data.f0);
13    }
14
15    @Override
16    public String getValueFromData(Tuple2<Long, Long> data) {
17      return String.valueOf(data.f1);
18    }
19
20    @Override
21    public Optional<String> getAdditionalKey(Tuple2<Long, Long> data) {
22      return Optional.of(
23        ZSET_NAME_PREFIX +
24        new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) + ":" +
25        "MERCHANDISE"
26      );
27    }
28  }
29

后端取数时,用ZREVRANGE命令即可取出指定排名的数据了。
只要数据规模不是大到难以接受,并且有现成的Redis,这个方案完全可以作为各类Top N需求的通用实现。

The End

大屏的实际呈现需要保密,截图自然是没有的。
以下是提交执行时Flink Web UI给出的执行计划(实际有更多的统计任务,不止3个Sink)。
通过复用源数据,可以在同一个Flink job内实现更多统计需求。

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全运维

《NoSQL权威指南》——2.2 技术原理

2021-12-11 11:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索