From 942903e60cb401a81de02bd33cf362bcdd8f6d79 Mon Sep 17 00:00:00 2001 From: Andy Janata Date: Sat, 4 Mar 2017 15:43:00 -0800 Subject: [PATCH] sasl support for kafka --- build.properties.example | 8 +++++++- src/main/filtered-resources/WEB-INF/pyx.properties | 6 +++++- .../net/socialgamer/cah/metrics/KafkaMetrics.java | 14 +++++++++++++- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/build.properties.example b/build.properties.example index e8932d0..4fc179b 100644 --- a/build.properties.example +++ b/build.properties.example @@ -46,10 +46,16 @@ kafka.ssl=false kafka.truststore.path= kafka.truststore.password= # if this is false, the rest don't matter -kafka.ssl.auth=false +kafka.ssl.usekey=false kafka.keystore.path= kafka.keystore.password= kafka.key.password= +# only SCRAM-SHA-512 supported; if this is false, the rest don't matter +# must be used with ssl, does not need client key +kafka.sasl=false +kafka.sasl.username= +kafka.sasl.password= + # GeoIP database for analytics. If unset, will not be used. diff --git a/src/main/filtered-resources/WEB-INF/pyx.properties b/src/main/filtered-resources/WEB-INF/pyx.properties index a0082a8..42123b3 100644 --- a/src/main/filtered-resources/WEB-INF/pyx.properties +++ b/src/main/filtered-resources/WEB-INF/pyx.properties @@ -14,10 +14,14 @@ kafka.topic=${kafka.topic} kafka.ssl=${kafka.ssl} kafka.truststore.path=${kafka.truststore.path} kafka.truststore.password=${kafka.truststore.password} -kafka.ssl.auth=${kafka.ssl.auth} +kafka.ssl.usekey=${kafka.ssl.usekey} kafka.keystore.path=${kafka.keystore.path} kafka.keystore.password=${kafka.keystore.password} kafka.key.password=${kafka.key.password} +# only SCRAM-SHA-512 supported +kafka.sasl=${kafka.sasl} +kafka.sasl.username=${kafka.sasl.username} +kafka.sasl.password=${kafka.sasl.password} # TODO reload this file occasionally in case it changes? geoip.db=${geoip.db} diff --git a/src/main/java/net/socialgamer/cah/metrics/KafkaMetrics.java b/src/main/java/net/socialgamer/cah/metrics/KafkaMetrics.java index a3fb17d..2f8303a 100644 --- a/src/main/java/net/socialgamer/cah/metrics/KafkaMetrics.java +++ b/src/main/java/net/socialgamer/cah/metrics/KafkaMetrics.java @@ -51,6 +51,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.log4j.Logger; @@ -106,11 +107,22 @@ public class KafkaMetrics implements Metrics { props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, inProps.getProperty("kafka.truststore.password")); - if (Boolean.valueOf(inProps.getProperty("kafka.ssl.auth", "false"))) { + if (Boolean.valueOf(inProps.getProperty("kafka.ssl.usekey", "false"))) { props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, inProps.get("kafka.keystore.path")); props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, inProps.get("kafka.keystore.password")); props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, inProps.get("kafka.key.password")); } + + if (Boolean.valueOf(inProps.getProperty("kafka.sasl", "false"))) { + // overwrite this + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); + props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format( + "org.apache.kafka.common.security.scram.ScramLoginModule " + + "required \n username=\"%s\" \n password=\"%s\";", + inProps.getProperty("kafka.sasl.username"), + inProps.getProperty("kafka.sasl.password"))); + props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); + } } return props;