package demo.globaldata.positionservice;

import com.google.protobuf.AbstractMessageLite;
import com.google.protobuf.GeneratedMessageV3;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Timestamp;
import com.trp.nyctdc.proto.Position;
import demo.globaldata.RegionPosition;
import demo.globaldata.RegionStatistics;
import demo.globaldata.positionservice.formatters.ByteArrayMessageFormatter;
import demo.globaldata.positionservice.formatters.PositionSingleLineFormatter;
import demo.globaldata.positionservice.statistics.PositionStatisticsCalculator;
import demo.globaldata.util.TimeUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import java.util.Objects;
import java.util.function.Function;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:demo/globaldata/positionservice/RegionalStatisticsEventBusPublisherVerticle.class */
public class RegionalStatisticsEventBusPublisherVerticle extends AbstractVerticle {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) RegionalStatisticsEventBusPublisherVerticle.class);
    private String region;
    private String listenAddress;
    private String publishAddress;
    private final Function<AbstractMessageLite, byte[]> formatter;
    private final Function<GeneratedMessageV3, String> logFormatter;
    private final PositionStatisticsCalculator positionStatisticsCalculator;

    public RegionalStatisticsEventBusPublisherVerticle() {
        this.region = "US_East";
        this.formatter = new ByteArrayMessageFormatter();
        this.logFormatter = new PositionSingleLineFormatter();
        this.positionStatisticsCalculator = new PositionStatisticsCalculator("US_East", regionStatistics -> {
            sendStats(regionStatistics);
        });
    }

    public RegionalStatisticsEventBusPublisherVerticle(@NotNull String str) {
        this.region = "US_East";
        Objects.requireNonNull(str);
        this.formatter = new ByteArrayMessageFormatter();
        this.logFormatter = new PositionSingleLineFormatter();
        this.positionStatisticsCalculator = new PositionStatisticsCalculator(str, regionStatistics -> {
            sendStats(regionStatistics);
        });
        this.region = str;
    }

    @Override // io.vertx.core.AbstractVerticle, io.vertx.core.Verticle
    public void init(Vertx vertx, Context context) {
        super.init(vertx, context);
        logger.info("Configuring verticle {}", this);
        logger.debug("Configuring listener/publisher for region {}", this.region);
        this.listenAddress = String.format(Constants.GRPC_POSITION_REGIONAL_STRING, this.region);
        this.publishAddress = String.format(Constants.GRPC_POSITION_STATISTICS_STRING, this.region);
    }

    @Override // io.vertx.core.AbstractVerticle, io.vertx.core.Verticle
    public void start(Future<Void> future) throws Exception {
        logger.info("Starting event bus listener for {}", this.listenAddress);
        try {
            this.vertx.eventBus().localConsumer(this.listenAddress, message -> {
                handleObservedPosition(message);
            });
            future.complete();
        } catch (Exception e) {
            future.fail(e);
        }
    }

    @Override // io.vertx.core.AbstractVerticle, io.vertx.core.Verticle
    public void stop(Future<Void> future) throws Exception {
        logger.info("Stopping statistics verticle for region {}", this.region);
        future.complete();
    }

    private void sendStats(RegionStatistics regionStatistics) {
        logger.debug("publishing regional stats for {}", this.region);
        if (logger.isTraceEnabled()) {
            logger.trace(this.logFormatter.apply(regionStatistics));
        }
        this.vertx.eventBus().publish(this.publishAddress, this.formatter.apply(regionStatistics));
    }

    private RegionStatistics generateRegionStatistics(Position position, Timestamp timestamp) {
        return null;
    }

    private void handleObservedPosition(Message<byte[]> message) {
        RegionPosition regionPosition = null;
        try {
            Timestamp currentTimestamp = TimeUtil.currentTimestamp();
            regionPosition = RegionPosition.parseFrom(message.body());
            this.positionStatisticsCalculator.processPosition(regionPosition, currentTimestamp);
        } catch (InvalidProtocolBufferException e) {
            logger.warn("failed to parse byte array into position object", (Throwable) e);
        } catch (Exception e2) {
            if (null != regionPosition) {
                logger.warn("failed to handle position: " + regionPosition.getPosition().getSymbol(), (Throwable) e2);
            } else {
                logger.warn("failed to handle position", (Throwable) e2);
            }
        }
    }

    public static void main(String[] strArr) throws InterruptedException {
        Vertx vertx = Vertx.vertx();
        logger.info("Starting RegionalStatisticsEventBusPublisherVerticle");
        vertx.deployVerticle(new RegionalStatisticsEventBusPublisherVerticle());
    }
}
