Flink-Kafka 内置 Schemas

2019/08/16 Flink

Apache Flink 内部提供了内置的常用消息格式的 Schemas


前言

Flink 消费 Kafka 时,要对消息进行格式化

Schemas

Flink 有提供内置的 Schemas

先看 Flink 中初始化 Kafka 数据源代码,其中传入服务器名和 Topic 名就可以了

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
    "flink_test", new SimpleStringSchema(), props);
DataStream<String> stream = env.addSource(consumer);

上方 SimpleStringSchema 即为 Schemas。如果需要使用其他的,替换掉该参数,并把相应数据类型进行修改

如果想自己实现 Schema ,可以参看Apache-Flink深度解析-DataStream-Connectors之Kafka Simple ETL 部分, 与 SimpleStringSchema 是一样的效果,只是自己实现的 Schema

如果类找不到,请添加依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.8.0</version>
</dependency>
SimpleStringSchema

SimpleStringSchema 把 message 反序列化为字符串。如果 message 有键, 则忽略键

JSONDeserializationSchema

JSONDeserializationSchema 使用 jackson 将 message 反序列化为 json 格式的消息并返回 com.fasterxml.jackson.databind.node.ObjectNode 对象流。你可以使用 .get(“property”) 方法访问字段。再一次, 键被忽略。

JSONKeyValueDeserializationSchema

JSONKeyValueDeserializationSchema 与前一个非常类似,但处理带有json编码的键和值的消息

返回的 ObjectNode 包含如下字段:

key: 键中存在的所有字段 value: 所有的 message 字段 metadata(可选): 暴露消息的 offset, partition 和 topic(将 true 传递给构造函数以获取元数据)

例如:

kafka-console-producer --broker-list localhost:9092 --topic json-topic \
    --property parse.key=true \
    --property key.separator=|
{"keyField1": 1, "keyField2": 2} | {"valueField1": 1, "valueField2" : {"foo": "bar"}

会被解码为

{
    "key":{"keyField1":1,"keyField2":2},
    "value":{"valueField1":1,"valueField2":{"foo":"bar"}},
    "metadata":{
        "offset":43,
        "topic":"json-topic",
        "partition":0
    }
}

如果消息本身就没有 key、metadata。JSONKeyValueDeserializationSchema 解析出来也是没有key、value

AvroDeserializationSchema

使用静态提供的模式读取使用 Avro 格式序列化的数据

可以从 Avro 生成的类(AvroDeserializationSchema.forSpecific(…))推断出模式,或者它可以与 GenericRecords 一起使用手动提供的模式(使用AvroDeserializationSchema.forGeneric(…))

TypeInformationSerializationSchema

TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema) 它基于Flink的TypeInformation创建模式。 如果数据由Flink写入和读取,这将非常有用

总结

AvroDeserializationSchema 与 TypeInformationSerializationSchema

还没接触过,这里就暂时记录下,如果下次有用到再回来记录


参考链接

Search

    Table of Contents