Flink(二)

发布于:2022-11-28 ⋅ 阅读:(224) ⋅ 点赞:(0)

Flink快速上手

       对 Flink 有了基本的了解后,接下来就要理论联系实际,真正上手写代码了。Flink 底层是以 Java 编写的,并为开发人员同时提供了完整的 Java 和 Scala API。在本书中,代码示例将全部用 Java 实现;而在具体项目应用中,可以根据需要选择合适语言的API 进行开发。在这一章,我们将会以大家最熟悉的 IntelliJ IDEA 作为开发工具,用实际项目中最常见的Maven 作为包管理工具,在开发环境中编写一个简单的 Flink 项目,实现零基础快速上手。

环境准备

工欲善其事,必先利其器。在进行代码的编写之前,先将我们使用的开发环境和工具介绍一下:

1.系统环境为 Windows 10。

2.提前安装 Java 8。

3.集成开发环境(IDE)使用 IntelliJ IDEA,具体的安装流程参见 IntelliJ 官网

4. 安装 IntelliJ IDEA 之后,还需要安装一些插件——Maven 和Git。Maven 用来管理项目依赖;通过 Git 可以轻松获取我们的示例代码,并进行本地代码的版本控制。

 创建工程

添加项目依赖

 <dependencies>
    <!-- 引入 Flink 相关依赖-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 引入日志管理相关依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>
    </dependencies>

配置日志管理

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

批处理

1.工程根目录下新建一个 input 文件夹,并在下面创建文本文件 words.txt

在 words.txt 中输入一些文字,例如:

hello world 
hello flink
hello java

2.代码实现

package com.atguigu.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

import java.lang.reflect.Type;

public class BatchWorkCount {
    public static void main(String[] args) throws Exception {
        //执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //按行读取数据
        DataSource<String> lineDataSource = env.readTextFile("input/words.txt");

        FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] worsd = line.split(" ");

            for (String word : worsd
            ) {
                out.collect(Tuple2.of(word, 1L));
            }


        })
                .returns(Types.TUPLE(Types.STRING, Types.LONG));

        //按照word分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup =  wordAndOneTuple.groupBy(0);

        //分组内进行聚合计算
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);

        sum.print();

    }
}

这边注意的是所有导的包都得是org.apache.flink.api.*下面的

 输出结果:

(flink,1)
(world,1)
(hello,3)
(java,1)

流处理

我们已经知道,用DataSet API 可以很容易地实现批处理;与之对应,流处理当然可以用DataStream API 来实现。对于 Flink 而言,流才是整个处理逻辑的底层核心,所以流批统一之后的DataStream API 更加强大,可以直接处理批处理和流处理的所有场景。

DataStream API 作为“数据流”的处理接口,又怎样处理批数据呢?

回忆一下上一章中我们讲到的 Flink 世界观。在 Flink 的视角里,一切数据都可以认为是流,流数据是无界流,而批数据则是有界流。所以批处理,其实就可以看作有界流的处理。

对于流而言,我们会在获取输入数据后立即处理,这个过程是连续不断的。当然,有时我们的输入数据可能会有尽头,这看起来似乎就成了一个有界流;但是它跟批处理是截然不同的

——在输入结束之前,我们依然会认为数据是无穷无尽的,处理的模式也仍旧是连续逐个处理。下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。

读取文件

1.准备文件(上同)

2.代码实现

package com.atguigu.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class BounderStreamWorkCount {

    public static void main(String[] args) throws Exception {

        //创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //读取文件
        DataStreamSource<String> lineDataStreamSource = env.readTextFile("input/words.txt");

        SingleOutputStreamOperator<Tuple2<String, Long>> workAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        })
                .returns(Types.TUPLE(Types.STRING, Types.LONG));

        //分组
        KeyedStream<Tuple2<String, Long>, String> WorkAndOneKeyedStream = workAndOneTuple.keyBy(data -> data.f0);
        //求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = WorkAndOneKeyedStream.sum(1);

        //打印
        sum.print();

        //启动执行
        env.execute();
    }

}

输出实现:

5> (hello,1)
5> (hello,2)
3> (java,1)
9> (world,1)
5> (hello,3)
13> (flink,1)

读取文件流

package com.atguigu.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWorkCount {
    public static void main(String[] args) throws Exception {
        //创建流式处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //从参数中提取主机名和端口号
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
//        String host = parameterTool.get("host");
//        Integer port = parameterTool.getInt("port");

        //读取文本流
        DataStreamSource<String> lineDataStream = env.socketTextStream("hadoop102", 7777);

        SingleOutputStreamOperator<Tuple2<String, Long>> workAndOneTuple = lineDataStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        })
                .returns(Types.TUPLE(Types.STRING, Types.LONG));

        //分组
        KeyedStream<Tuple2<String, Long>, String> WorkAndOneKeyedStream = workAndOneTuple.keyBy(data -> data.f0);
        //求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = WorkAndOneKeyedStream.sum(1);

        //打印
        sum.print();

        //启动执行
        env.execute();


    }
}

Flink部署

1.上传解压

在 hadoop102 节点服务器上创建安装目录/opt/module,将 flink 安装包放在该目录下,并执行解压命令,解压至当前目录。

tar -zxvf flink-1.13.0-bin-scala_2.12.tgz -C /opt/module/ 

2.启动

bin/start-cluster.sh Starting cluster.
Starting standalonesession daemon on host hadoop102. 
Starting taskexecutor daemon on host hadoop102.

3.关闭

bin/stop-cluster.sh

4.修改配置文件

1)进入 conf 目录下,修改 flink-conf.yaml 文件,修改 jobmanager.rpc.address 参数hadoop102

cd conf/

vim flink-conf.yaml # JobManager 节点地址.
jobmanager.rpc.address: hadoop102

2)修改works配置

vim workers 
hadoop103
hadoop104

3)分发文件

xsync flink/
bin/start-cluster.sh Starting cluster.
启动集群
Starting standalonesession daemon on host hadoop102. 
Starting taskexecutor daemon on host hadoop103.
Starting taskexecutor daemon on host hadoop104.
本文含有隐藏内容,请 开通VIP 后查看