/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.spring.starter; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.METHOD_DESTROY; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUMER_GROUP; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_MODE; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_THREAD_MAX; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_MESSAGE_MODEL; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_NAMESERVER; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_OBJECT_MAPPER; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_LISTENER; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_TEMPLATE; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_EXPRESS; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_TYPE; import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_TOPIC; import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Resource; import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.starter.core.AliyunRocketMQListenerContainer; import org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainer; import org.apache.rocketmq.spring.starter.core.RocketMQListener; import org.apache.rocketmq.spring.starter.core.RocketMQTemplate; import org.springframework.aop.support.AopUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.DefaultListableBeanFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.annotation.Order; import org.springframework.core.env.StandardEnvironment; import org.springframework.util.Assert; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; @Configuration @EnableConfigurationProperties(RocketMQProperties.class) @ConditionalOnProperty(name = "spring.rocketmq.aliyun",havingValue="true") @Order @Slf4j public class AliyunRocketMQAutoConfiguration { @Bean @ConditionalOnClass(Producer.class) @ConditionalOnMissingBean(Producer.class) @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"nameServer", "producer.group"}) public Producer mqProducer(RocketMQProperties rocketMQProperties) { RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer(); String groupName = producerConfig.getGroup(); Assert.hasText(groupName, "[spring.rocketmq.producer.group] must not be null"); String accessKey = rocketMQProperties.getAccessKey(); Assert.hasText(accessKey, "[spring.rocketmq.producer.accessKey] must not be null"); String secretKey = rocketMQProperties.getSecretKey(); Assert.hasText(secretKey, "[spring.rocketmq.producer.secretKey] must not be null"); Properties producerProperties = new Properties(); producerProperties.setProperty(PropertyKeyConst.ProducerId, "PID_"+groupName); producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey); producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey); producerProperties.setProperty(PropertyKeyConst.ONSAddr, rocketMQProperties.getNameServer()); Producer producer = ONSFactory.createProducer(producerProperties); return producer; } @Bean @ConditionalOnClass(ObjectMapper.class) @ConditionalOnMissingBean(name = "rocketMQMessageObjectMapper") public ObjectMapper rocketMQMessageObjectMapper() { return new ObjectMapper(); } @Bean(destroyMethod = "destroy") @ConditionalOnBean(Producer.class) @ConditionalOnMissingBean(name = "rocketMQTemplate") public RocketMQTemplate rocketMQTemplate(Producer mqProducer, @Autowired(required = false) @Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) { RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); rocketMQTemplate.setAliyunProducer(mqProducer); if (Objects.nonNull(objectMapper)) { rocketMQTemplate.setObjectMapper(objectMapper); } return rocketMQTemplate; } @Configuration @EnableConfigurationProperties(RocketMQProperties.class) @ConditionalOnProperty(prefix = "spring.rocketmq", value = "nameServer") @Order public static class ListenerContainerConfiguration implements ApplicationContextAware, InitializingBean { private ConfigurableApplicationContext applicationContext; private AtomicLong counter = new AtomicLong(0); @Resource private StandardEnvironment environment; @Resource private RocketMQProperties rocketMQProperties; private ObjectMapper objectMapper; @Autowired private RocketMQTemplate rocketMQTemplate; public ListenerContainerConfiguration() { } @Autowired(required = false) public ListenerContainerConfiguration( @Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) { this.objectMapper = objectMapper; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (ConfigurableApplicationContext) applicationContext; } @Override public void afterPropertiesSet() { Map beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); if (Objects.nonNull(beans)) { beans.forEach(this::registerContainer); } } private void registerContainer(String beanName, Object bean) { Class clazz = AopUtils.getTargetClass(bean); if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); } RocketMQListener rocketMQListener = (RocketMQListener) bean; RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(AliyunRocketMQListenerContainer.class); beanBuilder.addPropertyValue(PROP_NAMESERVER, rocketMQProperties.getNameServer()); beanBuilder.addPropertyValue(PROP_TOPIC, environment.resolvePlaceholders(annotation.topic())); beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, environment.resolvePlaceholders(annotation.consumerGroup())); beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode()); beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax()); beanBuilder.addPropertyValue(PROP_MESSAGE_MODEL, annotation.messageModel()); beanBuilder.addPropertyValue(PROP_SELECTOR_EXPRESS, environment.resolvePlaceholders(annotation.selectorExpress())); beanBuilder.addPropertyValue(PROP_SELECTOR_TYPE, annotation.selectorType()); beanBuilder.addPropertyValue(PROP_ROCKETMQ_LISTENER, rocketMQListener); beanBuilder.addPropertyValue(PROP_ROCKETMQ_TEMPLATE, rocketMQTemplate); if (Objects.nonNull(objectMapper)) { beanBuilder.addPropertyValue(PROP_OBJECT_MAPPER, objectMapper); } beanBuilder.setDestroyMethodName(METHOD_DESTROY); //增加阿里云key beanBuilder.addPropertyValue("accessKey", rocketMQProperties.getAccessKey()); beanBuilder.addPropertyValue("secretKey", rocketMQProperties.getSecretKey()); String containerBeanName = String.format("%s_%s", AliyunRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory(); beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition()); AliyunRocketMQListenerContainer container = beanFactory.getBean(containerBeanName, AliyunRocketMQListenerContainer.class); if (!container.isStarted()) { try { container.start(); } catch (Exception e) { log.error("started container failed. {}", container, e); throw new RuntimeException(e); } } log.info("register rocketMQ listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); } } }