package demo.globaldata.positionservice;

import com.google.protobuf.AbstractMessageLite;
import com.google.protobuf.InvalidProtocolBufferException;
import demo.globaldata.GlobalStatistics;
import demo.globaldata.RegionPosition;
import demo.globaldata.positionservice.formatters.ByteArrayMessageFormatter;
import demo.globaldata.positionservice.statistics.GlobalPositionStatisticsCalculator;
import demo.globaldata.util.ConfigUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:demo/globaldata/positionservice/GlobalStatisticsEventBusPublisherVerticle.class */
public class GlobalStatisticsEventBusPublisherVerticle extends AbstractVerticle {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) GlobalStatisticsEventBusPublisherVerticle.class);
    private String publishAddress = Constants.GLOBAL_POSITION_STATISTICS;
    private final GlobalPositionStatisticsCalculator statisticsCalculator = new GlobalPositionStatisticsCalculator();
    private long publishFrequencyMilliseconds = 1000;
    private final Function<AbstractMessageLite, byte[]> formatter = new ByteArrayMessageFormatter();

    @Override // io.vertx.core.AbstractVerticle, io.vertx.core.Verticle
    public void init(Vertx vertx, Context context) {
        super.init(vertx, context);
        this.publishFrequencyMilliseconds = ConfigUtil.getInteger(config(), getClass(), Constants.KEY_GLOBAL_STATISTICS_PUBLISH_FREQUENCY_MILLIS, logger, 1000);
        logger.info("Configured  publishFrequencyMilliseconds: {}", Long.valueOf(this.publishFrequencyMilliseconds));
    }

    @Override // io.vertx.core.AbstractVerticle, io.vertx.core.Verticle
    public void start(Future<Void> future) throws Exception {
        logger.info("Starting event bus listener for all region topics");
        try {
            Constants.REGIONS.forEach(str -> {
                this.vertx.eventBus().localConsumer(String.format(Constants.GRPC_POSITION_REGIONAL_STRING, str), message -> {
                    handleObservedPosition(message);
                });
            });
            this.vertx.setPeriodic(this.publishFrequencyMilliseconds, (v1) -> {
                publishGlobalStatsUpdate(v1);
            });
            future.complete();
        } catch (Exception e) {
            future.fail(e);
        }
    }

    @Override // io.vertx.core.AbstractVerticle, io.vertx.core.Verticle
    public void stop(Future<Void> future) throws Exception {
        logger.info("Stopping global statistics verticle");
        future.complete();
    }

    private void publishGlobalStatsUpdate(Object obj) {
        GlobalStatistics statistics = this.statisticsCalculator.getStatistics();
        if (null == statistics) {
            return;
        }
        logger.trace("Publishing stats: " + statistics);
        this.vertx.eventBus().publish(this.publishAddress, this.formatter.apply(statistics));
    }

    private void handleObservedPosition(Message<byte[]> message) {
        RegionPosition regionPosition = null;
        try {
            regionPosition = RegionPosition.parseFrom(message.body());
            this.statisticsCalculator.processPosition(regionPosition);
        } catch (InvalidProtocolBufferException e) {
            logger.warn("failed to parse byte array into RegionPosition object", (Throwable) e);
        } catch (Exception e2) {
            if (null != regionPosition) {
                logger.warn("failed to handle position: " + regionPosition.getPosition().getSymbol(), (Throwable) e2);
            } else {
                logger.warn("failed to handle position", (Throwable) e2);
            }
        }
    }

    public static void main(String[] strArr) throws InterruptedException {
        Vertx vertx = Vertx.vertx();
        logger.info("Starting GlobalStatisticsEventBusPublisherVerticle");
        vertx.deployVerticle(new GlobalStatisticsEventBusPublisherVerticle());
    }
}
