activemq端口怎么调(activemq详解)

一、下载与安装

直接去官网(
http://activemq.apache.org/)下载最新版本即可,由于这是免安装的,只需要解压就行了。安装完之后进入bin目录,双击 activemq.bat文件(linux下在bin目录下执行 activemq start)

二、访问控制台

在浏览器输入:http://ip:8161/admin/,出现如下界面表示启动成功,默认的用户名密码都是admin

activemq端口怎么调(activemq详解)

三、修改端口号

61616为对外服务端口号

8161为控制器端口号

当端口号冲突时,可以修改这两个端口号。cd conf ,修改activemq.xml 修改里面的61616端口。修改jetty.xml,修改里面的8161端口。

queue队列模式:

和rabbitmq简单队列模式一样,若是有多个消费者消费同一个队列中的消息的话,默认也是轮询机制的消费

示例代码:

public class Productor {
    public static final String BORKER_URL = "tcp://127.0.0.1:61616";
    public static final String QUEUE_NAME = "queue1";
    public static void main(String[] args) throws JMSException {
        //创建工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);
        //创建tcp连接
        Connection connection = factory.createConnection();
        //建立连接
        connection.start();
        /**
         * 创建会话,1.是否开启事务,2.签收模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //创建队列(消息的目的地)
        Queue queue = session.createQueue(QUEUE_NAME);

        //创建生产者
        MessageProducer producer = session.createProducer(queue);

        //消息非持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        //消息持久化 默认是持久化的
//        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        //创建消息
        TextMessage message = session.createTextMessage("你好吗");

        //发送消息
        producer.send(message);

        producer.close();
        session.close();
        connection.close();

        System.out.println("发送成功!");

    }
}


public class Consumer {
    public static final String BORKER_URL = "tcp://127.0.0.1:61616";
    public static final String QUEUE_NAME = "queue1";
    public static void main(String[] args) throws JMSException {
        //创建工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);
        //创建tcp连接
        Connection connection = factory.createConnection();
        //建立连接
        connection.start();
        /**
         * 创建会话,1.是否开启事务,2.签收模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //创建/声明队列(消息的目的地)
        Queue queue = session.createQueue(QUEUE_NAME);

        //创建消费者
        MessageConsumer consumer = session.createConsumer(queue);

        /*while (true) {
            //receive会阻塞线程
            TextMessage message = (TextMessage)consumer.receive();
            System.out.println("接收到消息:" + message.getText());
        }*/

        //监听的方式消费
        consumer.setMessageListener(message -> {
            TextMessage textMessage = (TextMessage)message;
            try {
                System.out.println("1号接收到消息:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }
}

topic队列模式:

称为发布订阅模式,生产者把消息发送给订阅给某个topic主题的消费者,是分发的模式,这种模式默认需要先启动消费者,不然就算生产者发布了某个topic主题的消息,消费者也消费不了;除非消费者提前订阅,并且做了消息持久化的处理,这样后启动消费者才能消费提前推送的消息。

代码:

本文转载自:https://www.gylmap.com

public class Productor {
    public static final String BORKER_URL = "tcp://127.0.0.1:61616";
    public static final String TOPIC_NAME = "topic1";
    public static void main(String[] args) throws JMSException {
        //创建工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);

        //异步投递
        factory.setUseAsyncSend(true);

        //创建tcp连接
        Connection connection = factory.createConnection();
        /**
         * 创建会话,1.是否开启事务,2.签收模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //创建/声明topic(消息的目的地)
        Topic topic = session.createTopic(TOPIC_NAME);

        //创建生产者
        ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(topic);

        //持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        //建立连接
        connection.start();


        //创建消息
        TextMessage message = session.createTextMessage("你好吗");

        //发送消息,异步发送回调函数
        producer.send(message, new AsyncCallback() {
            @Override
            public void onSuccess() {
                System.out.println("success");
            }

            @Override
            public void onException(JMSException e) {
                System.out.println("fail");
            }
        });

        producer.close();
        session.close();
        connection.close();

        System.out.println("发送成功!");

    }
}

public class Consumer1 {
    public static final String BORKER_URL = "tcp://127.0.0.1:61616";
    public static final String TOPIC_NAME = "topic1";
    public static void main(String[] args) throws JMSException {
        //创建工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);

        //创建tcp连接
        Connection connection = factory.createConnection();

        //制定clientId
        connection.setClientID("my");

        /**
         * 创建会话,1.是否开启事务,2.签收模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //创建/声明topic(消息的目的地)
        Topic topic = session.createTopic(TOPIC_NAME);

        //订阅主题
        TopicSubscriber subscriber = session.createDurableSubscriber(topic, "remark");
        //建立连接
        connection.start();
        while (true) {
            //receive会阻塞线程
            //接收订阅的消息
            TextMessage message = (TextMessage) subscriber.receive();
            System.out.println("接收到消息:" + message.getText());
        }

        /*//创建消费者
        MessageConsumer consumer = session.createConsumer(topic);

        //建立连接
        connection.start();

        *//*while (true) {
            //receive会阻塞线程
            TextMessage message = (TextMessage)consumer.receive();
            System.out.println("接收到消息:" + message.getText());
        }*//*

        //监听的方式消费
        consumer.setMessageListener(message -> {
            TextMessage textMessage = (TextMessage)message;
            try {
                System.out.println("1号接收到消息:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });*/
    }
}

如何保证消息的可靠性

回答这个问题主要从持久化,事务,签收这几个方面入手

消息持久化的核心代码:

//queue模式的消息持久化 默认是持久化的
 producer.setDeliveryMode(DeliveryMode.PERSISTENT);
 
 /**
 * topic模式的持久化
 */
Topic topic = session.createTopic(TOPIC_NAME);
ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();

事务的核心代码(偏生产者):

//参数设置成true
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//事务提交
session.commit();

签收的核心代码(偏消费者):

//参数设置成手动提交
connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//消息签收
message.acknowledge();

注意:若是既开启事务,又开启手动签收,以事务为准,只要事务被提交了也默认消息被签收了

性能提升:

1.利用nio的协议比tcp的性能高,

  • 配置方式:在conf目录下activemq.xml照着下面配置

  ...
  
  ...
  • 第二步是代码访问方式由tcp改为nio
//创建工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("nio://127.0.0.1:61616");

2.jdbc+Journaling提高只有jdbc持久化的性能,它在做持久化入数据库之前,会先将数据保存到Journaling文件中,之后才慢慢同步到数据库中,等于中间加了一层缓冲层。

  • 把数据库mysql的驱动包放到lib目录下
  • 配置方式:在conf目录下activemq.xml照着下面配置,其中有个createTablesOnStartup属性,默认值是true,表示每次启动后去数据库自动建表


 //上面是默认配置找到改成下面的配置
 

//下面的代码写在节点中
秒鲨号所有文章资讯、展示的图片素材等内容均为注册用户上传(部分报媒/平媒内容转载自网络合作媒体),仅供学习参考。用户通过本站上传、发布的任何内容的知识产权归属用户或原始著作权人所有。如有侵犯您的版权,请联系我们反馈!本站将在三个工作日内改正。
(0)

大家都在看

  • 差评怎么回复(差评一般怎么回复啊)

      对于淘宝差评,每个商家都不可避免,甚至深恶痛绝,但并不是无可奈何,小编整理了一些淘宝差评回复技巧,都是最实用的解释话术和解决方法。我们一起来看看这个中差评回复技巧吧!    …

    2022年4月16日
  • 考公务员多少分(公务员考试培训机构哪家好)

    这个问题见仁见智。 一、介绍 中公教育:创建于1999年,是一家教育服务业的综合性企业集团,集合面授教学培训、网校远程教育、图书教材及音像制品的出版发行于一体的大型知识产业实体。目…

    2022年5月28日
  • 怎么买养老保险(每年交7000社保15年领多少钱)

    今天和大家讨论的是:农民愿意不愿意缴纳5000元以上的高档次养老保险?5000元以上的高档次养老保险到底合算不合算? 其实,农民愿意不愿意缴5000元以上的高档次养老保险,一是看能…

    2022年4月20日 专栏投稿
  • 找工作什么网站最好(推荐6个给力求职网站)

    受疫情的影响,2022年毕业季堪称“史上最难毕业季”。不仅毕业生就业压力剧增,而且企业招聘人员数量断崖式下降。即便如此,据调查数据显示,2022年应届生薪资期望值高达6139元,较…

    2022年3月16日 专栏投稿
  • 租车怎么租(不要押金的租车平台)

    在互联网和分享经济盛行的当下,租车已经成为不少朋友,尤其是“本本族”自驾出行的一种有效、便捷方式,满足了很多用户群体的出行需求。不管是旅游还是日常出行,选择租车的人也越来越多,汽车…

    2022年4月12日
  • 手机抠图软件哪个好用免费(推荐这5个好用软件)

    随着欢乐的春节结束,一年一度的求职高峰期又来了,本想为过年后精神爽利的自己重新换上证件照附在简历上,但来到图文店门前,我犹豫了…… 倒不是因为我担心影印照片的时间太长或者不想花钱,…

    2022年5月10日 专栏投稿
  • 融e借有额度就能下吗(融e借有额度但审核不过)

    如果工商银行融e借有额度了,那么表示你具备基本的申贷资格,现在大部分网贷都是采用填写资料-出额度,提现-审核,这两种套路。所以有额度并不代表什么,必须要提现成功才算真正到账,那么工…

    2022年6月5日
  • 粉胶多少钱(粉胶84ml)

    雅顿产品也是非常受大家欢迎的,被大家都吹爆了的橘灿精华、 全系列时空胶囊等,但国内价格实在有点贵,所以赶上雅顿美国官网做活动,不要犹豫,直接下手!现在来分享一下小编5折撸到的雅顿粉…

    2022年5月30日
  • 国旅怎么样(中国国际旅行社总社优缺点)

    前两天,我发了一篇关于丹麦的微头条,引起很多人的讨论。 有人说,丹麦对中国人不友好,不能去。 有人说,丹麦在实行群体免疫,不能去。 这些是事实,但也不全对。旅游是一种无国界的精神活…

    2022年4月25日 专栏投稿
  • 怎么开共享(如何用主机共享上网啊)

    以VMware为例,打开主机的控制面板,选择网络和共享中心,更改适配器设置,将所有网络的属性与虚拟机相关的项目打开 3打开虚拟机设置界面,选择网络适配器,选择用于共享主机的ip地址…

    2022年4月4日
品牌推广 在线咨询
返回顶部