package com.elyxor.vertx.analytics;

import com.elyxor.util.time.SystemTimeProvider;
import com.elyxor.vertx.analytics.LongValueContainer;
import com.elyxor.vertx.analytics.WindowedAverage;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.EvictingQueue;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import java.io.IOException;
import java.util.OptionalDouble;
import java.util.function.Function;
import java.util.function.Predicate;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/elyxor/vertx/analytics/WindowedAverageVerticle.class */
public abstract class WindowedAverageVerticle<E extends LongValueContainer> extends AbstractVerticle {
    EvictingQueue<WindowedAverageVerticle<E>.QueueDataWrapper<E>> _queue;
    private long _publishIntervalMills;
    private long _windowMillis;
    private String _listenTopic;
    private String _publishTopic;
    private String _metricType;
    private Function<E, Long> _timestampFcn;
    private static final Logger LOGGER = LoggerFactory.getLogger(WindowedAverageVerticle.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/elyxor/vertx/analytics/WindowedAverageVerticle$QueueDataWrapper.class */
    public class QueueDataWrapper<D> {
        private long _createEpoch;
        private D _data;

        QueueDataWrapper(D d, Function<D, Long> function) {
            this._createEpoch = function.apply(d).longValue();
            this._data = d;
        }

        public long getCreateEpoch() {
            return this._createEpoch;
        }

        public D getData() {
            return this._data;
        }
    }

    protected WindowedAverageVerticle(String str, String str2, String str3, long j, long j2, Function<E, Long> function) {
        this._metricType = str3;
        this._listenTopic = str;
        this._publishTopic = str2;
        this._publishIntervalMills = j2;
        this._windowMillis = j;
        this._timestampFcn = function;
    }

    public String getListenTopic() {
        return this._listenTopic;
    }

    public String getPublishTopic() {
        return this._publishTopic;
    }

    public void init(Vertx vertx, Context context) {
        super.init(vertx, context);
        this._queue = EvictingQueue.create(100);
    }

    public void start(Future<Void> future) throws Exception {
        getLogger().info("Metric Type: " + this._metricType);
        getLogger().info("Listen topic: " + this._listenTopic);
        getLogger().info("Publish topic: " + this._publishTopic);
        this.vertx.eventBus().consumer(this._listenTopic, this::handle);
        getLogger().info("Publish Interval: " + this._publishIntervalMills + "ms");
        getLogger().info("Window period: " + this._windowMillis + "ms");
        this.vertx.setPeriodic(this._publishIntervalMills, l -> {
            this.vertx.executeBlocking(future2 -> {
                try {
                    this.vertx.eventBus().publish(this._publishTopic, new ObjectMapper().writeValueAsString(calcAverage()));
                    future2.complete();
                } catch (IOException e) {
                    future2.fail(e);
                }
            }, asyncResult -> {
            });
        });
        future.complete();
    }

    private WindowedAverage calcAverage() {
        long currentTimeMillis = new SystemTimeProvider().currentTimeMillis();
        OptionalDouble average = this._queue.stream().filter(getQueueDataWrapperPredicate(currentTimeMillis)).mapToLong(queueDataWrapper -> {
            return ((LongValueContainer) queueDataWrapper.getData()).getValue();
        }).average();
        return new WindowedAverage.Builder().withAverage(average.isPresent() ? Double.valueOf(average.getAsDouble()) : null).withWindowMillis(this._windowMillis).withType(this._metricType).withCreateEpoch(currentTimeMillis).build();
    }

    @NotNull
    private Predicate<WindowedAverageVerticle<E>.QueueDataWrapper<E>> getQueueDataWrapperPredicate(long j) {
        return queueDataWrapper -> {
            return j - queueDataWrapper.getCreateEpoch() < this._windowMillis;
        };
    }

    protected Logger getLogger() {
        return LOGGER;
    }

    protected abstract E toInstance(String str) throws IOException;

    private void handle(Message<Object> message) {
        String obj = message.body().toString();
        getLogger().debug("Handling event: {}", obj);
        try {
            E instance = toInstance(obj);
            getLogger().trace("handling new data: {}", instance);
            this._queue.add(new QueueDataWrapper(instance, this._timestampFcn));
            getLogger().trace("Queue depth: " + this._queue.size());
        } catch (IOException e) {
            getLogger().trace("Caught IOException parsing message. ", e);
        }
    }
}
