sasl support for kafka

This commit is contained in:
Andy Janata 2017-03-04 15:43:00 -08:00
parent 73c41cc59f
commit 942903e60c
3 changed files with 25 additions and 3 deletions

View File

@ -46,10 +46,16 @@ kafka.ssl=false
kafka.truststore.path=
kafka.truststore.password=
# if this is false, the rest don't matter
kafka.ssl.auth=false
kafka.ssl.usekey=false
kafka.keystore.path=
kafka.keystore.password=
kafka.key.password=
# only SCRAM-SHA-512 supported; if this is false, the rest don't matter
# must be used with ssl, does not need client key
kafka.sasl=false
kafka.sasl.username=
kafka.sasl.password=
# GeoIP database for analytics. If unset, will not be used.

View File

@ -14,10 +14,14 @@ kafka.topic=${kafka.topic}
kafka.ssl=${kafka.ssl}
kafka.truststore.path=${kafka.truststore.path}
kafka.truststore.password=${kafka.truststore.password}
kafka.ssl.auth=${kafka.ssl.auth}
kafka.ssl.usekey=${kafka.ssl.usekey}
kafka.keystore.path=${kafka.keystore.path}
kafka.keystore.password=${kafka.keystore.password}
kafka.key.password=${kafka.key.password}
# only SCRAM-SHA-512 supported
kafka.sasl=${kafka.sasl}
kafka.sasl.username=${kafka.sasl.username}
kafka.sasl.password=${kafka.sasl.password}
# TODO reload this file occasionally in case it changes?
geoip.db=${geoip.db}

View File

@ -51,6 +51,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Logger;
@ -106,11 +107,22 @@ public class KafkaMetrics implements Metrics {
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
inProps.getProperty("kafka.truststore.password"));
if (Boolean.valueOf(inProps.getProperty("kafka.ssl.auth", "false"))) {
if (Boolean.valueOf(inProps.getProperty("kafka.ssl.usekey", "false"))) {
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, inProps.get("kafka.keystore.path"));
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, inProps.get("kafka.keystore.password"));
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, inProps.get("kafka.key.password"));
}
if (Boolean.valueOf(inProps.getProperty("kafka.sasl", "false"))) {
// overwrite this
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"org.apache.kafka.common.security.scram.ScramLoginModule "
+ "required \n username=\"%s\" \n password=\"%s\";",
inProps.getProperty("kafka.sasl.username"),
inProps.getProperty("kafka.sasl.password")));
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
}
}
return props;