运行时在spring启动rabbitmq创建队列/交换/绑定/监听器

By simon at 2018-02-07 • 0人收藏 • 715人看过

我正在使用rabbitmq的spring-boot。我创造了一些 队列/交换/绑定/听众是固定的。 听者是创造者编辑如下:

@RabbitListener
public void foo(String msg) {...}
现在我想为每个用户在运行时创建队列,只要他登录 与交换/绑定/ listener,并在用户注销时销毁这些内容。怎么能 我在春季启动。

2 个回复 | 最后更新于 2018-02-07
2018-02-07   #1

@SpringBootApplication
public class Application {
要添加的队列名称
 final static String queueName = "spring-boot";
创建队列的bean并给它命名
@Bean
Queue queue() {
    return new Queue(queueName, false);
}
创建topicExchange如果你使用的话题不是队列
@Bean
TopicExchange exchange() {
    return new TopicExchange("spring-boot-exchange");
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(queueName);
}

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
        MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(queueName);
    container.setMessageListener(listenerAdapter);
    return container;
}


public static void main(String[] args) throws InterruptedException {
    SpringApplication.run(Application.class, args);
}

2018-02-07   #2

你不能用@RabbitListener轻松做到这一点(你可以,但你必须创建一个 每个新的子应用程序上下文)。 您可以使用一个RabbitAdmin来动态创建队列和绑定 为每个新队列创建一个消息监听器容器。 编辑 这是@RabbitListener和子上下文的一种方法。使用时 春季启动,ListenerConfig类一定不能是我n同一个包裹(或小孩) 包)作为启动应用程序本身。

@SpringBootApplication
public class So48617898Application {

    public static void main(String[] args) {
        SpringApplication.run(So48617898Application.class, args).close();
    }

    private final Map<String, ConfigurableApplicationContext> children = new HashMap<>();

    @Bean
    public ApplicationRunner runner(RabbitTemplate template, ApplicationContext context) {
        return args -> {
            Scanner scanner = new Scanner(System.in);
            String line = null;
            while (true) {
                System.out.println("Enter a new queue");
                line = scanner.next();
                if ("quit".equals(line)) {
                    break;
                }
                children.put(line, addNewListener(line, context));
                template.convertAndSend(line, "test to " + line);
            }
            scanner.close();
            for (ConfigurableApplicationContext ctx : this.children.values()) {
                ctx.stop();
            }
        };
    }

    private ConfigurableApplicationContext addNewListener(String queue, ApplicationContext context) {
        AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
        child.setParent(context);
        ConfigurableEnvironment environment = child.getEnvironment();
        Properties properties = new Properties();
        properties.setProperty("queue.name", queue);
        PropertiesPropertySource pps = new PropertiesPropertySource("props", properties);
        environment.getPropertySources().addLast(pps);
        child.register(ListenerConfig.class);
        child.refresh();
        return child;
    }

}
@Configuration
@EnableRabbit
public class ListenerConfig {

    @RabbitListener(queues = "${queue.name}")
    public void listen(String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        System.out.println("Received " + in + " from queue " + queue);
    }

    @Bean
    public Queue queue(@Value("${queue.name}") String name) {
        return new Queue(name);
    }

    @Bean
    public RabbitAdmin admin(ConnectionFactory cf) {
        return new RabbitAdmin(cf);
    }

}

登录后方可回帖

Loading...