1-kafka权限调研
调研kafka权限相关,并验证,生产级别
[toc]
一、机器准备:
ip | 机器 | 用户 | jdk版本 | zk版本 | kafka版本 | 安装路径 |
---|---|---|---|---|---|---|
10.57.26.110 | kafka-1 | admin | jdk1.8.0_191 | zookeeper-3.4.6 | kafka_2.11-1.1.1 | opt |
10.57.26.111 | kafka-1 | admin | jdk1.8.0_191 | zookeeper-3.4.6 | kafka_2.11-1.1.1 | opt |
10.57.26.112 | kafka-1 | admin | jdk1.8.0_191 | zookeeper-3.4.6 | kafka_2.11-1.1.1 | opt |
二、安装zookeeper
- 解压zookeeper tar zxvf resources/zookeeper-3.4.6.tar.gz -C opt/
- 修改配置文件zoo.cfg,myid cd opt/zookeeper-3.4.6/conf cp zoo_sample.cfg zoo.cfg vim zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
#修改数据目录
dataDir=/home/admin/data/zookeeper
clientPort=2181
#添加主机配置
server.1=kafka-1:2888:3888
server.2=kafka-2:2888:3888
server.3=kafka-3:2888:3888
mkdir ~/data/zookeeper
到之前配置的zookeeper数据文件所在的目录下生成一个文件叫myid,其中写上一个数字表明当前机器是哪一个编号的机器。
echo 1 > ~/data/zookeeper/myid
3. 拷贝到另外2台服务器
cd ~/opt
scp -r zookeeper-3.4.6 admin@kafka-2:pwd
scp -r zookeeper-3.4.6 admin@kafka-3:pwd
4. 修改myid
echo 2 > ~/data/zookeeper/myid
echo 3 > ~/data/zookeeper/myid
5. 配置环境变量:vim /etc/profile
echo '
export JAVA_HOME=/home/admin/opt/jdk1.8.0_191
export KAFKA_HOME=/home/admin/opt/kafka_2.11-1.1.1
export ZOOKEEPER_HOME=/home/admin/opt/zookeeper-3.4.6
export PATH=$PATH:$JAVA_HOME/bin:$KAFKA_HOME/bin:$ZOOKEEPER_HOME/bin
' >> ~/.bashrc
source ~/.bashrc
java -version
- 启动zookeeper 启动zookeeper的各种命令操作如下,可以使用绝对路径操作这些命令,也可使用相对路径操作这些命令,相对路径需要进到zookeeper服务的bin目录进行操作。
#启动ZK服务:
zkServer.sh start
#停止ZK服务:
zkServer.sh stop
#重启ZK服务:
zkServer.sh restart
#查看ZK服务状态:
zkServer.sh status
Zookeeper集群需要每台挨个启动。
- 下载kafka1.1.1
- 解压
tar zxvf resources/kafka_2.11-1.1.1.tgz -C opt/
- 进入config目录,修改配置文件
cd opt/kafka_2.11-1.1.1/config/
vim server.properties
# (其他机器为 1/2)
broker.id =0
# zookeeper地址
zookeeper.connect= 10.57.26.110:2181,10.57.26.111:2181,10.57.26.112:2181
log.dirs =/home/admin/data/kafka-logs
# 本机地址
listeners=PLAINTEXT://10.57.26.110:9092
- 发送到其他机器并修改对应配置broker .id,listeners
- 添加环境变量
- 挨个启动kafka
nohup kafka-server-start.sh $KAFKA_HOME/config/server.properties > ~/kafka.out 2>&1 &
- 通过zookeeper命令行方式查看kafka集群节点数
zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids [0, 1, 2]
- 创建topic
kafka-topics.sh \
--create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic test-topic
- 列出所有topic
kafka-topics.sh \
--list \
--zookeeper 10.57.26.110:2181
- 查看列表及具体信息
kafka-topics.sh \
--zookeeper localhost \
--describe
>Topic:test-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: test-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,1,0
- 查看topic集群情况:
kafka-topics.sh \
--describe \
--zookeeper localhost \
--topic test-topic
- 生产消息
kafka-console-producer.sh \
--broker-list 10.57.26.110:9092 \
--topic test-topic
- 消费消息
kafka-console-consumer.sh \
--zookeeper 10.57.26.110:2181 \
--from-beginning \
--topic test-topic
- 删除topic server.properties中配置了delete.topic.enable=true 通过kafka删除 效果显示的只是mark for deletion 并没有删除
kafka-topics.sh \
--delete \
--zookeeper 10.57.26.110:2181 \
--topic test-topic
- 彻底删除topic
zkCli.sh -server 10.57.26.110
rmr /brokers/topic/test-topic
rmr /admin/delete_topics/test-topic
rmr /config/topics/test-topic
- 重启kafka
kill -9 `ps -ef | grep kafka | grep -v grep| awk '{print $2}'`
nohup kafka-server-start.sh $KAFKA_HOME/config/server.properties > ~/kafka.out 2>&1 &
三、安装iptables
- 安装 sudo yum -y install iptables-services
- 启动 sudo service iptables start
- 查看状态 service iptables status
- 设置iptables的开机自启动 sudo systemctl enable iptables
- 添加白名单规则及开放指定端口 sudo vim /etc/sysconfig/iptables
*filter
:INPUT ACCEPT [0:0]
:FORWARD ACCEPT [0:0]
:OUTPUT ACCEPT [0:0]
增加白名单
-N whitelist
-A whitelist -s 10.57.26.110 -j ACCEPT
-A whitelist -s 10.57.26.111 -j ACCEPT
-A whitelist -s 10.57.26.112 -j ACCEPT
-A INPUT -m state --state RELATED,ESTABLISHED -j ACCEPT
所有端口对白名单开放
-A INPUT -m state --state NEW -m tcp -p tcp --dport 1:65535 -j whitelist
-A INPUT -p icmp -j ACCEPT
-A INPUT -i lo -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 22 -j ACCEPT
开放kafka端口
-A INPUT -p tcp -m state --state NEW -m tcp --dport 9092 -j ACCEPT
开放kafka jmx端口
-A INPUT -p tcp -m state --state NEW -m tcp --dport 9999 -j ACCEPT
-A INPUT -j REJECT --reject-with icmp-host-prohibited
-A FORWARD -j REJECT --reject-with icmp-host-prohibited
COMMIT
- 重启 sudo service iptables restart
- 查看配置规则 sudo iptables -L -n
四、测试flink消费
- 创建topic
kafka-topics.sh \
--create \
--zookeeper 10.57.26.110:2181 \
--replication-factor 1 \
--partitions 1 \
--topic test
- 消费
kafka-console-consumer.sh \
--zookeeper 10.57.26.110:2181 \
--from-beginning \
--topic test
- 启动生产者发送消息
- 启动flink程序消费
五、开启SASL_SCRAM认证
1. 创建SCRAM证书
Kafka的SCRAM实现使用Zookeeper作为证书存储。通过使用kafka-configs.sh来创建证书。 对于启用的每个SCRAM机制,必须通过使用机制名称添加配置来创建证书。 必须在kafka broker启动之前创建broker之间通信的证书。客户端证书可以动态创建和更新,并且将使用更新后的证书来验证新的连接。
- 为用户td-kafka创建SCRAM凭证(密码为tongdun123),该用户作为kafka各broker之间通信的用户:
kafka-configs.sh \
--zookeeper localhost:2181 \
--alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=tongdun123],SCRAM-SHA-512=[password=tongdun123]' \
--entity-type users \
--entity-name td-kafka
如果未指定迭代数,则使用默认迭代数为4096。 创建一个随机salt,由salt,迭代,StoredKey和ServerKey组成的SCRAM标识,都存储在Zookeeper中。
- 可以使用–describe列出现有的证书:
kafka-configs.sh \
--zookeeper localhost:2181 \
--describe \
--entity-type users \
--entity-name td-kafka
- 可以使用–delete为一个或多个SCRAM机制删除证书:
#添加一个admin用户
kafka-configs.sh \
--zookeeper localhost:2181 \
--alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=tongdun123],SCRAM-SHA-512=[password=tongdun123]' \
--entity-type users \
--entity-name admin
#删除admin用户
kafka-configs.sh \
--zookeeper localhost:2181 \
--alter \
--delete-config 'SCRAM-SHA-512,SCRAM-SHA-256' \
--entity-type users \
--entity-name admin
- SCRAM加密信息在zk里面,所以zk需要加固,才能保证密钥不被泄露
2. 配置Broker
- 配置Kafka Broker 在每个Kafka broker的config目录下添加一个类似于下面的JAAS文件,我们姑且将其称为kafka_server_jaas.conf: cd $KAFKA_HOME/config vim kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="td-kafka"
password="tongdun123";
};
- 复制到其他broker节点
scp kafka_server_jaas.conf kafka-2:`pwd`
scp kafka_server_jaas.conf kafka-3:`pwd`
其中,broker使用KafkaServer中的用户名和密码来和其他broker进行连接。 在这个例子中,td-kafka是broker之间通信的用户。 3. 修改kafka启动脚本,讲JAAS配置文件的位置作为JVM参数:
cd $KAFKA_HOME/bin
vim kafka-server-start.sh
#添加一下内容
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_server_jaas.conf"
fi
-
复制到其他broker节点 scp kafka-server-start.sh kafka-2:
pwd
scp kafka-server-start.sh kafka-3:pwd
-
修改server.properties中配置SASL端口和SASL机制。 增加一下三行:
cd $KAFKA_HOME/config
vim server.properties
#修改
listeners=SASL_PLAINTEXT://10.57.26.110:9092
#添加下面内容
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
#默认禁止一切操作,必须显式授权
#allow.everyone.if.no.acl.found=false
#管理员用户允许做一切操作
super.users=User:td-kafka;
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
3. 配置kafka客户端
- 在config目录添加kafka_client_jaas.conf
cd $KAFKA_HOME/config
vim kafka_client_jaas.conf
#添加内容
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="td-kafka"
password="tongdun123";
};
- 复制到其他broker节点
scp kafka_client_jaas.conf kafka-2:`pwd`
scp kafka_client_jaas.conf kafka-3:`pwd`
- 修改producer.properties,consumer.properties 中配置以下参数,并同步到所有节点
echo '
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
' >> producer.properties
echo '
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
' >> consumer.properties
scp producer.properties consumer.properties kafka-2:`pwd`
scp producer.properties consumer.properties kafka-3:`pwd`
- 修改
kafka-console-producer.sh
和kafka-console-consumer.sh
文件
cd $KAFKA_HOME/bin
vim kafka-console-producer.sh
#添加一下内容
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_client_jaas.conf"
fi
vim kafka-console-consumer.sh
#添加一下内容
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_client_jaas.conf"
fi
- 复制到其他broker节点
scp kafka-console-producer.sh kafka-console-consumer.sh kafka-2:`pwd`
scp kafka-console-producer.sh kafka-console-consumer.sh kafka-3:`pwd`
- 重启kafka集群
- 创建topic
kafka-topics.sh \
--zookeeper 10.57.26.110:2181 \
--create \
--topic test \
--partitions 1 \
--replication-factor 1;
- 命令行启动消费者
kafka-console-consumer.sh \
--bootstrap-server 10.57.26.110:9092 \
--consumer.config $KAFKA_HOME/config/consumer.properties \
--topic test
- 命令行启动生产者
kafka-console-producer.sh \
--broker-list 10.57.26.110:9092 \
--producer.config $KAFKA_HOME/config/producer.properties \
--topic test
- 正常消费
4.权限管理
4.1 用户管理
- 用户创建
kafka-configs.sh \
--zookeeper 10.57.26.110:2181 \
--alter \
--add-config 'SCRAM-SHA-256=[password=test123]' \
--entity-type users \
--entity-name test
- v查看用户 ls /config/users
- 查看用户信息
kafka-configs.sh \
--zookeeper 10.57.26.110:2181 \
--describe \
--entity-type users \
# 可以不指定用户
--entity-name test
- 修改用户密码
kafka-configs.sh \
--zookeeper 10.57.26.110:2181 \
--alter \
--add-config 'SCRAM-SHA-256=[password=test1234]' \
--entity-type users \
--entity-name test
- 删除用户
kafka-configs.sh \
--zookeeper 10.57.26.110:2181 \
--alter \
--delete-config 'SCRAM-SHA-256' \
--entity-type users \
--entity-name test
4.2 Topic授权管理
- 授予test用户对test topic 写权限, 只允许 10.57.241.* 网段
kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=10.57.26.110:2181 \
--add \
--allow-principal User:test \
--operation Write \
--topic test \
--allow-host 10.57.241.161
- 删除topic权限
kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=localhost:2181 \
--remove \
--allow-principal User:test \
--operation Write \
--topic test \
--allow-host 10.57.241.*
- 查看topic权限
kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=localhost:2181 \
--list \
--topic test
六、Scala/Java 原生权限验证
- 消费及生产程序改造 kafka的properties需要添加
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name)
props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"dev\" password=\"dev123\";")
props.put(SaslConfigs.SASL_MECHANISM,"SCRAM-SHA-256")
- 创建topic
kafka-topics.sh \
--zookeeper 10.57.26.110:2181 \
--create \
--topic dev-topic \
--partitions 3 \
--replication-factor 2
- 启动消费程序跟生产程序都报错
failed authentication due to: Authentication failed due to invalid credentials with SASL mechanism SCRAM-SHA-256
- 查询权限
kafka-acls.sh \
--authorizer-properties zookeeper.connect=10.57.26.110:2181 \
--list \
--topic dev-topic
Current ACLs for resource
Topic:dev-topic
:
- 创建dev用户
kafka-configs.sh \
--zookeeper localhost:2181 \
--alter \
--add-config 'SCRAM-SHA-256=[password=dev123]' \
--entity-type users \
--entity-name dev
- 添加写权限
kafka-acls.sh \
--authorizer-properties zookeeper.connect=10.57.26.110:2181 \
--add \
--allow-principal User:dev \
--producer \
--topic dev-topic
–producer实际上在Topic域上创建了(Write/Describe/Create)3个子权限,用户也可以单独创建者三个子权限。
- 再次启动生产程序不报错了
- 添加读权限
kafka-acls.sh \
--authorizer-properties zookeeper.connect=10.57.26.110:2181 \
--add \
--allow-principal User:dev \
--consumer \
--topic dev-topic \
--group '*'
和producer相比,consumer还有一个额外的参数–group,如果没有限制,则置成’*‘即可;这个–consumer的选择实际上在Topic域上创建了(Read/Describe)2个子权限,然后在Group域创建了(Read)1个子权限:
- dev用户可以正常消费
- 查看dev-topic权限
kafka-acls.sh \
--authorizer-properties zookeeper.connect=10.57.26.110:2181 \
--list \
--topic dev-topic
>Current ACLs for resource `Topic:dev-topic`:
User:dev has Allow permission for operations: Describe from hosts: *
User:dev has Allow permission for operations: Write from hosts: *
User:dev has Allow permission for operations: Read from hosts: *
七、Flink、Spark Streaming、Spring boot权限验证
八、Kafka-manager部署
- 下载最新kafka-manager-1.3.3.23版本
- 解压安装 unzip kafka-manager-1.3.3.23.zip -d ~/opt/
- 修改配置文件
> vim ~/opt/kafka-manager-1.3.3.23/conf
#修改zk ip
kafka-manager.zkhosts="stream-server:2181"
- 启动 nohup kafka-manager &
- 页面添加kafka集群 填写zk地址,选择kafka版本,开启Poll consumer information
- 添加加密集群 填写zk地址,选择kafka版本,开启Poll consumer information,填写Security Protocol,SASL Mechanism,SASL JAAS Config
八、Kafka-manager故障解决
- 收集JMX端口打开
java.lang.IllegalArgumentException: requirement failed: No jmx port but jmx polling enabled!
解决办法:
在
kafka-server-start.sh
添加export JMX_PORT=9999
- no route to host
k.m.j.KafkaJMX$ - Failed to connect to service:jmx:rmi:///jndi/rmi://10.57.26.111:9999/jmxrmi
java.io.IOException: Failed to retrieve RMIServer stub: javax.naming.CommunicationException [Root exception is java.rmi.ConnectIOException: Exception creating connection to: 10.57.26.111; nested exception is:
java.net.NoRouteToHostException: No route to host (Host unreachable)]
解决办法:添加-Djava.rmi.server.hostname=kafka当前主机ip KAFKA_JMX_OPTS=”-Djava.rmi.server.hostname=10.57.26.110 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false " 修改防火墙规则 3. Caused by: java.lang.IllegalArgumentException: JAAS config entry not termina 解决办法: kafka配置jaas密码的末尾少一个分号
九、Kafka MirrorMaker使用
- 修改consumer.properties
# 源kafka集群地址
bootstrap.servers=10.57.26.136:9092
# consumer group id 自定义
group.id=dp-MirrorMaker
- 复制出一个供mirror使用的producer.properties
>cp producer.properties mirror-producer.properties
>vim mirror-producer.properties
#修改需要同步的目标集群地址
bootstrap.servers=10.57.26.110:9092,10.57.26.111:9092,10.57.26.112:9092
# 添加SASL配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=** password=”**“;
- 启动脚本
kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $KAFKA_HOME/config/consumer.properties --producer.config $KAFKA_HOME/config/mirror-producer.properties --num.streams 1 —-num.producers 1 --whitelist=mirror1
十、Kafka 源码阅读及修改
- 下载kafka1.1.1源码
- 下载gradle-4.10.3并安装
- idea导入项目
- 下载gradle依赖及编译 ./gradlew jar
- 为了提高gradle下载依赖速度
>vim ~/.gradle/init.gradle
#添加一下内容到文件
allprojects{
repositories {
def REPOSITORY_URL = 'http://maven.aliyun.com/nexus/content/groups/public/'
all { ArtifactRepository repo ->
if(repo instanceof MavenArtifactRepository){
def url = repo.url.toString()
if (url.startsWith('https://repo1.maven.org/maven2') || url.startsWith('https://jcenter.bintray.com/')) {
project.logger.lifecycle "Repository ${repo.url} replaced by $REPOSITORY_URL."
remove repo
}
}
}
maven {
url REPOSITORY_URL
}
}
}
- 修改后的包替换kafka_2.11-1.1.1.jar
- 启动脚本
kafka-run-class.sh kafka.tools.MirrorMaker \
--consumer.config $KAFKA_HOME/config/consumer.properties \
--producer.config $KAFKA_HOME/config/mirror-producer.properties \
--num.streams 1 \
--message.handler kafka.tools.CustomMirrorMakerMessageHandler \
--whitelist="mirror1|mirror2" \
--message.handler.args "mirror1->mirror1,mirror2|mirror2->mirror2"
错误
1. kafka源码日志未打印
出现:SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”. 具体做法: 1.下载相应的jar包,libs/slf4j-log4j12-1.7.25.jar libs/log4j-1.2.17.jar 2.导入jar包 具体的IDEA操作如下: (1)File -> Project Structure -> Modules (2)找到 core,main,打开 dependencies,点击 +,添加 libs目录
2. 缺少log4j.properties
log4j:WARN No appenders could be found for logger (kafka.utils.Log4jControllerRegistration$). 解决办法: 在core-main添加resources目录,新建log4j.properties,并添加以下内容
log4j.rootCategory=info, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d] %p %m (%c)%n
十一、kafka压测脚本
- topic:test-perf
- 分区数:1
- 副本数:1
- 测试机器:4C16G,kafka单节点
1. kafka生产数据写入压力测试
2. kafka消费数据压力测试
3. 写入压测命令
4. 消费压测命令
十二、kafka数据同步
- 创建topic cd sasl_kafka_tools/ ./kafka_topic_create.sh teset-copy 2 1
- 配置mirror-maker环境变量
- 启动mirror-maker ./pstart.sh mk
- 通过脚本发送数据
kafka-producer-perf-test.sh \
--topic teset-copy \
--num-records 1000000 \
--record-size 2000 \
--throughput 5000 \
--producer-props bootstrap.servers=10.57.26.136:9092
- 源topic数据量2G左右。同步10个副本到二级kafka,数据量10倍,占二级kafka集群磁盘空间20G
十三、机器评估
生产环境kafka,36h数据量top3
- dsp-wave-radar-original 总内存约为 361 G
- etl-merge-vehicle-data 总内存约为 31 G
- etl-siteregion-speed 总内存约为 24 G
二级kafka3台机器,磁盘空间单台1T,可支持扩展