package com.elyxor.vertx.analytics;

import com.elyxor.vertx.analytics.interfaces.ValueContainer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.EvictingQueue;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import java.util.function.Predicate;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/elyxor/vertx/analytics/WindowedFunctionVerticle.class */
public abstract class WindowedFunctionVerticle<E extends ValueContainer, R> extends AbstractVerticle {
    EvictingQueue<WindowedFunctionVerticle<E, R>.QueueDataWrapper<E>> _queue;
    private long _publishIntervalMills;
    private long _windowMillis;
    private String _listenTopic;
    private String _publishTopic;
    private String _metricType;
    private Function<E, Long> _timestampFcn;
    private Logger _logger;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/elyxor/vertx/analytics/WindowedFunctionVerticle$QueueDataWrapper.class */
    public class QueueDataWrapper<D> {
        private long _createEpoch;
        private D _data;

        QueueDataWrapper(D d, Function<D, Long> function) {
            this._createEpoch = function.apply(d).longValue();
            this._data = d;
        }

        public long getCreateEpoch() {
            return this._createEpoch;
        }

        public D getData() {
            return this._data;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public WindowedFunctionVerticle(String str, String str2, String str3, long j, long j2, Function<E, Long> function, Logger logger) {
        this._logger = LoggerFactory.getLogger(WindowedAverageVerticle.class);
        this._metricType = str3;
        this._listenTopic = str;
        this._publishTopic = str2;
        this._windowMillis = j;
        this._publishIntervalMills = j2;
        this._timestampFcn = function;
        this._logger = null != logger ? logger : this._logger;
    }

    public String getListenTopic() {
        return this._listenTopic;
    }

    public String getPublishTopic() {
        return this._publishTopic;
    }

    protected Logger getLogger() {
        return this._logger;
    }

    public long getWindowMillis() {
        return this._windowMillis;
    }

    public long getPublishIntervalMills() {
        return this._publishIntervalMills;
    }

    public String getMetricType() {
        return this._metricType;
    }

    protected abstract E toInstance(String str) throws IOException;

    public void init(Vertx vertx, Context context) {
        super.init(vertx, context);
        this._queue = EvictingQueue.create(100);
    }

    public void start(Future<Void> future) throws Exception {
        this._logger.info("Metric Type: {}", this._metricType);
        this._logger.info("Listen topic: {}", this._listenTopic);
        this._logger.info("Publish topic: {}", this._publishTopic);
        this._logger.info("Publish Interval: {}ms", Long.valueOf(this._publishIntervalMills));
        this._logger.info("Window period: {}ms", Long.valueOf(this._windowMillis));
        this.vertx.eventBus().consumer(this._listenTopic, this::handle);
        this.vertx.setPeriodic(this._publishIntervalMills, l -> {
            this._logger.trace("Periodic Event Fired");
            this.vertx.executeBlocking(future2 -> {
                this._logger.debug("Doing Calculation");
                List<R> doCalc = doCalc();
                this._logger.debug("Number of results: {}", Integer.valueOf(doCalc.size()));
                doCalc.stream().forEach(obj -> {
                    try {
                        if (!future2.isComplete()) {
                            String writeValueAsString = new ObjectMapper().writeValueAsString(obj);
                            this._logger.debug("Publishing to {}: {}", this._publishTopic, writeValueAsString);
                            this.vertx.eventBus().publish(this._publishTopic, writeValueAsString);
                        }
                    } catch (IOException e) {
                        this._logger.warn("failed to publish windowed data message", e);
                        future2.fail(e);
                    }
                });
                this._logger.debug("Completing event");
                future2.complete();
            }, asyncResult -> {
            });
        });
        future.complete();
    }

    protected abstract List<R> doCalc();

    /* JADX INFO: Access modifiers changed from: protected */
    @NotNull
    public Predicate<WindowedFunctionVerticle<E, R>.QueueDataWrapper<E>> getQueueDataWrapperPredicate(long j) {
        return queueDataWrapper -> {
            return j - queueDataWrapper.getCreateEpoch() < this._windowMillis;
        };
    }

    private void handle(Message<Object> message) {
        String obj = message.body().toString();
        getLogger().debug("Handling event: {}", obj);
        try {
            E instance = toInstance(obj);
            getLogger().trace("handling new data: {}", instance);
            this._queue.add(new QueueDataWrapper(instance, this._timestampFcn));
            getLogger().trace("Queue depth: {}", Integer.valueOf(this._queue.size()));
        } catch (IOException e) {
            getLogger().trace("Caught IOException parsing message. ", e);
        }
    }
}
