NaiveAsync: 对 Kafka 生产者和消费者进行封装,提供实时报警和数据监控功能。

使用要求

Maven 配置

    <dependency>
        <groupId>com.heimuheimu</groupId>
        <artifactId>naiveasync</artifactId>
        <version>1.1</version>
    </dependency>

Log4J 配置

log4j.logger.com.heimuheimu.naiveasync=ERROR, NAIVEASYNC
log4j.additivity.com.heimuheimu.naiveasync=false
log4j.appender.NAIVEASYNC=org.apache.log4j.DailyRollingFileAppender
log4j.appender.NAIVEASYNC.file=${log.output.directory}/naiveasync/naiveasync.log
log4j.appender.NAIVEASYNC.encoding=UTF-8
log4j.appender.NAIVEASYNC.DatePattern=_yyyy-MM-dd
log4j.appender.NAIVEASYNC.layout=org.apache.log4j.PatternLayout
log4j.appender.NAIVEASYNC.layout.ConversionPattern=%d{ISO8601} %-5p [%F:%L] : %m%n

Kafka 消费者

Spring 配置

    <!-- Kafka 消费者配置信息,更多配置项可查看 KafkaConsumerConfig 类 -->
    <bean id="kafkaConsumerConfig" class="com.heimuheimu.naiveasync.kafka.consumer.KafkaConsumerConfig">
        <property name="bootstrapServers" value="127.0.0.1:9092,127.0.0.1:9093" /> <!-- Kafka 服务地址-->
        <property name="groupId" value="consumer-group-id" /> <!-- Kafka 消费组 ID-->
    </bean>
    
    <!-- Kafka 消费者监听器,用于消费错误实时报警 -->
    <bean id="kafkaConsumerListener" class="com.heimuheimu.naiveasync.kafka.consumer.NoticeableKafkaConsumerListener">
        <constructor-arg index="0" value="your-project-name" /> <!-- 当前项目名称 -->
        <constructor-arg index="1" ref="notifierList" /> <!-- 报警器列表,关于报警器的信息可查看 naivemonitor 项目 -->
    </bean>
    
    <!-- Kafka 消费者管理器 -->
    <bean id="kafkaAsyncMessageConsumerManager" class="com.heimuheimu.naiveasync.kafka.consumer.KafkaConsumerManager"
    		  init-method="init" destroy-method="close">
        <constructor-arg index="0"> <!-- Kafka 消费者列表,消费者为 com.heimuheimu.naiveasync.consumer.AsyncMessageConsumer<T> 的实现类 -->
            <util:list>
                <bean class="com.heimuheimu.naiveasync.demo.consumer.DemoMessageConsumer" />
            </util:list>
        </constructor-arg>
        <constructor-arg index="1" ref="kafkaConsumerConfig" />
        <constructor-arg index="2" ref="kafkaConsumerListener" />
    </bean>

Falcon 监控数据上报 Spring 配置

    <!-- 监控数据采集器列表 -->
    <util:list id="falconDataCollectorList">
        <!-- 消费者监控数据采集器 -->
        <bean class="com.heimuheimu.naiveasync.monitor.consumer.falcon.AsyncMessageConsumerDataCollector" />
        
        <!-- 如果对具体的消息类型进行额外上报,可进行以下配置
        <bean class="com.heimuheimu.naiveasync.monitor.consumer.falcon.AsyncMessageConsumerDataCollector">
            <constructor-arg index="0">
                <map>
                    <entry key="com.heimuheimu.naiveasync.demo.DemoMessage" value="demo"/>
                    <entry key="com.heimuheimu.naiveasync.demo.TestMessage" value="test"/>
                </map>
            </constructor-arg>
        </bean>
        -->
    </util:list>
    
    <!-- Falcon 监控数据上报器 -->
    <bean id="falconReporter" class="com.heimuheimu.naivemonitor.falcon.FalconReporter" init-method="init" destroy-method="close">
        <constructor-arg index="0" value="http://127.0.0.1:1988/v1/push" /> <!-- Falcon 监控数据推送地址-->
        <constructor-arg index="1" ref="falconDataCollectorList" />
    </bean>

Falcon 上报数据项说明(上报周期:30秒)

如果配置了具体消息类型的上报,将会有以下数据项:

消费者 Log4j 配置

log4j.logger.NAIVE_ASYNC_CONSUMER_INFO_LOG=INFO, NAIVE_ASYNC_CONSUMER_INFO_LOG
log4j.additivity.NAIVE_ASYNC_CONSUMER_INFO_LOG=false
log4j.appender.NAIVE_ASYNC_CONSUMER_INFO_LOG=org.apache.log4j.DailyRollingFileAppender
log4j.appender.NAIVE_ASYNC_CONSUMER_INFO_LOG.file=${log.output.directory}/naiveasync/consumer_info.log
log4j.appender.NAIVE_ASYNC_CONSUMER_INFO_LOG.encoding=UTF-8
log4j.appender.NAIVE_ASYNC_CONSUMER_INFO_LOG.DatePattern=_yyyy-MM-dd
log4j.appender.NAIVE_ASYNC_CONSUMER_INFO_LOG.layout=org.apache.log4j.PatternLayout
log4j.appender.NAIVE_ASYNC_CONSUMER_INFO_LOG.layout.ConversionPattern=%d{ISO8601} %-5p : %m%n

消费者示例代码

单条消息消费者

    public class DemoMessageConsumer extends AbstractMessageConsumer<DemoMessage> {
        
        @Override
        public Class<DemoMessage> getMessageClass() {
            return DemoMessage.class;
        }
        
        @Override
        public void consume(DemoMessage demoMessage) {
            // 进行单条消息的消费,如果抛出异常,在等待 X 秒后,消息将会再次进行推送。
        }   
    }

批量消息消费者

    public class DemoMessageBatchConsumer extends AbstractBatchMessageConsumer<DemoMessage> {
        
        @Override
        public Class<DemoMessage> getMessageClass() {
            return DemoMessage.class;
        }    
    
        @Override
        public void consume(List<DemoMessage> messageList) {
            // 进行批量消息的消费,如果抛出异常,在等待 X 秒后,消息列表将会再次进行推送。
        }   
    }

KafkaConsumerManager 会为每个消费者 实例分配一个线程,如果需要分配多个线程,可继承 AbstractKafkaMessageConsumerAbstractKafkaBatchMessageConsumer 来指定线程数量。

Kafka 生产者

Spring 配置

    <!-- Kafka 消息生产者配置信息,更多配置项可查看 KafkaProducerConfig 类 -->
    <bean id="kafkaProducerConfig" class="com.heimuheimu.naiveasync.kafka.producer.KafkaProducerConfig">
        <property name="bootstrapServers" value="127.0.0.1:9092,127.0.0.1:9093" /> <!-- Kafka 服务地址-->
    </bean>
    
    <!-- Kafka 消息生产者监听器,用于生产错误实时报警 -->
    <bean id="kafkaProducerListener" class="com.heimuheimu.naiveasync.kafka.producer.NoticeableKafkaProducerListener">
        <constructor-arg index="0" value="your-project-name" /> <!-- 当前项目名称 -->
        <constructor-arg index="1" ref="notifierList" /> <!-- 报警器列表,关于报警器的信息可查看 naivemonitor 项目 -->
    </bean>
    
    <!-- Kafka 消息生产者 -->
    <bean id="kafkaAsyncMessageProducer" class="com.heimuheimu.naiveasync.kafka.producer.KafkaProducer" destroy-method="close">
        <constructor-arg index="0" ref="kafkaProducerConfig" />
        <constructor-arg index="1" ref="kafkaProducerListener" />
    </bean>

Falcon 监控数据上报 Spring 配置

    <!-- 监控数据采集器列表 -->
    <util:list id="falconDataCollectorList">
        <!-- 生产者监控数据采集器 -->
        <bean class="com.heimuheimu.naiveasync.monitor.producer.falcon.AsyncMessageProducerDataCollector" />
        
        <!-- 如果对具体的消息类型进行额外上报,可进行以下配置
        <bean class="com.heimuheimu.naiveasync.monitor.producer.falcon.AsyncMessageProducerDataCollector">
            <constructor-arg index="0">
                <map>
                    <entry key="com.heimuheimu.naiveasync.demo.DemoMessage" value="demo"/>
                    <entry key="com.heimuheimu.naiveasync.demo.TestMessage" value="test"/>
                </map>
            </constructor-arg>
        </bean>
        -->
    </util:list>
    
    <!-- Falcon 监控数据上报器 -->
    <bean id="falconReporter" class="com.heimuheimu.naivemonitor.falcon.FalconReporter" init-method="init" destroy-method="close">
        <constructor-arg index="0" value="http://127.0.0.1:1988/v1/push" /> <!-- Falcon 监控数据推送地址-->
        <constructor-arg index="1" ref="falconDataCollectorList" />
    </bean>

Falcon 上报数据项说明(上报周期:30秒)

生产者示例代码

@Service
public class UserService {
    
    @Autowired
    private AsyncMessageProducer asyncMessageProducer;
    
    public void add(User user) { //注意:发送的消息必须是可序列化的(实现 Serializable 接口)
        // balabalabala... 执行添加用户的业务逻辑
        
        asyncMessageProducer.send(user); //发送 User 消息至 Kafka 中,该方法不会抛出任何异常
    }
}

版本发布记录

V1.1

BUG 修复:

新增特性:


V1.0

特性:

更多信息