package com.alibaba.cloud.stream.binder.rocketmq.integration;

import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQMessageQueueChooser;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.context.Lifecycle;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.AcknowledgmentCallback;
import org.springframework.integration.support.AcknowledgmentCallbackFactory;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.class */
public class RocketMQMessageSource extends AbstractMessageSource<Object> implements DisposableBean, Lifecycle {
    private static final Logger log = LoggerFactory.getLogger(RocketMQMessageSource.class);
    private final RocketMQCallbackFactory ackCallbackFactory;
    private final RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties;
    private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties;
    private final String topic;
    private final String group;
    private final Object consumerMonitor;
    private DefaultMQPullConsumer consumer;
    private boolean running;
    private MessageSelector messageSelector;
    private RocketMQMessageQueueChooser messageQueueChooser;

    /* renamed from: com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource$2, reason: invalid class name */
    /* loaded from: input_file:com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$rocketmq$common$protocol$heartbeat$MessageModel;
        static final /* synthetic */ int[] $SwitchMap$org$springframework$integration$support$AcknowledgmentCallback$Status = new int[AcknowledgmentCallback.Status.values().length];

        static {
            try {
                $SwitchMap$org$springframework$integration$support$AcknowledgmentCallback$Status[AcknowledgmentCallback.Status.ACCEPT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$springframework$integration$support$AcknowledgmentCallback$Status[AcknowledgmentCallback.Status.REJECT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$springframework$integration$support$AcknowledgmentCallback$Status[AcknowledgmentCallback.Status.REQUEUE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$rocketmq$common$protocol$heartbeat$MessageModel = new int[MessageModel.values().length];
            try {
                $SwitchMap$org$apache$rocketmq$common$protocol$heartbeat$MessageModel[MessageModel.BROADCASTING.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$common$protocol$heartbeat$MessageModel[MessageModel.CLUSTERING.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* loaded from: input_file:com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource$RocketMQAckCallback.class */
    public static class RocketMQAckCallback implements AcknowledgmentCallback {
        private final RocketMQAckInfo ackInfo;
        private boolean acknowledged;
        private boolean autoAckEnabled = true;

        public RocketMQAckCallback(RocketMQAckInfo rocketMQAckInfo) {
            this.ackInfo = rocketMQAckInfo;
        }

        protected void setAcknowledged(boolean z) {
            this.acknowledged = z;
        }

        public boolean isAcknowledged() {
            return this.acknowledged;
        }

        public void noAutoAck() {
            this.autoAckEnabled = false;
        }

        public boolean isAutoAck() {
            return this.autoAckEnabled;
        }

        /* JADX WARN: Finally extract failed */
        public void acknowledge(AcknowledgmentCallback.Status status) {
            Assert.notNull(status, "'status' cannot be null");
            if (this.acknowledged) {
                throw new IllegalStateException("Already acknowledged");
            }
            RocketMQMessageSource.log.debug("acknowledge(" + status.name() + ") for " + this);
            synchronized (this.ackInfo.getConsumerMonitor()) {
                try {
                    try {
                        switch (AnonymousClass2.$SwitchMap$org$springframework$integration$support$AcknowledgmentCallback$Status[status.ordinal()]) {
                            case 1:
                            case 2:
                                this.ackInfo.getConsumer().updateConsumeOffset(this.ackInfo.getMessageQueue(), this.ackInfo.getPullResult().getNextBeginOffset());
                                RocketMQMessageSource.log.debug("messageQueue='{}' offset update to `{}`", this.ackInfo.getMessageQueue(), String.valueOf(this.ackInfo.getPullResult().getNextBeginOffset()));
                                break;
                            case 3:
                                int requeue = this.ackInfo.getMessageQueueChooser().requeue();
                                this.ackInfo.getConsumer().updateConsumeOffset(this.ackInfo.getMessageQueue(), this.ackInfo.getOldOffset());
                                RocketMQMessageSource.log.debug("messageQueue='{}' offset requeue to index:`{}`, oldOffset:'{}'", new Object[]{this.ackInfo.getMessageQueue(), Integer.valueOf(requeue), Long.valueOf(this.ackInfo.getOldOffset())});
                                break;
                        }
                        this.acknowledged = true;
                    } catch (MQClientException e) {
                        RocketMQMessageSource.log.error("acknowledge error: " + e.getErrorMessage(), e);
                        this.acknowledged = true;
                    }
                } catch (Throwable th) {
                    this.acknowledged = true;
                    throw th;
                }
            }
        }

        public String toString() {
            return "RocketMQAckCallback{ackInfo=" + this.ackInfo + ", acknowledged=" + this.acknowledged + ", autoAckEnabled=" + this.autoAckEnabled + '}';
        }
    }

    /* loaded from: input_file:com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource$RocketMQAckInfo.class */
    public class RocketMQAckInfo {
        private final MessageQueue messageQueue;
        private final PullResult pullResult;
        private final DefaultMQPullConsumer consumer;
        private final long oldOffset;

        public RocketMQAckInfo(MessageQueue messageQueue, PullResult pullResult, DefaultMQPullConsumer defaultMQPullConsumer, long j) {
            this.messageQueue = messageQueue;
            this.pullResult = pullResult;
            this.consumer = defaultMQPullConsumer;
            this.oldOffset = j;
        }

        public MessageQueue getMessageQueue() {
            return this.messageQueue;
        }

        public PullResult getPullResult() {
            return this.pullResult;
        }

        public DefaultMQPullConsumer getConsumer() {
            return this.consumer;
        }

        public RocketMQMessageQueueChooser getMessageQueueChooser() {
            return RocketMQMessageSource.this.messageQueueChooser;
        }

        public long getOldOffset() {
            return this.oldOffset;
        }

        public Object getConsumerMonitor() {
            return RocketMQMessageSource.this.consumerMonitor;
        }

        public String toString() {
            return "RocketMQAckInfo{messageQueue=" + this.messageQueue + ", pullResult=" + this.pullResult + ", consumer=" + this.consumer + ", oldOffset=" + this.oldOffset + '}';
        }
    }

    /* loaded from: input_file:com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource$RocketMQCallbackFactory.class */
    public static class RocketMQCallbackFactory implements AcknowledgmentCallbackFactory<RocketMQAckInfo> {
        public AcknowledgmentCallback createCallback(RocketMQAckInfo rocketMQAckInfo) {
            return new RocketMQAckCallback(rocketMQAckInfo);
        }
    }

    public RocketMQMessageSource(RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties, ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties, String str, String str2) {
        this(new RocketMQCallbackFactory(), rocketMQBinderConfigurationProperties, extendedConsumerProperties, str, str2);
    }

    public RocketMQMessageSource(RocketMQCallbackFactory rocketMQCallbackFactory, RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties, ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties, String str, String str2) {
        this.consumerMonitor = new Object();
        this.messageQueueChooser = new RocketMQMessageQueueChooser();
        this.ackCallbackFactory = rocketMQCallbackFactory;
        this.rocketMQBinderConfigurationProperties = rocketMQBinderConfigurationProperties;
        this.rocketMQConsumerProperties = extendedConsumerProperties;
        this.topic = str;
        this.group = str2;
    }

    public synchronized void start() {
        if (isRunning()) {
            throw new IllegalStateException("pull consumer already running. " + toString());
        }
        try {
            this.consumer = new DefaultMQPullConsumer(this.group);
            this.consumer.setNamesrvAddr(this.rocketMQBinderConfigurationProperties.getNameServer());
            this.consumer.setConsumerPullTimeoutMillis(((RocketMQConsumerProperties) this.rocketMQConsumerProperties.getExtension()).getPullTimeout());
            this.consumer.setMessageModel(MessageModel.CLUSTERING);
            String tags = ((RocketMQConsumerProperties) this.rocketMQConsumerProperties.getExtension()).getTags();
            String sql = ((RocketMQConsumerProperties) this.rocketMQConsumerProperties.getExtension()).getSql();
            if (!StringUtils.isEmpty(tags) && !StringUtils.isEmpty(sql)) {
                this.messageSelector = MessageSelector.byTag(tags);
            } else if (!StringUtils.isEmpty(tags)) {
                this.messageSelector = MessageSelector.byTag(tags);
            } else if (!StringUtils.isEmpty(sql)) {
                this.messageSelector = MessageSelector.bySql(sql);
            }
            this.consumer.registerMessageQueueListener(this.topic, new MessageQueueListener() { // from class: com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource.1
                public void messageQueueChanged(String str, Set<MessageQueue> set, Set<MessageQueue> set2) {
                    RocketMQMessageSource.log.info("messageQueueChanged, topic='{}', mqAll=`{}`, mqDivided=`{}`", new Object[]{str, set, set2});
                    switch (AnonymousClass2.$SwitchMap$org$apache$rocketmq$common$protocol$heartbeat$MessageModel[RocketMQMessageSource.this.consumer.getMessageModel().ordinal()]) {
                        case 1:
                            RocketMQMessageSource.this.resetMessageQueues(set);
                            return;
                        case 2:
                            RocketMQMessageSource.this.resetMessageQueues(set2);
                            return;
                        default:
                            return;
                    }
                }
            });
            this.consumer.start();
        } catch (MQClientException e) {
            log.error("DefaultMQPullConsumer startup error: " + e.getMessage(), e);
        }
        setRunning(true);
    }

    public synchronized void stop() {
        if (isRunning()) {
            setRunning(false);
            this.consumer.shutdown();
        }
    }

    public synchronized boolean isRunning() {
        return this.running;
    }

    protected synchronized Object doReceive() {
        MessageQueue choose;
        if (this.messageQueueChooser.getMessageQueues() == null || this.messageQueueChooser.getMessageQueues().size() == 0) {
            return null;
        }
        for (int i = 0; i < this.messageQueueChooser.getMessageQueues().size(); i++) {
            try {
                synchronized (this.consumerMonitor) {
                    choose = this.messageQueueChooser.choose();
                    this.messageQueueChooser.increment();
                }
                long fetchConsumeOffset = this.consumer.fetchConsumeOffset(choose, ((RocketMQConsumerProperties) this.rocketMQConsumerProperties.getExtension()).isFromStore());
                log.debug("topic='{}', group='{}', messageQueue='{}', offset now='{}'", new Object[]{this.topic, this.group, choose, Long.valueOf(fetchConsumeOffset)});
                PullResult pull = this.messageSelector != null ? this.consumer.pull(choose, this.messageSelector, fetchConsumeOffset, 1) : this.consumer.pull(choose, (String) null, fetchConsumeOffset, 1);
                if (pull.getPullStatus() == PullStatus.FOUND) {
                    return MessageBuilder.fromMessage(RocketMQUtil.convertToSpringMessage((MessageExt) pull.getMsgFoundList().get(0))).setHeader("acknowledgmentCallback", this.ackCallbackFactory.createCallback(new RocketMQAckInfo(choose, pull, this.consumer, fetchConsumeOffset))).build();
                }
                log.debug("messageQueue='{}' PullResult='{}' with topic `{}`", new Object[]{this.messageQueueChooser.getMessageQueues(), pull.getPullStatus(), this.topic});
            } catch (Exception e) {
                log.error("Consumer pull error: " + e.getMessage(), e);
                return null;
            }
        }
        return null;
    }

    public String getComponentType() {
        return "rocketmq:message-source";
    }

    public synchronized void setRunning(boolean z) {
        this.running = z;
    }

    public synchronized void resetMessageQueues(Set<MessageQueue> set) {
        log.info("resetMessageQueues, topic='{}', messageQueue=`{}`", this.topic, set);
        synchronized (this.consumerMonitor) {
            this.messageQueueChooser.reset(set);
        }
    }

    public void destroy() throws Exception {
    }
}
