implement kafka metrics without tls or authentication
This commit is contained in:
parent
a38dc53a77
commit
23ce041bda
|
@ -291,5 +291,8 @@ LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
|
||||||
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
||||||
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
</pre>
|
</pre>
|
||||||
|
<h3>
|
||||||
|
<a href="http://www.apache.org/licenses/LICENSE-2.0">UADetector</a>
|
||||||
|
</h3>
|
||||||
</body>
|
</body>
|
||||||
</html>
|
</html>
|
||||||
|
|
|
@ -36,9 +36,13 @@ hibernate.cache.use_query_cache=false
|
||||||
hibernate.cache.provider_class=org.hibernate.cache.NoCacheProvider
|
hibernate.cache.provider_class=org.hibernate.cache.NoCacheProvider
|
||||||
|
|
||||||
# Metrics implementation.
|
# Metrics implementation.
|
||||||
# FIXME: this isn't used. Change the binding in CahModule.
|
|
||||||
pyx.metrics.impl=net.socialgamer.cah.metrics.NoOpMetrics
|
pyx.metrics.impl=net.socialgamer.cah.metrics.NoOpMetrics
|
||||||
|
|
||||||
|
# for kafka metrics
|
||||||
|
kafka.host=kafka-host:9092
|
||||||
|
kafka.topic=pyx-metrics
|
||||||
|
|
||||||
|
|
||||||
# GeoIP database for analytics. If unset, will not be used.
|
# GeoIP database for analytics. If unset, will not be used.
|
||||||
# Only used if the above is not NoOpMetrics.
|
# Only used if the above is not NoOpMetrics.
|
||||||
# See README.md for instructions.
|
# See README.md for instructions.
|
||||||
|
|
10
pom.xml
10
pom.xml
|
@ -5,7 +5,7 @@
|
||||||
|
|
||||||
<groupId>net.socialgamer</groupId>
|
<groupId>net.socialgamer</groupId>
|
||||||
<artifactId>pyx</artifactId>
|
<artifactId>pyx</artifactId>
|
||||||
<version>0.5.0-SNAPSHOT</version>
|
<version>0.6.0-SNAPSHOT</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<name>pyx</name>
|
<name>pyx</name>
|
||||||
|
@ -268,9 +268,9 @@
|
||||||
<version>4.0</version>
|
<version>4.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>commons-collections</groupId>
|
<groupId>org.apache.commons</groupId>
|
||||||
<artifactId>commons-collections</artifactId>
|
<artifactId>commons-collections4</artifactId>
|
||||||
<version>3.1</version>
|
<version>4.1</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.commons</groupId>
|
<groupId>org.apache.commons</groupId>
|
||||||
|
@ -346,7 +346,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.kafka</groupId>
|
<groupId>org.apache.kafka</groupId>
|
||||||
<artifactId>kafka-clients</artifactId>
|
<artifactId>kafka-clients</artifactId>
|
||||||
<version>0.10.1.1</version>
|
<version>0.10.2.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.maxmind.geoip2</groupId>
|
<groupId>com.maxmind.geoip2</groupId>
|
||||||
|
|
|
@ -8,5 +8,9 @@ pyx.build=${buildNumber}
|
||||||
# this is NOT allowed to be changed during a reload, as metrics depend on previous events
|
# this is NOT allowed to be changed during a reload, as metrics depend on previous events
|
||||||
pyx.metrics.impl=${pyx.metrics.impl}
|
pyx.metrics.impl=${pyx.metrics.impl}
|
||||||
|
|
||||||
|
# these also are NOT allowed to be changed during a reload
|
||||||
|
kafka.host=${kafka.host}
|
||||||
|
kafka.topic=${kafka.topic}
|
||||||
|
|
||||||
# TODO reload this file occasionally in case it changes?
|
# TODO reload this file occasionally in case it changes?
|
||||||
geoip.db=${geoip.db}
|
geoip.db=${geoip.db}
|
||||||
|
|
|
@ -35,12 +35,13 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import javax.servlet.ServletContext;
|
||||||
|
|
||||||
import net.socialgamer.cah.data.GameManager;
|
import net.socialgamer.cah.data.GameManager;
|
||||||
import net.socialgamer.cah.data.GameManager.GameId;
|
import net.socialgamer.cah.data.GameManager.GameId;
|
||||||
import net.socialgamer.cah.data.GameManager.MaxGames;
|
import net.socialgamer.cah.data.GameManager.MaxGames;
|
||||||
import net.socialgamer.cah.data.User;
|
import net.socialgamer.cah.data.User;
|
||||||
import net.socialgamer.cah.metrics.Metrics;
|
import net.socialgamer.cah.metrics.Metrics;
|
||||||
import net.socialgamer.cah.metrics.NoOpMetrics;
|
|
||||||
import net.socialgamer.cah.metrics.UniqueIds;
|
import net.socialgamer.cah.metrics.UniqueIds;
|
||||||
|
|
||||||
import org.hibernate.Session;
|
import org.hibernate.Session;
|
||||||
|
@ -61,6 +62,12 @@ public class CahModule extends AbstractModule {
|
||||||
|
|
||||||
private final static Properties properties = new Properties();
|
private final static Properties properties = new Properties();
|
||||||
|
|
||||||
|
private final ServletContext context;
|
||||||
|
|
||||||
|
public CahModule(final ServletContext context) {
|
||||||
|
this.context = context;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void configure() {
|
protected void configure() {
|
||||||
bind(Integer.class)
|
bind(Integer.class)
|
||||||
|
@ -76,10 +83,21 @@ public class CahModule extends AbstractModule {
|
||||||
.toInstance(Collections.synchronizedSet(new HashSet<String>()));
|
.toInstance(Collections.synchronizedSet(new HashSet<String>()));
|
||||||
|
|
||||||
bind(Properties.class).toInstance(properties);
|
bind(Properties.class).toInstance(properties);
|
||||||
bind(Metrics.class).to(NoOpMetrics.class);
|
|
||||||
|
// FIXME huge hack.
|
||||||
|
StartupUtils.reloadProperties(context, properties);
|
||||||
|
final String metricsClassName = properties.getProperty("pyx.metrics.impl");
|
||||||
|
try {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final Class<? extends Metrics> metricsClass = (Class<? extends Metrics>) Class
|
||||||
|
.forName(metricsClassName);
|
||||||
|
bind(Metrics.class).to(metricsClass);
|
||||||
|
} catch (final ClassNotFoundException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
bind(Date.class).annotatedWith(ServerStarted.class).toInstance(new Date());
|
bind(Date.class).annotatedWith(ServerStarted.class).toInstance(new Date());
|
||||||
bind(String.class).annotatedWith(UniqueId.class).toProvider(UniqueIds.class);
|
bind(String.class).annotatedWith(UniqueId.class).toProvider(UniqueIds.class);
|
||||||
|
|
||||||
install(new FactoryModuleBuilder().build(User.Factory.class));
|
install(new FactoryModuleBuilder().build(User.Factory.class));
|
||||||
|
|
||||||
final ScheduledThreadPoolExecutor threadPool =
|
final ScheduledThreadPoolExecutor threadPool =
|
||||||
|
|
|
@ -114,7 +114,7 @@ public class StartupUtils extends GuiceServletContextListener {
|
||||||
@Override
|
@Override
|
||||||
public void contextInitialized(final ServletContextEvent contextEvent) {
|
public void contextInitialized(final ServletContextEvent contextEvent) {
|
||||||
final ServletContext context = contextEvent.getServletContext();
|
final ServletContext context = contextEvent.getServletContext();
|
||||||
final Injector injector = getInjector();
|
final Injector injector = getInjector(context);
|
||||||
|
|
||||||
final ScheduledThreadPoolExecutor timer = injector
|
final ScheduledThreadPoolExecutor timer = injector
|
||||||
.getInstance(ScheduledThreadPoolExecutor.class);
|
.getInstance(ScheduledThreadPoolExecutor.class);
|
||||||
|
@ -140,10 +140,17 @@ public class StartupUtils extends GuiceServletContextListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void reloadProperties(final ServletContext context) {
|
public static void reloadProperties(final ServletContext context) {
|
||||||
LOG.info("Reloading pyx.properties");
|
|
||||||
|
|
||||||
final Injector injector = (Injector) context.getAttribute(INJECTOR);
|
final Injector injector = (Injector) context.getAttribute(INJECTOR);
|
||||||
final Properties props = injector.getInstance(Properties.class);
|
final Properties props = injector.getInstance(Properties.class);
|
||||||
|
reloadProperties(context, props);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hack method for calling inside CahModule before the injector is usable.
|
||||||
|
*/
|
||||||
|
public static void reloadProperties(final ServletContext context, final Properties props) {
|
||||||
|
LOG.info("Reloading pyx.properties");
|
||||||
|
|
||||||
final File propsFile = new File(context.getRealPath("/WEB-INF/pyx.properties"));
|
final File propsFile = new File(context.getRealPath("/WEB-INF/pyx.properties"));
|
||||||
try {
|
try {
|
||||||
synchronized (props) {
|
synchronized (props) {
|
||||||
|
@ -163,8 +170,12 @@ public class StartupUtils extends GuiceServletContextListener {
|
||||||
"/WEB-INF/log4j.properties"));
|
"/WEB-INF/log4j.properties"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Injector getInjector(final ServletContext context) {
|
||||||
|
return Guice.createInjector(new CahModule(context), new CardcastModule());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Injector getInjector() {
|
protected Injector getInjector() {
|
||||||
return Guice.createInjector(new CahModule(), new CardcastModule());
|
throw new RuntimeException("Not supported.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,7 +142,7 @@ public class ConnectedUsers {
|
||||||
logger.warn(String.format("Unable to get address for user %s (hostname: %s)",
|
logger.warn(String.format("Unable to get address for user %s (hostname: %s)",
|
||||||
user.getNickname(), user.getHostname()), e);
|
user.getNickname(), user.getHostname()), e);
|
||||||
}
|
}
|
||||||
metrics.newUser(user.getPersistentId(), user.getSessionId(), geo, user.getAgentName(),
|
metrics.userConnect(user.getPersistentId(), user.getSessionId(), geo, user.getAgentName(),
|
||||||
user.getAgentType(), user.getAgentOs(), user.getAgentLanguage());
|
user.getAgentType(), user.getAgentOs(), user.getAgentLanguage());
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -1446,29 +1446,29 @@ public class Game {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The judge has selected a card. The {@code cardId} passed in may be any white cards's ID for
|
* The judge has selected a card. The {@code cardId} passed in may be any white card's ID for
|
||||||
* black cards that have multiple selection, however only the first card in the set's ID will be
|
* black cards that have multiple selection, however only the first card in the set's ID will be
|
||||||
* passed around to clients.
|
* passed around to clients.
|
||||||
*
|
*
|
||||||
* @param user
|
* @param judge
|
||||||
* Judge user.
|
* Judge user.
|
||||||
* @param cardId
|
* @param cardId
|
||||||
* Selected card ID.
|
* Selected card ID.
|
||||||
* @return Error code if there is an error, or null if success.
|
* @return Error code if there is an error, or null if success.
|
||||||
*/
|
*/
|
||||||
public ErrorCode judgeCard(final User user, final int cardId) {
|
public ErrorCode judgeCard(final User judge, final int cardId) {
|
||||||
final Player cardPlayer;
|
final Player cardPlayer;
|
||||||
synchronized (judgeLock) {
|
synchronized (judgeLock) {
|
||||||
final Player player = getPlayerForUser(user);
|
final Player judgePlayer = getPlayerForUser(judge);
|
||||||
if (getJudge() != player) {
|
if (getJudge() != judgePlayer) {
|
||||||
return ErrorCode.NOT_JUDGE;
|
return ErrorCode.NOT_JUDGE;
|
||||||
} else if (state != GameState.JUDGING) {
|
} else if (state != GameState.JUDGING) {
|
||||||
return ErrorCode.NOT_YOUR_TURN;
|
return ErrorCode.NOT_YOUR_TURN;
|
||||||
}
|
}
|
||||||
|
|
||||||
// shouldn't ever happen, but just in case...
|
// shouldn't ever happen, but just in case...
|
||||||
if (null != player) {
|
if (null != judgePlayer) {
|
||||||
player.resetSkipCount();
|
judgePlayer.resetSkipCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
cardPlayer = playedCards.getPlayerForId(cardId);
|
cardPlayer = playedCards.getPlayerForId(cardId);
|
||||||
|
@ -1512,8 +1512,11 @@ public class Game {
|
||||||
rescheduleTimer(task, ROUND_INTERMISSION);
|
rescheduleTimer(task, ROUND_INTERMISSION);
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics.roundComplete(currentUniqueId, user.getSessionId(), cardPlayer.getUser().getSessionId(),
|
final Map<String, List<WhiteCard>> cardsBySessionId = new HashMap<>();
|
||||||
playedCards.cardsByUser());
|
playedCards.cardsByUser().forEach(
|
||||||
|
(key, value) -> cardsBySessionId.put(key.getSessionId(), value));
|
||||||
|
metrics.roundComplete(currentUniqueId, uniqueIdProvider.get(), judge.getSessionId(),
|
||||||
|
cardPlayer.getUser().getSessionId(), blackCard, cardsBySessionId);
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -148,10 +148,7 @@ public class PlayerPlayedCardsTracker {
|
||||||
*/
|
*/
|
||||||
public synchronized Map<User, List<WhiteCard>> cardsByUser() {
|
public synchronized Map<User, List<WhiteCard>> cardsByUser() {
|
||||||
final Map<User, List<WhiteCard>> cardsByUser = new HashMap<>();
|
final Map<User, List<WhiteCard>> cardsByUser = new HashMap<>();
|
||||||
// TODO java8: streams
|
playerCardMap.forEach((key, value) -> cardsByUser.put(key.getUser(), value));
|
||||||
for (final Map.Entry<Player, List<WhiteCard>> entry : playerCardMap.entrySet()) {
|
|
||||||
cardsByUser.put(entry.getKey().getUser(), entry.getValue());
|
|
||||||
}
|
|
||||||
return Collections.unmodifiableMap(cardsByUser);
|
return Collections.unmodifiableMap(cardsByUser);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,60 +23,312 @@
|
||||||
|
|
||||||
package net.socialgamer.cah.metrics;
|
package net.socialgamer.cah.metrics;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
|
import net.socialgamer.cah.data.BlackCard;
|
||||||
import net.socialgamer.cah.data.CardSet;
|
import net.socialgamer.cah.data.CardSet;
|
||||||
import net.socialgamer.cah.data.User;
|
|
||||||
import net.socialgamer.cah.data.WhiteCard;
|
import net.socialgamer.cah.data.WhiteCard;
|
||||||
|
import net.socialgamer.cah.db.PyxBlackCard;
|
||||||
|
import net.socialgamer.cah.db.PyxCardSet;
|
||||||
|
import net.socialgamer.cah.db.PyxWhiteCard;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.Callback;
|
||||||
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
|
import org.apache.kafka.clients.producer.Producer;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||||
|
import org.apache.kafka.common.PartitionInfo;
|
||||||
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
import org.json.simple.JSONValue;
|
||||||
|
|
||||||
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import com.maxmind.geoip2.model.CityResponse;
|
import com.maxmind.geoip2.model.CityResponse;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Metrics implementation that sends all data to a Kafka topic.
|
* Metrics implementation that sends all data to an Apache Kafka topic.
|
||||||
*
|
*
|
||||||
* @author Andy Janata (ajanata@socialgamer.net)
|
* @author Andy Janata (ajanata@socialgamer.net)
|
||||||
*/
|
*/
|
||||||
@Singleton
|
@Singleton
|
||||||
public class KafkaMetrics implements Metrics {
|
public class KafkaMetrics implements Metrics {
|
||||||
|
|
||||||
|
private static final String metricsVersion = "0.1";
|
||||||
private static final Logger LOG = Logger.getLogger(KafkaMetrics.class);
|
private static final Logger LOG = Logger.getLogger(KafkaMetrics.class);
|
||||||
|
|
||||||
@Override
|
private final ProducerCallback callback = new ProducerCallback();
|
||||||
public void serverStart(final String startupId) {
|
private final String build;
|
||||||
LOG.trace(String.format("serverStarted(%s)", startupId));
|
private final String hosts;
|
||||||
|
private final String topic;
|
||||||
|
private volatile Producer<String, String> producer;
|
||||||
|
private final Properties producerProps;
|
||||||
|
private final Lock makeProducerLock = new ReentrantLock();
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public KafkaMetrics(final Properties properties) {
|
||||||
|
build = properties.getProperty("pyx.build");
|
||||||
|
hosts = properties.getProperty("kafka.host");
|
||||||
|
topic = properties.getProperty("kafka.topic");
|
||||||
|
LOG.info("Sending metrics to Kafka topic " + topic);
|
||||||
|
producerProps = getProducerProps();
|
||||||
|
tryEnsureProducer();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Properties getProducerProps() {
|
||||||
|
final Properties props = new Properties();
|
||||||
|
props.put("bootstrap.servers", hosts);
|
||||||
|
props.put("key.serializer", StringSerializer.class.getName());
|
||||||
|
props.put("value.serializer", StringSerializer.class.getName());
|
||||||
|
props.put("acks", "0");
|
||||||
|
props.put("compression.type", "gzip");
|
||||||
|
props.put("retries", 1);
|
||||||
|
props.put("client.id", "pyx-" + build);
|
||||||
|
props.put("max.block.ms", TimeUnit.SECONDS.toMillis(5));
|
||||||
|
// TODO TLS, authentication
|
||||||
|
return props;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method to log at TRACE level while only taking string format penalties if such logging
|
||||||
|
* is enabled. Includes the method name as well.
|
||||||
|
* @param format Format string to log
|
||||||
|
* @param params Parameters for format string
|
||||||
|
*/
|
||||||
|
private void trace(final String format, final Object... params) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
final StackTraceElement[] stack = Thread.currentThread().getStackTrace();
|
||||||
|
final String message = String.format(format, params);
|
||||||
|
// skip getStackTrace and this method
|
||||||
|
LOG.trace(String.format("%s: %s", stack[2].getMethodName(), message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempt to create a producer. {@link #producer} must still be checked against null after
|
||||||
|
* calling this method.
|
||||||
|
*/
|
||||||
|
private void tryEnsureProducer() {
|
||||||
|
if (null != producer) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (makeProducerLock.tryLock()) {
|
||||||
|
try {
|
||||||
|
LOG.info("Attempting to create producer.");
|
||||||
|
final Producer<String, String> newProducer = new KafkaProducer<>(producerProps);
|
||||||
|
final List<PartitionInfo> info = newProducer.partitionsFor(topic);
|
||||||
|
LOG.info(String.format("Topic %s has %d partitions", topic, info.size()));
|
||||||
|
final Producer<String, String> oldProducer = producer;
|
||||||
|
producer = newProducer;
|
||||||
|
if (null != oldProducer) {
|
||||||
|
LOG.info("Old producer closed.");
|
||||||
|
oldProducer.close();
|
||||||
|
}
|
||||||
|
LOG.info("Producer created.");
|
||||||
|
} catch (final Exception e) {
|
||||||
|
LOG.error("Unable to retrieve partition info for topic " + topic, e);
|
||||||
|
} finally {
|
||||||
|
makeProducerLock.unlock();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.warn("Another thread is creating a producer.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void send(final Map<String, Object> map) {
|
||||||
|
send(JSONValue.toJSONString(map));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void send(final String json) {
|
||||||
|
trace("%s", json);
|
||||||
|
tryEnsureProducer();
|
||||||
|
if (null != producer) {
|
||||||
|
final ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, json);
|
||||||
|
producer.send(record, callback);
|
||||||
|
} else {
|
||||||
|
LOG.warn("Dropping event " + json);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ProducerCallback implements Callback {
|
||||||
|
@Override
|
||||||
|
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
|
||||||
|
if (null != exception) {
|
||||||
|
LOG.error("Unable to send event to Kafka", exception);
|
||||||
|
final Producer<String, String> oldProducer = producer;
|
||||||
|
producer = null;
|
||||||
|
if (null != oldProducer) {
|
||||||
|
LOG.info("Closing producer after exception.");
|
||||||
|
oldProducer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void newUser(final String guid, final String sessionId, final CityResponse geoIp,
|
public void shutdown() {
|
||||||
final String agentName, final String agentType, final String agentOs,
|
trace("");
|
||||||
final String agentLanguage) {
|
if (null != producer) {
|
||||||
LOG.trace(String.format("newUser(%s, %s, %s, %s, %s, %s, %s)", guid, sessionId, geoIp,
|
producer.close();
|
||||||
agentName, agentType, agentOs, agentLanguage));
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Object> getEventMap(final String type, final Map<String, Object> data) {
|
||||||
|
final Map<String, Object> ret = new HashMap<>();
|
||||||
|
ret.put("timestamp", System.currentTimeMillis());
|
||||||
|
ret.put("build", build);
|
||||||
|
ret.put("type", type);
|
||||||
|
ret.put("data", data);
|
||||||
|
ret.put("version", metricsVersion);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serverStart(final String startupId) {
|
||||||
|
trace("%s", startupId);
|
||||||
|
final Map<String, Object> data = new HashMap<>();
|
||||||
|
data.put("startupId", startupId);
|
||||||
|
send(getEventMap("serverStart", data));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void userConnect(final String persistentId, final String sessionId,
|
||||||
|
@Nullable final CityResponse geoIp, final String agentName, final String agentType,
|
||||||
|
final String agentOs, final String agentLanguage) {
|
||||||
|
trace("%s, %s, %s, %s, %s, %s, %s", persistentId, sessionId, geoIp, agentName, agentType,
|
||||||
|
agentOs, agentLanguage);
|
||||||
|
|
||||||
|
final Map<String, Object> data = new HashMap<>();
|
||||||
|
data.put("persistentId", persistentId);
|
||||||
|
data.put("sessionId", sessionId);
|
||||||
|
|
||||||
|
final Map<String, Object> browser = new HashMap<>();
|
||||||
|
browser.put("name", agentName);
|
||||||
|
browser.put("type", agentType);
|
||||||
|
browser.put("os", agentOs);
|
||||||
|
browser.put("language", agentLanguage);
|
||||||
|
data.put("browser", browser);
|
||||||
|
|
||||||
|
final Map<String, Object> geo = new HashMap<>();
|
||||||
|
if (null != geoIp) {
|
||||||
|
// it appears these will never be null and will return null/blank data, but let's be sure
|
||||||
|
if (null != geoIp.getCity()) {
|
||||||
|
geo.put("city", geoIp.getCity().getName());
|
||||||
|
}
|
||||||
|
if (null != geoIp.getCountry()) {
|
||||||
|
geo.put("country", geoIp.getCountry().getIsoCode());
|
||||||
|
}
|
||||||
|
final List<String> subdivCodes = new ArrayList<>(2);
|
||||||
|
geoIp.getSubdivisions().forEach(subdiv -> subdivCodes.add(subdiv.getIsoCode()));
|
||||||
|
if (!subdivCodes.isEmpty()) {
|
||||||
|
geo.put("subdivisions", subdivCodes);
|
||||||
|
}
|
||||||
|
if (null != geoIp.getRepresentedCountry()) {
|
||||||
|
geo.put("representedCountry", geoIp.getRepresentedCountry().getIsoCode());
|
||||||
|
}
|
||||||
|
if (null != geoIp.getPostal()) {
|
||||||
|
geo.put("postal", geoIp.getPostal().getCode());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
data.put("geo", geo);
|
||||||
|
|
||||||
|
send(getEventMap("userConnect", data));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void userDisconnect(final String sessionId) {
|
public void userDisconnect(final String sessionId) {
|
||||||
LOG.trace(String.format("userDisconnect(%s)", sessionId));
|
trace("%s", sessionId);
|
||||||
|
|
||||||
|
final Map<String, Object> data = new HashMap<>();
|
||||||
|
data.put("sessionId", sessionId);
|
||||||
|
send(getEventMap("userDisconnect", data));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void gameStart(final String gameId, final Collection<CardSet> decks, final int blanks,
|
public void gameStart(final String gameId, final Collection<CardSet> decks, final int blanks,
|
||||||
final int maxPlayers, final int scoreGoal, final boolean hasPassword) {
|
final int maxPlayers, final int scoreGoal, final boolean hasPassword) {
|
||||||
LOG.trace(String.format("gameStart(%s, %s, %d, %d, %d, %s)", gameId, decks.toArray(), blanks,
|
trace("%s, %s, %d, %d, %d, %s", gameId, decks.toArray(), blanks, maxPlayers, scoreGoal,
|
||||||
maxPlayers, scoreGoal, hasPassword));
|
hasPassword);
|
||||||
|
|
||||||
|
final Map<String, Object> data = new HashMap<>();
|
||||||
|
data.put("gameId", gameId);
|
||||||
|
data.put("blankCardsInDeck", blanks);
|
||||||
|
data.put("maxPlayers", maxPlayers);
|
||||||
|
data.put("scoreGoal", scoreGoal);
|
||||||
|
data.put("hasPassword", hasPassword);
|
||||||
|
|
||||||
|
final List<Map<String, Object>> deckInfos = new ArrayList<>(decks.size());
|
||||||
|
for (final CardSet deck : decks) {
|
||||||
|
final Map<String, Object> deckInfo = new HashMap<>();
|
||||||
|
// if we ever have more than cardcast for custom cards, this needs updated to indicate which
|
||||||
|
// custom deck source, but will still be correct for this specific flag
|
||||||
|
deckInfo.put("isCustom", !(deck instanceof PyxCardSet));
|
||||||
|
deckInfo.put("id", deck.getId());
|
||||||
|
// TODO(?) don't include these data for non-custom decks?
|
||||||
|
deckInfo.put("name", deck.getName());
|
||||||
|
deckInfo.put("whiteCount", deck.getWhiteCards().size());
|
||||||
|
deckInfo.put("blackCount", deck.getBlackCards().size());
|
||||||
|
deckInfos.add(deckInfo);
|
||||||
|
}
|
||||||
|
data.put("decks", deckInfos);
|
||||||
|
|
||||||
|
send(getEventMap("gameStart", data));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void roundComplete(final String gameId, final String judgeSessionId,
|
public void roundComplete(final String gameId, final String roundId, final String judgeSessionId,
|
||||||
final String winnerSessionId,
|
final String winnerSessionId, final BlackCard blackCard,
|
||||||
final Map<User, List<WhiteCard>> cards) {
|
final Map<String, List<WhiteCard>> cards) {
|
||||||
LOG.trace(String.format("roundJudged(%s, %s, %s, %s)", gameId, judgeSessionId, winnerSessionId,
|
trace("%s, %s, %s, %s, %s, %s", gameId, roundId, judgeSessionId, winnerSessionId, blackCard,
|
||||||
cards));
|
cards);
|
||||||
|
|
||||||
|
final Map<String, Object> data = new HashMap<>();
|
||||||
|
data.put("gameId", gameId);
|
||||||
|
data.put("roundId", roundId);
|
||||||
|
data.put("judgeSessionId", judgeSessionId);
|
||||||
|
data.put("winnerSessionId", winnerSessionId);
|
||||||
|
|
||||||
|
// <player id, cards[<id key, value>]>
|
||||||
|
final Map<String, List<Map<String, Object>>> allCardMap = new HashMap<>();
|
||||||
|
for (final Entry<String, List<WhiteCard>> cardsByUser : cards.entrySet()) {
|
||||||
|
final List<Map<String, Object>> userCards = new ArrayList<>(cardsByUser.getValue().size());
|
||||||
|
for (final WhiteCard card : cardsByUser.getValue()) {
|
||||||
|
final Map<String, Object> cardInfo = new HashMap<>();
|
||||||
|
// same re: more custom deck sources
|
||||||
|
cardInfo.put("isCustom", !(card instanceof PyxWhiteCard));
|
||||||
|
cardInfo.put("isWriteIn", card.isWriteIn());
|
||||||
|
// negative IDs would be custom: either blank or cardcast. they are not stable.
|
||||||
|
cardInfo.put("id", card.getId());
|
||||||
|
cardInfo.put("text", card.getText());
|
||||||
|
userCards.add(cardInfo);
|
||||||
|
}
|
||||||
|
allCardMap.put(cardsByUser.getKey(), userCards);
|
||||||
|
}
|
||||||
|
data.put("cardsByUserId", allCardMap);
|
||||||
|
|
||||||
|
final Map<String, Object> blackCardData = new HashMap<>();
|
||||||
|
// same re: more custom deck sources
|
||||||
|
blackCardData.put("isCustom", !(blackCard instanceof PyxBlackCard));
|
||||||
|
// negative IDs would be custom: either blank or cardcast. they are not stable.
|
||||||
|
blackCardData.put("id", blackCard.getId());
|
||||||
|
blackCardData.put("text", blackCard.getText());
|
||||||
|
blackCardData.put("draw", blackCard.getDraw());
|
||||||
|
blackCardData.put("pick", blackCard.getPick());
|
||||||
|
data.put("blackCard", blackCardData);
|
||||||
|
|
||||||
|
send(getEventMap("roundComplete", data));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,8 +29,8 @@ import java.util.Map;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
|
import net.socialgamer.cah.data.BlackCard;
|
||||||
import net.socialgamer.cah.data.CardSet;
|
import net.socialgamer.cah.data.CardSet;
|
||||||
import net.socialgamer.cah.data.User;
|
|
||||||
import net.socialgamer.cah.data.WhiteCard;
|
import net.socialgamer.cah.data.WhiteCard;
|
||||||
|
|
||||||
import com.maxmind.geoip2.model.CityResponse;
|
import com.maxmind.geoip2.model.CityResponse;
|
||||||
|
@ -42,17 +42,19 @@ import com.maxmind.geoip2.model.CityResponse;
|
||||||
* @author Andy Janata (ajanata@socialgamer.net)
|
* @author Andy Janata (ajanata@socialgamer.net)
|
||||||
*/
|
*/
|
||||||
public interface Metrics {
|
public interface Metrics {
|
||||||
|
void shutdown();
|
||||||
|
|
||||||
void serverStart(String startupId);
|
void serverStart(String startupId);
|
||||||
|
|
||||||
void newUser(String persistentId, String sessionId, @Nullable CityResponse geoIp,
|
void userConnect(String persistentId, String sessionId, @Nullable CityResponse geoIp,
|
||||||
String agentName, String agentType, String agentOs, String agentLanguage);
|
String agentName, String agentType, String agentOs, String agentLanguage);
|
||||||
|
|
||||||
void userDisconnect(String sessionId);
|
void userDisconnect(String sessionId);
|
||||||
|
|
||||||
// The card data is way too complicated to dictate the format it should be in, so let
|
// The card data is way too complicated to dictate the format it should be in, so let
|
||||||
// implementations deal with the structured data.
|
// implementations deal with the structured data.
|
||||||
void roundComplete(String gameId, String judgeSessionId, String winnerSessionId,
|
void roundComplete(String gameId, String roundId, String judgeSessionId, String winnerSessionId,
|
||||||
Map<User, List<WhiteCard>> cards);
|
BlackCard blackCard, Map<String, List<WhiteCard>> cards);
|
||||||
|
|
||||||
void gameStart(String gameId, Collection<CardSet> decks, int blanks, int maxPlayers,
|
void gameStart(String gameId, Collection<CardSet> decks, int blanks, int maxPlayers,
|
||||||
int scoreGoal, boolean hasPassword);
|
int scoreGoal, boolean hasPassword);
|
||||||
|
|
|
@ -27,8 +27,8 @@ import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import net.socialgamer.cah.data.BlackCard;
|
||||||
import net.socialgamer.cah.data.CardSet;
|
import net.socialgamer.cah.data.CardSet;
|
||||||
import net.socialgamer.cah.data.User;
|
|
||||||
import net.socialgamer.cah.data.WhiteCard;
|
import net.socialgamer.cah.data.WhiteCard;
|
||||||
|
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
@ -47,16 +47,21 @@ public class NoOpMetrics implements Metrics {
|
||||||
|
|
||||||
private static final Logger LOG = Logger.getLogger(NoOpMetrics.class);
|
private static final Logger LOG = Logger.getLogger(NoOpMetrics.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
// nothing to do
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void serverStart(final String startupId) {
|
public void serverStart(final String startupId) {
|
||||||
LOG.trace(String.format("serverStarted(%s)", startupId));
|
LOG.trace(String.format("serverStarted(%s)", startupId));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void newUser(final String guid, final String sessionId, final CityResponse geoIp,
|
public void userConnect(final String persistentId, final String sessionId, final CityResponse geoIp,
|
||||||
final String agentName, final String agentType, final String agentOs,
|
final String agentName, final String agentType, final String agentOs,
|
||||||
final String agentLanguage) {
|
final String agentLanguage) {
|
||||||
LOG.trace(String.format("newUser(%s, %s, %s, %s, %s, %s, %s)", guid, sessionId, geoIp,
|
LOG.trace(String.format("newUser(%s, %s, %s, %s, %s, %s, %s)", persistentId, sessionId, geoIp,
|
||||||
agentName, agentType, agentOs, agentLanguage));
|
agentName, agentType, agentOs, agentLanguage));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,10 +78,10 @@ public class NoOpMetrics implements Metrics {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void roundComplete(final String gameId, final String judgeSessionId,
|
public void roundComplete(final String gameId, final String roundId, final String judgeSessionId,
|
||||||
final String winnerSessionId,
|
final String winnerSessionId, final BlackCard blackCard,
|
||||||
final Map<User, List<WhiteCard>> cards) {
|
final Map<String, List<WhiteCard>> cards) {
|
||||||
LOG.trace(String.format("roundJudged(%s, %s, %s, %s)", gameId, judgeSessionId, winnerSessionId,
|
LOG.trace(String.format("roundJudged(%s, %s, %s, %s, %s, %s)", gameId, roundId, judgeSessionId,
|
||||||
cards));
|
winnerSessionId, blackCard, cards));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,7 @@ import net.socialgamer.cah.data.GameManager.GameId;
|
||||||
import net.socialgamer.cah.data.GameManager.MaxGames;
|
import net.socialgamer.cah.data.GameManager.MaxGames;
|
||||||
import net.socialgamer.cah.data.QueuedMessage.MessageType;
|
import net.socialgamer.cah.data.QueuedMessage.MessageType;
|
||||||
import net.socialgamer.cah.metrics.Metrics;
|
import net.socialgamer.cah.metrics.Metrics;
|
||||||
|
import net.socialgamer.cah.metrics.NoOpMetrics;
|
||||||
|
|
||||||
import org.hibernate.Session;
|
import org.hibernate.Session;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -98,6 +99,7 @@ public class GameManagerTest {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
bind(ScheduledThreadPoolExecutor.class).toInstance(threadPool);
|
bind(ScheduledThreadPoolExecutor.class).toInstance(threadPool);
|
||||||
|
bind(Metrics.class).to(NoOpMetrics.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Provides
|
@Provides
|
||||||
|
|
Loading…
Reference in New Issue