/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.mns.sample;

import com.aliyun.mns.client.AsyncCallback;
import com.aliyun.mns.client.AsyncResult;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.CloudTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.BatchDeleteException;
import com.aliyun.mns.common.BatchSendException;
import com.aliyun.mns.common.ClientException;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Base64TopicMessage;
import com.aliyun.mns.model.ErrorMessageResult;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.model.PagingListResult;
import com.aliyun.mns.model.QueueMeta;
import com.aliyun.mns.model.SubscriptionMeta;
import com.aliyun.mns.model.TopicMessage;
import com.aliyun.mns.model.TopicMeta;
import com.aliyun.mns.sample.HttpEndpoint;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Sample {
    private static String QUEUE_NAME1 = "java-test-abc";
    private static String QUEUE_NAME2 = "java-test-efg";
    private static String TOPIC_NAME1 = "java-test-topic1";
    private static String TOPIC_NAME2 = "java-test-topic2";
    private static String TOPIC_NAME3 = "java-test-topic10";
    private static String SUB_NAME = "java-test-sub10";
    private MNSClient client = null;

    public Sample() {
        CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccessKeyId(), ServiceSettings.getMNSAccessKeySecret(), ServiceSettings.getMNSAccountEndpoint());
        this.client = account.getMNSClient();
    }

    public void queueOperators() {
        QueueMeta queueMeta;
        CloudQueue queue;
        QueueMeta meta1;
        try {
            meta1 = new QueueMeta();
            meta1.setQueueName(QUEUE_NAME1);
            meta1.setPollingWaitSeconds(15);
            meta1.setMaxMessageSize(2048L);
            CloudQueue queue1 = this.client.createQueue(meta1);
            System.out.println("Queue1 URL: " + queue1.getQueueURL());
            QueueMeta meta2 = new QueueMeta();
            meta2.setQueueName(QUEUE_NAME2);
            meta2.setPollingWaitSeconds(15);
            meta2.setMaxMessageSize(2048L);
            CloudQueue queue2 = this.client.getQueueRef(QUEUE_NAME2);
            String queueURL = queue2.create(meta2);
            System.out.println("Queeu2 URL: " + queueURL);
            queue1 = this.client.createQueue(meta1);
            String queueURL2 = queue2.create(meta2);
            System.out.println("Queue1 URL: " + queue1.getQueueURL());
            System.out.println("Queeu2 URL: " + queueURL2);
        }
        catch (ClientException ex) {
            ex.printStackTrace();
        }
        catch (ServiceException ex) {
            ex.printStackTrace();
        }
        try {
            meta1 = new QueueMeta();
            meta1.setQueueName(QUEUE_NAME1);
            meta1.setPollingWaitSeconds(15);
            meta1.setMaxMessageSize(2048L);
            meta1.setDelaySeconds(30L);
            queue = this.client.getQueueRef(QUEUE_NAME1);
            queue.create(meta1);
        }
        catch (ServiceException ex) {
            System.out.println("CreateQueue: " + QUEUE_NAME1 + ", but " + ex.getErrorCode());
        }
        try {
            QueueMeta meta2 = new QueueMeta();
            meta2.setQueueName(QUEUE_NAME2);
            meta2.setPollingWaitSeconds(15);
            meta2.setMaxMessageSize(2048L);
            meta2.setDelaySeconds(30L);
            this.client.createQueue(meta2);
        }
        catch (ServiceException ex) {
            System.out.println("CreateQueue: " + QUEUE_NAME2 + ", but " + ex.getErrorCode());
        }
        String marker = null;
        do {
            PagingListResult<Object> list = new PagingListResult();
            try {
                list = this.client.listQueueURL("java-test-", marker, 1);
            }
            catch (ClientException ex) {
                ex.printStackTrace();
            }
            catch (ServiceException ex) {
                ex.printStackTrace();
            }
            List queues = list.getResult();
            marker = list.getMarker();
            System.out.println("Result:");
            for (String queue2 : queues) {
                System.out.println(queue2);
            }
        } while (marker != null && marker != "");
        try {
            queue = this.client.getQueueRef(QUEUE_NAME1);
            queueMeta = queue.getAttributes();
            System.out.println(queueMeta.getDelaySeconds());
            System.out.println(queueMeta.getActiveMessages());
            System.out.println(queueMeta.getDelaySeconds());
        }
        catch (ClientException ex) {
            ex.printStackTrace();
        }
        catch (ServiceException ex) {
            ex.printStackTrace();
        }
        try {
            QueueMeta newMeta = new QueueMeta();
            newMeta.setQueueName(QUEUE_NAME1);
            newMeta.setDelaySeconds(30L);
            CloudQueue queue3 = this.client.getQueueRef(QUEUE_NAME1);
            queue3.setAttributes(newMeta);
            QueueMeta queueMeta2 = queue3.getAttributes();
            System.out.println(queueMeta2.getDelaySeconds());
        }
        catch (ClientException ex) {
            ex.printStackTrace();
        }
        catch (ServiceException ex) {
            ex.printStackTrace();
        }
        try {
            queue = this.client.getQueueRef(QUEUE_NAME1);
            queue.delete();
            queue.delete();
            try {
                queueMeta = queue.getAttributes();
                System.out.println(queueMeta.getQueueName());
            }
            catch (ServiceException ex) {
                System.out.println(ex.getErrorCode());
            }
            CloudQueue queue2 = this.client.getQueueRef(QUEUE_NAME2);
            queue2.delete();
        }
        catch (ClientException ex) {
            ex.printStackTrace();
        }
        catch (ServiceException ex) {
            ex.printStackTrace();
        }
    }

    public void messageOperators() {
        try {
            CloudQueue queue = this.client.getQueueRef(QUEUE_NAME1);
            queue.create();
            Message message = new Message();
            message.setMessageBody("message_body");
            Message putMsg = queue.putMessage(message);
            System.out.println("PutMessage has MsgId: " + putMsg.getMessageId());
            Message peekMsg = queue.peekMessage();
            System.out.println("PeekMessage has MsgId: " + peekMsg.getMessageId());
            System.out.println("PeekMessage Body: " + peekMsg.getMessageBodyAsString());
            Message popMsg = queue.popMessage();
            System.out.println("PopMessage Body: " + popMsg.getMessageBodyAsString());
            String receiptHandle = popMsg.getReceiptHandle();
            int visibilityTimeout = 100;
            String rh = queue.changeMessageVisibilityTimeout(receiptHandle, visibilityTimeout);
            System.out.println("ReceiptHandle:" + rh);
            Message popMsg2 = queue.popMessage();
            if (popMsg2 == null) {
                System.out.println("No Message popped!");
            }
            queue.deleteMessage(rh);
            queue.delete();
        }
        catch (ClientException ex) {
            ex.printStackTrace();
        }
        catch (ServiceException ex) {
            ex.printStackTrace();
        }
    }

    public void rawMessageOperators() {
        try {
            CloudQueue queue = this.client.getQueueRef(QUEUE_NAME1);
            queue.create();
            Message message = new Message();
            message.setMessageBody("message_body", Message.MessageBodyType.RAW_STRING);
            Message putMsg = queue.putMessage(message);
            System.out.println("PutMessage has MsgId: " + putMsg.getMessageId());
            Message peekMsg = queue.peekMessage();
            System.out.println("PeekMessage has MsgId: " + peekMsg.getMessageId());
            System.out.println("PeekMessage Body: " + peekMsg.getMessageBodyAsRawString());
            Message popMsg = queue.popMessage();
            System.out.println("PopMessage Body: " + popMsg.getMessageBodyAsRawString());
            String receiptHandle = popMsg.getReceiptHandle();
            int visibilityTimeout = 1000;
            String rh = queue.changeMessageVisibilityTimeout(receiptHandle, visibilityTimeout);
            System.out.println("ReceiptHandle:" + rh);
            Message popMsg2 = queue.popMessage();
            if (popMsg2 == null) {
                System.out.println("No Message popped!");
            }
            queue.deleteMessage(rh);
            for (int round = 0; round < 3; ++round) {
                ArrayList<Message> messages = new ArrayList<Message>();
                for (int id = 0; id < 10; ++id) {
                    Message m = new Message();
                    m.setMessageBody("batch_" + round + "_" + id, Message.MessageBodyType.RAW_STRING);
                    messages.add(m);
                }
                queue.batchPutMessage(messages);
            }
            ArrayList<Message> popedMessages = new ArrayList<Message>();
            List<Message> batchPopMessage = null;
            do {
                if ((batchPopMessage = queue.batchPopMessage(2)) == null) continue;
                popedMessages.addAll(batchPopMessage);
            } while (batchPopMessage != null);
            for (Message m : popedMessages) {
                System.out.println("MessageId: " + m.getMessageId());
                System.out.println("RawString:" + m.getMessageBodyAsRawString());
            }
            queue.delete();
        }
        catch (ClientException ex) {
            ex.printStackTrace();
        }
        catch (ServiceException ex) {
            ex.printStackTrace();
        }
    }

    public void messageBatchOperators() {
        try {
            CloudQueue queue = this.client.getQueueRef(QUEUE_NAME1);
            queue.create();
            int batchMsgSize = 5;
            ArrayList<Message> msgs = new ArrayList<Message>();
            ArrayList<Message> asyncMsgs = new ArrayList<Message>();
            for (int i = 0; i < batchMsgSize; ++i) {
                Message message = new Message();
                message.setMessageBody("message_body_" + i);
                msgs.add(message);
                Message asyncMsg = new Message();
                asyncMsg.setMessageBody("async_message_body_" + i);
                asyncMsgs.add(asyncMsg);
            }
            List<Message> putMsgs = queue.batchPutMessage(msgs);
            for (Message putMsg : putMsgs) {
                System.out.println("PutMessage has MsgId: " + putMsg.getMessageId());
            }
            AsyncCallback<List<Message>> putCallback = new AsyncCallback<List<Message>>(){

                @Override
                public void onSuccess(List<Message> result) {
                    for (Message putMsg : result) {
                        System.out.println("PutMessage has MsgId:" + putMsg.getMessageId());
                    }
                }

                @Override
                public void onFail(Exception ex) {
                    if (ex instanceof BatchSendException) {
                        List<Message> messages = ((BatchSendException)ex).getMessages();
                        for (Message msg : messages) {
                            if (msg.isErrorMessage()) {
                                ErrorMessageResult errorMessageDetail = msg.getErrorMessageDetail();
                                System.out.println("PutMessage Fail. ErrorCode: " + errorMessageDetail.getErrorCode() + " ErrorMessage: " + errorMessageDetail.getErrorMessage());
                                continue;
                            }
                            System.out.println(msg);
                        }
                    } else {
                        System.out.println("AsyncBatchPutMessage Exception: ");
                        ex.printStackTrace();
                    }
                }
            };
            AsyncResult<List<Message>> asyncBatchPutMessage = queue.asyncBatchPutMessage(asyncMsgs, putCallback);
            asyncBatchPutMessage.getResult();
            List<Message> batchPeekMessage = queue.batchPeekMessage(batchMsgSize);
            for (Message peekMsg : batchPeekMessage) {
                System.out.println("PeekMessage has MsgId:" + peekMsg.getMessageId());
            }
            AsyncCallback<List<Message>> peekCallback = new AsyncCallback<List<Message>>(){

                @Override
                public void onSuccess(List<Message> result) {
                    for (Message msg : result) {
                        System.out.println("AsyncBatchPeekMessage has MsgId: " + msg.getMessageId() + "\tMsgbody: " + msg.getMessageBodyAsString());
                    }
                }

                @Override
                public void onFail(Exception ex) {
                    System.out.println("AsyncBatchPeekMessage Exception: ");
                    ex.printStackTrace();
                }
            };
            AsyncResult<List<Message>> asyncBatchPeekMessage = queue.asyncBatchPeekMessage(batchMsgSize, peekCallback);
            asyncBatchPeekMessage.getResult();
            ArrayList<String> receiptsToDelete = new ArrayList<String>();
            List<Message> batchPopMessage = queue.batchPopMessage(batchMsgSize);
            for (Message popMsg : batchPopMessage) {
                System.out.println("PopMessage has MsgId: " + popMsg.getMessageId());
                receiptsToDelete.add(popMsg.getReceiptHandle());
            }
            class AsyncBatchPopCallback
            implements AsyncCallback<List<Message>> {
                public List<String> receipts = new ArrayList<String>();

                AsyncBatchPopCallback() {
                }

                @Override
                public void onSuccess(List<Message> result) {
                    for (Message msg : result) {
                        System.out.println("AsyncBatchPopMessage has MsgId: " + msg.getMessageId());
                        this.receipts.add(msg.getReceiptHandle());
                    }
                }

                @Override
                public void onFail(Exception ex) {
                    System.out.println("AsyncBatchPopMessage Exception: ");
                    ex.printStackTrace();
                }
            }
            AsyncBatchPopCallback popCallback = new AsyncBatchPopCallback();
            AsyncResult<List<Message>> asyncBatchPopMessage = queue.asyncBatchPopMessage(batchMsgSize, popCallback);
            asyncBatchPopMessage.getResult();
            queue.batchDeleteMessage(receiptsToDelete);
            AsyncCallback<Void> deleteCallback = new AsyncCallback<Void>(){

                @Override
                public void onSuccess(Void result) {
                    System.out.println("Async BatchDelete messages success!");
                }

                @Override
                public void onFail(Exception ex) {
                    if (ex instanceof BatchDeleteException) {
                        Map<String, ErrorMessageResult> errorMessages = ((BatchDeleteException)ex).getErrorMessages();
                        for (String receiptHandle : errorMessages.keySet()) {
                            ErrorMessageResult error = errorMessages.get(receiptHandle);
                            System.out.println("ReceiptHandle to delete : " + receiptHandle + ", errorcode: " + error.getErrorCode() + ", errormessage: " + error.getErrorMessage());
                        }
                    } else {
                        System.out.println("AsyncBatchDeleteMessage Exception: ");
                        ex.printStackTrace();
                    }
                }
            };
            ArrayList<String> receiptsWithDeleted = new ArrayList<String>();
            receiptsWithDeleted.addAll(popCallback.receipts);
            receiptsWithDeleted.addAll(receiptsToDelete);
            AsyncResult<Void> asyncBatchDeleteMessage = queue.asyncBatchDeleteMessage(receiptsWithDeleted, deleteCallback);
            asyncBatchDeleteMessage.getResult();
            queue.delete();
        }
        catch (ClientException ex) {
            ex.printStackTrace();
        }
        catch (ServiceException ex) {
            ex.printStackTrace();
        }
    }

    public void multiThreadHandleMsgs() {
        CloudQueue queue = this.client.getQueueRef(QUEUE_NAME1);
        queue.create();
        int numOfMessages = 100;
        int produceTaskNum = 2;
        int consumeTaskNum = 5;
        assert (numOfMessages % produceTaskNum == 0);
        assert (numOfMessages % consumeTaskNum == 0);
        ExecutorService es = Executors.newFixedThreadPool(5);
        int msgNum2SendPerTask = numOfMessages / produceTaskNum;
        while (produceTaskNum-- > 0) {
            ProduceTask produceTask = new ProduceTask(queue, msgNum2SendPerTask);
            es.submit(produceTask);
        }
        int msgNum2ReceivePerTask = numOfMessages / consumeTaskNum;
        while (consumeTaskNum-- > 0) {
            ConsumeTask consumTask = new ConsumeTask(queue, msgNum2ReceivePerTask);
            es.submit(consumTask);
        }
        es.shutdown();
        try {
            es.awaitTermination(1L, TimeUnit.MINUTES);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        queue.delete();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void topicOper() {
        CloudTopic topic = null;
        CloudTopic topic2 = null;
        try {
            TopicMeta meta1 = new TopicMeta();
            meta1.setTopicName(TOPIC_NAME1);
            topic = this.client.createTopic(meta1);
            System.out.println("topic url: " + topic.getTopicURL());
            topic2 = this.client.getTopicRef(TOPIC_NAME2);
            String topicUrl = topic2.create();
            System.out.println("topic url: " + topicUrl);
            TopicMeta meta3 = topic.getAttribute();
            System.out.println("topic url: " + meta3.getTopicURL());
            System.out.println("topic name:" + meta3.getTopicName());
            System.out.println("topic message retention period: " + meta3.getMessageRetentionPeriod());
            meta3.setMaxMessageSize(10240L);
            topic.setAttribute(meta3);
        }
        catch (Exception e) {
            e.printStackTrace();
            System.out.println("create topic error, " + e.getMessage());
        }
        finally {
            if (topic != null) {
                topic.delete();
            }
            if (topic2 != null) {
                topic2.delete();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribeOper() {
        CloudTopic topic = null;
        try {
            TopicMeta meta = new TopicMeta();
            meta.setTopicName(TOPIC_NAME3);
            topic = this.client.createTopic(meta);
            SubscriptionMeta subMeta = new SubscriptionMeta();
            subMeta.setSubscriptionName(SUB_NAME);
            subMeta.setEndpoint(HttpEndpoint.GenEndpointLocal());
            subMeta.setNotifyContentFormat(SubscriptionMeta.NotifyContentFormat.XML);
            String subUrl = topic.subscribe(subMeta);
            System.out.println("subscription url: " + subUrl);
            PagingListResult<SubscriptionMeta> resultM = topic.listSubscriptions(null, "", 10);
            System.out.println(resultM);
            PagingListResult<String> result = topic.listSubscriptionUrls(null, "", 10);
            System.out.println(result);
            SubscriptionMeta sm = topic.getSubscriptionAttr(SUB_NAME);
            System.out.println("subscription notify strategy: " + (Object)((Object)sm.getNotifyStrategy()));
            System.out.println("topic name: " + sm.getTopicName());
            System.out.println("subscription notify content format: " + (Object)((Object)sm.getNotifyContentFormat()));
            sm.setNotifyStrategy(SubscriptionMeta.NotifyStrategy.EXPONENTIAL_DECAY_RETRY);
            topic.setSubscriptionAttr(sm);
            SubscriptionMeta sm2 = topic.getSubscriptionAttr(SUB_NAME);
            System.out.println("subscription notify strategy: " + (Object)((Object)sm2.getNotifyStrategy()));
            System.out.println("topic name: " + sm2.getTopicName());
            System.out.println("subscription notify content format: " + (Object)((Object)sm2.getNotifyContentFormat()));
            topic.unsubscribe(SUB_NAME);
        }
        catch (Exception e) {
            e.printStackTrace();
            System.out.println("subscribe/unsubribe error");
        }
        finally {
            if (topic != null) {
                topic.delete();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void publishMsg() {
        int http_port = 8080;
        HttpEndpoint ep = new HttpEndpoint(http_port);
        CloudTopic topic = null;
        try {
            ep.start();
            System.out.println("start endpoint");
            Thread.sleep(5000L);
            TopicMeta meta = new TopicMeta();
            meta.setTopicName(TOPIC_NAME3);
            topic = this.client.createTopic(meta);
            SubscriptionMeta subMeta = new SubscriptionMeta();
            subMeta.setSubscriptionName(SUB_NAME);
            subMeta.setEndpoint(HttpEndpoint.GenEndpointLocal(http_port));
            topic.subscribe(subMeta);
            TopicMessage msg = new Base64TopicMessage();
            msg.setMessageBody("hello world!" + System.currentTimeMillis());
            msg = topic.publishMessage(msg);
            System.out.println(msg.getMessageId());
            System.out.println(msg.getMessageBodyMD5());
            msg = new Base64TopicMessage();
            msg.setMessageBody(("hello bytes" + System.currentTimeMillis()).getBytes());
            msg = topic.publishMessage(msg);
            System.out.println(msg.getMessageId());
            System.out.println(msg.getMessageBodyMD5());
            Thread.sleep(10000L);
            topic.unsubscribe(SUB_NAME);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (topic != null) {
                topic.delete();
            }
            ep.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void publishMsgUsingSimplifiedFormat() {
        int http_port = 8089;
        HttpEndpoint ep = new HttpEndpoint(http_port);
        CloudTopic topic = null;
        try {
            ep.start();
            System.out.println("start endpoint");
            Thread.sleep(5000L);
            TopicMeta meta = new TopicMeta();
            meta.setTopicName(TOPIC_NAME3);
            topic = this.client.createTopic(meta);
            SubscriptionMeta subMeta = new SubscriptionMeta();
            subMeta.setSubscriptionName(SUB_NAME);
            String endpoint = HttpEndpoint.GenEndpointLocal(http_port) + "/simplified";
            subMeta.setEndpoint(endpoint);
            subMeta.setNotifyContentFormat(SubscriptionMeta.NotifyContentFormat.SIMPLIFIED);
            topic.subscribe(subMeta);
            TopicMessage msg = new Base64TopicMessage();
            msg.setMessageBody("hello world, Simplified message!" + System.currentTimeMillis());
            msg = topic.publishMessage(msg);
            System.out.println(msg.getMessageId());
            System.out.println(msg.getMessageBodyMD5());
            msg = new Base64TopicMessage();
            msg.setMessageBody(("hello bytes" + System.currentTimeMillis()).getBytes());
            msg = topic.publishMessage(msg);
            System.out.println(msg.getMessageId());
            System.out.println(msg.getMessageBodyMD5());
            Thread.sleep(10000L);
            topic.unsubscribe(SUB_NAME);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (topic != null) {
                topic.delete();
            }
            ep.stop();
        }
    }

    public void runQueue() {
        this.queueOperators();
        this.messageOperators();
        this.rawMessageOperators();
        this.messageBatchOperators();
        this.multiThreadHandleMsgs();
    }

    public void runTopic() {
        this.topicOper();
        this.subscribeOper();
        this.publishMsg();
        this.publishMsgUsingSimplifiedFormat();
    }

    public void clear() {
        if (this.client.isOpen()) {
            this.client.close();
        }
    }

    protected class ReceiveDeleteAsyncCallback<T>
    implements AsyncCallback<T> {
        private ConsumeTask mTask;
        private MessageStage mStage;
        private Message mMessage;

        public ReceiveDeleteAsyncCallback(ConsumeTask task) {
            this.mTask = task;
            this.mStage = MessageStage.ReceiveStage;
        }

        @Override
        public void onSuccess(T msg) {
            if (this.mStage == MessageStage.ReceiveStage) {
                this.mMessage = (Message)msg;
                System.out.println("Receive Message " + this.mMessage.getMessageId());
                this.doDelete();
            } else if (this.mStage == MessageStage.DeleteStage) {
                System.out.println("Delete Message " + this.mMessage.getMessageId());
                this.mTask.updateCompleteCount();
            }
        }

        @Override
        public void onFail(Exception ex) {
            System.out.println("Operate Message Fail.");
            if (ex instanceof ServiceException && this.mTask.mQueue.isMessageNotExist((ServiceException)ex)) {
                System.out.println("Stage:" + (Object)((Object)this.mStage));
                if (this.mStage == MessageStage.ReceiveStage) {
                    System.out.println("Continue to receive message.");
                    this.doReceive();
                } else {
                    if (this.mStage == MessageStage.DeleteStage) {
                        System.out.println("Message does not exist when deleting.");
                    }
                    this.mTask.updateCompleteCount();
                }
            } else {
                ex.printStackTrace();
                this.mTask.updateCompleteCount();
            }
        }

        public void doReceive() {
            this.mStage = MessageStage.ReceiveStage;
            this.mTask.mQueue.asyncPopMessage(this);
        }

        public void doDelete() {
            this.mStage = MessageStage.DeleteStage;
            this.mTask.mQueue.asyncDeleteMessage(this.mMessage.getReceiptHandle(), this);
        }
    }

    protected class ConsumeTask
    extends TaskBase
    implements Runnable {
        public CloudQueue mQueue;

        public ConsumeTask(CloudQueue queue, int receiveNum) {
            this.mQueue = queue;
            this.mNum = receiveNum;
        }

        @Override
        public void run() {
            int receiveMsgNum = 0;
            while (receiveMsgNum++ < this.mNum) {
                ReceiveDeleteAsyncCallback<Message> cb = new ReceiveDeleteAsyncCallback<Message>(this);
                AsyncResult<Message> asyncPopMsgResult = this.mQueue.asyncPopMessage(cb);
                if (asyncPopMsgResult != null) continue;
                System.out.println("AsyncPopMessage Fail!");
            }
            this.waitComplete();
        }
    }

    protected class SendAsyncCallback
    implements AsyncCallback<Message> {
        private ProduceTask mTask;

        public SendAsyncCallback(ProduceTask task) {
            this.mTask = task;
        }

        @Override
        public void onSuccess(Message result) {
            System.out.println("Send Message " + result.getMessageId());
            this.mTask.updateCompleteCount();
        }

        @Override
        public void onFail(Exception ex) {
            System.out.println("Send Message Fail.");
            ex.printStackTrace();
            this.mTask.updateCompleteCount();
        }
    }

    public class ProduceTask
    extends TaskBase
    implements Runnable {
        public CloudQueue mQueue;

        public ProduceTask(CloudQueue queue, int sendNum) {
            this.mQueue = queue;
            this.mNum = sendNum;
        }

        @Override
        public void run() {
            int hasSendNum = 0;
            while (hasSendNum++ < this.mNum) {
                Message message = new Message();
                message.setMessageBody("message_body_" + hasSendNum);
                SendAsyncCallback cb = new SendAsyncCallback(this);
                AsyncResult<Message> asyncPutResult = this.mQueue.asyncPutMessage(message, cb);
                if (asyncPutResult != null) continue;
                System.out.println("AsyncSendMessage Fail");
            }
            this.waitComplete();
        }
    }

    public class TaskBase {
        private final AtomicInteger mCount = new AtomicInteger(0);
        private final ReentrantLock mLock = new ReentrantLock();
        private final Condition mCondition = this.mLock.newCondition();
        private final AtomicBoolean mFinished = new AtomicBoolean(false);
        public int mNum;

        public void waitComplete() {
            this.mLock.lock();
            if (!this.mFinished.get()) {
                try {
                    this.mCondition.await();
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            this.mLock.unlock();
        }

        public int updateCompleteCount() {
            int num = this.mCount.incrementAndGet();
            if (num >= this.mNum) {
                this.mLock.lock();
                this.mFinished.set(true);
                this.mCondition.signal();
                this.mLock.unlock();
            }
            return num;
        }
    }

    static enum MessageStage {
        ReceiveStage,
        DeleteStage,
        FinishStage;

    }
}

