作者:周凯波(宝牛)
在前面几期的课程里面讲过了 Flink 开发环境的搭建和应用的部署以及运行,今天的课程主要是讲 Flink 的客户端操作。本次讲解以实际操作为主。这次课程是基于社区的 Flink 1.7.2 版本,操作系统是 Mac 系统,浏览器是 Google Chrome 浏览器。有关开发环境的准备和集群的部署,请参考「开发环境搭建和应用的配置、部署及运行」的内容。
如下图所示,Flink 提供了丰富的客户端操作来提交任务和与任务进行交互,包括 Flink 命令行,Scala Shell,SQL Client,Restful API 和 Web。Flink 首先提供的最重要的是命令行,其次是 SQL Client 用于提交 SQL 任务的运行,还有就是 Scala Shell 提交 Table API 的任务。同时,Flink 也提供了Restful 服务,用户可以通过 http 方式进行调用。此外,还有 Web 的方式可以提交任务。
在 Flink 安装目录的 bin 目录下面可以看到有 flink, start-scala-shell.sh 和 sql-client.sh 等文件,这些都是客户端操作的入口。
Flink 的命令行参数很多,输入 flink - h 能看到完整的说明:
如果想看某一个命令的参数,比如 Run 命令,输入:
本文主要讲解常见的一些操作,更详细的文档请参考: Flink 命令行官方文档。
首先启动一个 Standalone 的集群:
打开 http://127.0.0.1:8081 能看到 Web 界面。
Run
运行任务,以 Flink 自带的例子 TopSpeedWindowing 为例:
运行起来后默认是 1 个并发:
点左侧「Task Manager」,然后点「Stdout」能看到输出日志:
或者查看本地 Log 目录下的 *.out 文件:
List
查看任务列表:
Stop
停止任务。通过 -m 来指定要停止的 JobManager 的主机地址和端口。
从日志里面能看出 Stop 命令执行失败了。一个 Job 能够被 Stop 要求所有的 Source 都是可以 Stoppable 的,即实现了 StoppableFunction 接口。
Cancel
取消任务。如果在 conf/flink-conf.yaml 里面配置了 state.savepoints.dir,会保存 Savepoint,否则不会保存 Savepoint。
也可以在停止的时候显示指定 Savepoint 目录。
取消和停止(流作业)的区别如下:
- cancel() 调用,立即调用作业算子的 cancel() 方法,以尽快取消它们。如果算子在接到 cancel() 调用后没有停止,Flink 将开始定期中断算子线程的执行,直到所有算子停止为止。
- stop() 调用,是更优雅的停止正在运行流作业的方式。stop() 仅适用于 Source 实现了 StoppableFunction 接口的作业。当用户请求停止作业时,作业的所有 Source 都将接收 stop() 方法调用。直到所有 Source 正常关闭时,作业才会正常结束。这种方式,使作业正常处理完所有作业。
Savepoint
触发 Savepoint。
说明:Savepoint 和 Checkpoint 的区别(详见文档):
- Checkpoint 是增量做的,每次的时间较短,数据量较小,只要在程序里面启用后会自动触发,用户无须感知;Checkpoint 是作业 failover 的时候自动使用,不需要用户指定。
- Savepoint 是全量做的,每次的时间较长,数据量较大,需要用户主动去触发。Savepoint 一般用于程序的版本更新(详见文档),Bug 修复,A/B Test 等场景,需要用户指定。
通过 -s 参数从指定的 Savepoint 启动:
查看 JobManager 的日志,能够看到类似这样的 Log:
Modify
修改任务并行度。
为了方便演示,我们修改 conf/flink-conf.yaml 将 Task Slot 数从默认的 1 改为 4,并配置 Savepoint 目录。(Modify 参数后面接 -s 指定 Savepoint 路径当前版本可能有 Bug,提示无法识别)
修改参数后需要重启集群生效,然后再启动任务:
从页面上能看到 Task Slot 变为了 4,这时候任务的默认并发度是 1。
通过 Modify 命令依次将并发度修改为 4 和 3,可以看到每次 Modify 命令都会触发一次 Savepoint。
查看 JobManager 的日志,可以看到:
Info
Info 命令是用来查看 Flink 任务的执行计划(StreamGraph)的。
拷贝输出的 Json 内容,粘贴到这个网站:http://flink.apache.org/visualizer/
可以和实际运行的物理执行计划对比:
单任务 Attach 模式
默认是 Attach 模式,即客户端会一直等待直到程序结束才会退出。
- 通过 -m yarn-cluster 指定 Yarn 模式
- Yarn 上显示名字为 Flink session cluster,这个 Batch 的 Wordcount 任务运行完会 FINISHED。
- 客户端能看到结果输出
如果我们以 Attach 模式运行 Streaming 的任务,客户端会一直等待不退出,可以运行以下的例子试验下:
单任务 Detached 模式
- 由于是 Detached 模式,客户端提交完任务就退出了
- Yarn 上显示为 Flink per-job cluster
启动 Session
表示启动一个 Yarn session 集群,每个 TM 的内存是 2 G,每个 TM 有 3 个 Slot。(注意:-n 参数不生效)
客户端默认是 Attach 模式,不会退出:
- 可以 ctrl + c 退出,然后再通过 https://developer.aliyun.com/article/bin/yarn-session.sh -id application_47_0726 连上来;
- 或者启动的时候用 -d 则为 detached 模式
Yarn 上显示为 Flink session cluster;
- 在本机的临时目录(有些机器是 /tmp 目录)下会生成一个文件:
提交任务
将会根据 /tmp/.yarn-properties-admin 文件内容提交到了刚启动的 Session。
运行结束后 TM 的资源会释放。
提交到指定的 Session
通过 -yid 参数来提交到指定的 Session。
注:Blink版本 的 Session 与 Flink 的 Session 的区别:
- Flink 的 session -n 参数不生效,而且不会提前启动 TM;
- Blink 的 session 可以通过 -n 指定启动多少个 TM,而且 TM 会提前起来;
官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/scala_shell.html
Local
任务运行说明:
- Batch 任务内置了 benv 变量,通过 print() 将结果输出到控制台;
- Streaming 任务内置了 senv 变量,通过 senv.execute("job name") 来提交任务,且 Datastream 的输出只有在 Local 模式下打印到控制台;
Remote
先启动一个 yarn session cluster:
启动 scala shell,连到 jm:
Yarn
按 CTRL + C 退出 Shell 后,这个 Flink cluster 还会继续运行,不会退出。
DataSet
对 DataSet 任务来说,print() 会触发任务的执行。
也可以将结果输出到文件(先删除 /tmp/out1,不然会报错同名文件已经存在),继续执行以下命令:
查看 /tmp/out1 文件就能看到输出结果。
DataSteam
对 DataStream 任务,print() 并不会触发任务的执行,需要显示调用 execute(“job name”) 才会执行任务。
TableAPI
在 Blink 开源版本里面,支持了 TableAPI 方式提交任务(可以用 btenv.sqlQuery 提交 SQL 查询),社区版本 Flink 1.8 会支持 TableAPI: https://issues.apache.org/jira/browse/FLINK-9555
SQL Client 目前还只是测试版,处于开发阶段,只能用于 SQL 的原型验证,不推荐在生产环境使用。
Select 查询
注意:如果本机的临时目录存在类似 .yarn-properties-baoniu 的文件,任务会提交到 Yarn 上。
Explain
Explain 命令可以查看 SQL 的执行计划。
SQL Client 支持两种模式来维护并展示查询结果:
- table mode: 在内存中物化查询结果,并以分页 table 形式展示。用户可以通过以下命令启用 table mode;
- changlog mode: 不会物化查询结果,而是直接对 continuous query 产生的添加和撤回(retractions)结果进行展示。
接下来通过实际的例子进行演示。
Table mode
运行结果如下图所示:
Changlog mode
运行结果如下图所示:
其中 ‘-’ 代表的就是撤回消息。
目前的 SQL Client 还不支持 DDL 语句,只能通过 yaml 文件的方式来定义 SQL 查询需要的表,UDF 和运行参数等信息。
首先,准备 env.yaml 和 input.csv 两个文件。
启动 SQL Client:
使用 insert into 写入结果表:
查询生成的结果数据文件:
也可以在 Environment 文件里面定义 UDF,在 SQL Client 里面通过 「HOW FUNCTIONS」查询和使用,这里就不再说明了。
SQL Client 功能社区还在开发中,详见 FLIP-24。
接下来我们演示如何通过 Rest API 来提交 Jar 包和执行任务。
更详细的操作请参考 Flink 的 Restful API 文档:https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html
Restful API 还提供了很多监控和 Metrics 相关的功能,对于任务提交的操作也支持的比较全面。
在 Flink Dashboard 页面左侧可以看到有个「Submit new Job」的地方,用户可以上传 Jar 包和显示执行计划和提交任务。Web 提交功能主要用于新手入门和演示用。
本期的课程到这里就结束了,我们主要讲解了 Flink 的 5 种任务提交的方式。熟练掌握各种任务提交方式,有利于提高我们日常的开发和运维效率。
视频回顾:https://zh.ververica.com/developers/flink-training-course2/
到此这篇yarn top命令(yarn 运行命令)的文章就介绍到这了,更多相关内容请继续浏览下面的相关推荐文章,希望大家都能在编程的领域有一番成就!版权声明:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权、违法违规、事实不符,请将相关资料发送至xkadmin@xkablog.com进行投诉反馈,一经查实,立即处理!
转载请注明出处,原文链接:https://www.xkablog.com/rfx/17051.html