package com.heimuheimu.naiveasync.kafka.producer;

import com.heimuheimu.naiveasync.kafka.util.KafkaUtil;
import com.heimuheimu.naiveasync.monitor.producer.AsyncMessageProducerMonitor;
import com.heimuheimu.naiveasync.monitor.producer.AsyncMessageProducerMonitorFactory;
import com.heimuheimu.naiveasync.producer.AsyncMessageProducer;
import com.heimuheimu.naiveasync.transcoder.MessageTranscoder;
import com.heimuheimu.naiveasync.transcoder.SimpleMessageTranscoder;
import java.io.Closeable;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/heimuheimu/naiveasync/kafka/producer/KafkaProducer.class */
public class KafkaProducer implements AsyncMessageProducer, Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
    private final MessageTranscoder transcoder;
    private final KafkaProducerConfig producerConfig;
    private final Producer<byte[], byte[]> producer;
    private final KafkaProducerListener kafkaProducerListener;
    private final AsyncMessageProducerMonitor monitor;

    /* loaded from: input_file:com/heimuheimu/naiveasync/kafka/producer/KafkaProducer$MessageSendCallback.class */
    private class MessageSendCallback implements Callback {
        private final String topicName;

        public MessageSendCallback(String str) {
            this.topicName = str;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc == null) {
                KafkaProducer.this.monitor.onSuccessSent(this.topicName);
                return;
            }
            KafkaProducer.LOGGER.error("Send async message failed: `" + exc.getMessage() + "`. Topic: `" + this.topicName + "`. KafkaProducer: `" + this + "`.", exc);
            KafkaProducer.this.monitor.onErrorSent(this.topicName);
            if (KafkaProducer.this.kafkaProducerListener != null) {
                try {
                    KafkaProducer.this.kafkaProducerListener.onErrorSent(this.topicName, KafkaProducer.this.producerConfig.getBootstrapServers());
                } catch (Exception e) {
                    KafkaProducer.LOGGER.error("Call KafkaProducerListener#onErrorSent() failed. Topic: `" + this.topicName + "`. KafkaProducer: `" + this + "`.", e);
                }
            }
        }
    }

    public KafkaProducer(KafkaProducerConfig kafkaProducerConfig, KafkaProducerListener kafkaProducerListener) throws NullPointerException {
        if (kafkaProducerConfig == null) {
            LOGGER.error("Create KafkaProducer failed: `kafkaProducerConfig could not be null`.");
            throw new NullPointerException("Create KafkaProducer failed: `kafkaProducerConfig could not be null`.");
        }
        this.transcoder = new SimpleMessageTranscoder();
        this.producerConfig = kafkaProducerConfig;
        this.producer = new org.apache.kafka.clients.producer.KafkaProducer(this.producerConfig.toConfigMap());
        this.kafkaProducerListener = kafkaProducerListener;
        this.monitor = AsyncMessageProducerMonitorFactory.get();
        LOGGER.info("KafkaProducer has been initialized. KafkaProducer: `{}`.", this);
    }

    @Override // com.heimuheimu.naiveasync.producer.AsyncMessageProducer
    public <T> void send(T t) {
        if (t != null) {
            String topicName = KafkaUtil.getTopicName(t.getClass());
            try {
                this.producer.send(new ProducerRecord(topicName, this.transcoder.encode(t)), new MessageSendCallback(topicName));
            } catch (Exception e) {
                LOGGER.error("Send async message failed: `" + e.getMessage() + "`. Message: `" + t + "`. KafkaProducer: `" + this + "`.", e);
                this.monitor.onErrorSent(topicName);
                if (this.kafkaProducerListener != null) {
                    try {
                        this.kafkaProducerListener.onErrorSent(topicName, this.producerConfig.getBootstrapServers());
                    } catch (Exception e2) {
                        LOGGER.error("Call KafkaProducerListener#onErrorSent() failed. Topic: `" + topicName + "`. KafkaProducer: `" + this + "`.", e2);
                    }
                }
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            this.producer.flush();
            this.producer.close();
        } catch (Exception e) {
            LOGGER.error("Close KafkaProducer failed: `" + e.getMessage() + "`. KafkaProducer: `" + this + "`.", e);
        }
    }

    public String toString() {
        return "KafkaProducer{producerConfig=" + this.producerConfig + ", kafkaProducerListener=" + this.kafkaProducerListener + '}';
    }
}
