Commit dbe4854c9269cf2754e9b7e0961b89882462d92f
1 parent
2238632a
C_G
Showing
5 changed files
with
20 additions
and
14 deletions
pom.xml
@@ -62,7 +62,7 @@ | @@ -62,7 +62,7 @@ | ||
62 | <dependency> | 62 | <dependency> |
63 | <groupId>com.aliyun.openservices</groupId> | 63 | <groupId>com.aliyun.openservices</groupId> |
64 | <artifactId>ons-client</artifactId> | 64 | <artifactId>ons-client</artifactId> |
65 | - <version>1.7.1.Final</version> | 65 | + <version>1.8.0.Final</version> |
66 | </dependency> | 66 | </dependency> |
67 | <dependency> | 67 | <dependency> |
68 | <groupId>org.springframework</groupId> | 68 | <groupId>org.springframework</groupId> |
@@ -107,12 +107,12 @@ | @@ -107,12 +107,12 @@ | ||
107 | <repository> | 107 | <repository> |
108 | <id>nexus_releases</id> | 108 | <id>nexus_releases</id> |
109 | <name>core Release Repository</name> | 109 | <name>core Release Repository</name> |
110 | - <url>http://maven.renniting.cn/nexus/content/repositories/releases/</url> | 110 | + <url>http://maven.renniting.cn/repository/maven-releases/</url> |
111 | </repository> | 111 | </repository> |
112 | <snapshotRepository> | 112 | <snapshotRepository> |
113 | <id>nexus_snapshots</id> | 113 | <id>nexus_snapshots</id> |
114 | <name>core Snapshots Repository</name> | 114 | <name>core Snapshots Repository</name> |
115 | - <url>http://maven.renniting.cn/nexus/content/repositories/snapshots/</url> | 115 | + <url>http://maven.renniting.cn/repository/maven-snapshots/</url> |
116 | </snapshotRepository> | 116 | </snapshotRepository> |
117 | </distributionManagement> | 117 | </distributionManagement> |
118 | <build> | 118 | <build> |
src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java
@@ -37,10 +37,12 @@ import java.util.concurrent.atomic.AtomicLong; | @@ -37,10 +37,12 @@ import java.util.concurrent.atomic.AtomicLong; | ||
37 | 37 | ||
38 | import javax.annotation.Resource; | 38 | import javax.annotation.Resource; |
39 | 39 | ||
40 | +import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON; | ||
40 | import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener; | 41 | import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener; |
41 | import org.apache.rocketmq.spring.starter.core.AliyunRocketMQListenerContainer; | 42 | import org.apache.rocketmq.spring.starter.core.AliyunRocketMQListenerContainer; |
42 | import org.apache.rocketmq.spring.starter.core.RocketMQListener; | 43 | import org.apache.rocketmq.spring.starter.core.RocketMQListener; |
43 | import org.apache.rocketmq.spring.starter.core.RocketMQTemplate; | 44 | import org.apache.rocketmq.spring.starter.core.RocketMQTemplate; |
45 | +import org.json.JSONObject; | ||
44 | import org.springframework.aop.support.AopUtils; | 46 | import org.springframework.aop.support.AopUtils; |
45 | import org.springframework.beans.BeansException; | 47 | import org.springframework.beans.BeansException; |
46 | import org.springframework.beans.factory.InitializingBean; | 48 | import org.springframework.beans.factory.InitializingBean; |
@@ -88,8 +90,7 @@ public class AliyunRocketMQAutoConfiguration { | @@ -88,8 +90,7 @@ public class AliyunRocketMQAutoConfiguration { | ||
88 | Assert.hasText(accessKey, "[spring.rocketmq.accessKey] must not be null"); | 90 | Assert.hasText(accessKey, "[spring.rocketmq.accessKey] must not be null"); |
89 | String secretKey = rocketMQProperties.getSecretKey(); | 91 | String secretKey = rocketMQProperties.getSecretKey(); |
90 | Assert.hasText(secretKey, "[spring.rocketmq.secretKey] must not be null"); | 92 | Assert.hasText(secretKey, "[spring.rocketmq.secretKey] must not be null"); |
91 | - // String onsAddr = rocketMQProperties.getOnsAddr(); | ||
92 | - String namesrvAdder = rocketMQProperties.getNamesrvAdder(); | 93 | + String onsAddr = rocketMQProperties.getOnsAddr(); |
93 | Assert.hasText(secretKey, "[spring.rocketmq.onsAddr] must not be null"); | 94 | Assert.hasText(secretKey, "[spring.rocketmq.onsAddr] must not be null"); |
94 | String environmentPrefix = rocketMQProperties.getEnvironmentPrefix(); | 95 | String environmentPrefix = rocketMQProperties.getEnvironmentPrefix(); |
95 | Assert.hasText(secretKey, "[spring.rocketmq.environmentPrefix] must not be null"); | 96 | Assert.hasText(secretKey, "[spring.rocketmq.environmentPrefix] must not be null"); |
@@ -101,9 +102,11 @@ public class AliyunRocketMQAutoConfiguration { | @@ -101,9 +102,11 @@ public class AliyunRocketMQAutoConfiguration { | ||
101 | producerProperties.setProperty(PropertyKeyConst.ProducerId, pid); | 102 | producerProperties.setProperty(PropertyKeyConst.ProducerId, pid); |
102 | producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey); | 103 | producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey); |
103 | producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey); | 104 | producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey); |
104 | - producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAdder); | 105 | + producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, onsAddr); |
106 | + log.info("注册生产者producerProperties:"+ JSON.toJSONString(producerProperties)); | ||
105 | //producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsAddr); | 107 | //producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsAddr); |
106 | Producer producer = ONSFactory.createProducer(producerProperties); | 108 | Producer producer = ONSFactory.createProducer(producerProperties); |
109 | + log.info("注册生产者完成:"+ JSON.toJSONString(producer)); | ||
107 | return producer; | 110 | return producer; |
108 | } | 111 | } |
109 | 112 | ||
@@ -176,6 +179,8 @@ public class AliyunRocketMQAutoConfiguration { | @@ -176,6 +179,8 @@ public class AliyunRocketMQAutoConfiguration { | ||
176 | private void registerContainer(String beanName, Object bean) { | 179 | private void registerContainer(String beanName, Object bean) { |
177 | String uuid = UUID.randomUUID().toString(); | 180 | String uuid = UUID.randomUUID().toString(); |
178 | log.info(uuid+"开始注册消费者,beanName:"+beanName); | 181 | log.info(uuid+"开始注册消费者,beanName:"+beanName); |
182 | + log.info(uuid+"开始注册消费者,rocketMQProperties:"+JSON.toJSONString(rocketMQProperties)); | ||
183 | + | ||
179 | Class<?> clazz = AopUtils.getTargetClass(bean); | 184 | Class<?> clazz = AopUtils.getTargetClass(bean); |
180 | 185 | ||
181 | if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { | 186 | if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { |
@@ -184,8 +189,8 @@ public class AliyunRocketMQAutoConfiguration { | @@ -184,8 +189,8 @@ public class AliyunRocketMQAutoConfiguration { | ||
184 | RocketMQListener rocketMQListener = (RocketMQListener) bean; | 189 | RocketMQListener rocketMQListener = (RocketMQListener) bean; |
185 | RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); | 190 | RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); |
186 | BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(AliyunRocketMQListenerContainer.class); | 191 | BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(AliyunRocketMQListenerContainer.class); |
187 | - beanBuilder.addPropertyValue(PROP_NAMESRV_Addr, rocketMQProperties.getNamesrvAdder()); | ||
188 | - // beanBuilder.addPropertyValue(PROP_ONS_Addr, rocketMQProperties.getOnsAddr()); | 192 | + // beanBuilder.addPropertyValue(PropertyKeyConst.NAMESRV_ADDR, rocketMQProperties.getOnsAddr()); |
193 | + beanBuilder.addPropertyValue(PROP_NAMESRV_ADDR, rocketMQProperties.getOnsAddr()); | ||
189 | String topic = rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.topic()); | 194 | String topic = rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.topic()); |
190 | log.info(uuid+"订阅的主题topic:"+topic); | 195 | log.info(uuid+"订阅的主题topic:"+topic); |
191 | beanBuilder.addPropertyValue(PROP_TOPIC, topic); | 196 | beanBuilder.addPropertyValue(PROP_TOPIC, topic); |
src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java
src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java
@@ -86,6 +86,10 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket | @@ -86,6 +86,10 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket | ||
86 | 86 | ||
87 | @Setter | 87 | @Setter |
88 | @Getter | 88 | @Getter |
89 | + private String nameServerAddr; | ||
90 | + | ||
91 | + @Setter | ||
92 | + @Getter | ||
89 | private String topic; | 93 | private String topic; |
90 | 94 | ||
91 | @Setter | 95 | @Setter |
@@ -392,14 +396,14 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket | @@ -392,14 +396,14 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket | ||
392 | 396 | ||
393 | Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required"); | 397 | Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required"); |
394 | Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); | 398 | Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); |
395 | - Assert.notNull(onsAddr, "Property 'nameServer' is required"); | 399 | + Assert.notNull(nameServerAddr, "Property 'nameServer' is required"); |
396 | Assert.notNull(topic, "Property 'topic' is required"); | 400 | Assert.notNull(topic, "Property 'topic' is required"); |
397 | 401 | ||
398 | Properties consumerProperties = new Properties(); | 402 | Properties consumerProperties = new Properties(); |
399 | consumerProperties.setProperty(PropertyKeyConst.ConsumerId, consumerGroup); | 403 | consumerProperties.setProperty(PropertyKeyConst.ConsumerId, consumerGroup); |
400 | consumerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey); | 404 | consumerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey); |
401 | consumerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey); | 405 | consumerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey); |
402 | - consumerProperties.setProperty(PropertyKeyConst.ONSAddr, onsAddr); | 406 | + consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameServerAddr); |
403 | consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums, consumeThreadMax+""); | 407 | consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums, consumeThreadMax+""); |
404 | consumerProperties.setProperty(PropertyKeyConst.MessageModel, messageModel.getModeCN()); | 408 | consumerProperties.setProperty(PropertyKeyConst.MessageModel, messageModel.getModeCN()); |
405 | //允许用户自己设置该consumer的一些配置 | 409 | //允许用户自己设置该consumer的一些配置 |
src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java
@@ -34,10 +34,9 @@ public final class DefaultRocketMQListenerContainerConstants { | @@ -34,10 +34,9 @@ public final class DefaultRocketMQListenerContainerConstants { | ||
34 | public static final String METHOD_DESTROY = "destroy"; | 34 | public static final String METHOD_DESTROY = "destroy"; |
35 | public static final String PROP_ROCKETMQ_TEMPLATE = "rocketMQTemplate"; | 35 | public static final String PROP_ROCKETMQ_TEMPLATE = "rocketMQTemplate"; |
36 | public static final String PROP_ONS_Addr = "onsAddr"; | 36 | public static final String PROP_ONS_Addr = "onsAddr"; |
37 | - | ||
38 | - public static final String PROP_NAMESRV_Addr = "namesrvAdder"; | ||
39 | public static final String PROP_ACCESS_KEY = "accessKey"; | 37 | public static final String PROP_ACCESS_KEY = "accessKey"; |
40 | public static final String PROP_SECRET_KEY = "secretKey"; | 38 | public static final String PROP_SECRET_KEY = "secretKey"; |
39 | + public static final String PROP_NAMESRV_ADDR = "nameServerAddr"; | ||
41 | /** | 40 | /** |
42 | * 环境前缀 | 41 | * 环境前缀 |
43 | */ | 42 | */ |