package com.yyjz.icop.mq.consumer;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.yyjz.icop.mq.common.MqMessage;
import javax.annotation.Resource;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

/* loaded from: input_file:WEB-INF/lib/icop-mq-0.0.1-SNAPSHOT.jar:com/yyjz/icop/mq/consumer/BaseConsumer.class */
public abstract class BaseConsumer implements InitializingBean, ChannelAwareMessageListener, DisposableBean {
    protected static Logger logger = LoggerFactory.getLogger((Class<?>) BaseConsumer.class);

    @Resource(name = "icopRabbitAdmin")
    private RabbitAdmin rabbitAdmin;

    @Resource(name = "icopRabbitTemplate")
    private RabbitTemplate rabbitTemplate;
    private SimpleMessageListenerContainer listenerContainer;
    public static final String DEFAULT_CHARSET = "UTF-8";

    @Override // org.springframework.beans.factory.InitializingBean
    public void afterPropertiesSet() {
        String[] queueNames = getQueueNames();
        if (ArrayUtils.isEmpty(queueNames)) {
            logger.warn("没有设置需要消费的队列");
            return;
        }
        this.listenerContainer = new SimpleMessageListenerContainer();
        this.listenerContainer.setConnectionFactory(this.rabbitTemplate.getConnectionFactory());
        Queue[] queueArr = new Queue[queueNames.length];
        int length = queueNames.length;
        for (int i = 0; i < length; i++) {
            Queue queue = new Queue(queueNames[i], true, false, false);
            this.rabbitAdmin.declareQueue(queue);
            queueArr[i] = queue;
        }
        this.listenerContainer.addQueues(queueArr);
        this.listenerContainer.setPrefetchCount(1);
        this.listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        this.listenerContainer.setMessageListener(this);
        this.listenerContainer.start();
    }

    @Override // org.springframework.beans.factory.DisposableBean
    public void destroy() {
        logger.debug("关闭监听...");
        if (this.listenerContainer != null) {
            this.listenerContainer.stop();
        }
    }

    @Override // org.springframework.amqp.rabbit.core.ChannelAwareMessageListener
    public void onMessage(Message message, Channel channel) throws Exception {
        String str = new String(message.getBody(), "UTF-8");
        logger.debug("接收到消息：" + str);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        try {
            MqMessage mqMessage = (MqMessage) JSON.parseObject(str, MqMessage.class);
            if (mqMessage == null || mqMessage.getBody() == null) {
                logger.error("消息体为空，舍弃！");
            } else {
                doConsumeMsg(mqMessage);
                logger.debug("消息消费完成");
            }
        } catch (Exception e) {
            logger.error("消息消费失败:", (Throwable) e);
        }
    }

    protected abstract void doConsumeMsg(MqMessage mqMessage);

    protected abstract String[] getQueueNames();
}
