Commit 4250eff7859ee908edb04b0b9f5d91fd80c25d81
1 parent
dbe4854c
1.master1.1.1
Showing
22 changed files
with
190 additions
and
125 deletions
.classpath
... | ... | @@ -9,12 +9,14 @@ |
9 | 9 | <classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources"> |
10 | 10 | <attributes> |
11 | 11 | <attribute name="maven.pomderived" value="true"/> |
12 | + <attribute name="optional" value="true"/> | |
12 | 13 | </attributes> |
13 | 14 | </classpathentry> |
14 | 15 | <classpathentry kind="src" output="target/test-classes" path="src/test/java"> |
15 | 16 | <attributes> |
16 | 17 | <attribute name="optional" value="true"/> |
17 | 18 | <attribute name="maven.pomderived" value="true"/> |
19 | + <attribute name="test" value="true"/> | |
18 | 20 | </attributes> |
19 | 21 | </classpathentry> |
20 | 22 | <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8"> |
... | ... | @@ -27,5 +29,29 @@ |
27 | 29 | <attribute name="maven.pomderived" value="true"/> |
28 | 30 | </attributes> |
29 | 31 | </classpathentry> |
32 | + <classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources"> | |
33 | + <attributes> | |
34 | + <attribute name="maven.pomderived" value="true"/> | |
35 | + <attribute name="test" value="true"/> | |
36 | + <attribute name="optional" value="true"/> | |
37 | + </attributes> | |
38 | + </classpathentry> | |
39 | + <classpathentry kind="src" path="target/generated-sources/annotations"> | |
40 | + <attributes> | |
41 | + <attribute name="optional" value="true"/> | |
42 | + <attribute name="maven.pomderived" value="true"/> | |
43 | + <attribute name="ignore_optional_problems" value="true"/> | |
44 | + <attribute name="m2e-apt" value="true"/> | |
45 | + </attributes> | |
46 | + </classpathentry> | |
47 | + <classpathentry kind="src" output="target/test-classes" path="target/generated-test-sources/test-annotations"> | |
48 | + <attributes> | |
49 | + <attribute name="optional" value="true"/> | |
50 | + <attribute name="maven.pomderived" value="true"/> | |
51 | + <attribute name="ignore_optional_problems" value="true"/> | |
52 | + <attribute name="m2e-apt" value="true"/> | |
53 | + <attribute name="test" value="true"/> | |
54 | + </attributes> | |
55 | + </classpathentry> | |
30 | 56 | <classpathentry kind="output" path="target/classes"/> |
31 | 57 | </classpath> | ... | ... |
.project
... | ... | @@ -20,4 +20,15 @@ |
20 | 20 | <nature>org.eclipse.jdt.core.javanature</nature> |
21 | 21 | <nature>org.eclipse.m2e.core.maven2Nature</nature> |
22 | 22 | </natures> |
23 | + <filteredResources> | |
24 | + <filter> | |
25 | + <id>1751124793989</id> | |
26 | + <name></name> | |
27 | + <type>30</type> | |
28 | + <matcher> | |
29 | + <id>org.eclipse.core.resources.regexFilterMatcher</id> | |
30 | + <arguments>node_modules|\.git|__CREATED_BY_JAVA_LANGUAGE_SERVER__</arguments> | |
31 | + </matcher> | |
32 | + </filter> | |
33 | + </filteredResources> | |
23 | 34 | </projectDescription> | ... | ... |
pom.xml
... | ... | @@ -20,9 +20,9 @@ |
20 | 20 | xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
21 | 21 | <modelVersion>4.0.0</modelVersion> |
22 | 22 | |
23 | - <groupId>org.apache.rocketmq</groupId> | |
23 | + <groupId>com.zteits.rocketmq</groupId> | |
24 | 24 | <artifactId>spring-boot-starter-rocketmq</artifactId> |
25 | - <version>1.0.8-SNAPSHOT</version> | |
25 | + <version>1.1.1</version> | |
26 | 26 | |
27 | 27 | <name>Spring Boot Rocket Starter</name> |
28 | 28 | <description>Starter for messaging using Apache RocketMQ</description> |
... | ... | @@ -107,12 +107,12 @@ |
107 | 107 | <repository> |
108 | 108 | <id>nexus_releases</id> |
109 | 109 | <name>core Release Repository</name> |
110 | - <url>http://maven.renniting.cn/repository/maven-releases/</url> | |
110 | + <url>https://maven2.renniting.cn/repository/maven-releases/</url> | |
111 | 111 | </repository> |
112 | 112 | <snapshotRepository> |
113 | 113 | <id>nexus_snapshots</id> |
114 | 114 | <name>core Snapshots Repository</name> |
115 | - <url>http://maven.renniting.cn/repository/maven-snapshots/</url> | |
115 | + <url>https://maven2.renniting.cn/repository/maven-snapshots/</url> | |
116 | 116 | </snapshotRepository> |
117 | 117 | </distributionManagement> |
118 | 118 | <build> |
... | ... | @@ -169,4 +169,4 @@ |
169 | 169 | </build> |
170 | 170 | </profile> |
171 | 171 | </profiles> |
172 | -</project> | |
173 | 172 | \ No newline at end of file |
173 | +</project> | ... | ... |
src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java renamed to src/main/java/zteits/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java
... | ... | @@ -15,19 +15,19 @@ |
15 | 15 | * limitations under the License. |
16 | 16 | */ |
17 | 17 | |
18 | -package org.apache.rocketmq.spring.starter; | |
19 | - | |
20 | -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.METHOD_DESTROY; | |
21 | -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUMER_GROUP; | |
22 | -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_MODE; | |
23 | -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_THREAD_MAX; | |
24 | -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_MESSAGE_MODEL; | |
25 | -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_OBJECT_MAPPER; | |
26 | -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_LISTENER; | |
27 | -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_TEMPLATE; | |
28 | -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_EXPRESS; | |
29 | -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_TYPE; | |
30 | -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.*; | |
18 | +package zteits.rocketmq.spring.starter; | |
19 | + | |
20 | +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.METHOD_DESTROY; | |
21 | +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUMER_GROUP; | |
22 | +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_MODE; | |
23 | +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_THREAD_MAX; | |
24 | +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_MESSAGE_MODEL; | |
25 | +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_OBJECT_MAPPER; | |
26 | +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_LISTENER; | |
27 | +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_TEMPLATE; | |
28 | +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_EXPRESS; | |
29 | +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_TYPE; | |
30 | +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.*; | |
31 | 31 | |
32 | 32 | import java.util.Map; |
33 | 33 | import java.util.Objects; |
... | ... | @@ -37,12 +37,11 @@ import java.util.concurrent.atomic.AtomicLong; |
37 | 37 | |
38 | 38 | import javax.annotation.Resource; |
39 | 39 | |
40 | -import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON; | |
41 | -import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener; | |
42 | -import org.apache.rocketmq.spring.starter.core.AliyunRocketMQListenerContainer; | |
43 | -import org.apache.rocketmq.spring.starter.core.RocketMQListener; | |
44 | -import org.apache.rocketmq.spring.starter.core.RocketMQTemplate; | |
45 | -import org.json.JSONObject; | |
40 | +import com.aliyun.openservices.shade.com.alibaba.fastjson.JSONObject; | |
41 | +import zteits.rocketmq.spring.starter.annotation.RocketMQMessageListener; | |
42 | +import zteits.rocketmq.spring.starter.core.AliyunRocketMQListenerContainer; | |
43 | +import zteits.rocketmq.spring.starter.core.RocketMQListener; | |
44 | +import zteits.rocketmq.spring.starter.core.RocketMQTemplate; | |
46 | 45 | import org.springframework.aop.support.AopUtils; |
47 | 46 | import org.springframework.beans.BeansException; |
48 | 47 | import org.springframework.beans.factory.InitializingBean; |
... | ... | @@ -82,6 +81,7 @@ public class AliyunRocketMQAutoConfiguration { |
82 | 81 | @ConditionalOnMissingBean(RocketMQProperties.Producer.class) |
83 | 82 | @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"environmentPrefix", "producer.group"}) |
84 | 83 | public Producer mqProducer(RocketMQProperties rocketMQProperties) { |
84 | + log.info("注册生产者mqProducer:"+ JSONObject.toJSON(rocketMQProperties)); | |
85 | 85 | |
86 | 86 | RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer(); |
87 | 87 | String groupName = producerConfig.getGroup(); |
... | ... | @@ -90,11 +90,12 @@ public class AliyunRocketMQAutoConfiguration { |
90 | 90 | Assert.hasText(accessKey, "[spring.rocketmq.accessKey] must not be null"); |
91 | 91 | String secretKey = rocketMQProperties.getSecretKey(); |
92 | 92 | Assert.hasText(secretKey, "[spring.rocketmq.secretKey] must not be null"); |
93 | - String onsAddr = rocketMQProperties.getOnsAddr(); | |
94 | - Assert.hasText(secretKey, "[spring.rocketmq.onsAddr] must not be null"); | |
93 | + // String onsAddr = rocketMQProperties.getOnsAddr(); | |
94 | + String namesrvAddr = rocketMQProperties.getNameSrvAddr(); | |
95 | + Assert.hasText(namesrvAddr, "[spring.rocketmq.nameSrvAddr] must not be null"); | |
95 | 96 | String environmentPrefix = rocketMQProperties.getEnvironmentPrefix(); |
96 | 97 | Assert.hasText(secretKey, "[spring.rocketmq.environmentPrefix] must not be null"); |
97 | - | |
98 | + | |
98 | 99 | Properties producerProperties = new Properties(); |
99 | 100 | //生成者ProducerId添加前缀:PID_+环境标识_+groupName |
100 | 101 | String pid = "PID_"+environmentPrefix+"_"+groupName; |
... | ... | @@ -102,11 +103,9 @@ public class AliyunRocketMQAutoConfiguration { |
102 | 103 | producerProperties.setProperty(PropertyKeyConst.ProducerId, pid); |
103 | 104 | producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey); |
104 | 105 | producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey); |
105 | - producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, onsAddr); | |
106 | - log.info("注册生产者producerProperties:"+ JSON.toJSONString(producerProperties)); | |
106 | + producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr); | |
107 | 107 | //producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsAddr); |
108 | 108 | Producer producer = ONSFactory.createProducer(producerProperties); |
109 | - log.info("注册生产者完成:"+ JSON.toJSONString(producer)); | |
110 | 109 | return producer; |
111 | 110 | } |
112 | 111 | |
... | ... | @@ -121,9 +120,9 @@ public class AliyunRocketMQAutoConfiguration { |
121 | 120 | @ConditionalOnBean(Producer.class) |
122 | 121 | @ConditionalOnMissingBean(name = "rocketMQTemplate") |
123 | 122 | public RocketMQTemplate rocketMQTemplate(Producer mqProducer,RocketMQProperties rocketMQProperties, |
124 | - @Autowired(required = false) | |
125 | - @Qualifier("rocketMQMessageObjectMapper") | |
126 | - ObjectMapper objectMapper) { | |
123 | + @Autowired(required = false) | |
124 | + @Qualifier("rocketMQMessageObjectMapper") | |
125 | + ObjectMapper objectMapper) { | |
127 | 126 | RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); |
128 | 127 | rocketMQTemplate.setAliyunProducer(mqProducer); |
129 | 128 | rocketMQTemplate.setEnvironmentPrefix(rocketMQProperties.getEnvironmentPrefix()); |
... | ... | @@ -149,16 +148,16 @@ public class AliyunRocketMQAutoConfiguration { |
149 | 148 | private RocketMQProperties rocketMQProperties; |
150 | 149 | |
151 | 150 | private ObjectMapper objectMapper; |
152 | - | |
151 | + | |
153 | 152 | @Autowired |
154 | 153 | private RocketMQTemplate rocketMQTemplate; |
155 | - | |
154 | + | |
156 | 155 | public ListenerContainerConfiguration() { |
157 | 156 | } |
158 | 157 | |
159 | 158 | @Autowired(required = false) |
160 | 159 | public ListenerContainerConfiguration( |
161 | - @Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) { | |
160 | + @Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) { | |
162 | 161 | this.objectMapper = objectMapper; |
163 | 162 | } |
164 | 163 | |
... | ... | @@ -177,10 +176,8 @@ public class AliyunRocketMQAutoConfiguration { |
177 | 176 | } |
178 | 177 | |
179 | 178 | private void registerContainer(String beanName, Object bean) { |
180 | - String uuid = UUID.randomUUID().toString(); | |
181 | - log.info(uuid+"开始注册消费者,beanName:"+beanName); | |
182 | - log.info(uuid+"开始注册消费者,rocketMQProperties:"+JSON.toJSONString(rocketMQProperties)); | |
183 | - | |
179 | + String uuid = UUID.randomUUID().toString(); | |
180 | + log.info(uuid+"开始注册消费者,beanName:"+beanName); | |
184 | 181 | Class<?> clazz = AopUtils.getTargetClass(bean); |
185 | 182 | |
186 | 183 | if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { |
... | ... | @@ -189,8 +186,8 @@ public class AliyunRocketMQAutoConfiguration { |
189 | 186 | RocketMQListener rocketMQListener = (RocketMQListener) bean; |
190 | 187 | RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); |
191 | 188 | BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(AliyunRocketMQListenerContainer.class); |
192 | - // beanBuilder.addPropertyValue(PropertyKeyConst.NAMESRV_ADDR, rocketMQProperties.getOnsAddr()); | |
193 | - beanBuilder.addPropertyValue(PROP_NAMESRV_ADDR, rocketMQProperties.getOnsAddr()); | |
189 | + beanBuilder.addPropertyValue(PROP_NAMESRV_Addr, rocketMQProperties.getNameSrvAddr()); | |
190 | + // beanBuilder.addPropertyValue(PROP_ONS_Addr, rocketMQProperties.getOnsAddr()); | |
194 | 191 | String topic = rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.topic()); |
195 | 192 | log.info(uuid+"订阅的主题topic:"+topic); |
196 | 193 | beanBuilder.addPropertyValue(PROP_TOPIC, topic); | ... | ... |
src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java renamed to src/main/java/zteits/rocketmq/spring/starter/RocketMQProperties.java
... | ... | @@ -15,7 +15,7 @@ |
15 | 15 | * limitations under the License. |
16 | 16 | */ |
17 | 17 | |
18 | -package org.apache.rocketmq.spring.starter; | |
18 | +package zteits.rocketmq.spring.starter; | |
19 | 19 | |
20 | 20 | import lombok.Data; |
21 | 21 | import org.springframework.boot.context.properties.ConfigurationProperties; |
... | ... | @@ -24,16 +24,18 @@ import org.springframework.boot.context.properties.ConfigurationProperties; |
24 | 24 | @ConfigurationProperties(prefix = "spring.rocketmq") |
25 | 25 | @Data |
26 | 26 | public class RocketMQProperties { |
27 | - /** | |
28 | - * 环境前缀 | |
29 | - */ | |
30 | - private String environmentPrefix; | |
31 | - /** | |
27 | + /** | |
28 | + * 环境前缀 | |
29 | + */ | |
30 | + private String environmentPrefix; | |
31 | + /** | |
32 | 32 | * 消息队列服务接入点 |
33 | 33 | */ |
34 | - private String onsAddr; | |
34 | + private String onsAddr; | |
35 | + | |
36 | + private String nameSrvAddr; | |
35 | 37 | |
36 | - /** | |
38 | + /** | |
37 | 39 | * AccessKey, 用于标识、校验用户身份 |
38 | 40 | */ |
39 | 41 | private String accessKey; |
... | ... | @@ -41,7 +43,7 @@ public class RocketMQProperties { |
41 | 43 | * SecretKey, 用于标识、校验用户身份 |
42 | 44 | */ |
43 | 45 | private String secretKey; |
44 | - | |
46 | + | |
45 | 47 | private Producer producer; |
46 | 48 | @Data |
47 | 49 | public static class Producer { |
... | ... | @@ -82,5 +84,32 @@ public class RocketMQProperties { |
82 | 84 | * Maximum allowed message size in bytes. |
83 | 85 | */ |
84 | 86 | private int maxMessageSize = 1024 * 1024 * 4; // 4M |
87 | + | |
88 | + /** | |
89 | + * 消费失败消息主题 | |
90 | + */ | |
91 | + private String consumeFailedTopic = "ZTEITS_RNT_CLOUD"; | |
92 | + | |
93 | + /** | |
94 | + * 消费失败消息标签 | |
95 | + */ | |
96 | + private String consumeFailedTag = "ConsumeMsgFailed"; | |
97 | + | |
98 | + // 对应的getter和setter方法 | |
99 | + public String getConsumeFailedTopic() { | |
100 | + return consumeFailedTopic; | |
101 | + } | |
102 | + | |
103 | + public void setConsumeFailedTopic(String consumeFailedTopic) { | |
104 | + this.consumeFailedTopic = consumeFailedTopic; | |
105 | + } | |
106 | + | |
107 | + public String getConsumeFailedTag() { | |
108 | + return consumeFailedTag; | |
109 | + } | |
110 | + | |
111 | + public void setConsumeFailedTag(String consumeFailedTag) { | |
112 | + this.consumeFailedTag = consumeFailedTag; | |
113 | + } | |
85 | 114 | } |
86 | 115 | } | ... | ... |
src/main/java/org/apache/rocketmq/spring/starter/annotation/RocketMQMessageListener.java renamed to src/main/java/zteits/rocketmq/spring/starter/annotation/RocketMQMessageListener.java
... | ... | @@ -15,7 +15,7 @@ |
15 | 15 | * limitations under the License. |
16 | 16 | */ |
17 | 17 | |
18 | -package org.apache.rocketmq.spring.starter.annotation; | |
18 | +package zteits.rocketmq.spring.starter.annotation; | |
19 | 19 | |
20 | 20 | import java.lang.annotation.Documented; |
21 | 21 | import java.lang.annotation.ElementType; |
... | ... | @@ -23,8 +23,8 @@ import java.lang.annotation.Retention; |
23 | 23 | import java.lang.annotation.RetentionPolicy; |
24 | 24 | import java.lang.annotation.Target; |
25 | 25 | |
26 | -import org.apache.rocketmq.spring.starter.enums.ConsumeMode; | |
27 | -import org.apache.rocketmq.spring.starter.enums.SelectorType; | |
26 | +import zteits.rocketmq.spring.starter.enums.ConsumeMode; | |
27 | +import zteits.rocketmq.spring.starter.enums.SelectorType; | |
28 | 28 | |
29 | 29 | import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; |
30 | 30 | ... | ... |
src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java renamed to src/main/java/zteits/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java
... | ... | @@ -15,10 +15,10 @@ |
15 | 15 | * limitations under the License. |
16 | 16 | */ |
17 | 17 | |
18 | -package org.apache.rocketmq.spring.starter.core; | |
18 | +package zteits.rocketmq.spring.starter.core; | |
19 | 19 | |
20 | -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.CONSUMEFAILED_TAG; | |
21 | -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.CONSUMEFAILED_TOPIC; | |
20 | +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.CONSUMEFAILED_TAG; | |
21 | +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.CONSUMEFAILED_TOPIC; | |
22 | 22 | |
23 | 23 | import java.lang.reflect.ParameterizedType; |
24 | 24 | import java.lang.reflect.Type; |
... | ... | @@ -28,12 +28,12 @@ import java.util.List; |
28 | 28 | import java.util.Objects; |
29 | 29 | import java.util.Properties; |
30 | 30 | |
31 | -import org.apache.rocketmq.spring.starter.enums.ConsumeMode; | |
32 | -import org.apache.rocketmq.spring.starter.enums.SelectorType; | |
33 | -import org.apache.rocketmq.spring.starter.exception.ConvertMsgException; | |
34 | -import org.apache.rocketmq.spring.starter.msgvo.ConsumeFailedMsgVO; | |
35 | -import org.apache.rocketmq.spring.starter.utils.ExceptionUtil; | |
36 | -import org.apache.rocketmq.spring.starter.utils.IPUtil; | |
31 | +import zteits.rocketmq.spring.starter.enums.ConsumeMode; | |
32 | +import zteits.rocketmq.spring.starter.enums.SelectorType; | |
33 | +import zteits.rocketmq.spring.starter.exception.ConvertMsgException; | |
34 | +import zteits.rocketmq.spring.starter.msgvo.ConsumeFailedMsgVO; | |
35 | +import zteits.rocketmq.spring.starter.utils.ExceptionUtil; | |
36 | +import zteits.rocketmq.spring.starter.utils.IPUtil; | |
37 | 37 | import org.springframework.beans.factory.InitializingBean; |
38 | 38 | import org.springframework.util.Assert; |
39 | 39 | import org.springframework.util.StringUtils; |
... | ... | @@ -51,7 +51,6 @@ import com.aliyun.openservices.ons.api.order.ConsumeOrderContext; |
51 | 51 | import com.aliyun.openservices.ons.api.order.MessageOrderListener; |
52 | 52 | import com.aliyun.openservices.ons.api.order.OrderAction; |
53 | 53 | import com.aliyun.openservices.ons.api.order.OrderConsumer; |
54 | -import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.MessageSelector; | |
55 | 54 | import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQClientException; |
56 | 55 | import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; |
57 | 56 | import com.fasterxml.jackson.databind.ObjectMapper; |
... | ... | @@ -73,7 +72,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket |
73 | 72 | */ |
74 | 73 | @Setter |
75 | 74 | private String secretKey; |
76 | - | |
75 | + | |
77 | 76 | @Setter |
78 | 77 | @Getter |
79 | 78 | private String consumerGroup; |
... | ... | @@ -84,9 +83,13 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket |
84 | 83 | @Getter |
85 | 84 | private String onsAddr; |
86 | 85 | |
86 | +// @Setter | |
87 | +// @Getter | |
88 | +// private String nameServerAddr; | |
89 | + | |
87 | 90 | @Setter |
88 | 91 | @Getter |
89 | - private String nameServerAddr; | |
92 | + private String nameSrvAddr; | |
90 | 93 | |
91 | 94 | @Setter |
92 | 95 | @Getter |
... | ... | @@ -132,7 +135,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket |
132 | 135 | private OrderConsumer orderConsumer; |
133 | 136 | /**批量消息*/ |
134 | 137 | private BatchConsumer batchConsumer; |
135 | - | |
138 | + | |
136 | 139 | private Class messageType; |
137 | 140 | /** |
138 | 141 | * 环境前缀 |
... | ... | @@ -142,7 +145,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket |
142 | 145 | |
143 | 146 | @Setter |
144 | 147 | private RocketMQTemplate rocketMQTemplate; |
145 | - | |
148 | + | |
146 | 149 | public void setupMessageListener(RocketMQListener rocketMQListener) { |
147 | 150 | this.rocketMQListener = rocketMQListener; |
148 | 151 | } |
... | ... | @@ -214,7 +217,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket |
214 | 217 | this.sendConsumeMsgFailed(message,e,consumeBeginTime); |
215 | 218 | return Action.ReconsumeLater; |
216 | 219 | } |
217 | - | |
220 | + | |
218 | 221 | return Action.CommitMessage; |
219 | 222 | } |
220 | 223 | /** |
... | ... | @@ -225,8 +228,8 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket |
225 | 228 | */ |
226 | 229 | private void sendConsumeMsgFailed(Message message, Exception e,Date consumeBeginTime) { |
227 | 230 | log.info("消费消息失败,开始发送消费失败MQ"); |
228 | - String topic = CONSUMEFAILED_TOPIC; | |
229 | - String tag = CONSUMEFAILED_TAG; | |
231 | + String topic = environmentPrefix+"_"+CONSUMEFAILED_TOPIC; | |
232 | + String tag = CONSUMEFAILED_TAG; | |
230 | 233 | try{ |
231 | 234 | Date consumeEndTime = new Date(); |
232 | 235 | ConsumeFailedMsgVO consumeFailedMsgVO = new ConsumeFailedMsgVO(); |
... | ... | @@ -250,9 +253,8 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket |
250 | 253 | rocketMQTemplate.sendOneWay(topic, tag, consumeFailedMsgVO); |
251 | 254 | log.info("发送消息消费失败MQ成功"); |
252 | 255 | }catch(Exception e1){ |
253 | - log.info("发送消息消费失败MQ异常",e); | |
256 | + log.error("发送消息消费失败MQ异常", e1); | |
254 | 257 | } |
255 | - | |
256 | 258 | } |
257 | 259 | } |
258 | 260 | |
... | ... | @@ -273,7 +275,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket |
273 | 275 | return OrderAction.Success; |
274 | 276 | } |
275 | 277 | } |
276 | - | |
278 | + | |
277 | 279 | public class DefaultMessageListenerBatchs implements BatchMessageListener{ |
278 | 280 | |
279 | 281 | @Override |
... | ... | @@ -304,7 +306,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket |
304 | 306 | } |
305 | 307 | return Action.CommitMessage; |
306 | 308 | } |
307 | - | |
309 | + | |
308 | 310 | /** |
309 | 311 | * 发送消息消费失败消息 |
310 | 312 | * @param message |
... | ... | @@ -314,7 +316,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket |
314 | 316 | private void sendConsumeMsgFailed(Message message, Exception e,Date consumeBeginTime) { |
315 | 317 | log.info("消费消息失败,开始发送消费失败MQ"); |
316 | 318 | String topic = environmentPrefix+"_"+CONSUMEFAILED_TOPIC; |
317 | - String tag = CONSUMEFAILED_TAG; | |
319 | + String tag = CONSUMEFAILED_TAG; | |
318 | 320 | try{ |
319 | 321 | Date consumeEndTime = new Date(); |
320 | 322 | ConsumeFailedMsgVO consumeFailedMsgVO = new ConsumeFailedMsgVO(); |
... | ... | @@ -338,9 +340,8 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket |
338 | 340 | rocketMQTemplate.sendOneWay(topic, tag, consumeFailedMsgVO); |
339 | 341 | log.info("发送消息消费失败MQ成功"); |
340 | 342 | }catch(Exception e1){ |
341 | - log.info("发送消息消费失败MQ异常",e); | |
343 | + log.error("发送消息消费失败MQ异常", e1); | |
342 | 344 | } |
343 | - | |
344 | 345 | } |
345 | 346 | } |
346 | 347 | @Override |
... | ... | @@ -396,14 +397,14 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket |
396 | 397 | |
397 | 398 | Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required"); |
398 | 399 | Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); |
399 | - Assert.notNull(nameServerAddr, "Property 'nameServer' is required"); | |
400 | + Assert.notNull(nameSrvAddr, "Property 'nameServer' is required"); | |
400 | 401 | Assert.notNull(topic, "Property 'topic' is required"); |
401 | 402 | |
402 | 403 | Properties consumerProperties = new Properties(); |
403 | 404 | consumerProperties.setProperty(PropertyKeyConst.ConsumerId, consumerGroup); |
404 | 405 | consumerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey); |
405 | 406 | consumerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey); |
406 | - consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameServerAddr); | |
407 | + consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr); | |
407 | 408 | consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums, consumeThreadMax+""); |
408 | 409 | consumerProperties.setProperty(PropertyKeyConst.MessageModel, messageModel.getModeCN()); |
409 | 410 | //允许用户自己设置该consumer的一些配置 |
... | ... | @@ -434,7 +435,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket |
434 | 435 | default: |
435 | 436 | throw new IllegalArgumentException("Property 'consumeMode' was wrong."); |
436 | 437 | } |
437 | - | |
438 | + | |
438 | 439 | } |
439 | 440 | |
440 | 441 | } | ... | ... |
src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQPushConsumerLifecycleListener.java renamed to src/main/java/zteits/rocketmq/spring/starter/core/AliyunRocketMQPushConsumerLifecycleListener.java
src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java renamed to src/main/java/zteits/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java
src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java renamed to src/main/java/zteits/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java
... | ... | @@ -15,7 +15,7 @@ |
15 | 15 | * limitations under the License. |
16 | 16 | */ |
17 | 17 | |
18 | -package org.apache.rocketmq.spring.starter.core; | |
18 | +package zteits.rocketmq.spring.starter.core; | |
19 | 19 | |
20 | 20 | /** |
21 | 21 | * Constants Created by aqlu on 2017/11/16. |
... | ... | @@ -36,7 +36,8 @@ public final class DefaultRocketMQListenerContainerConstants { |
36 | 36 | public static final String PROP_ONS_Addr = "onsAddr"; |
37 | 37 | public static final String PROP_ACCESS_KEY = "accessKey"; |
38 | 38 | public static final String PROP_SECRET_KEY = "secretKey"; |
39 | - public static final String PROP_NAMESRV_ADDR = "nameServerAddr"; | |
39 | + public static final String PROP_NAMESRV_Addr = "nameSrvAddr"; | |
40 | + | |
40 | 41 | /** |
41 | 42 | * 环境前缀 |
42 | 43 | */ | ... | ... |
src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQConsumerLifecycleListener.java renamed to src/main/java/zteits/rocketmq/spring/starter/core/RocketMQConsumerLifecycleListener.java
... | ... | @@ -15,7 +15,7 @@ |
15 | 15 | * limitations under the License. |
16 | 16 | */ |
17 | 17 | |
18 | -package org.apache.rocketmq.spring.starter.core; | |
18 | +package zteits.rocketmq.spring.starter.core; | |
19 | 19 | |
20 | 20 | public interface RocketMQConsumerLifecycleListener<T> { |
21 | 21 | void prepareStart(final T consumer); | ... | ... |
src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListener.java renamed to src/main/java/zteits/rocketmq/spring/starter/core/RocketMQListener.java
src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListenerContainer.java renamed to src/main/java/zteits/rocketmq/spring/starter/core/RocketMQListenerContainer.java
src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java renamed to src/main/java/zteits/rocketmq/spring/starter/core/RocketMQTemplate.java
... | ... | @@ -15,7 +15,7 @@ |
15 | 15 | * limitations under the License. |
16 | 16 | */ |
17 | 17 | |
18 | -package org.apache.rocketmq.spring.starter.core; | |
18 | +package zteits.rocketmq.spring.starter.core; | |
19 | 19 | |
20 | 20 | import java.nio.charset.Charset; |
21 | 21 | import java.util.Map; |
... | ... | @@ -50,7 +50,7 @@ public class RocketMQTemplate implements InitializingBean, DisposableBean { |
50 | 50 | @Getter |
51 | 51 | @Setter |
52 | 52 | private String charset = "UTF-8"; |
53 | - | |
53 | + | |
54 | 54 | /** |
55 | 55 | * 环境前缀 |
56 | 56 | */ |
... | ... | @@ -63,7 +63,7 @@ public class RocketMQTemplate implements InitializingBean, DisposableBean { |
63 | 63 | * @param tag 消息标签, 请使用合法标识符, 尽量简短且见名知意 |
64 | 64 | * @param key 业务主键 |
65 | 65 | * @param payload 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述. |
66 | - * @param userProperties 添加用户自定义属性键值对; 该键值对在消费消费时可被获取.也可用于做SQL属性过滤 | |
66 | + * @param userProperties 添加用户自定义属性键值对; 该键值对在消费消费时可被获取.也可用于做SQL属性过滤 | |
67 | 67 | * @param startDeliverTime 设置消息的定时投递时间(绝对时间),最大延迟时间为7天. |
68 | 68 | * </p> |
69 | 69 | * <ol> |
... | ... | @@ -81,7 +81,7 @@ public class RocketMQTemplate implements InitializingBean, DisposableBean { |
81 | 81 | |
82 | 82 | try { |
83 | 83 | long now = System.currentTimeMillis(); |
84 | - | |
84 | + | |
85 | 85 | Message rocketMsg = new Message(environmentPrefix+"_"+topic, tag, keys, convertToRocketMsg(payload)); |
86 | 86 | if(userProperties!=null && !userProperties.isEmpty()){ |
87 | 87 | for (Entry<String, String> userProp : userProperties.entrySet()) { |
... | ... | @@ -133,7 +133,7 @@ public class RocketMQTemplate implements InitializingBean, DisposableBean { |
133 | 133 | * @param tag 消息标签, 请使用合法标识符, 尽量简短且见名知意 |
134 | 134 | * @param key 业务主键 |
135 | 135 | * @param payload 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述. |
136 | - * @param userProperties 添加用户自定义属性键值对; 该键值对在消费消费时可被获取.也可用于做SQL属性过滤 | |
136 | + * @param userProperties 添加用户自定义属性键值对; 该键值对在消费消费时可被获取.也可用于做SQL属性过滤 | |
137 | 137 | * @param startDeliverTime 设置消息的定时投递时间(绝对时间),最大延迟时间为7天. |
138 | 138 | * </p> |
139 | 139 | * <ol> |
... | ... | @@ -151,7 +151,7 @@ public class RocketMQTemplate implements InitializingBean, DisposableBean { |
151 | 151 | } |
152 | 152 | try { |
153 | 153 | long now = System.currentTimeMillis(); |
154 | - | |
154 | + | |
155 | 155 | Message rocketMsg = new Message(environmentPrefix+"_"+topic, tag, keys, convertToRocketMsg(payload)); |
156 | 156 | if(userProperties!=null && !userProperties.isEmpty()){ |
157 | 157 | for (Entry<String, String> userProp : userProperties.entrySet()) { |
... | ... | @@ -202,7 +202,7 @@ public class RocketMQTemplate implements InitializingBean, DisposableBean { |
202 | 202 | * @param tag 消息标签, 请使用合法标识符, 尽量简短且见名知意 |
203 | 203 | * @param key 业务主键 |
204 | 204 | * @param payload 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述. |
205 | - * @param userProperties 添加用户自定义属性键值对; 该键值对在消费消费时可被获取.也可用于做SQL属性过滤 | |
205 | + * @param userProperties 添加用户自定义属性键值对; 该键值对在消费消费时可被获取.也可用于做SQL属性过滤 | |
206 | 206 | * @param startDeliverTime 设置消息的定时投递时间(绝对时间),最大延迟时间为7天. |
207 | 207 | * </p> |
208 | 208 | * <ol> |
... | ... | @@ -219,7 +219,7 @@ public class RocketMQTemplate implements InitializingBean, DisposableBean { |
219 | 219 | } |
220 | 220 | try { |
221 | 221 | long now = System.currentTimeMillis(); |
222 | - | |
222 | + | |
223 | 223 | Message rocketMsg = new Message(environmentPrefix+"_"+topic, tag, keys, convertToRocketMsg(payload)); |
224 | 224 | if(userProperties!=null && !userProperties.isEmpty()){ |
225 | 225 | for (Entry<String, String> userProp : userProperties.entrySet()) { | ... | ... |
src/main/java/org/apache/rocketmq/spring/starter/enums/ConsumeMode.java renamed to src/main/java/zteits/rocketmq/spring/starter/enums/ConsumeMode.java
... | ... | @@ -15,7 +15,7 @@ |
15 | 15 | * limitations under the License. |
16 | 16 | */ |
17 | 17 | |
18 | -package org.apache.rocketmq.spring.starter.enums; | |
18 | +package zteits.rocketmq.spring.starter.enums; | |
19 | 19 | |
20 | 20 | public enum ConsumeMode { |
21 | 21 | /** |
... | ... | @@ -27,7 +27,7 @@ public enum ConsumeMode { |
27 | 27 | * 顺序接收消息,一个队列,一个线程 |
28 | 28 | */ |
29 | 29 | ORDERLY, |
30 | - | |
30 | + | |
31 | 31 | /** |
32 | 32 | * 批量接收发送的消息,允许自定义范围为[1, 32], 实际消费数量可能小于该值 |
33 | 33 | */ | ... | ... |
src/main/java/org/apache/rocketmq/spring/starter/enums/SelectorType.java renamed to src/main/java/zteits/rocketmq/spring/starter/enums/SelectorType.java
src/main/java/org/apache/rocketmq/spring/starter/exception/ConvertMsgException.java renamed to src/main/java/zteits/rocketmq/spring/starter/exception/ConvertMsgException.java
1 | -package org.apache.rocketmq.spring.starter.exception; | |
1 | +package zteits.rocketmq.spring.starter.exception; | |
2 | 2 | |
3 | 3 | public class ConvertMsgException extends RuntimeException{ |
4 | 4 | |
... | ... | @@ -23,5 +23,5 @@ public class ConvertMsgException extends RuntimeException{ |
23 | 23 | public ConvertMsgException(Throwable cause) { |
24 | 24 | super(cause); |
25 | 25 | } |
26 | - | |
26 | + | |
27 | 27 | } | ... | ... |
src/main/java/org/apache/rocketmq/spring/starter/msgvo/ConsumeFailedMsgVO.java renamed to src/main/java/zteits/rocketmq/spring/starter/msgvo/ConsumeFailedMsgVO.java
1 | -package org.apache.rocketmq.spring.starter.msgvo; | |
1 | +package zteits.rocketmq.spring.starter.msgvo; | |
2 | 2 | |
3 | 3 | import java.io.Serializable; |
4 | 4 | import java.util.Date; |
... | ... | @@ -33,7 +33,7 @@ public class ConsumeFailedMsgVO implements Serializable{ |
33 | 33 | |
34 | 34 | /**重复消费次数*/ |
35 | 35 | private Integer reconsumeTimes; |
36 | - | |
36 | + | |
37 | 37 | /**消费失败错误信息*/ |
38 | 38 | private String cunsumerErrMsg; |
39 | 39 | ... | ... |
src/main/java/org/apache/rocketmq/spring/starter/utils/ExceptionUtil.java renamed to src/main/java/zteits/rocketmq/spring/starter/utils/ExceptionUtil.java
1 | -package org.apache.rocketmq.spring.starter.utils; | |
2 | - | |
3 | -import java.io.PrintWriter; | |
4 | -import java.io.StringWriter; | |
5 | - | |
6 | -public class ExceptionUtil { | |
7 | - | |
8 | - public static String getTrace(Throwable t) { | |
9 | - StringBuffer buffer = new StringBuffer(); | |
10 | - if(t==null){ | |
11 | - return ""; | |
12 | - } | |
13 | - StringWriter stringWriter = new StringWriter(); | |
14 | - PrintWriter writer = new PrintWriter(stringWriter); | |
15 | - t.printStackTrace(writer); | |
16 | - //设置堆栈信息 | |
17 | - buffer.append("堆栈信息为:" + stringWriter.getBuffer().toString()); | |
18 | - return buffer.toString(); | |
19 | - } | |
20 | - | |
21 | -} | |
1 | +package zteits.rocketmq.spring.starter.utils; | |
2 | + | |
3 | +import java.io.PrintWriter; | |
4 | +import java.io.StringWriter; | |
5 | + | |
6 | +public class ExceptionUtil { | |
7 | + | |
8 | + public static String getTrace(Throwable t) { | |
9 | + StringBuffer buffer = new StringBuffer(); | |
10 | + if(t==null){ | |
11 | + return ""; | |
12 | + } | |
13 | + StringWriter stringWriter = new StringWriter(); | |
14 | + PrintWriter writer = new PrintWriter(stringWriter); | |
15 | + t.printStackTrace(writer); | |
16 | + //设置堆栈信息 | |
17 | + buffer.append("堆栈信息为:" + stringWriter.getBuffer().toString()); | |
18 | + return buffer.toString(); | |
19 | + } | |
20 | + | |
21 | +} | ... | ... |
src/main/java/org/apache/rocketmq/spring/starter/utils/IPUtil.java renamed to src/main/java/zteits/rocketmq/spring/starter/utils/IPUtil.java
src/main/resources/META-INF/spring.factories
src/test/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfigurationTests.java renamed to src/test/java/zteits/rocketmq/spring/starter/RocketMQAutoConfigurationTests.java