7d680bc0
zhaowg
增加阿里云配置
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
|
4250eff7
王彪总
1.master1.1.1
|
18
19
20
21
22
23
24
25
26
27
28
29
30
|
package zteits.rocketmq.spring.starter;
import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.METHOD_DESTROY;
import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUMER_GROUP;
import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_MODE;
import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_THREAD_MAX;
import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_MESSAGE_MODEL;
import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_OBJECT_MAPPER;
import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_LISTENER;
import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_TEMPLATE;
import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_EXPRESS;
import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_TYPE;
import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.*;
|
7d680bc0
zhaowg
增加阿里云配置
|
31
32
33
34
|
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
|
fb98bcf4
zhaowg
1.0.6
|
35
|
import java.util.UUID;
|
7d680bc0
zhaowg
增加阿里云配置
|
36
37
38
39
|
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Resource;
|
4250eff7
王彪总
1.master1.1.1
|
40
41
42
43
44
|
import com.aliyun.openservices.shade.com.alibaba.fastjson.JSONObject;
import zteits.rocketmq.spring.starter.annotation.RocketMQMessageListener;
import zteits.rocketmq.spring.starter.core.AliyunRocketMQListenerContainer;
import zteits.rocketmq.spring.starter.core.RocketMQListener;
import zteits.rocketmq.spring.starter.core.RocketMQTemplate;
|
7d680bc0
zhaowg
增加阿里云配置
|
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.util.Assert;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
|
7d680bc0
zhaowg
增加阿里云配置
|
75
76
77
78
79
|
@Order
@Slf4j
public class AliyunRocketMQAutoConfiguration {
@Bean
|
2238632a
王彪总(备用)
C_G
|
80
81
|
@ConditionalOnClass(RocketMQProperties.Producer.class)
@ConditionalOnMissingBean(RocketMQProperties.Producer.class)
|
a2aff61f
zhaowg
整合阿里云RocketMQ SDK
|
82
|
@ConditionalOnProperty(prefix = "spring.rocketmq", value = {"environmentPrefix", "producer.group"})
|
7d680bc0
zhaowg
增加阿里云配置
|
83
|
public Producer mqProducer(RocketMQProperties rocketMQProperties) {
|
4250eff7
王彪总
1.master1.1.1
|
84
|
log.info("注册生产者mqProducer:"+ JSONObject.toJSON(rocketMQProperties));
|
7d680bc0
zhaowg
增加阿里云配置
|
85
86
87
88
|
RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
String groupName = producerConfig.getGroup();
Assert.hasText(groupName, "[spring.rocketmq.producer.group] must not be null");
|
b2ebdd73
zhaowg
兼容原始rocketmq和阿里云环境
|
89
|
String accessKey = rocketMQProperties.getAccessKey();
|
a2aff61f
zhaowg
整合阿里云RocketMQ SDK
|
90
|
Assert.hasText(accessKey, "[spring.rocketmq.accessKey] must not be null");
|
b2ebdd73
zhaowg
兼容原始rocketmq和阿里云环境
|
91
|
String secretKey = rocketMQProperties.getSecretKey();
|
a2aff61f
zhaowg
整合阿里云RocketMQ SDK
|
92
|
Assert.hasText(secretKey, "[spring.rocketmq.secretKey] must not be null");
|
4250eff7
王彪总
1.master1.1.1
|
93
94
95
|
// String onsAddr = rocketMQProperties.getOnsAddr();
String namesrvAddr = rocketMQProperties.getNameSrvAddr();
Assert.hasText(namesrvAddr, "[spring.rocketmq.nameSrvAddr] must not be null");
|
a2aff61f
zhaowg
整合阿里云RocketMQ SDK
|
96
97
|
String environmentPrefix = rocketMQProperties.getEnvironmentPrefix();
Assert.hasText(secretKey, "[spring.rocketmq.environmentPrefix] must not be null");
|
4250eff7
王彪总
1.master1.1.1
|
98
|
|
7d680bc0
zhaowg
增加阿里云配置
|
99
|
Properties producerProperties = new Properties();
|
a2aff61f
zhaowg
整合阿里云RocketMQ SDK
|
100
|
//生成者ProducerId添加前缀:PID_+环境标识_+groupName
|
fb98bcf4
zhaowg
1.0.6
|
101
102
103
|
String pid = "PID_"+environmentPrefix+"_"+groupName;
log.info("注册生产者PID:"+pid);
producerProperties.setProperty(PropertyKeyConst.ProducerId, pid);
|
7d680bc0
zhaowg
增加阿里云配置
|
104
105
|
producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey);
producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey);
|
4250eff7
王彪总
1.master1.1.1
|
106
|
producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
|
2238632a
王彪总(备用)
C_G
|
107
|
//producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsAddr);
|
7d680bc0
zhaowg
增加阿里云配置
|
108
|
Producer producer = ONSFactory.createProducer(producerProperties);
|
7d680bc0
zhaowg
增加阿里云配置
|
109
110
111
112
113
114
115
116
117
118
119
120
121
|
return producer;
}
@Bean
@ConditionalOnClass(ObjectMapper.class)
@ConditionalOnMissingBean(name = "rocketMQMessageObjectMapper")
public ObjectMapper rocketMQMessageObjectMapper() {
return new ObjectMapper();
}
@Bean(destroyMethod = "destroy")
@ConditionalOnBean(Producer.class)
@ConditionalOnMissingBean(name = "rocketMQTemplate")
|
a2aff61f
zhaowg
整合阿里云RocketMQ SDK
|
122
|
public RocketMQTemplate rocketMQTemplate(Producer mqProducer,RocketMQProperties rocketMQProperties,
|
4250eff7
王彪总
1.master1.1.1
|
123
124
125
|
@Autowired(required = false)
@Qualifier("rocketMQMessageObjectMapper")
ObjectMapper objectMapper) {
|
7d680bc0
zhaowg
增加阿里云配置
|
126
|
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
|
b2ebdd73
zhaowg
兼容原始rocketmq和阿里云环境
|
127
|
rocketMQTemplate.setAliyunProducer(mqProducer);
|
a2aff61f
zhaowg
整合阿里云RocketMQ SDK
|
128
|
rocketMQTemplate.setEnvironmentPrefix(rocketMQProperties.getEnvironmentPrefix());
|
7d680bc0
zhaowg
增加阿里云配置
|
129
130
131
132
133
134
135
136
|
if (Objects.nonNull(objectMapper)) {
rocketMQTemplate.setObjectMapper(objectMapper);
}
return rocketMQTemplate;
}
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
|
fb98bcf4
zhaowg
1.0.6
|
137
|
@ConditionalOnProperty(prefix = "spring.rocketmq", value = {"environmentPrefix", "producer.group"})
|
7d680bc0
zhaowg
增加阿里云配置
|
138
139
140
141
142
143
144
145
146
147
148
149
150
|
@Order
public static class ListenerContainerConfiguration implements ApplicationContextAware, InitializingBean {
private ConfigurableApplicationContext applicationContext;
private AtomicLong counter = new AtomicLong(0);
@Resource
private StandardEnvironment environment;
@Resource
private RocketMQProperties rocketMQProperties;
private ObjectMapper objectMapper;
|
4250eff7
王彪总
1.master1.1.1
|
151
|
|
7d680bc0
zhaowg
增加阿里云配置
|
152
153
|
@Autowired
private RocketMQTemplate rocketMQTemplate;
|
4250eff7
王彪总
1.master1.1.1
|
154
|
|
7d680bc0
zhaowg
增加阿里云配置
|
155
156
157
158
159
|
public ListenerContainerConfiguration() {
}
@Autowired(required = false)
public ListenerContainerConfiguration(
|
4250eff7
王彪总
1.master1.1.1
|
160
|
@Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) {
|
7d680bc0
zhaowg
增加阿里云配置
|
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
|
this.objectMapper = objectMapper;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
@Override
public void afterPropertiesSet() {
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
if (Objects.nonNull(beans)) {
beans.forEach(this::registerContainer);
}
}
private void registerContainer(String beanName, Object bean) {
|
4250eff7
王彪总
1.master1.1.1
|
179
180
|
String uuid = UUID.randomUUID().toString();
log.info(uuid+"开始注册消费者,beanName:"+beanName);
|
7d680bc0
zhaowg
增加阿里云配置
|
181
182
183
184
185
|
Class<?> clazz = AopUtils.getTargetClass(bean);
if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
}
|
7d680bc0
zhaowg
增加阿里云配置
|
186
187
|
RocketMQListener rocketMQListener = (RocketMQListener) bean;
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
|
b2ebdd73
zhaowg
兼容原始rocketmq和阿里云环境
|
188
|
BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(AliyunRocketMQListenerContainer.class);
|
4250eff7
王彪总
1.master1.1.1
|
189
190
|
beanBuilder.addPropertyValue(PROP_NAMESRV_Addr, rocketMQProperties.getNameSrvAddr());
// beanBuilder.addPropertyValue(PROP_ONS_Addr, rocketMQProperties.getOnsAddr());
|
fb98bcf4
zhaowg
1.0.6
|
191
192
193
|
String topic = rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.topic());
log.info(uuid+"订阅的主题topic:"+topic);
beanBuilder.addPropertyValue(PROP_TOPIC, topic);
|
defb86b1
王彪总(备用)
C_G
|
194
|
String cid = "GID_"+rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.consumerGroup());
|
fb98bcf4
zhaowg
1.0.6
|
195
|
log.info(uuid+"消费者CID:"+cid);
|
a2aff61f
zhaowg
整合阿里云RocketMQ SDK
|
196
|
//消费者ConsumerId添加前缀:PID_+环境标识_+groupName
|
fb98bcf4
zhaowg
1.0.6
|
197
|
beanBuilder.addPropertyValue(PROP_CONSUMER_GROUP, cid);
|
7d680bc0
zhaowg
增加阿里云配置
|
198
199
200
201
202
203
204
|
beanBuilder.addPropertyValue(PROP_CONSUME_MODE, annotation.consumeMode());
beanBuilder.addPropertyValue(PROP_CONSUME_THREAD_MAX, annotation.consumeThreadMax());
beanBuilder.addPropertyValue(PROP_MESSAGE_MODEL, annotation.messageModel());
beanBuilder.addPropertyValue(PROP_SELECTOR_EXPRESS, environment.resolvePlaceholders(annotation.selectorExpress()));
beanBuilder.addPropertyValue(PROP_SELECTOR_TYPE, annotation.selectorType());
beanBuilder.addPropertyValue(PROP_ROCKETMQ_LISTENER, rocketMQListener);
beanBuilder.addPropertyValue(PROP_ROCKETMQ_TEMPLATE, rocketMQTemplate);
|
a2aff61f
zhaowg
整合阿里云RocketMQ SDK
|
205
|
beanBuilder.addPropertyValue(PROP_ENVIRONMENT_PREFIX, rocketMQProperties.getEnvironmentPrefix());
|
7d680bc0
zhaowg
增加阿里云配置
|
206
207
208
209
|
if (Objects.nonNull(objectMapper)) {
beanBuilder.addPropertyValue(PROP_OBJECT_MAPPER, objectMapper);
}
beanBuilder.setDestroyMethodName(METHOD_DESTROY);
|
b2ebdd73
zhaowg
兼容原始rocketmq和阿里云环境
|
210
|
//增加阿里云key
|
a2aff61f
zhaowg
整合阿里云RocketMQ SDK
|
211
212
|
beanBuilder.addPropertyValue(PROP_ACCESS_KEY, rocketMQProperties.getAccessKey());
beanBuilder.addPropertyValue(PROP_SECRET_KEY, rocketMQProperties.getSecretKey());
|
7d680bc0
zhaowg
增加阿里云配置
|
213
|
|
b2ebdd73
zhaowg
兼容原始rocketmq和阿里云环境
|
214
|
String containerBeanName = String.format("%s_%s", AliyunRocketMQListenerContainer.class.getName(), counter.incrementAndGet());
|
fb98bcf4
zhaowg
1.0.6
|
215
|
log.info("消费者容器beanName:"+containerBeanName);
|
7d680bc0
zhaowg
增加阿里云配置
|
216
217
218
|
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();
beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition());
|
b2ebdd73
zhaowg
兼容原始rocketmq和阿里云环境
|
219
|
AliyunRocketMQListenerContainer container = beanFactory.getBean(containerBeanName, AliyunRocketMQListenerContainer.class);
|
7d680bc0
zhaowg
增加阿里云配置
|
220
221
222
223
224
225
226
227
228
229
230
231
232
233
|
if (!container.isStarted()) {
try {
container.start();
} catch (Exception e) {
log.error("started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
log.info("register rocketMQ listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
}
}
|