Blame view

src/main/java/zteits/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java 12.3 KB
7d680bc0   zhaowg   增加阿里云配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  /*
   * Licensed to the Apache Software Foundation (ASF) under one or more
   * contributor license agreements.  See the NOTICE file distributed with
   * this work for additional information regarding copyright ownership.
   * The ASF licenses this file to You under the Apache License, Version 2.0
   * (the "License"); you may not use this file except in compliance with
   * the License.  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
4250eff7   王彪总   1.master1.1.1
18
19
20
21
22
23
24
25
26
27
28
29
30
  package zteits.rocketmq.spring.starter;
  
  import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.METHOD_DESTROY;
  import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUMER_GROUP;
  import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_MODE;
  import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_THREAD_MAX;
  import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_MESSAGE_MODEL;
  import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_OBJECT_MAPPER;
  import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_LISTENER;
  import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_TEMPLATE;
  import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_EXPRESS;
  import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_TYPE;
  import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.*;
7d680bc0   zhaowg   增加阿里云配置
31
32
33
34
  
  import java.util.Map;
  import java.util.Objects;
  import java.util.Properties;
fb98bcf4   zhaowg   1.0.6
35
  import java.util.UUID;
7d680bc0   zhaowg   增加阿里云配置
36
37
38
39
  import java.util.concurrent.atomic.AtomicLong;
  
  import javax.annotation.Resource;
  
4250eff7   王彪总   1.master1.1.1
40
41
42
43
44
  import com.aliyun.openservices.shade.com.alibaba.fastjson.JSONObject;
  import zteits.rocketmq.spring.starter.annotation.RocketMQMessageListener;
  import zteits.rocketmq.spring.starter.core.AliyunRocketMQListenerContainer;
  import zteits.rocketmq.spring.starter.core.RocketMQListener;
  import zteits.rocketmq.spring.starter.core.RocketMQTemplate;
7d680bc0   zhaowg   增加阿里云配置
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
  import org.springframework.aop.support.AopUtils;
  import org.springframework.beans.BeansException;
  import org.springframework.beans.factory.InitializingBean;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.beans.factory.annotation.Qualifier;
  import org.springframework.beans.factory.support.BeanDefinitionBuilder;
  import org.springframework.beans.factory.support.DefaultListableBeanFactory;
  import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
  import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
  import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
  import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  import org.springframework.boot.context.properties.EnableConfigurationProperties;
  import org.springframework.context.ApplicationContext;
  import org.springframework.context.ApplicationContextAware;
  import org.springframework.context.ConfigurableApplicationContext;
  import org.springframework.context.annotation.Bean;
  import org.springframework.context.annotation.Configuration;
  import org.springframework.core.annotation.Order;
  import org.springframework.core.env.StandardEnvironment;
  import org.springframework.util.Assert;
  
  import com.aliyun.openservices.ons.api.ONSFactory;
  import com.aliyun.openservices.ons.api.Producer;
  import com.aliyun.openservices.ons.api.PropertyKeyConst;
  import com.fasterxml.jackson.databind.ObjectMapper;
  
  import lombok.extern.slf4j.Slf4j;
  
  @Configuration
  @EnableConfigurationProperties(RocketMQProperties.class)
7d680bc0   zhaowg   增加阿里云配置
75
76
77
78
79
  @Order
  @Slf4j
  public class AliyunRocketMQAutoConfiguration {
  
      @Bean
2238632a   王彪总(备用)   C_G
80
81
      @ConditionalOnClass(RocketMQProperties.Producer.class)
      @ConditionalOnMissingBean(RocketMQProperties.Producer.class)
a2aff61f   zhaowg   整合阿里云RocketMQ SDK
82
      @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"environmentPrefix", "producer.group"})
7d680bc0   zhaowg   增加阿里云配置
83
      public Producer mqProducer(RocketMQProperties rocketMQProperties) {
4250eff7   王彪总   1.master1.1.1
84
          log.info("注册生产者mqProducer:"+ JSONObject.toJSON(rocketMQProperties));
7d680bc0   zhaowg   增加阿里云配置
85
86
87
88
  
          RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
          String groupName = producerConfig.getGroup();
          Assert.hasText(groupName, "[spring.rocketmq.producer.group] must not be null");
b2ebdd73   zhaowg   兼容原始rocketmq和阿里云环境
89
          String accessKey = rocketMQProperties.getAccessKey();
a2aff61f   zhaowg   整合阿里云RocketMQ SDK
90
          Assert.hasText(accessKey, "[spring.rocketmq.accessKey] must not be null");
b2ebdd73   zhaowg   兼容原始rocketmq和阿里云环境
91
          String secretKey = rocketMQProperties.getSecretKey();
a2aff61f   zhaowg   整合阿里云RocketMQ SDK
92
          Assert.hasText(secretKey, "[spring.rocketmq.secretKey] must not be null");
4250eff7   王彪总   1.master1.1.1
93
94
95
          // String onsAddr = rocketMQProperties.getOnsAddr();
          String namesrvAddr = rocketMQProperties.getNameSrvAddr();
          Assert.hasText(namesrvAddr, "[spring.rocketmq.nameSrvAddr] must not be null");
a2aff61f   zhaowg   整合阿里云RocketMQ SDK
96
97
          String environmentPrefix = rocketMQProperties.getEnvironmentPrefix();
          Assert.hasText(secretKey, "[spring.rocketmq.environmentPrefix] must not be null");
4250eff7   王彪总   1.master1.1.1
98
  
7d680bc0   zhaowg   增加阿里云配置
99
          Properties producerProperties = new Properties();
a2aff61f   zhaowg   整合阿里云RocketMQ SDK
100
          //生成者ProducerId添加前缀:PID_+环境标识_+groupName
fb98bcf4   zhaowg   1.0.6
101
102
103
          String pid = "PID_"+environmentPrefix+"_"+groupName;
          log.info("注册生产者PID:"+pid);
          producerProperties.setProperty(PropertyKeyConst.ProducerId, pid);
7d680bc0   zhaowg   增加阿里云配置
104
105
          producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey);
          producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey);
4250eff7   王彪总   1.master1.1.1
106
          producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
2238632a   王彪总(备用)   C_G
107
          //producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsAddr);
7d680bc0   zhaowg   增加阿里云配置
108
          Producer producer = ONSFactory.createProducer(producerProperties);
7d680bc0   zhaowg   增加阿里云配置
109
110
111
112
113
114
115
116
117
118
119
120
121
          return producer;
      }
  
      @Bean
      @ConditionalOnClass(ObjectMapper.class)
      @ConditionalOnMissingBean(name = "rocketMQMessageObjectMapper")
      public ObjectMapper rocketMQMessageObjectMapper() {
          return new ObjectMapper();
      }
  
      @Bean(destroyMethod = "destroy")
      @ConditionalOnBean(Producer.class)
      @ConditionalOnMissingBean(name = "rocketMQTemplate")
a2aff61f   zhaowg   整合阿里云RocketMQ SDK
122
      public RocketMQTemplate rocketMQTemplate(Producer mqProducer,RocketMQProperties rocketMQProperties,
4250eff7   王彪总   1.master1.1.1
123
124
125
                                               @Autowired(required = false)
                                               @Qualifier("rocketMQMessageObjectMapper")
                                               ObjectMapper objectMapper) {
7d680bc0   zhaowg   增加阿里云配置
126
          RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
b2ebdd73   zhaowg   兼容原始rocketmq和阿里云环境
127
          rocketMQTemplate.setAliyunProducer(mqProducer);
a2aff61f   zhaowg   整合阿里云RocketMQ SDK
128
          rocketMQTemplate.setEnvironmentPrefix(rocketMQProperties.getEnvironmentPrefix());
7d680bc0   zhaowg   增加阿里云配置
129
130
131
132
133
134
135
136
          if (Objects.nonNull(objectMapper)) {
              rocketMQTemplate.setObjectMapper(objectMapper);
          }
          return rocketMQTemplate;
      }
  
      @Configuration
      @EnableConfigurationProperties(RocketMQProperties.class)
fb98bcf4   zhaowg   1.0.6
137
      @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"environmentPrefix", "producer.group"})
7d680bc0   zhaowg   增加阿里云配置
138
139
140
141
142
143
144
145
146
147
148
149
150
      @Order
      public static class ListenerContainerConfiguration implements ApplicationContextAware, InitializingBean {
          private ConfigurableApplicationContext applicationContext;
  
          private AtomicLong counter = new AtomicLong(0);
  
          @Resource
          private StandardEnvironment environment;
  
          @Resource
          private RocketMQProperties rocketMQProperties;
  
          private ObjectMapper objectMapper;
4250eff7   王彪总   1.master1.1.1
151
  
7d680bc0   zhaowg   增加阿里云配置
152
153
          @Autowired
          private RocketMQTemplate rocketMQTemplate;
4250eff7   王彪总   1.master1.1.1
154
  
7d680bc0   zhaowg   增加阿里云配置
155
156
157
158
159
          public ListenerContainerConfiguration() {
          }
  
          @Autowired(required = false)
          public ListenerContainerConfiguration(
4250eff7   王彪总   1.master1.1.1
160
                  @Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) {
7d680bc0   zhaowg   增加阿里云配置
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
              this.objectMapper = objectMapper;
          }
  
          @Override
          public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
              this.applicationContext = (ConfigurableApplicationContext) applicationContext;
          }
  
          @Override
          public void afterPropertiesSet() {
              Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
  
              if (Objects.nonNull(beans)) {
                  beans.forEach(this::registerContainer);
              }
          }
  
          private void registerContainer(String beanName, Object bean) {
4250eff7   王彪总   1.master1.1.1
179
180
              String uuid = UUID.randomUUID().toString();
              log.info(uuid+"开始注册消费者,beanName:"+beanName);
7d680bc0   zhaowg   增加阿里云配置
181
182
183
184
185
              Class<?> clazz = AopUtils.getTargetClass(bean);
  
              if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
                  throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
              }
7d680bc0   zhaowg   增加阿里云配置
186
187
              RocketMQListener rocketMQListener = (RocketMQListener) bean;
              RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
b2ebdd73   zhaowg   兼容原始rocketmq和阿里云环境
188
              BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(AliyunRocketMQListenerContainer.class);
4250eff7   王彪总   1.master1.1.1
189
190
              beanBuilder.addPropertyValue(PROP_NAMESRV_Addr, rocketMQProperties.getNameSrvAddr());
              // beanBuilder.addPropertyValue(PROP_ONS_Addr, rocketMQProperties.getOnsAddr());
fb98bcf4   zhaowg   1.0.6
191
192
193
              String topic = rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.topic());
              log.info(uuid+"订阅的主题topic:"+topic);
              beanBuilder.addPropertyValue(PROP_TOPIC, topic);
defb86b1   王彪总(备用)   C_G
194
              String cid = "GID_"+rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.consumerGroup());
fb98bcf4   zhaowg   1.0.6
195
              log.info(uuid+"消费者CID:"+cid);
a2aff61f   zhaowg   整合阿里云RocketMQ SDK
196
              //消费者ConsumerId添加前缀:PID_+环境标识_+groupName
fb98bcf4   zhaowg   1.0.6
197
              beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, cid);
7d680bc0   zhaowg   增加阿里云配置
198
199
200
201
202
203
204
              beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode());
              beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax());
              beanBuilder.addPropertyValue(PROP_MESSAGE_MODEL, annotation.messageModel());
              beanBuilder.addPropertyValue(PROP_SELECTOR_EXPRESS, environment.resolvePlaceholders(annotation.selectorExpress()));
              beanBuilder.addPropertyValue(PROP_SELECTOR_TYPE, annotation.selectorType());
              beanBuilder.addPropertyValue(PROP_ROCKETMQ_LISTENER, rocketMQListener);
              beanBuilder.addPropertyValue(PROP_ROCKETMQ_TEMPLATE, rocketMQTemplate);
a2aff61f   zhaowg   整合阿里云RocketMQ SDK
205
              beanBuilder.addPropertyValue(PROP_ENVIRONMENT_PREFIX, rocketMQProperties.getEnvironmentPrefix());
7d680bc0   zhaowg   增加阿里云配置
206
207
208
209
              if (Objects.nonNull(objectMapper)) {
                  beanBuilder.addPropertyValue(PROP_OBJECT_MAPPER, objectMapper);
              }
              beanBuilder.setDestroyMethodName(METHOD_DESTROY);
b2ebdd73   zhaowg   兼容原始rocketmq和阿里云环境
210
              //增加阿里云key
a2aff61f   zhaowg   整合阿里云RocketMQ SDK
211
212
              beanBuilder.addPropertyValue(PROP_ACCESS_KEY, rocketMQProperties.getAccessKey());
              beanBuilder.addPropertyValue(PROP_SECRET_KEY, rocketMQProperties.getSecretKey());
7d680bc0   zhaowg   增加阿里云配置
213
  
b2ebdd73   zhaowg   兼容原始rocketmq和阿里云环境
214
              String containerBeanName = String.format("%s_%s", AliyunRocketMQListenerContainer.class.getName(), counter.incrementAndGet());
fb98bcf4   zhaowg   1.0.6
215
              log.info("消费者容器beanName:"+containerBeanName);
7d680bc0   zhaowg   增加阿里云配置
216
217
218
              DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();
              beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition());
  
b2ebdd73   zhaowg   兼容原始rocketmq和阿里云环境
219
              AliyunRocketMQListenerContainer container = beanFactory.getBean(containerBeanName, AliyunRocketMQListenerContainer.class);
7d680bc0   zhaowg   增加阿里云配置
220
221
222
223
224
225
226
227
228
229
230
231
232
233
  
              if (!container.isStarted()) {
                  try {
                      container.start();
                  } catch (Exception e) {
                      log.error("started container failed. {}", container, e);
                      throw new RuntimeException(e);
                  }
              }
  
              log.info("register rocketMQ listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
          }
      }
  }