package demo.globaldata.positionservice;

import com.google.protobuf.AbstractMessageLite;
import com.google.protobuf.Timestamp;
import com.trp.nyctdc.proto.Position;
import demo.globaldata.RegionPosition;
import demo.globaldata.positionservice.formatters.ByteArrayMessageFormatter;
import demo.globaldata.util.TimeUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import java.util.function.Function;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:demo/globaldata/positionservice/RegionalPositionEventBusPublisherVerticle.class */
public class RegionalPositionEventBusPublisherVerticle extends AbstractVerticle {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) RegionalPositionEventBusPublisherVerticle.class);
    private String region;
    private String listenAddress;
    private String publishAddress;
    private final Function<AbstractMessageLite, byte[]> formatter;

    public RegionalPositionEventBusPublisherVerticle() {
        this.region = "US_East";
        this.formatter = new ByteArrayMessageFormatter();
    }

    public RegionalPositionEventBusPublisherVerticle(String str) {
        this.region = "US_East";
        this.formatter = new ByteArrayMessageFormatter();
        this.region = str;
    }

    public String getRegion() {
        return this.region;
    }

    @Override // io.vertx.core.AbstractVerticle, io.vertx.core.Verticle
    public void init(Vertx vertx, Context context) {
        super.init(vertx, context);
        logger.info("Configuring verticle {}", this);
        logger.debug("Configuring listener/publisher for region {}", this.region);
        this.listenAddress = String.format(Constants.GRPC_RAW_POSITION_REGIONAL_STRING, this.region);
        this.publishAddress = String.format(Constants.GRPC_POSITION_REGIONAL_STRING, this.region);
    }

    @Override // io.vertx.core.AbstractVerticle, io.vertx.core.Verticle
    public void start(Future<Void> future) throws Exception {
        logger.info("Starting event bus listener for {}", this.listenAddress);
        try {
            this.vertx.eventBus().localConsumer(this.listenAddress, message -> {
                handleMessage(message, TimeUtil.currentTimestamp());
            });
            future.complete();
        } catch (Exception e) {
            future.fail(e);
        }
    }

    private RegionPosition generateRegionPosition(Position position, Timestamp timestamp) {
        return RegionPosition.newBuilder().setRegion(this.region).setPosition(position).setReceiptTimestamp(timestamp).build();
    }

    protected void handleMessage(Message<byte[]> message, Timestamp timestamp) {
        try {
            handleObservedPosition(Position.parseFrom(message.body()), timestamp);
        } catch (Exception e) {
            logger.warn("failed to parse byte array into position object", (Throwable) e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleObservedPosition(@NotNull Position position, @NotNull Timestamp timestamp) {
        logger.debug("publishing regional position for {}:{}", this.region, position.getSymbol());
        this.vertx.eventBus().publish(this.publishAddress, this.formatter.apply(generateRegionPosition(position, timestamp)));
    }

    @Override // io.vertx.core.AbstractVerticle, io.vertx.core.Verticle
    public void stop(Future<Void> future) throws Exception {
        logger.info("Stopping position verticle for region {}", this.region);
        future.complete();
    }
}
