Commit 43a3a918af9efcb428d6aadf5f6bd1934a4145b2
1 parent
a2aff61f
阿里云提供的ons-client最新版本1.7.4有问题,引入后工程无法打包,报Failure to find
com.alibaba.security:security-spring-dependencies:pom:1.0.0-SNAPSHOT。 目前使用的1.7.1,该版本不支持SQL92过滤
Showing
4 changed files
with
17 additions
and
19 deletions
pom.xml
| @@ -22,7 +22,7 @@ | @@ -22,7 +22,7 @@ | ||
| 22 | 22 | ||
| 23 | <groupId>org.apache.rocketmq</groupId> | 23 | <groupId>org.apache.rocketmq</groupId> |
| 24 | <artifactId>spring-boot-starter-rocketmq</artifactId> | 24 | <artifactId>spring-boot-starter-rocketmq</artifactId> |
| 25 | - <version>1.0.2-SNAPSHOT</version> | 25 | + <version>1.0.4-SNAPSHOT</version> |
| 26 | 26 | ||
| 27 | <name>Spring Boot Rocket Starter</name> | 27 | <name>Spring Boot Rocket Starter</name> |
| 28 | <description>Starter for messaging using Apache RocketMQ</description> | 28 | <description>Starter for messaging using Apache RocketMQ</description> |
| @@ -60,10 +60,10 @@ | @@ -60,10 +60,10 @@ | ||
| 60 | <artifactId>spring-boot-starter</artifactId> | 60 | <artifactId>spring-boot-starter</artifactId> |
| 61 | </dependency> | 61 | </dependency> |
| 62 | <dependency> | 62 | <dependency> |
| 63 | - <groupId>com.aliyun.openservices</groupId> | ||
| 64 | - <artifactId>ons-client</artifactId> | ||
| 65 | - <version>1.7.2.Final</version> | ||
| 66 | - </dependency> | 63 | + <groupId>com.aliyun.openservices</groupId> |
| 64 | + <artifactId>ons-client</artifactId> | ||
| 65 | + <version>1.7.1.Final</version> | ||
| 66 | + </dependency> | ||
| 67 | <dependency> | 67 | <dependency> |
| 68 | <groupId>org.springframework</groupId> | 68 | <groupId>org.springframework</groupId> |
| 69 | <artifactId>spring-messaging</artifactId> | 69 | <artifactId>spring-messaging</artifactId> |
src/main/java/org/apache/rocketmq/spring/starter/annotation/RocketMQMessageListener.java
| @@ -26,7 +26,6 @@ import java.lang.annotation.Target; | @@ -26,7 +26,6 @@ import java.lang.annotation.Target; | ||
| 26 | import org.apache.rocketmq.spring.starter.enums.ConsumeMode; | 26 | import org.apache.rocketmq.spring.starter.enums.ConsumeMode; |
| 27 | import org.apache.rocketmq.spring.starter.enums.SelectorType; | 27 | import org.apache.rocketmq.spring.starter.enums.SelectorType; |
| 28 | 28 | ||
| 29 | -import com.aliyun.openservices.ons.api.ExpressionType; | ||
| 30 | import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; | 29 | import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; |
| 31 | 30 | ||
| 32 | @Target(ElementType.TYPE) | 31 | @Target(ElementType.TYPE) |
src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java
| @@ -17,6 +17,9 @@ | @@ -17,6 +17,9 @@ | ||
| 17 | 17 | ||
| 18 | package org.apache.rocketmq.spring.starter.core; | 18 | package org.apache.rocketmq.spring.starter.core; |
| 19 | 19 | ||
| 20 | +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.CONSUMEFAILED_TAG; | ||
| 21 | +import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.CONSUMEFAILED_TOPIC; | ||
| 22 | + | ||
| 20 | import java.lang.reflect.ParameterizedType; | 23 | import java.lang.reflect.ParameterizedType; |
| 21 | import java.lang.reflect.Type; | 24 | import java.lang.reflect.Type; |
| 22 | import java.nio.charset.Charset; | 25 | import java.nio.charset.Charset; |
| @@ -40,7 +43,6 @@ import com.aliyun.openservices.ons.api.ConsumeContext; | @@ -40,7 +43,6 @@ import com.aliyun.openservices.ons.api.ConsumeContext; | ||
| 40 | import com.aliyun.openservices.ons.api.Consumer; | 43 | import com.aliyun.openservices.ons.api.Consumer; |
| 41 | import com.aliyun.openservices.ons.api.Message; | 44 | import com.aliyun.openservices.ons.api.Message; |
| 42 | import com.aliyun.openservices.ons.api.MessageListener; | 45 | import com.aliyun.openservices.ons.api.MessageListener; |
| 43 | -import com.aliyun.openservices.ons.api.MessageSelector; | ||
| 44 | import com.aliyun.openservices.ons.api.ONSFactory; | 46 | import com.aliyun.openservices.ons.api.ONSFactory; |
| 45 | import com.aliyun.openservices.ons.api.PropertyKeyConst; | 47 | import com.aliyun.openservices.ons.api.PropertyKeyConst; |
| 46 | import com.aliyun.openservices.ons.api.batch.BatchConsumer; | 48 | import com.aliyun.openservices.ons.api.batch.BatchConsumer; |
| @@ -49,15 +51,11 @@ import com.aliyun.openservices.ons.api.order.ConsumeOrderContext; | @@ -49,15 +51,11 @@ import com.aliyun.openservices.ons.api.order.ConsumeOrderContext; | ||
| 49 | import com.aliyun.openservices.ons.api.order.MessageOrderListener; | 51 | import com.aliyun.openservices.ons.api.order.MessageOrderListener; |
| 50 | import com.aliyun.openservices.ons.api.order.OrderAction; | 52 | import com.aliyun.openservices.ons.api.order.OrderAction; |
| 51 | import com.aliyun.openservices.ons.api.order.OrderConsumer; | 53 | import com.aliyun.openservices.ons.api.order.OrderConsumer; |
| 52 | -import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; | ||
| 53 | -import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; | ||
| 54 | -import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext; | ||
| 55 | -import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; | 54 | +import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.MessageSelector; |
| 56 | import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQClientException; | 55 | import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQClientException; |
| 57 | -import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt; | ||
| 58 | import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; | 56 | import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; |
| 59 | import com.fasterxml.jackson.databind.ObjectMapper; | 57 | import com.fasterxml.jackson.databind.ObjectMapper; |
| 60 | -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.*; | 58 | + |
| 61 | import lombok.Getter; | 59 | import lombok.Getter; |
| 62 | import lombok.Setter; | 60 | import lombok.Setter; |
| 63 | import lombok.extern.slf4j.Slf4j; | 61 | import lombok.extern.slf4j.Slf4j; |
| @@ -399,16 +397,16 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket | @@ -399,16 +397,16 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket | ||
| 399 | orderConsumer = ONSFactory.createOrderedConsumer(consumerProperties); | 397 | orderConsumer = ONSFactory.createOrderedConsumer(consumerProperties); |
| 400 | if(selectorType == SelectorType.TAG){ | 398 | if(selectorType == SelectorType.TAG){ |
| 401 | orderConsumer.subscribe(topic, selectorExpress, new DefaultMessageListenerOrderly()); | 399 | orderConsumer.subscribe(topic, selectorExpress, new DefaultMessageListenerOrderly()); |
| 402 | - }else if(selectorType == SelectorType.SQL92){ | ||
| 403 | - orderConsumer.subscribe(topic, MessageSelector.bySql(selectorExpress), new DefaultMessageListenerOrderly()); | 400 | +// }else if(selectorType == SelectorType.SQL92){ |
| 401 | +// orderConsumer.subscribe(topic, MessageSelector.bySql(selectorExpress), new DefaultMessageListenerOrderly()); | ||
| 404 | } | 402 | } |
| 405 | break; | 403 | break; |
| 406 | case CONCURRENTLY://普通消息 | 404 | case CONCURRENTLY://普通消息 |
| 407 | consumer = ONSFactory.createConsumer(consumerProperties); | 405 | consumer = ONSFactory.createConsumer(consumerProperties); |
| 408 | if(selectorType == SelectorType.TAG){ | 406 | if(selectorType == SelectorType.TAG){ |
| 409 | consumer.subscribe(topic, selectorExpress, new DefaultMessageListenerConcurrently()); | 407 | consumer.subscribe(topic, selectorExpress, new DefaultMessageListenerConcurrently()); |
| 410 | - }else if(selectorType == SelectorType.SQL92){ | ||
| 411 | - consumer.subscribe(topic, MessageSelector.bySql(selectorExpress), new DefaultMessageListenerConcurrently()); | 408 | +// }else if(selectorType == SelectorType.SQL92){ |
| 409 | +// consumer.subscribe(topic, MessageSelector.bySql(selectorExpress), new DefaultMessageListenerConcurrently()); | ||
| 412 | } | 410 | } |
| 413 | break; | 411 | break; |
| 414 | case BATCH://批量消息 | 412 | case BATCH://批量消息 |
src/main/java/org/apache/rocketmq/spring/starter/enums/SelectorType.java
| @@ -17,7 +17,7 @@ | @@ -17,7 +17,7 @@ | ||
| 17 | 17 | ||
| 18 | package org.apache.rocketmq.spring.starter.enums; | 18 | package org.apache.rocketmq.spring.starter.enums; |
| 19 | 19 | ||
| 20 | -import com.aliyun.openservices.ons.api.ExpressionType; | 20 | +import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.filter.ExpressionType; |
| 21 | 21 | ||
| 22 | public enum SelectorType { | 22 | public enum SelectorType { |
| 23 | 23 | ||
| @@ -28,6 +28,7 @@ public enum SelectorType { | @@ -28,6 +28,7 @@ public enum SelectorType { | ||
| 28 | 28 | ||
| 29 | /** | 29 | /** |
| 30 | * @see ExpressionType#SQL92 | 30 | * @see ExpressionType#SQL92 |
| 31 | + * 注释by zwg 暂时不支持,原因:阿里云提供的ons-client最新版本1.7.4支持,但是该包有问题,引入后工程无法打包,目前使用的1.7.1,该版本不支持SQL92过滤 | ||
| 31 | */ | 32 | */ |
| 32 | - SQL92 | 33 | + //SQL92 |
| 33 | } | 34 | } |