fix some cleanup issues when unable to connect to kafka

This commit is contained in:
Andy Janata 2017-03-04 14:13:51 -08:00
parent 5e6791e071
commit 73c41cc59f
1 changed files with 10 additions and 3 deletions

View File

@ -141,11 +141,12 @@ public class KafkaMetrics implements Metrics {
}
if (makeProducerLock.tryLock()) {
Producer<String, String> newProducer = null;
try {
LOG.info("Attempting to create producer.");
final Producer<String, String> newProducer = new KafkaProducer<>(producerProps);
newProducer = new KafkaProducer<>(producerProps);
final List<PartitionInfo> info = newProducer.partitionsFor(topic);
LOG.info(String.format("Topic %s has %d partitions", topic, info.size()));
LOG.info(String.format("Topic %s has %d partitions.", topic, info.size()));
final Producer<String, String> oldProducer = producer;
producer = newProducer;
if (null != oldProducer) {
@ -155,6 +156,10 @@ public class KafkaMetrics implements Metrics {
LOG.info("Producer created.");
} catch (final Exception e) {
LOG.error("Unable to retrieve partition info for topic " + topic, e);
if (null != newProducer) {
LOG.info("Closing failed new producer.");
newProducer.close();
}
} finally {
makeProducerLock.unlock();
}
@ -172,6 +177,8 @@ public class KafkaMetrics implements Metrics {
tryEnsureProducer();
if (null != producer) {
final ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, json);
// Certain situations where the broker is unavailable may cause this to actually block briefly
// depending on which thread is doing what. This can cause connection issues for pyx clients.
producer.send(record, callback);
} else {
LOG.warn("Dropping event " + json);
@ -187,7 +194,7 @@ public class KafkaMetrics implements Metrics {
producer = null;
if (null != oldProducer) {
LOG.info("Closing producer after exception.");
oldProducer.close();
oldProducer.close(0, TimeUnit.MILLISECONDS);
}
}
}