浅谈flink state状态管理机制

state 简述

Apache Flink® — Stateful Computations over Data Streams,flink是一个默认就有状态的分分析引擎,针对流失计算引擎中的数据往往是转瞬即逝,但在flink真实业务场景确不能这样,什么都不能留下,肯定是需要有数据留下的,针对这些数据留下来存储下来,在flink中叫做state,中文可以翻译成状态。

state 类型

Flink中有两种基本类型的State, 分别为 Keyed State(键控状态) 和 Operator State(算子状态)。

keyed State(键控状态)

Keyed State:顾名思义就是基于 KeyedStream 上的状态,这个状态是跟特定的Key绑定的。KeyedStrean 流上的每一个Key,都对应一个 State。Flink针对Keyed State提供了以下六种类可以保存 State 的数据结构类型。

keyed state托管状态有六种类型

ValueState

保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 Key,因此算子接收到的每个Key都可能对应一个值)。这个值可以通过 update(T)进行更新,通过 T value() 进行检索。

ListState

保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获取整个列表。还可以通过 update(List) 覆盖当前的列表。

MapState<UK,UV>

维护了一个添加映射列表。你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 分别检索映射、键和值的可迭代视图。

ReducingState

保存一个单值,表示添加到状态的所有聚合。接口与 ListState 类似,使用 add(T)增加元素,会使用提供的 ReduceFunction 进行聚合

AggregatingState<IN.OUT>

保留一个单值,表示添加到状态的所以值的聚合。和ReducingState 相反的额是聚合类型可能与添加到状态的元素的类型不同。接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。

FoldingState<T,ACC>

保留一个单值,表示添加到状态的所有值的聚合。与 ReducingState 相反,聚合类型可能与添加到状态的元素类型不同。接口与ListState类型,但使用 add(T) 添加的元素会用指定的FoldFunction 折叠成聚合值。

Keyed State案例演示

ValueState 使用valueState实现平均值求取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package xin.studytime.scala

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

object ValueStateOperate {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._

env.fromCollection(List(
(1L, 3d),
(1L, 5d),
(1L, 7d),
(1L, 4d),
(1L, 2d)
))
.keyBy(_._1)
.flatMap(new CountAverageWithValue())
.print()
env.execute()
}
}

class CountAverageWithValue extends RichFlatMapFunction[(Long, Double), (Long, Double)] {
//定义ValueState类型的变量

private var sum: ValueState[(Long,Double)] = _
override def open(parameters: Configuration): Unit = {
//初始化获取历史状态的值
val average = new ValueStateDescriptor[(Long, Double)]("average", classOf[(Long, Double)])
sum = getRuntimeContext.getState(average)
}

override def flatMap(input: (Long, Double), out: Collector[(Long, Double)]): Unit = {
// access the state value
val tmpCurrentSum = sum.value
// If it hasn't been used before, it will be null
val currentSum = if (tmpCurrentSum != null) {
tmpCurrentSum
} else {
(0L, 0d)
}
// update the count
val newSum = (currentSum._1 + 1, currentSum._2 + input._2)

// update the state
sum.update(newSum)

// if the count reaches 2, emit the average and clear the state
if (newSum._1 >= 2) {
out.collect((input._1, newSum._2 / newSum._1))
//将状态清除
//sum.clear()
}
}
}

ListState 求取数据平均值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package xin.studytime.scala

import java.lang
import java.util.Collections

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._

object ListStateOperate {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
env.fromCollection(List(
(1L, 3d),
(1L, 5d),
(1L, 7d),
(2L, 4d),
(2L, 2d),
(2L, 6d)
)).keyBy(_._1)
.flatMap(new CountAverageWithList)
.print()
env.execute()
}
}

class CountAverageWithList extends RichFlatMapFunction[(Long, Double), (Long, Double)] {
private var elementsByKey: ListState[(Long, Double)] = _

override def open(parameters: Configuration): Unit = {
val listState = new ListStateDescriptor[(Long, Double)]("listState", classOf[(Long, Double)])
elementsByKey = getRuntimeContext.getListState(listState)
}

override def flatMap(in: (Long, Double), collector: Collector[(Long, Double)]): Unit = {
val currentState: lang.Iterable[(Long, Double)] = elementsByKey.get()

if (currentState == null) {
elementsByKey.addAll(Collections.emptyList())
}
elementsByKey.add(in)
val allElements: Iterator[(Long, Double)] = elementsByKey.get().iterator().asScala
val allElementList: List[(Long, Double)] = allElements.toList
if (allElementList.size > 3) {
var count = 0L
var sum = 0d
for (eachElement <- allElementList) {
count += 1
sum += eachElement._2
}
collector.collect(in._1, sum / count)
}
}
}

MapState 求取数据平均值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package xin.studytime.scala

import java.util.UUID

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}

object MapStateOperate {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._

environment.fromCollection(List(
(1L, 3d),
(1L, 5d),
(1L, 7d),
(2L, 4d),
(2L, 2d),
(2L, 6d)
)).keyBy(_._1).flatMap(new CountAverageMapState).print()

environment.execute()
}
}

class CountAverageMapState extends RichFlatMapFunction[(Long, Double), (Long, Double)] {

private var mapState: MapState[String, Double] = _

override def open(parameters: Configuration): Unit = {
val mapStateOperate = new MapStateDescriptor[String, Double]("mapStateOperate", classOf[String], classOf[Double])
mapState = getRuntimeContext.getMapState(mapStateOperate)
}

override def flatMap(in: (Long, Double), out: Collector[(Long, Double)]): Unit = {

//将相同的key对应的数据放到一个map集合当中去,就是这种对应 key -> Map((key1, value1),(key2, value2))
//每次都构建一个map集合
mapState.put(UUID.randomUUID().toString, in._2)
import scala.collection.JavaConverters._

//获取map集合当中所有的value,我们每次将数据的value给放到map的value里面去
val listState: List[Double] = mapState.values().iterator().asScala.toList
if (listState.size >= 3) {
var count = 0L
var sum = 0d
for (eachState <- listState) {
count += 1
sum += eachState
}
out.collect(in._1, sum / count)
}
}
}

ReducingState、AggregatingState等考虑实际场景使用不多,就不在此列举。

Operator State(算子状态)

Operator State 与 Key 无关,而是与 Operator 绑定,整个 Operator 只对应一个 State。比如:Flink 中的 Kafka Connector 就使用了 Operator State,它会在每个 Connector 实例中,保存该实例消费 Topic 的所有(partition,offset)映射,operator state 只有一种托管状态:ValueState

Operator State案例演示

Flink的状态管理之State Backend

默认情况下,state会保存在taskmanager的内存中,checkpoint会存储在JobManager的内存中。state 的存储和checkpoint的位置取决于State Backend的配置。Flink一共提供了3种StateBackend,MemoryStateBackend基于内存存储、FsStateBackend基于文件系统存储、RocksDBStateBackend基于数据库存储。可以通过 StreamExecutionEnvironment.setStateBackend(...)来设置state存储的位置。

MemoryStateBackend

将数据持久化状态存储到内存当中,state数据保存在java堆内存中,执行checkpoint的时候,会把state的快照数据保存到jobmanager的内存中。基于内存的state backend在生产环境下不建议使用。

浅谈flink state状态管理机制

代码配置

environment.setStateBackend(new MemoryStateBackend()) 设置MemoryStateBackend。

使用场景

  • 本地调试场景
  • flink任务状态数据量较小的场景

FsStateBackend

state数据保存在taskmanager的内存中,执行checkpoint的时候,会把state的快照数据保存到配置的文件系统中。可以使用hdfs等分布式文件系统.

浅谈flink state状态管理机制

代码配置

environment.setStateBackend(new FsStateBackend("hdfs://node01:8020/flink/checkDir"))

使用场景

  • 状态数据特别的多,还有长时间的window算子等,它很安全,因为基于hdfs,所以数据有备份很安全
  • 大状态、长窗口、大key/value状态的的任务
  • 全高可用配置

RocksDBStateBackend

RocksDB介绍:
RocksDB使用一套日志结构的数据库引擎,它是Flink中内置的第三方状态管理器,为了更好的性能,这套引擎是用C++编写的。 Key和value是任意大小的字节流。RocksDB跟上面的都略有不同,它会在本地文件系统中维护状态,state会直接写入本地rocksdb中。同时它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到fileSystem中。fail over的时候从fileSystem中恢复到本地RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用.

![](https://static.studytime.xin/article/2020/09/15997362546353.jpg)

代码配置

1
2
3
4
5
6
7
8
9
# 导入jar包然后配置代码
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.9.2</version>
</dependency>

# 代码配置
environment.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink/checkDir",true))

使用场景

  • 大状态、长窗口、大key/value状态的的任务
  • 全高可用配置

如何选择以及使用state-backend

由于RocksDBStateBackend将工作状态存储在taskManger的本地文件系统,状态数量仅仅受限于本地磁盘容量限制,对比于FsStateBackend保存工作状态在内存中,RocksDBStateBackend能避免flink任务持续运行可能导致的状态数量暴增而内存不足的情况,因此适合在生产环境使用。

修改state-backend的两种方式

单任务调整
1
2
3
4
env.setStateBackend(
new FsStateBackend("hdfs://node01:8020/flink/checkDir"))
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】

全局调整

1
2
3
4
vim flink-conf.yaml

state.backend: filesystem
state.checkpoints.dir: hdfs://node01:8020/flink/checkDir

注意:state.backend的值可以是下面几种

  • jobmanager 表示使用 MemoryStateBackend
  • filesystem 表示使用 FsStateBackend
  • rocksdb 表示使用 RocksDBStateBackend
「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
0 条回复 A 作者 M 管理员
    所有的伟大,都源于一个勇敢的开始!
欢迎您,新朋友,感谢参与互动!欢迎您 {{author}},您在本站有{{commentsCount}}条评论