1-flink部署
flink 部署
下载 wget https://archive.apache.org/dist/flink/flink-1.5.6/flink-1.5.6-bin-hadoop27-scala_2.11.tgz
1. standalone方式
- 解压文件 tar zxvf flink-1.5.6-bin-hadoop27-scala_2.11.tgz -C /opt/
- 修改环境变量
- 修改配置文件
vim $FLINK_HOME/conf/flink-conf.yaml
#修改
jobmanager.rpc.address: hadoop-01
taskmanager.numberOfTaskSlots: 4
--------
vim $FLINK_HOME/conf/masters
#修改
hadoop-01:8081
--------
vim $FLINK_HOME/conf/slaves
hadoop-01
hadoop-02
hadoop-03
--------
- 把flink-1.5.6拷贝到其他机器
scp -r flink-1.5.6 hadoop-02:
pwd
scp -r flink-1.5.6 hadoop-03:pwd
- 执行启动命令 [root@hadoop-01 ~]# start-cluster.sh Starting cluster. Starting standalonesession daemon on host hadoop-01. Starting taskexecutor daemon on host hadoop-01. Starting taskexecutor daemon on host hadoop-02. Starting taskexecutor daemon on host hadoop-03.
- 登录页面查看
- 编写WordCount实时程序
package io.github.wzq.offline
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
/**
* Created by azhe on 2019-11-21 14:37
*/
object StreamWordCount {
def main(args: Array[String]): Unit = {
val arg: ParameterTool = ParameterTool.fromArgs(args)
val host: String = arg.get("host")
val port: Int = arg.getInt("port")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val socktStream: DataStream[String] = env.socketTextStream(host,port)
val wordCount: DataStream[(String, Int)] = socktStream.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.sum(1)
wordCount.print().setParallelism(2)
env.execute("Stream word count")
}
}
- hadoop-02启动tcp端口 [root@hadoop-02 ~]# nc -lk 8888
- 上传打包好的jar,编辑提交参数,执行submit
- running状态
- 发送两条数据,查看taskmanager的stdout [root@hadoop-02 ~]# nc -lk 8888 hello flink hello scala
- 页面查看结果
- 停止任务可以页面点击cancel按钮,或者命令行操作
[root@hadoop-01 conf]# flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
21.11.2019 04:17:57 : 7b6607a8a5cdecaa7e88bcfa3ec680cc : Stream word count (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
[root@hadoop-01 conf]# flink cancel 7b6607a8a5cdecaa7e88bcfa3ec680cc
Cancelling job 7b6607a8a5cdecaa7e88bcfa3ec680cc.
Cancelled job 7b6607a8a5cdecaa7e88bcfa3ec680cc.
[root@hadoop-01 conf]# flink list
Waiting for response...
No running jobs.
No scheduled jobs.
yarn-session.sh -n 3 -tm 1024 -s 2