Flink WordCount 程序

2019/07/23 Flink

Mac 上搭建 Flink 1.8.0 环境并构建运行简单 WordCount 简单程序入门


前言

18年关注到的 Flink。那是还懵懵懂懂参加了 Flink Forward in China 2018。

因岗位为 SRE 和公司暂无需要。一直没有认真入门过。

这次打算作为 19 年下半年计划正式入坑学习。

开始前建议先阅读下云邪入门教程 5分钟从零构建第一个 Flink 应用

准备工作

Flink 可以运行在 Linux、Mac 和 Windows 上,唯一的要求就是必须安装 Java 8 或以上版本。

可以通过发出以下命令来检查Java的正确安装

java -version

有 Java 8,输出将如下所示

java version "1.8.0_201"
Java(TM) SE Runtime Environment (build 1.8.0_201-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)

Java 安装后去官网下载 Flink,然后解压即可运行。这里以 flink-1.8.0 为例。

cd ~/Downloads/
tar xzf flink-1.8.0-bin-scala_2.12.tgz
cd flink-1.8.0

配置全局变量。FLINK_HOME 写你自己的 Flink 路径

vim ~/.bash_profile

# Flink HOME
FLINK_HOME=/Users/tu/Public/SoftWare/flink-1.8.0 
export PATH=$FLINK_HOME/bin:$PATH

对于 MacOS 用户,可以选择通过 homebrew 安装 Flink。一键安装就可以用,不需要配置全局变量

brew install apache-flink

检查安装

$ flink --version
Version: 1.8.0, Commit ID: 4caec0d  

在 Flink 目录下运行以下命令即可启动

[16:29:35] tu flink-1.8.0 $ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host lihuimindeMacBook-Pro.local.
Starting taskexecutor daemon on host lihuimindeMacBook-Pro.local.

接着可以打开浏览器访问 http://localhost:8081/ 查看

通过 jps 可以看到多出来两个JVM进程,运行的主类

[16:29:45] tu bin $ jps -l
9169 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
9702 sun.tools.jps.Jps
9658 org.apache.flink.runtime.taskexecutor.TaskManagerRunner

开发

集群启动后就可以开发 Flink 程序

使用 IDEA 新建一个 maven 项目

如果没有看到 flink-quickstart-java 的话通过右上角的 Add Archetype 按钮来添加。

创建一个 SocketTextStreamWordCount Java 文件,加入以下代码

package cn.lihm.examples.streaming;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author lihm
 * @date 2019-07-23 16:18
 * @description TODO
 */

public class SocketTextStreamWordCount {
    public static void main(String[] args) throws Exception {
        // the port to connect to
        final String hostname;
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            port = params.getInt("port");
            hostname = params.get("hostname");
        } catch (Exception e) {
            System.err.println("USAGE: Please run 'SocketWindowWordCount --hostname <hostname> --port <port>'");
            return;
        }
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 获取数据
        DataStreamSource<String> stream = env.socketTextStream(hostname, port);

        // 计数
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);

        sum.print();

        env.execute("Java WordCount from SocketTextStream Example");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] tokens = s.toLowerCase().split("\\W+");

            for (String token: tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

官网还有 Scala 版本的。自己学习尝试即可。这里不介绍了

运行

接着进入工程目录,使用以下命令打包。

mvn clean package -Dmaven.test.skip=true

然后开启监听 9000 端口

nc -l 9000

最后进入 flink 安装目录 bin 下执行以下命令提交job任务。注意换成你自己项目的路径

flink run -c cn.lihm.examples.streaming.SocketTextStreamWordCount flink-learning-examples-1.0-SNAPSHOT.jar --port 9000 --hostname 127.0.0.1

执行完上述命令后,可以在 webUI 中看到正在运行的程序

可以在 nc 监听端口中输入 text

$ nc -l 9000
lorem ipsum
ipsum ipsum ipsum
bye

该任务的输出在 flink 家目录下 log 下以 .out 结尾的文件下。
通过 tail 命令看一下输出的文件,来观察统计结果。

最后测试完可以关闭集群

./bin/stop-cluster.sh

总结

整个过程下来还是有些成就感的。提高了自己动手能力。


参考链接

Search

    Table of Contents