关于kafka的权限认证(此测试用的是2.30版本的kafka)
身份认证:对client与服务器之间的认证,broker和zookeeper之间的连接认证(producer和consumer)、其他broker、tools与broker之间连接的认证。
权限控制:实现对于topic级别的权限控制,clients的读写操作进行Authorization:(生产/消费/group)数据权限。
这里普及一下认证的知识,大家都知道操作kafka的方式有两种,第一种是对zookeeper进行操作,第二种是对kafka集群broker直接操作。这里引申出来的相关认证就有两个,第一个是对zookeeper的认证,第二个就是对broker的认证,这两种结合起来就成了,client → zookeeper,client → broker,broker → zookeeper。
如果只需要broker的连接方式的话,就可以不对zookeeper进行认证,只对broker进行认证的。
SASL验证:Simple Authentication and Security Layer(简单的认证及安全层)
SSL加密: 使用SSL对代理和客户端之间,代理之间或代理和工具之间传输的数据进行加密。(加密、认证、授权)
在0.9.0.0版中,Kafka社区添加了一些特性,通过单独使用或者一起使用这些特性,提高了Kafka集群的安全性。目前支持下列安全措施:
1. 使用SSL或SASL验证来自客户端(producers和consumers)、其他broker和工具的连接。Kafka支持以下SASL机制:
· SASL/GSSAPI (Kerberos) - 从版本0.9.0.0开始 需要独立部署验证服务
· SASL/PLAIN - 从版本0.10.0.0开始 不能动态增加用户
· SASL/SCRAM-SHA-256 和 SASL/SCRAM-SHA-512 - 从版本0.10.2.0开始 可以动态增加用户
· SASL/OAUTHBEARER - 从2.0版本开始 需要自己实现接口和token的创建和验证,需要Oauth服务
2. 从broker 到 ZooKeeper的连接认证;
3. 对broker 与 client 之间、broker之间或broker与工具之间使用SSL传输对数据加密(注意,启用SSL时性能会下降);
4. 授权客户端的读写操作;
5. 授权是可插拔的,并且支持与外部授权服务的集成。
值得注意的是,安全是可选的 - 支持非安全集群,也支持需要认证,不需要认证,加密和未加密client的混合集群。
4.1 SASL/PLAIN验证
kafka服务端配置
第一步
kafka增加JAAS文件(kafka-server-jaas.conf,这个文件可以放到$kafka_home/config下,自行决定,后面需要配置这个文件路径)
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-pwd"
user_admin="admin-pwd"
user_producer="producer-secret"
user_consumer="consumer-secret"
user_alice="alice-secret";
};
user_XXX 为自定义的用户,客户端(管理员、生产者、消费者)连接broker时所使用到的用户名密码,不能动态添加用户。
KafkaServer内配置了broker之间以及client和broker之间的认证配置。
KafkaServer.username KafkaServer.password用于broker之间的相互认证。
KafkaServer.user_kafka=“kafka-passwd” client和broker之间的认证。
第二步
配置$kafka_home/config/server.properties
listeners=SASL_PLAINTEXT://host.name::9092
advertised.listeners=SASL_PLAINTEXT://host.name::9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=false //当没有找到ACL配置时,允许所有的访问操作。
super.users=User:admin //设置admin为超级用户
第三步
修改 $kafka_home/bin/kafka-server-start.sh启动脚本
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka “$@”
修改为
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/etc/kafka/config/kafka-server-jaas.conf kafka.Kafka “$@”
然后启动kafka
第四步
授权
给alice用户授权所有topic的写权限(生产权限)
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:12181 --add --allow-principal User:alice --operation Write --topic *
给alice用户授权所有topic的读权限(消费权限)
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:12181 --add --allow-principal User:alice --operation Read --topic *
给alice用户授权所有topic的详情权限(详情权限)
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:12181 --add --allow-principal User:alice --operation Decribe --topic *
给alice用户配置消费组权限(消费组权限)
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:12181 --add --allow-principal User:alice --operation Read --group test-group
查看配置权限信息
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:12181 --list
kafka客户端使用
生产者
package cn.xdf.xadd.xaddstarterkafkasample.sample;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @Classname KafkaProducer
* @Date 2020/11/11 上午11:25
* @auth tianshuailong
*/
public class KafkaProducer {
private final static String TOPIC = "test-topic1";
private final static String BROKERS = "host.name:port"; // 你自己的Kafka broker地址
private static org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
static {
Properties c = initConfig();
producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(c);
}
public static Properties initConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// SASL/PLAIN 认证配置
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\" password=\"alice-secret\";");
return props;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, 0, "tian", "xdf-9901-value-" + (int) (10 * (Math.random())));
Future<RecordMetadata> send = producer.send(record, (recordMetadata, e) -> {
if (Objects.nonNull(e)) {
System.out.println("send error: " + e.getMessage());
} else {
System.out.println(String.format("offset:%s,partition:%s", recordMetadata.offset(), recordMetadata.partition()));
}
});
send.get();
}
}
消费者
package cn.xdf.xadd.xaddstarterkafkasample.sample;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @Classname KafkaConsumer
* @Date 2020/11/11 下午3:53
* @auth tianshuailong
*/
public class KafkaConsumer {
private final static String TOPIC = "test-topic1";
private final static String BROKERS = "host.name:port"; // 你自己的Kafka broker地址
private static org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer;
static {
Properties c = initConfig();
consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(c);
}
public static Properties initConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "xdf-9901");
// SASL/PLAIN 认证配置
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\" password=\"alice-secret\";");
// 可选设置属性
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 自动提交offset,每1s提交一次
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
return props;
}
public static void main(String[] args) {
consumer.subscribe(Collections.singleton(TOPIC));
AtomicBoolean runing = new AtomicBoolean(Boolean.TRUE);
try {
while (runing.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000L);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic:" + record.topic() + ", partition:" + record.partition() + ", offset:" + record.offset());
System.out.println("key:" + record.key() + ", value:" + record.value());
}
}
} catch (Exception e) {
System.out.println("consumer error : " + e.getMessage());
} finally {
consumer.close();
}
}
}
脚本客户端
如果这个时候通过脚本发送消息的话,会报以下错误
[root@XXGL-T-TJSYZ-arch-mid-redis-test-038 bin]# ./kafka-console-producer.sh --broker-list 172.24.29.213:9092 --topic test-topic1 --producer.config …/config/producer.properties
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:433)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:45)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
Caused by: java.lang.IllegalArgumentException: Could not find a ‘KafkaClient’ entry in the JAAS configuration. System property ‘java.security.auth.login.config’ is not set
at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98)
at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:124)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:441)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:422)
… 3 more
因为broker已经加了验证,所以需要做些配置才能发送。
1. 创建JASS文件
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-pwd";
};
2. 在config/prducer.properties里添加客户端配置信息
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
3. 在kafka-console-producer.sh脚本中添加第一步创建的JAAS文件
if [ “x$KAFKA_HEAP_OPTS” = “x” ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer “$@”
修改为
if [ “x$KAFKA_HEAP_OPTS” = “x” ]; then
export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/neworiental/tsl/kafka_2.12-2.3.0/config/kafka-client-jaas.conf"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer “$@”
4. 改完后再执行脚本,成功!消费者脚本同理…
[root@XXGL-T-TJSYZ-arch-mid-redis-test-038 bin]# ./kafka-console-producer.sh --broker-list 172.24.29.213:9092 --topic test-topic1 --producer.config …/config/producer.properties
>> xdf-9901
4.2 SASL/SCRAM验证
SASL/PLAIN验证有一个问题:只能在JAAS文件KafkaServer中配置用户,一旦Kafka启动,无法动态新增用户。
第一步
这个方法是把凭证存储在了zookeeper里,可以使用kafka-config.sh脚本在zookeeper里创建凭证,对于SCRAM机制,我们需要添加用户来创建凭证,并且在kafka 启动之前创建代理间的通讯凭证。
第二步
创建SCRAM证书,使用$kafka_home/bin下的kafka-configs.sh脚本。
ps:如果没有指定迭代,则使用默认的迭代计数4096。创建一个随机的 salt,并且由 salt、 iterations、 StoredKey 和 ServerKey 组成的 SCRAM 标识存储在 Zookeeper 中。详细信息请参阅 RFC 5802(https://tools.ietf.org/html/rfc5802)。
SCRAM-SHA-256/SCRAM-SHA-512是对密码加密的算法,二者有其一即可。
1) 创建broker间通讯用户,必须在使用sasl前创建,否则启动报错。
./kafka-configs.sh --zookeeper zookeeper地址 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin
2) 创建普通用户
创建生产者用户
./kafka-configs.sh --zookeeper zookeeper地址 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=prod-secret],SCRAM-SHA-512=[password=prod-secret]' --entity-type users --entity-name producer
创建消费者用户
./kafka-configs.sh --zookeeper zookeeper地址 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=consumer-secret],SCRAM-SHA-512=[password=consumer-secret]' --entity-type users --entity-name consumer
查看用户信息
./kafka-configs.sh --zookeeper zookeeper地址 --describe --entity-type users --entity-name consumer
删除凭证
./kafka-configs.sh --zookeeper zookeeper地址 --alter --delete-config SCRAM-SHA-512,SCRAM-SHA-256 --entity-type users --entity-name producer
第三步
Configuring Kafka Broker.
1) Add JAAS file.
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};
2) 将 JAAS 配置文件位置作为 JVM 参数传递给每个 Kafka broker。
-Djava.security.auth.login.config=/etc/kafka/kafka-server-jaas.conf
3) 在 server.properties 中配置 SASL 端口和 SASL 机制,For example:
#认证配置
listeners=SASL_PLAINTEXT://host.name:port
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 (or SCRAM-SHA-512)
sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)
#ACL配置
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
4) 启动kafka。
第四步
验证,现在如果用上面的账户来发送消息或者消费消息的话,是没有权限的,因为还没有分配权限
分配权限一样是用kafka-acl.sh脚本,操作请看4.1章节里的授权,完全一样!
客户端配置:
生产者
package cn.xdf.xadd.xaddstarterkafkasample.sample;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* @Classname KafkaProducer
* @Date 2020/11/12 上午11:25
* @auth tianshuailong
*/
public class KafkaProducer1 {
private final static String TOPIC = "tsl-topic1";
private final static String BROKERS = "host.name:port"; // 你自己的Kafka broker地址
private static org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
static {
Properties c = initConfig();
producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(c);
}
public static Properties initConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// SASL/PLAIN 认证配置
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
// or SCRAM-SHA-512
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
// props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"producer\" password=\"prod-secret\";");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin-secret\";");
return props;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, 0, "tian", "xdf-9901-value-" + (int) (10 * (Math.random())));
Future<RecordMetadata> send = producer.send(record, (recordMetadata, e) -> {
if (Objects.nonNull(e)) {
System.out.println("send error: " + e.getMessage());
} else {
System.out.println(String.format("offset:%s,partition:%s", recordMetadata.offset(), recordMetadata.partition()));
}
});
send.get();
}
}
消费者
package cn.xdf.xadd.xaddstarterkafkasample.sample;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @Classname KafkaConsumer1
* @auth tianshuailong
*/
public class KafkaConsumer1 {
private final static String TOPIC = "tsl-topic1";
private final static String BROKERS = "host.name:port"; // 你自己的Kafka broker地址
private static org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer;
static {
Properties c = initConfig();
consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(c);
}
public static Properties initConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "xdf-9901");
// SASL/PLAIN 认证配置
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin-secret\";");
// 可选设置属性
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 自动提交offset,每1s提交一次
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
return props;
}
public static void main(String[] args) {
consumer.subscribe(Collections.singleton(TOPIC));
AtomicBoolean runing = new AtomicBoolean(Boolean.TRUE);
try {
while (runing.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000L);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic:" + record.topic() + ", partition:" + record.partition() + ", offset:" + record.offset());
System.out.println("key:" + record.key() + ", value:" + record.value());
}
}
} catch (Exception e) {
System.out.println("consumer error : " + e.getMessage());
} finally {
consumer.close();
}
}
}
以上是SASL/PLAIN和SASL/SCRAM安全认证的配置,如果想了解其他更多类型的配置,可以参考 kafka源码 org.apache.kafka.common.security.* 包:
https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/security