package reactor.io.queue;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import net.openhft.chronicle.Chronicle;
import net.openhft.chronicle.ChronicleQueueBuilder;
import net.openhft.chronicle.Excerpt;
import net.openhft.chronicle.ExcerptAppender;
import net.openhft.chronicle.ExcerptCommon;
import net.openhft.chronicle.ExcerptTailer;
import net.openhft.chronicle.tools.ChronicleTools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.codec.JavaSerializationCodec;

/* loaded from: input_file:lib/reactor-core-2.0.6.RELEASE.jar:reactor/io/queue/ChronicleQueuePersistor.class */
public class ChronicleQueuePersistor<T> implements QueuePersistor<T> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ChronicleQueuePersistor.class);
    private final Object monitor;
    private final AtomicLong lastId;
    private final AtomicLong size;
    private final ExcerptTailer exTrailer;
    private final ExcerptTailer indexTrailer;
    private final ExcerptAppender exAppender;
    private final String basePath;
    private final Codec<Buffer, T, T> codec;
    private final boolean deleteOnExit;
    private final Chronicle data;

    public ChronicleQueuePersistor(@Nonnull String str) throws IOException {
        this(str, new JavaSerializationCodec(), false, false, ChronicleQueueBuilder.vanilla(str));
    }

    public ChronicleQueuePersistor(@Nonnull String str, @Nonnull Codec<Buffer, T, T> codec, boolean z, boolean z2, @Nonnull ChronicleQueueBuilder chronicleQueueBuilder) throws IOException {
        this.monitor = new Object();
        this.lastId = new AtomicLong();
        this.size = new AtomicLong(0L);
        this.basePath = str;
        this.codec = codec;
        this.deleteOnExit = z2;
        this.data = chronicleQueueBuilder.build();
        this.lastId.set(this.data.lastWrittenIndex());
        if (z) {
            this.data.clear();
        }
        ChronicleTools.warmup();
        Excerpt createExcerpt = this.data.createExcerpt();
        createExcerpt.skip(4L);
        while (createExcerpt.nextIndex()) {
            int readInt = createExcerpt.readInt();
            this.size.incrementAndGet();
            createExcerpt.skip(readInt);
        }
        this.indexTrailer = this.data.createTailer();
        this.exTrailer = this.data.createTailer();
        this.exAppender = this.data.createAppender();
    }

    @Override // reactor.io.queue.QueuePersistor
    public void close() {
        try {
            this.data.close();
            if (this.deleteOnExit) {
                this.data.clear();
            }
        } catch (IOException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @Override // reactor.io.queue.QueuePersistor
    public long lastId() {
        return this.lastId.get();
    }

    @Override // reactor.io.queue.QueuePersistor
    public long size() {
        return this.size.get();
    }

    @Override // reactor.io.queue.QueuePersistor
    public boolean hasNext() {
        return this.indexTrailer.nextIndex();
    }

    @Override // reactor.io.queue.QueuePersistor
    public Long offer(@Nonnull T t) {
        synchronized (this.monitor) {
            Buffer apply = this.codec.apply(t);
            int remaining = apply.remaining();
            this.exAppender.startExcerpt(4 + remaining);
            this.exAppender.writeInt(remaining);
            this.exAppender.write(apply.byteBuffer());
            this.exAppender.finish();
            this.size.incrementAndGet();
            this.lastId.set(this.exAppender.lastWrittenIndex());
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Offered {} to Chronicle at index {}, size {}", t, Long.valueOf(lastId()), Long.valueOf(size()));
        }
        return Long.valueOf(lastId());
    }

    @Override // reactor.io.queue.QueuePersistor
    public Long offerAll(@Nonnull Collection<T> collection) {
        if (collection.isEmpty()) {
            return Long.valueOf(lastId());
        }
        Codec<Buffer, T, T> codec = this.codec;
        synchronized (this.monitor) {
            long j = 0;
            Iterator<T> it = collection.iterator();
            while (it.hasNext()) {
                Buffer apply = codec.apply(it.next());
                int remaining = apply.remaining();
                long j2 = j;
                j = j2 + 1;
                if (j2 == 0) {
                    this.exAppender.startExcerpt(4 + (16 * collection.size() * remaining));
                }
                this.exAppender.writeInt(remaining);
                this.exAppender.write(apply.byteBuffer());
                this.size.incrementAndGet();
            }
            this.lastId.set(this.exAppender.lastWrittenIndex());
            this.exAppender.finish();
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Offered {} to Chronicle at index {}, size {}", collection, Long.valueOf(lastId()), Long.valueOf(size()));
        }
        return Long.valueOf(lastId());
    }

    @Override // reactor.io.queue.QueuePersistor
    public T get(Long l) {
        if (this.exTrailer.index(l.longValue())) {
            return read(this.exTrailer);
        }
        return null;
    }

    public Codec<Buffer, T, T> codec() {
        return this.codec;
    }

    public ExcerptTailer reader() {
        return this.indexTrailer;
    }

    @Override // reactor.io.queue.QueuePersistor
    public T remove() {
        T read;
        synchronized (this.monitor) {
            read = read(this.indexTrailer);
            this.size.decrementAndGet();
        }
        return read;
    }

    @Override // java.lang.Iterable
    public Iterator<T> iterator() {
        return new Iterator<T>() { // from class: reactor.io.queue.ChronicleQueuePersistor.1
            @Override // java.util.Iterator
            public boolean hasNext() {
                return ChronicleQueuePersistor.this.hasNext();
            }

            @Override // java.util.Iterator
            public T next() {
                return (T) ChronicleQueuePersistor.this.read(ChronicleQueuePersistor.this.indexTrailer);
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new IllegalStateException("This Iterator is read-only.");
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public T read(ExcerptCommon excerptCommon) {
        try {
            ByteBuffer allocate = ByteBuffer.allocate(excerptCommon.readInt());
            excerptCommon.read(allocate);
            allocate.flip();
            T apply = this.codec.decoder(null).apply(new Buffer(allocate));
            excerptCommon.finish();
            return apply;
        } catch (Throwable th) {
            excerptCommon.finish();
            throw th;
        }
    }
}
