package com.hazelcast.topic.impl.reliable;

import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.Member;
import com.hazelcast.core.Message;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.logging.ILogger;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.ringbuffer.StaleSequenceException;
import com.hazelcast.spi.exception.DistributedObjectDestroyedException;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.topic.ReliableMessageListener;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;

/* loaded from: input_file:WEB-INF/lib/hazelcast-3.12.6.jar:com/hazelcast/topic/impl/reliable/MessageRunner.class */
public abstract class MessageRunner<E> implements ExecutionCallback<ReadResultSet<ReliableTopicMessage>> {
    protected final Ringbuffer<ReliableTopicMessage> ringbuffer;
    protected final ILogger logger;
    protected final ReliableMessageListener<E> listener;
    protected final String topicName;
    protected long sequence;
    private final SerializationService serializationService;
    private final ConcurrentMap<String, MessageRunner<E>> runnersMap;
    private final String id;
    private final Executor executor;
    private final int batchSze;
    private volatile boolean cancelled;

    public MessageRunner(String str, ReliableMessageListener<E> reliableMessageListener, Ringbuffer<ReliableTopicMessage> ringbuffer, String str2, int i, SerializationService serializationService, Executor executor, ConcurrentMap<String, MessageRunner<E>> concurrentMap, ILogger iLogger) {
        this.id = str;
        this.listener = reliableMessageListener;
        this.ringbuffer = ringbuffer;
        this.topicName = str2;
        this.serializationService = serializationService;
        this.logger = iLogger;
        this.batchSze = i;
        this.executor = executor;
        this.runnersMap = concurrentMap;
        long retrieveInitialSequence = reliableMessageListener.retrieveInitialSequence();
        this.sequence = retrieveInitialSequence == -1 ? ringbuffer.tailSequence() + 1 : retrieveInitialSequence;
    }

    public void next() {
        if (this.cancelled) {
            return;
        }
        this.ringbuffer.readManyAsync(this.sequence, 1, this.batchSze, null).andThen(this, this.executor);
    }

    @Override // com.hazelcast.core.ExecutionCallback
    public void onResponse(ReadResultSet<ReliableTopicMessage> readResultSet) {
        for (ReliableTopicMessage reliableTopicMessage : readResultSet) {
            if (this.cancelled) {
                return;
            }
            try {
                this.listener.storeSequence(this.sequence);
                process(reliableTopicMessage);
            } catch (Throwable th) {
                if (terminate(th)) {
                    cancel();
                    return;
                }
            }
            this.sequence++;
        }
        next();
    }

    private void process(ReliableTopicMessage reliableTopicMessage) {
        updateStatistics();
        this.listener.onMessage(toMessage(reliableTopicMessage));
    }

    protected abstract void updateStatistics();

    private Message<E> toMessage(ReliableTopicMessage reliableTopicMessage) {
        Member member = getMember(reliableTopicMessage);
        return new Message<>(this.topicName, this.serializationService.toObject(reliableTopicMessage.getPayload()), reliableTopicMessage.getPublishTime(), member);
    }

    protected abstract Member getMember(ReliableTopicMessage reliableTopicMessage);

    @Override // com.hazelcast.core.ExecutionCallback
    public void onFailure(Throwable th) {
        if (this.cancelled) {
            return;
        }
        if (handleInternalException(adjustThrowable(th))) {
            next();
        } else {
            cancel();
        }
    }

    protected boolean handleInternalException(Throwable th) {
        if (th instanceof OperationTimeoutException) {
            return handleOperationTimeoutException();
        }
        if (th instanceof IllegalArgumentException) {
            return handleIllegalArgumentException((IllegalArgumentException) th);
        }
        if (th instanceof StaleSequenceException) {
            return handleStaleSequenceException((StaleSequenceException) th);
        }
        if (th instanceof HazelcastInstanceNotActiveException) {
            if (!this.logger.isFinestEnabled()) {
                return false;
            }
            this.logger.finest("Terminating MessageListener " + this.listener + " on topic: " + this.topicName + ".  Reason: HazelcastInstance is shutting down");
            return false;
        }
        if (!(th instanceof DistributedObjectDestroyedException)) {
            this.logger.warning("Terminating MessageListener " + this.listener + " on topic: " + this.topicName + ". Reason: Unhandled exception, message: " + th.getMessage(), th);
            return false;
        }
        if (!this.logger.isFinestEnabled()) {
            return false;
        }
        this.logger.finest("Terminating MessageListener " + this.listener + " on topic: " + this.topicName + ". Reason: Topic is destroyed");
        return false;
    }

    private boolean handleOperationTimeoutException() {
        if (!this.logger.isFinestEnabled()) {
            return true;
        }
        this.logger.finest("MessageListener " + this.listener + " on topic: " + this.topicName + " timed out. Continuing from last known sequence: " + this.sequence);
        return true;
    }

    protected abstract Throwable adjustThrowable(Throwable th);

    private boolean handleStaleSequenceException(StaleSequenceException staleSequenceException) {
        long headSequence = getHeadSequence(staleSequenceException);
        if (!this.listener.isLossTolerant()) {
            this.logger.warning("Terminating MessageListener:" + this.listener + " on topic: " + this.topicName + ". Reason: The listener was too slow or the retention period of the message has been violated. head: " + headSequence + " sequence:" + this.sequence);
            return false;
        }
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("MessageListener " + this.listener + " on topic: " + this.topicName + " ran into a stale sequence. Jumping from oldSequence: " + this.sequence + " to sequence: " + headSequence);
        }
        this.sequence = headSequence;
        return true;
    }

    protected abstract long getHeadSequence(StaleSequenceException staleSequenceException);

    private boolean handleIllegalArgumentException(IllegalArgumentException illegalArgumentException) {
        long headSequence = this.ringbuffer.headSequence();
        if (!this.listener.isLossTolerant()) {
            this.logger.warning("Terminating MessageListener:" + this.listener + " on topic: " + this.topicName + ". Reason: Underlying ring buffer data related to reliable topic is lost. ");
            return false;
        }
        if (this.logger.isFinestEnabled()) {
            this.logger.finest(String.format("MessageListener %s on topic %s requested a too large sequence: %s. . Jumping from old sequence: %s to sequence: %s", this.listener, this.topicName, illegalArgumentException.getMessage(), Long.valueOf(this.sequence), Long.valueOf(headSequence)));
        }
        this.sequence = headSequence;
        return true;
    }

    public void cancel() {
        this.cancelled = true;
        this.runnersMap.remove(this.id);
    }

    private boolean terminate(Throwable th) {
        if (this.cancelled) {
            return true;
        }
        try {
            boolean isTerminal = this.listener.isTerminal(th);
            if (isTerminal) {
                this.logger.warning("Terminating MessageListener " + this.listener + " on topic: " + this.topicName + ". Reason: Unhandled exception, message: " + th.getMessage(), th);
            } else if (this.logger.isFinestEnabled()) {
                this.logger.finest("MessageListener " + this.listener + " on topic: " + this.topicName + " ran into an exception: message:" + th.getMessage(), th);
            }
            return isTerminal;
        } catch (Throwable th2) {
            this.logger.warning("Terminating messageListener:" + this.listener + " on topic: " + this.topicName + ". Reason: Unhandled exception while calling ReliableMessageListener.isTerminal() method", th2);
            return true;
        }
    }

    public boolean isCancelled() {
        return this.cancelled;
    }
}
