package com.heimuheimu.naiveasync.kafka.consumer;

import com.heimuheimu.naiveasync.constant.BeanStatusEnum;
import com.heimuheimu.naiveasync.consumer.AsyncMessageConsumer;
import com.heimuheimu.naiveasync.kafka.util.KafkaUtil;
import com.heimuheimu.naiveasync.monitor.consumer.AsyncMessageConsumerMonitor;
import com.heimuheimu.naiveasync.monitor.consumer.AsyncMessageConsumerMonitorFactory;
import com.heimuheimu.naiveasync.transcoder.MessageTranscoder;
import com.heimuheimu.naiveasync.transcoder.SimpleMessageTranscoder;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/heimuheimu/naiveasync/kafka/consumer/KafkaConsumerManager.class */
public class KafkaConsumerManager implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerManager.class);
    private static final Logger CONSUMER_INFO_LOGGER = LoggerFactory.getLogger("NAIVE_ASYNC_CONSUMER_INFO_LOG");
    private final Map<String, AsyncMessageConsumer<?>> consumerMap;
    private final KafkaConsumerConfig config;
    private final KafkaConsumerListener listener;
    private final MessageTranscoder transcoder;
    private final AsyncMessageConsumerMonitor monitor;
    private final List<KafkaConsumeThread> consumeThreadList = new ArrayList();
    private BeanStatusEnum state = BeanStatusEnum.UNINITIALIZED;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/heimuheimu/naiveasync/kafka/consumer/KafkaConsumerManager$KafkaConsumeThread.class */
    public class KafkaConsumeThread extends Thread {
        private final LinkedList<Map<TopicPartition, OffsetAndMetadata>> failedCommitOffsetQueue;
        private final Map<TopicPartition, Long> continuesConsumeFailedCountMap;
        private final String topic;
        private final AsyncMessageConsumer asyncMessageConsumer;
        private volatile boolean stopFlag;
        private KafkaConsumer<byte[], byte[]> consumer;
        private int sleepSeconds;
        private boolean hasError;

        private KafkaConsumeThread(AsyncMessageConsumer asyncMessageConsumer) {
            this.failedCommitOffsetQueue = new LinkedList<>();
            this.continuesConsumeFailedCountMap = new HashMap();
            this.stopFlag = false;
            this.sleepSeconds = 2;
            this.hasError = false;
            this.topic = KafkaUtil.getTopicName(asyncMessageConsumer.getMessageClass());
            this.asyncMessageConsumer = asyncMessageConsumer;
            createConsumer();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    KafkaConsumerManager.CONSUMER_INFO_LOGGER.info("Kafka consumer thread has been started. Thread: `{}`. Topic: `{}`. Consumer: `{}`. Assigned partitions: `{}`. KafkaConsumerConfig: `{}`.", new Object[]{getName(), this.topic, this.asyncMessageConsumer, this.consumer.assignment(), KafkaConsumerManager.this.config});
                    while (!this.stopFlag) {
                        if (this.consumer == null) {
                            try {
                                createConsumer();
                            } catch (Exception e) {
                                KafkaConsumerManager.LOGGER.error("Create kafka consumer failed: `" + e.getMessage() + "`. Thread: `" + getName() + "`. Topic: `" + this.topic + "`.", e);
                                KafkaConsumerManager.this.monitor.onErrorExecution();
                                onError("create consumer failed", true);
                            }
                        }
                        try {
                            r8 = this.stopFlag ? null : this.consumer.poll(Long.MAX_VALUE);
                        } catch (Exception e2) {
                            KafkaConsumerManager.LOGGER.error("Poll message failed: `" + e2.getMessage() + "`. Assigned partitions: `" + this.consumer.assignment() + "`. Thread: `" + getName() + "`. KafkaConsumerConfig: `" + KafkaConsumerManager.this.config + "`.", e2);
                            KafkaConsumerManager.this.monitor.onErrorExecution();
                            onError("poll failed", true);
                        } catch (InterruptException e3) {
                        }
                        if (r8 != null) {
                            for (TopicPartition topicPartition : r8.partitions()) {
                                ArrayList arrayList = new ArrayList();
                                ArrayList arrayList2 = new ArrayList();
                                for (ConsumerRecord consumerRecord : r8.records(topicPartition)) {
                                    KafkaConsumerManager.this.monitor.onPolled(this.topic, System.currentTimeMillis() - consumerRecord.timestamp());
                                    Object obj = null;
                                    try {
                                        obj = KafkaConsumerManager.this.transcoder.decode((byte[]) consumerRecord.value());
                                        arrayList.add(obj);
                                        arrayList2.add(Long.valueOf(consumerRecord.offset()));
                                    } catch (Exception e4) {
                                        KafkaConsumerManager.LOGGER.error("Decode message failed: `" + e4.getMessage() + "`. Thread: `" + getName() + "`. TopicPartition: `" + topicPartition + "`. KafkaConsumerConfig: `" + KafkaConsumerManager.this.config + "`. Message: `" + obj + "`.", e4);
                                        KafkaConsumerManager.this.monitor.onErrorExecution();
                                        onError("decode message failed", false);
                                    }
                                }
                                if (!arrayList.isEmpty()) {
                                    if (this.asyncMessageConsumer.isBatchMode()) {
                                        try {
                                            this.asyncMessageConsumer.consume((List) arrayList);
                                            try {
                                                commitSync(topicPartition, ((Long) arrayList2.get(arrayList2.size() - 1)).longValue());
                                                KafkaConsumerManager.this.monitor.onSuccessConsumed(this.topic, arrayList.size());
                                            } catch (Exception e5) {
                                                KafkaConsumerManager.LOGGER.error("Commit sync failed: `" + e5.getMessage() + "`. Thread: `" + getName() + "`. TopicPartition: `" + topicPartition + "`. KafkaConsumerConfig: `" + KafkaConsumerManager.this.config + "`.", e5);
                                                onError("commit sync failed", true);
                                            }
                                        } catch (Exception e6) {
                                            KafkaConsumerManager.LOGGER.error("Consume message failed: `" + e6.getMessage() + "`. Thread: `" + getName() + "`. TopicPartition: `" + topicPartition + "`. KafkaConsumerConfig: `" + KafkaConsumerManager.this.config + "`. MessageList: `" + arrayList + "`.", e6);
                                            KafkaConsumerManager.this.monitor.onErrorExecution();
                                            onError("consume message failed", false);
                                        }
                                    } else {
                                        for (int i = 0; i < arrayList.size(); i++) {
                                            Object obj2 = arrayList.get(i);
                                            long longValue = ((Long) arrayList2.get(i)).longValue();
                                            try {
                                                this.asyncMessageConsumer.consume((AsyncMessageConsumer) obj2);
                                                try {
                                                    commitSync(topicPartition, longValue);
                                                    KafkaConsumerManager.this.monitor.onSuccessConsumed(this.topic, 1);
                                                } catch (Exception e7) {
                                                    KafkaConsumerManager.LOGGER.error("Commit sync failed: `" + e7.getMessage() + "`. Thread: `" + getName() + "`. TopicPartition: `" + topicPartition + "`. KafkaConsumerConfig: `" + KafkaConsumerManager.this.config + "`.", e7);
                                                    onError("commit sync failed", true);
                                                }
                                            } catch (Exception e8) {
                                                KafkaConsumerManager.LOGGER.error("Consume message failed: `" + e8.getMessage() + "`. Thread: `" + getName() + "`. TopicPartition: `" + topicPartition + "`. KafkaConsumerConfig: `" + KafkaConsumerManager.this.config + "`. Message: `" + obj2 + "`.", e8);
                                                KafkaConsumerManager.this.monitor.onErrorExecution();
                                                onError("consume message failed", false);
                                            }
                                        }
                                    }
                                }
                            }
                        }
                        onSuccess();
                    }
                    close();
                    try {
                        this.consumer.close();
                    } catch (Exception e9) {
                        KafkaConsumerManager.LOGGER.error("KafkaConsumer closed failed. Thread: `" + getName() + "`. Topic: `" + this.topic + "`.", e9);
                    } catch (InterruptException e10) {
                    }
                    KafkaConsumerManager.CONSUMER_INFO_LOGGER.info("Kafka consumer thread has been stopped. Thread: `{}`. Topic: `{}`. KafkaConsumerConfig: `{}`.", new Object[]{getName(), this.topic, KafkaConsumerManager.this.config});
                } catch (Exception e11) {
                    KafkaConsumerManager.LOGGER.error("KafkaConsumeThread need to be closed due to unexpected error. Thread: `" + getName() + "`. KafkaConsumerConfig: `" + KafkaConsumerManager.this.config + "`.", e11);
                    close();
                    try {
                        this.consumer.close();
                    } catch (Exception e12) {
                        KafkaConsumerManager.LOGGER.error("KafkaConsumer closed failed. Thread: `" + getName() + "`. Topic: `" + this.topic + "`.", e12);
                    } catch (InterruptException e13) {
                    }
                    KafkaConsumerManager.CONSUMER_INFO_LOGGER.info("Kafka consumer thread has been stopped. Thread: `{}`. Topic: `{}`. KafkaConsumerConfig: `{}`.", new Object[]{getName(), this.topic, KafkaConsumerManager.this.config});
                }
            } catch (Throwable th) {
                close();
                try {
                    this.consumer.close();
                } catch (InterruptException e14) {
                } catch (Exception e15) {
                    KafkaConsumerManager.LOGGER.error("KafkaConsumer closed failed. Thread: `" + getName() + "`. Topic: `" + this.topic + "`.", e15);
                }
                KafkaConsumerManager.CONSUMER_INFO_LOGGER.info("Kafka consumer thread has been stopped. Thread: `{}`. Topic: `{}`. KafkaConsumerConfig: `{}`.", new Object[]{getName(), this.topic, KafkaConsumerManager.this.config});
                throw th;
            }
        }

        private void createConsumer() {
            this.consumer = new KafkaConsumer<>(KafkaConsumerManager.this.config.toConfigMap());
            this.consumer.subscribe(Collections.singletonList(this.topic), new ConsumerRebalanceListener() { // from class: com.heimuheimu.naiveasync.kafka.consumer.KafkaConsumerManager.KafkaConsumeThread.1
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    KafkaConsumerManager.CONSUMER_INFO_LOGGER.info("[Rebalance] Revoked partitions: `{}`. Thread: `{}`. Topic: `{}`. KafkaConsumerConfig: `{}`.", new Object[]{collection, KafkaConsumeThread.this.getName(), KafkaConsumeThread.this.topic, KafkaConsumerManager.this.config});
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    KafkaConsumerManager.CONSUMER_INFO_LOGGER.info("[Rebalance] Assigned partitions: `{}`. Thread: `{}`. Topic: `{}`. KafkaConsumerConfig: `{}`.", new Object[]{collection, KafkaConsumeThread.this.getName(), KafkaConsumeThread.this.topic, KafkaConsumerManager.this.config});
                }
            });
        }

        private void onError(String str, boolean z) {
            if (z && this.consumer != null) {
                try {
                    this.consumer.close();
                } catch (Exception e) {
                    KafkaConsumerManager.LOGGER.error("KafkaConsumer closed failed. Thread: `" + getName() + "`. Topic: `" + this.topic + "`.", e);
                }
                this.consumer = null;
            }
            this.hasError = true;
            try {
                Thread.sleep(this.sleepSeconds * 1000);
            } catch (InterruptedException e2) {
            }
            this.sleepSeconds *= 2;
            KafkaConsumerManager.this.listener.onError(str, KafkaConsumerManager.this.config.getGroupId(), KafkaConsumerManager.this.config.getBootstrapServers());
        }

        private void onSuccess() {
            this.sleepSeconds = 2;
            if (this.hasError) {
                KafkaConsumerManager.this.listener.onRecover(KafkaConsumerManager.this.config.getGroupId(), KafkaConsumerManager.this.config.getBootstrapServers());
                this.hasError = false;
            }
        }

        private void commitSync(TopicPartition topicPartition, long j) {
            this.consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(j + 1)));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void close() {
            if (this.stopFlag) {
                return;
            }
            try {
                this.stopFlag = true;
                interrupt();
            } catch (Exception e) {
                KafkaConsumerManager.LOGGER.error("Consumer closed failed. Thread: `" + getName() + "`. KafkaConsumerConfig: `" + KafkaConsumerManager.this.config + "`.", e);
            }
        }
    }

    /* loaded from: input_file:com/heimuheimu/naiveasync/kafka/consumer/KafkaConsumerManager$KafkaConsumerListenerWrapper.class */
    private class KafkaConsumerListenerWrapper implements KafkaConsumerListener {
        private final KafkaConsumerListener listener;

        public KafkaConsumerListenerWrapper(KafkaConsumerListener kafkaConsumerListener) {
            this.listener = kafkaConsumerListener;
        }

        @Override // com.heimuheimu.naiveasync.kafka.consumer.KafkaConsumerListener
        public void onError(String str, String str2, String str3) {
            if (this.listener != null) {
                try {
                    this.listener.onError(str, str2, str3);
                } catch (Exception e) {
                    KafkaConsumerManager.LOGGER.error("Call KafkaConsumerListener#onError() failed. ErrorMessage: `" + str + "`. KafkaConsumerConfig: `" + KafkaConsumerManager.this.config + "`.", e);
                }
            }
        }

        @Override // com.heimuheimu.naiveasync.kafka.consumer.KafkaConsumerListener
        public void onRecover(String str, String str2) {
            if (this.listener != null) {
                try {
                    this.listener.onRecover(str, str2);
                } catch (Exception e) {
                    KafkaConsumerManager.LOGGER.error("Call KafkaConsumerListener#onRecover() failed. KafkaConsumerConfig: `" + KafkaConsumerManager.this.config + "`.", e);
                }
            }
        }
    }

    public KafkaConsumerManager(List<AsyncMessageConsumer<?>> list, KafkaConsumerConfig kafkaConsumerConfig, KafkaConsumerListener kafkaConsumerListener) throws IllegalArgumentException {
        if (list == null || list.isEmpty()) {
            LOGGER.error("Create KafkaConsumerManager failed: `consumers could not be null or empty`. Consumers: `" + list + "`. KafkaConsumerConfig: `" + kafkaConsumerConfig + "`. KafkaConsumerListener: `" + kafkaConsumerListener + "`.");
            throw new IllegalArgumentException("Create KafkaConsumerManager failed: `consumers could not be null or empty`. Consumers: `" + list + "`. KafkaConsumerConfig: `" + kafkaConsumerConfig + "`. KafkaConsumerListener: `" + kafkaConsumerListener + "`.");
        }
        if (kafkaConsumerConfig == null) {
            LOGGER.error("Create KafkaConsumerManager failed: `kafkaConsumerConfig could not be null or empty`. Consumers: `" + list + "`. KafkaConsumerConfig: `null`. KafkaConsumerListener: `" + kafkaConsumerListener + "`.");
            throw new IllegalArgumentException("Create KafkaConsumerManager failed: `kafkaConsumerConfig could not be null or empty`. Consumers: `" + list + "`. KafkaConsumerConfig: `null`. KafkaConsumerListener: `" + kafkaConsumerListener + "`.");
        }
        this.consumerMap = new HashMap();
        for (AsyncMessageConsumer<?> asyncMessageConsumer : list) {
            String topicName = KafkaUtil.getTopicName(asyncMessageConsumer.getMessageClass());
            AsyncMessageConsumer<?> asyncMessageConsumer2 = this.consumerMap.get(topicName);
            if (asyncMessageConsumer2 != null && asyncMessageConsumer2 != asyncMessageConsumer) {
                LOGGER.error("Consumer `{}` is existed. It will be overridden. Previous consumer: `{}`. New Consumer: `{}`. KafkaConsumerConfig: `{}`.", new Object[]{topicName, asyncMessageConsumer2, asyncMessageConsumer, kafkaConsumerConfig});
            }
            this.consumerMap.put(topicName, asyncMessageConsumer);
        }
        this.config = kafkaConsumerConfig;
        this.listener = new KafkaConsumerListenerWrapper(kafkaConsumerListener);
        this.transcoder = new SimpleMessageTranscoder();
        this.monitor = AsyncMessageConsumerMonitorFactory.get();
    }

    public synchronized void init() throws IllegalStateException {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.state == BeanStatusEnum.UNINITIALIZED) {
            this.state = BeanStatusEnum.NORMAL;
            try {
                for (String str : this.consumerMap.keySet()) {
                    AsyncMessageConsumer<?> asyncMessageConsumer = this.consumerMap.get(str);
                    int poolSize = asyncMessageConsumer instanceof KafkaAsyncMessageConsumer ? ((KafkaAsyncMessageConsumer) asyncMessageConsumer).getPoolSize() : 1;
                    for (int i = 0; i < poolSize; i++) {
                        KafkaConsumeThread kafkaConsumeThread = new KafkaConsumeThread(asyncMessageConsumer);
                        kafkaConsumeThread.setName("naiveasync-kafka-consumer-" + i + "[" + str + "]");
                        kafkaConsumeThread.start();
                        this.consumeThreadList.add(kafkaConsumeThread);
                    }
                }
                CONSUMER_INFO_LOGGER.info("KafkaConsumerManager has been initialized. Cost: `{} ms`. KafkaConsumerConfig: `{}`. PoolSize: `{}`. Topics: `{}`. Listener: `{}`.", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), this.config, Integer.valueOf(this.consumeThreadList.size()), this.consumerMap.keySet(), this.listener});
            } catch (Exception e) {
                LOGGER.error("KafkaConsumerManager initialize failed. KafkaConsumerConfig: `" + this.config + "`. PoolSize: `" + this.consumeThreadList.size() + "`. Topics: `" + this.consumerMap.keySet() + "`. Listener: `" + this.listener + "`.", e);
                close();
                throw new IllegalStateException("KafkaConsumerManager initialize failed. KafkaConsumerConfig: `" + this.config + "`. PoolSize: `" + this.consumeThreadList.size() + "`. Topics: `" + this.consumerMap.keySet() + "`. Listener: `" + this.listener + "`.", e);
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.state != BeanStatusEnum.CLOSED) {
            this.state = BeanStatusEnum.CLOSED;
            for (KafkaConsumeThread kafkaConsumeThread : this.consumeThreadList) {
                try {
                    kafkaConsumeThread.close();
                } catch (Exception e) {
                    LOGGER.error("KafkaConsumeThread close failed. ThreadName: `" + kafkaConsumeThread.getName() + "`. KafkaConsumerConfig: `" + this.config + "`.", e);
                }
            }
            CONSUMER_INFO_LOGGER.info("KafkaConsumerManager has been stopped. Cost: `{} ms`. KafkaConsumerConfig: `{}`. PoolSize: `{}`. Topics: `{}`. Listener: `{}`.", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), this.config, Integer.valueOf(this.consumeThreadList.size()), this.consumerMap.keySet(), this.listener});
        }
    }

    public String toString() {
        return "KafkaConsumerManager{consumerMap=" + this.consumerMap + ", config=" + this.config + ", listener=" + this.listener + ", state=" + this.state + '}';
    }
}
