package com.ejianc.zatopbpm.mq.consumer;

import com.alibaba.fastjson.JSONObject;
import com.ejianc.framework.mq.RabbitMqConfiguration;
import com.rabbitmq.client.Channel;
import javax.annotation.Resource;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:com/ejianc/zatopbpm/mq/consumer/ZatopBaseConsumer.class */
public abstract class ZatopBaseConsumer implements InitializingBean, ChannelAwareMessageListener, DisposableBean {
    protected static Logger logger = LoggerFactory.getLogger(ZatopBaseConsumer.class);

    @Resource(name = "rabbitAdmin")
    private RabbitAdmin rabbitAdmin;

    @Resource(name = "rabbitConfig")
    private RabbitConfig rabbitConfig;

    @Autowired
    private RabbitMqConfiguration mqConfig;
    private SimpleMessageListenerContainer listenerContainer;
    public static final String DEFAULT_CHARSET = "UTF-8";

    public void afterPropertiesSet() {
        logger.info("------------------------------------消息队列监听启动------------------------------------");
        String[] queueNames = getQueueNames();
        if (ArrayUtils.isEmpty(queueNames)) {
            logger.warn("消息队列为空");
            return;
        }
        logger.info("消息队列参数：----host：{}，port：{}，username：{}，virtualHost：{}", new Object[]{this.rabbitConfig.getConnectionFactory().getHost(), Integer.valueOf(this.rabbitConfig.getConnectionFactory().getPort()), this.rabbitConfig.getConnectionFactory().getUsername(), this.rabbitConfig.getConnectionFactory().getVirtualHost()});
        this.listenerContainer = new SimpleMessageListenerContainer();
        this.listenerContainer.setConnectionFactory(this.rabbitConfig.getConnectionFactory());
        Queue[] queueArr = new Queue[queueNames.length];
        int length = queueNames.length;
        for (int i = 0; i < length; i++) {
            Queue queue = new Queue(queueNames[i], true, false, false);
            this.rabbitAdmin.declareQueue(queue);
            queueArr[i] = queue;
        }
        this.listenerContainer.addQueues(queueArr);
        this.listenerContainer.setPrefetchCount(1);
        this.listenerContainer.setMaxConcurrentConsumers(this.mqConfig.getMaxConcurrentConsumers());
        this.listenerContainer.setConcurrentConsumers(this.mqConfig.getConcurrentConsumers());
        this.listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        this.listenerContainer.setMessageListener(this);
        this.listenerContainer.start();
    }

    public void destroy() {
        if (this.listenerContainer != null) {
            this.listenerContainer.stop();
        }
    }

    public void onMessage(Message message, Channel channel) throws Exception {
        String str = new String(message.getBody(), DEFAULT_CHARSET);
        MessageProperties messageProperties = message.getMessageProperties();
        try {
            logger.info("----------msg:{}", str);
            JSONObject parseObject = JSONObject.parseObject(str);
            if (parseObject == null) {
                logger.error("消息体为空，舍弃！");
            } else {
                doConsumeMsg(parseObject);
                logger.debug("消息消费完成");
            }
            channel.basicAck(messageProperties.getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicAck(messageProperties.getDeliveryTag(), true);
            logger.error("消息消费失败:", e);
        }
    }

    protected abstract void doConsumeMsg(JSONObject jSONObject);

    protected abstract String[] getQueueNames();
}
