framework-mq-up使用
1.pom引入依赖
<dependency>
<groupId>com.newyetai</groupId>
<artifactId>framework-mq-up</artifactId>
</dependency>
配置文件见framework-mq-up的application(yml/properties)文件
2.生产消息
// 消息实体
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class People implements Serializable{
private String name;
private Integer age;
private String say;
}
// 生产消息
public void send(String msg) {
// 第一个参数路由键routingKey,须唯一,用于标识消息用途,消费者需要与之对应才能消费消息;
// 第二个参数,消息内容,消息内容类型最好与消费者共用一个dto,方便处理;
// 普通队列
MqSendUtils.send("peopleMessage", People.builder().name("sr").age(8).say("i=" + i).build());
// 延时队列
MqSendUtils.sendDelayed("peopleMessageDelayed", People.builder().name("sr").age(8).say("i=" + i).build(), 30L, TimeUnit.SECONDS);
}
3.消费消息
// 普通队列实现 ITopicMqHandler
@Service
@Slf4j
public class TryHandler implements ITopicMqHandler<People>{
@Override
public String getRoutingKey() {
// 与生产消息的routingKey匹配一致才能消费到消息
return "peopleMessage";
}
@Override
public void onMessage(TopicMqMessage<People> message) {
// 消费消息
log.info("message consumer:{}", message);
}
}
// 延迟队列实现 IDelayedMqHandler
@Slf4j
@Service
public class TryDelayedHandler implements IDelayedMqHandler<People>{
@Override
public String getRoutingKey() {
// 与生产消息的routingKey匹配一致才能消费到消息
return "peopleMessageDelayed";
}
@Override
public void onMessage(DelayedMqMessage<People> message) {
// 消费消息
log.info("message consumer:{}", message);
}
}
4.指定消费者ip
# 启动自己的消费者项目,修改消息生产者以下配置为自己ip;生产者生产的消息只能被指定ip的消费者消费
#指定消息消费者ip
rabbitmq.custom.consumerAppId=10.20.30.165
5.同系统多部署-广播模式
#广播id,同系统内唯一
rabbitmq.custom.broadcastId=1
6.规范
routingkey默认命名规则:生产者项目名_业务key;
(ps:如果有)
例如:用户中心生产一个用户修改的消息,routingkey则定为alita-user-center_user_modify