初始代码
This commit is contained in:
22
yunzhupaas-boot-common/yunzhupaas-common-event/pom.xml
Normal file
22
yunzhupaas-boot-common/yunzhupaas-common-event/pom.xml
Normal file
@@ -0,0 +1,22 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.yunzhupaas</groupId>
|
||||
<artifactId>yunzhupaas-boot-common</artifactId>
|
||||
<version>5.2.0-RELEASE</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<artifactId>yunzhupaas-common-event</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.yunzhupaas</groupId>
|
||||
<artifactId>yunzhupaas-common-database</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,63 @@
|
||||
package com.yunzhupaas.config;
|
||||
|
||||
import com.baomidou.lock.LockTemplate;
|
||||
import com.yunzhupaas.consts.ProjectEventConst;
|
||||
import com.yunzhupaas.event.*;
|
||||
import com.yunzhupaas.handler.ProjectEventRedisMessageHandler;
|
||||
import com.yunzhupaas.handler.ProjectEventRedisSender;
|
||||
import com.yunzhupaas.event.ProjectEventSender;
|
||||
import com.yunzhupaas.properties.EventProperty;
|
||||
import com.yunzhupaas.util.PublishEventUtil;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
public class ProjectEventAutoConfiguration {
|
||||
|
||||
|
||||
|
||||
@Bean
|
||||
@ConfigurationProperties(prefix = EventProperty.PREFIX)
|
||||
public EventProperty getEventProperties(){
|
||||
return new EventProperty();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public ProjectEventListenerAnnotationBeanPostProcessor getProjectEventListenerAnnotationBeanPostProcessor(){
|
||||
return new ProjectEventListenerAnnotationBeanPostProcessor();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ProjectEventProccess getProjectEventProccess(LockTemplate lockTemplate, ConfigValueUtil configValueUtil){
|
||||
return new ProjectEventProccess(lockTemplate, configValueUtil);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(prefix = "event", name = "event-publish-type", havingValue = ProjectEventConst.EVENT_PUBLISH_TYPE_REDIS, matchIfMissing = true)
|
||||
public ProjectEventRedisMessageHandler getProjectEventRedisMessageHandler(RedisMessageListenerContainer container, EventProperty eventProperty){
|
||||
return new ProjectEventRedisMessageHandler(container, eventProperty);
|
||||
}
|
||||
|
||||
/**
|
||||
* 自定义事件发布渠道为QUEUE
|
||||
*/
|
||||
@Bean
|
||||
@ConditionalOnProperty(prefix = "event", name = "event-publish-type", havingValue = ProjectEventConst.EVENT_PUBLISH_TYPE_REDIS, matchIfMissing = true)
|
||||
public ProjectEventRedisSender getProjectEventRedisSender(RedisTemplate redisTemplate, EventProperty eventProperty){
|
||||
return new ProjectEventRedisSender(redisTemplate, eventProperty);
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public PublishEventUtil getPublishEventUtil(@Autowired(required = false) ProjectEventSender eventSender){
|
||||
return new PublishEventUtil(eventSender);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,56 @@
|
||||
package com.yunzhupaas.consts;
|
||||
|
||||
/**
|
||||
* 自定义事件常量
|
||||
*/
|
||||
public class ProjectEventConst {
|
||||
|
||||
public static final String DEFAULT_TOPIC_NAME = "defMsgToptic";
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 自定义通知渠道 Redis
|
||||
*/
|
||||
public static final String EVENT_PUBLISH_TYPE_REDIS = "redis";
|
||||
/**
|
||||
* 自定义通知渠道 MQ
|
||||
*/
|
||||
public static final String EVENT_PUBLISH_TYPE_QUEUE = "mq";
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Redis事件监听模式, 当前DB
|
||||
*/
|
||||
public static final String REDIS_PUBLISH_TYPE_CURRENT = "current";
|
||||
|
||||
/**
|
||||
* Redis事件监听模式, 相同Redis(全部DB) 多个环境使用相同的Redis实例都会触发事件
|
||||
*/
|
||||
public static final String REDIS_PUBLISH_TYPE_ALL = "all";
|
||||
|
||||
/**
|
||||
* Redis事件消息监听前缀
|
||||
*/
|
||||
public static final String DEFAULT_CHANNEL_PREFIX = "myevent_";
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 集群消费, 只有一个服务消费成功
|
||||
*/
|
||||
public static final int EVENT_PUBLISH_MODEL_CLUSTERING = 1;
|
||||
|
||||
/**
|
||||
* 广播消费, 所有服务都可以消费
|
||||
*/
|
||||
public static final int EVENT_PUBLISH_MODEL_BROADCASTING = 2;
|
||||
|
||||
/**
|
||||
* 本地消息, 只在当前进程触发事件
|
||||
*/
|
||||
public static final int EVENT_PUBLISH_MODEL_LOCAL = 3;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,58 @@
|
||||
package com.yunzhupaas.event;
|
||||
|
||||
import com.yunzhupaas.consts.ProjectEventConst;
|
||||
import com.yunzhupaas.module.ProjectEvent;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* 自定义事件注册
|
||||
*/
|
||||
public class ProjectEventHolder {
|
||||
|
||||
private ProjectEventHolder() {
|
||||
}
|
||||
|
||||
/**
|
||||
* {topic:{keymatcher:consumer}}
|
||||
*/
|
||||
private static final Map<String, Map<ProjectEventKeyMatcher, Consumer<ProjectEvent>>> events = new HashMap<>();
|
||||
|
||||
/**
|
||||
* 添加事件处理器, 默认主题
|
||||
*/
|
||||
public static void addEventListener(ProjectEventKeyMatcher keyMatcher, Consumer<ProjectEvent> consumer) {
|
||||
addEventListener(ProjectEventConst.DEFAULT_TOPIC_NAME, keyMatcher, consumer);
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加事件处理器
|
||||
*/
|
||||
public static void addEventListener(String topic, ProjectEventKeyMatcher keyMatcher, Consumer<ProjectEvent> consumer) {
|
||||
Map<ProjectEventKeyMatcher, Consumer<ProjectEvent>> topicEvents = events.computeIfAbsent(topic, k -> new HashMap<>());
|
||||
topicEvents.put(keyMatcher, consumer);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取事件匹配的处理器, 默认主题
|
||||
*/
|
||||
public static List<Consumer<ProjectEvent>> getEventListener(ProjectEvent event) {
|
||||
return getEventListener(ProjectEventConst.DEFAULT_TOPIC_NAME, event);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取事件匹配的处理器
|
||||
*/
|
||||
public static List<Consumer<ProjectEvent>> getEventListener(String topic, ProjectEvent event) {
|
||||
Map<ProjectEventKeyMatcher, Consumer<ProjectEvent>> topicEvents = events.getOrDefault(topic, Collections.emptyMap());
|
||||
List<Consumer<ProjectEvent>> consumers = new ArrayList<>();
|
||||
for (Map.Entry<ProjectEventKeyMatcher, Consumer<ProjectEvent>> entry : topicEvents.entrySet()) {
|
||||
if(entry.getKey().isMatch(event)){
|
||||
consumers.add(entry.getValue());
|
||||
}
|
||||
}
|
||||
return consumers;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,94 @@
|
||||
package com.yunzhupaas.event;
|
||||
|
||||
import com.yunzhupaas.module.ProjectEvent;
|
||||
import org.springframework.expression.Expression;
|
||||
import org.springframework.expression.ExpressionParser;
|
||||
import org.springframework.expression.spel.standard.SpelExpressionParser;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
/**
|
||||
* 事件匹配
|
||||
*/
|
||||
public class ProjectEventKeyMatcher {
|
||||
|
||||
|
||||
protected String expression;
|
||||
|
||||
public ProjectEventKeyMatcher(String expression) {
|
||||
this.expression = expression;
|
||||
}
|
||||
|
||||
public Predicate<ProjectEvent> getMatcher() {
|
||||
return null;
|
||||
}
|
||||
|
||||
protected String getExpression() {
|
||||
return expression;
|
||||
}
|
||||
|
||||
public boolean isMatch(ProjectEvent event){
|
||||
return getMatcher().test(event);
|
||||
}
|
||||
|
||||
/**
|
||||
* 文本直接匹配
|
||||
*/
|
||||
public static class Text extends ProjectEventKeyMatcher {
|
||||
|
||||
public Text(String expression) {
|
||||
super(expression);
|
||||
}
|
||||
|
||||
private final Predicate<ProjectEvent> matcher = target -> Objects.equals(target.getChannel(), getExpression());
|
||||
|
||||
@Override
|
||||
public Predicate<ProjectEvent> getMatcher() {
|
||||
return matcher;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 正则匹配 String.matches匹配
|
||||
*/
|
||||
public static class Pattern extends ProjectEventKeyMatcher {
|
||||
|
||||
public Pattern(String expression) {
|
||||
super(expression);
|
||||
}
|
||||
|
||||
private final Predicate<ProjectEvent> matcher = target -> target.getChannel().matches(expression);
|
||||
|
||||
@Override
|
||||
public Predicate<ProjectEvent> getMatcher() {
|
||||
return matcher;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* SpEL表达式匹配
|
||||
*/
|
||||
public static class Spel extends ProjectEventKeyMatcher {
|
||||
|
||||
private static final ExpressionParser spelExpressionParserarser = new SpelExpressionParser();
|
||||
|
||||
public Spel(String expression) {
|
||||
super(expression);
|
||||
}
|
||||
|
||||
private final Predicate<ProjectEvent> matcher = target -> {
|
||||
Expression expression1 = spelExpressionParserarser.parseExpression(expression);
|
||||
return Boolean.TRUE.equals(expression1.getValue(target, Boolean.TYPE));
|
||||
};
|
||||
|
||||
@Override
|
||||
public Predicate<ProjectEvent> getMatcher() {
|
||||
return matcher;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
package com.yunzhupaas.event;
|
||||
|
||||
import com.yunzhupaas.consts.ProjectEventConst;
|
||||
import org.springframework.context.event.EventListener;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
/**
|
||||
* 自定义事件处理器注解
|
||||
*/
|
||||
@Target(ElementType.METHOD)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
@Inherited
|
||||
public @interface ProjectEventListener {
|
||||
|
||||
String toptic() default ProjectEventConst.DEFAULT_TOPIC_NAME;
|
||||
|
||||
/**
|
||||
* 直接匹配
|
||||
*/
|
||||
String[] channel() default {};
|
||||
|
||||
/**
|
||||
* 正则匹配
|
||||
*/
|
||||
String[] channelRegex() default {};
|
||||
|
||||
/**
|
||||
* SpELl表达式匹配, 结果必须boolean类型
|
||||
* @see EventListener#condition()
|
||||
*/
|
||||
String[] channelSpel() default {};
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,110 @@
|
||||
package com.yunzhupaas.event;
|
||||
|
||||
import cn.hutool.core.util.ReflectUtil;
|
||||
import com.yunzhupaas.config.ApplicationStartErrorCheck;
|
||||
import com.yunzhupaas.constant.GlobalConst;
|
||||
import com.yunzhupaas.module.ProjectEvent;
|
||||
import com.yunzhupaas.module.ProjectEventInstance;
|
||||
import com.yunzhupaas.util.StringUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.aop.support.AopUtils;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
import org.springframework.core.annotation.AnnotatedElementUtils;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 自定义事件处理器扫描
|
||||
*/
|
||||
@Slf4j
|
||||
public class ProjectEventListenerAnnotationBeanPostProcessor implements BeanPostProcessor{
|
||||
|
||||
private List<String> scanPackages = Arrays.asList(GlobalConst.PROJECT_SCAN_PACKAGES.split(";"));
|
||||
private List<Class> initedClass = new ArrayList<>();
|
||||
|
||||
|
||||
@Override
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean)
|
||||
: bean.getClass();
|
||||
boolean isMatch = scanPackages.stream().anyMatch(s -> targetClass.getName().startsWith(s));
|
||||
if(isMatch) {
|
||||
if(!initedClass.contains(targetClass)) {
|
||||
ApplicationStartErrorCheck.getApplicationInitThreadPool().execute(() -> {
|
||||
try {
|
||||
initedClass.add(targetClass);
|
||||
Method[] uniqueDeclaredMethods = ReflectionUtils
|
||||
.getUniqueDeclaredMethods(targetClass, ReflectionUtils.USER_DECLARED_METHODS);
|
||||
for (Method method : uniqueDeclaredMethods) {
|
||||
ProjectEventListener listener = AnnotatedElementUtils
|
||||
.findMergedAnnotation(method, ProjectEventListener.class);
|
||||
if (listener != null) {
|
||||
String[] channel = listener.channel();
|
||||
String[] channelPattern = listener.channelRegex();
|
||||
String[] channelSpel = listener.channelSpel();
|
||||
Set<String> channelSet = Arrays.stream(channel).filter(StringUtil::isNotEmpty).collect(Collectors.toSet());
|
||||
Set<String> channelPatternSet = Arrays.stream(channelPattern).filter(StringUtil::isNotEmpty).collect(Collectors.toSet());
|
||||
Set<String> channelSpelSet = Arrays.stream(channelSpel).filter(StringUtil::isNotEmpty).collect(Collectors.toSet());
|
||||
if (channelSet.isEmpty() && channelPatternSet.isEmpty() && channelSpelSet.isEmpty()) {
|
||||
throw new IllegalArgumentException(getLogText(beanName, method.getName(), "channel和channelPattern和channelSpel必须填写一个"));
|
||||
}
|
||||
|
||||
// 检查参数数量小于等于1
|
||||
int paramCount = method.getParameterCount();
|
||||
if (paramCount > 1) {
|
||||
throw new IllegalArgumentException(getLogText(beanName, method.getName(), "参数只能小于1个"));
|
||||
} else if (paramCount == 1) {
|
||||
if (!method.getParameterTypes()[0].equals(ProjectEventInstance.class)) {
|
||||
throw new IllegalArgumentException(getLogText(beanName, method.getName(), "参数只能是com.yunzhupaas.module.ProjectEventInstance"));
|
||||
}
|
||||
}
|
||||
|
||||
String topic = listener.toptic();
|
||||
Assert.hasLength(topic, "主题不允许为空");
|
||||
for (String s : channelSet) {
|
||||
ProjectEventHolder.addEventListener(topic, new ProjectEventKeyMatcher.Text(s), event -> {
|
||||
invokeMethod(bean, method, event);
|
||||
});
|
||||
}
|
||||
for (String s : channelPatternSet) {
|
||||
ProjectEventHolder.addEventListener(topic, new ProjectEventKeyMatcher.Pattern(s), event -> {
|
||||
invokeMethod(bean, method, event);
|
||||
});
|
||||
}
|
||||
for (String s : channelSpelSet) {
|
||||
ProjectEventHolder.addEventListener(topic, new ProjectEventKeyMatcher.Spel(s), event -> {
|
||||
invokeMethod(bean, method, event);
|
||||
});
|
||||
}
|
||||
if(log.isDebugEnabled()) {
|
||||
log.debug(getLogText(beanName, method.getName(), "注册成功"));
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
ApplicationStartErrorCheck.setStartError();
|
||||
throw e;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
return bean;
|
||||
}
|
||||
|
||||
private void invokeMethod(Object bean, Method method, ProjectEvent topicEvent) {
|
||||
ReflectUtil.invoke(bean, method, topicEvent);
|
||||
}
|
||||
|
||||
private String getLogText(String bean, String method, String text) {
|
||||
return "@ProjectEventListener注解的对象[" + bean + "]方法[" + method + "]" + text;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,110 @@
|
||||
package com.yunzhupaas.event;
|
||||
|
||||
import cn.hutool.core.text.StrPool;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.baomidou.lock.LockInfo;
|
||||
import com.baomidou.lock.LockTemplate;
|
||||
import com.yunzhupaas.base.UserInfo;
|
||||
import com.yunzhupaas.config.ConfigValueUtil;
|
||||
import com.yunzhupaas.consts.ProjectEventConst;
|
||||
import com.yunzhupaas.consts.RedisConst;
|
||||
import com.yunzhupaas.database.util.TenantDataSourceUtil;
|
||||
import com.yunzhupaas.module.ProjectEvent;
|
||||
import com.yunzhupaas.module.ProjectEventInstance;
|
||||
import com.yunzhupaas.util.StringUtil;
|
||||
import com.yunzhupaas.util.UserProvider;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* 自定义事件执行
|
||||
*/
|
||||
@Slf4j
|
||||
public class ProjectEventProccess {
|
||||
|
||||
private final LockTemplate lockTemplate;
|
||||
private final ConfigValueUtil configValueUtil;
|
||||
|
||||
/**
|
||||
* 集群模式锁定事件
|
||||
*/
|
||||
private final long LOCK_TIME = 24 * 60 * 60000L;
|
||||
|
||||
public ProjectEventProccess(LockTemplate lockTemplate, ConfigValueUtil configValueUtil) {
|
||||
this.lockTemplate = lockTemplate;
|
||||
this.configValueUtil = configValueUtil;
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步执行
|
||||
*
|
||||
* @param event
|
||||
*/
|
||||
@EventListener(condition = "!#event.async")
|
||||
public void onApplicationEvent(ProjectEventInstance event) {
|
||||
processEvent(event);
|
||||
}
|
||||
|
||||
/**
|
||||
* 异步执行
|
||||
*
|
||||
* @param event
|
||||
*/
|
||||
@Async
|
||||
@EventListener(condition = "#event.async")
|
||||
public void onApplicationEventAsync(ProjectEventInstance event) {
|
||||
processEvent(event);
|
||||
}
|
||||
|
||||
private void processEvent(ProjectEvent event) {
|
||||
String topic = event.getToptic();
|
||||
List<Consumer<ProjectEvent>> eventListener = ProjectEventHolder.getEventListener(topic, event);
|
||||
LockInfo lock = null;
|
||||
// 集群消费模式, 添加Redis锁
|
||||
if (!eventListener.isEmpty() && Objects.equals(event.getMessageModel(), ProjectEventConst.EVENT_PUBLISH_MODEL_CLUSTERING)) {
|
||||
// lock:服务名:事件编号
|
||||
String key = RedisConst.REDIS_LOCK4J_PREFIX + StrPool.COLON + configValueUtil.getApplicationName() + StrPool.COLON + event.getEventId().toString();
|
||||
lock = lockTemplate.lock(key, LOCK_TIME, 0L);
|
||||
if (lock == null) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("事件在其他服务已经处理: {}", JSON.toJSONString(event));
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
try {
|
||||
// 设置用户信息
|
||||
if (!eventListener.isEmpty()) {
|
||||
UserInfo userInfo = null;
|
||||
// 初始化线程用户信息
|
||||
if (event.getToken() != null) {
|
||||
userInfo = UserProvider.getUser(event.getToken());
|
||||
UserProvider.setLocalLoginUser(userInfo);
|
||||
}
|
||||
// 开启多租户 切库
|
||||
if (configValueUtil.isEnableLogicDelete()) {
|
||||
if (userInfo != null && StringUtil.isNotEmpty(userInfo.getTenantId())) {
|
||||
// 优先从用户获取多租户
|
||||
TenantDataSourceUtil.switchTenant(userInfo.getTenantId());
|
||||
} else if (event.getTenantId() != null) {
|
||||
// 从事件里获取多租户
|
||||
TenantDataSourceUtil.switchTenant(event.getTenantId());
|
||||
}
|
||||
}
|
||||
}
|
||||
for (Consumer<ProjectEvent> consumer : eventListener) {
|
||||
consumer.accept(event);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
lockTemplate.releaseLock(lock);
|
||||
log.error("事件处理失败: {}", event, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
package com.yunzhupaas.event;
|
||||
|
||||
import com.yunzhupaas.module.ProjectEvent;
|
||||
|
||||
/**
|
||||
* 事件发送渠道
|
||||
*/
|
||||
public interface ProjectEventSender {
|
||||
|
||||
void send(ProjectEvent projectEvent);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,86 @@
|
||||
package com.yunzhupaas.handler;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import com.yunzhupaas.constant.GlobalConst;
|
||||
import com.yunzhupaas.consts.ProjectEventConst;
|
||||
import com.yunzhupaas.consts.RedisConst;
|
||||
import com.yunzhupaas.module.ProjectEvent;
|
||||
import com.yunzhupaas.module.ProjectEventInstance;
|
||||
import com.yunzhupaas.properties.EventProperty;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.context.ApplicationEventPublisherAware;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.listener.KeyspaceEventMessageListener;
|
||||
import org.springframework.data.redis.listener.PatternTopic;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.data.redis.listener.Topic;
|
||||
import org.springframework.lang.Nullable;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* 自定义事件监听, Redis渠道, 收到通知后发送Spring事件(RedisEventInstance)
|
||||
*/
|
||||
@Slf4j
|
||||
public class ProjectEventRedisMessageHandler extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware {
|
||||
|
||||
private static Topic TOPIC_ALL_KEYEVENTS;
|
||||
private String keyprefix;
|
||||
|
||||
private @Nullable ApplicationEventPublisher publisher;
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Creates new {@link KeyspaceEventMessageListener}.
|
||||
*
|
||||
* @param listenerContainer must not be {@literal null}.
|
||||
*/
|
||||
public ProjectEventRedisMessageHandler(RedisMessageListenerContainer listenerContainer, EventProperty eventProperties) {
|
||||
super(listenerContainer);
|
||||
if(Objects.equals(eventProperties.getRedisPublishType(), ProjectEventConst.REDIS_PUBLISH_TYPE_ALL)){
|
||||
// 只订阅当前配置的库索引
|
||||
keyprefix = ProjectEventConst.DEFAULT_CHANNEL_PREFIX;
|
||||
}else{
|
||||
// 订阅同一个Redis里的所有消息
|
||||
keyprefix = RedisConst.REDIS_EVENT_KEY + ProjectEventConst.DEFAULT_CHANNEL_PREFIX;
|
||||
}
|
||||
TOPIC_ALL_KEYEVENTS = new PatternTopic(keyprefix + "*");
|
||||
log.info("初始化自定义事件Redis监听器");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRegister(RedisMessageListenerContainer container) {
|
||||
container.addMessageListener(this, TOPIC_ALL_KEYEVENTS);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doHandleMessage(@NotNull Message message) {
|
||||
String channel = new String(message.getChannel(), GlobalConst.DEFAULT_CHARSET);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("事件监听收到Redis消息, channel: {}, body: {}", channel, new String(message.getBody(), GlobalConst.DEFAULT_CHARSET));
|
||||
}
|
||||
publishEvent(JSON.parseObject(message.getBody(), ProjectEventInstance.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish the event in case an {@link ApplicationEventPublisher} is set.
|
||||
*
|
||||
* @param event can be {@literal null}.
|
||||
*/
|
||||
protected void publishEvent(ProjectEvent event) {
|
||||
if (publisher != null) {
|
||||
event.setAsync(true);
|
||||
this.publisher.publishEvent(event);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
|
||||
publisher = applicationEventPublisher;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
package com.yunzhupaas.handler;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.yunzhupaas.consts.ProjectEventConst;
|
||||
import com.yunzhupaas.consts.RedisConst;
|
||||
import com.yunzhupaas.module.ProjectEvent;
|
||||
import com.yunzhupaas.event.ProjectEventSender;
|
||||
import com.yunzhupaas.properties.EventProperty;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* 自定义事件发布 Redis渠道
|
||||
*/
|
||||
@Slf4j
|
||||
public class ProjectEventRedisSender implements ProjectEventSender {
|
||||
|
||||
|
||||
private RedisTemplate redisTemplate;
|
||||
private EventProperty eventProperties;
|
||||
|
||||
public ProjectEventRedisSender(RedisTemplate redisTemplate, EventProperty eventProperties) {
|
||||
this.redisTemplate = redisTemplate;
|
||||
this.eventProperties = eventProperties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(ProjectEvent event) {
|
||||
String channel = event.getChannel();
|
||||
// channel 加上指定前缀
|
||||
if(Objects.equals(eventProperties.getRedisPublishType(), ProjectEventConst.REDIS_PUBLISH_TYPE_ALL)){
|
||||
// 订阅同一个Redis里的所有消息
|
||||
channel = ProjectEventConst.DEFAULT_CHANNEL_PREFIX + channel;
|
||||
}else{
|
||||
// 只订阅当前配置的库索引
|
||||
channel = RedisConst.REDIS_EVENT_KEY + ProjectEventConst.DEFAULT_CHANNEL_PREFIX + channel;
|
||||
}
|
||||
redisTemplate.convertAndSend(channel, event);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("发送Redis自定义事件: {}", JSON.toJSONString(event));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,84 @@
|
||||
package com.yunzhupaas.module;
|
||||
|
||||
import com.github.yitter.idgen.YitIdHelper;
|
||||
import com.yunzhupaas.consts.ProjectEventConst;
|
||||
import lombok.Data;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* 项目事件基础类
|
||||
*/
|
||||
@Data
|
||||
@Accessors(chain = true)
|
||||
public class ProjectEvent{
|
||||
|
||||
|
||||
/**
|
||||
* 数据
|
||||
*/
|
||||
private Object source;
|
||||
|
||||
/**
|
||||
* 消息ID
|
||||
*/
|
||||
private Long eventId;
|
||||
|
||||
/**
|
||||
* 主题
|
||||
*/
|
||||
private String toptic;
|
||||
|
||||
/**
|
||||
* 频道
|
||||
*/
|
||||
private String channel;
|
||||
|
||||
/**
|
||||
* 是否异步
|
||||
*/
|
||||
private boolean async;
|
||||
|
||||
/**
|
||||
* 1:集群, 2:广播, 3:本地
|
||||
* @see com.yunzhupaas.consts.ProjectEventConst#EVENT_PUBLISH_MODEL_CLUSTERING
|
||||
* @see com.yunzhupaas.consts.ProjectEventConst#EVENT_PUBLISH_MODEL_BROADCASTING
|
||||
* @see com.yunzhupaas.consts.ProjectEventConst#EVENT_PUBLISH_MODEL_LOCAL
|
||||
*/
|
||||
private int messageModel;
|
||||
|
||||
/**
|
||||
* 是否事务提交后才发送
|
||||
*/
|
||||
private boolean afterCommitTransaction;
|
||||
|
||||
/**
|
||||
* 用户TOKEN
|
||||
*/
|
||||
private String token;
|
||||
|
||||
/**
|
||||
* 租户编码
|
||||
*/
|
||||
private String tenantId;
|
||||
|
||||
public ProjectEvent(){
|
||||
}
|
||||
|
||||
public ProjectEvent(String topic, String channel, Object source) {
|
||||
this(null, topic, channel, source, null, true, false);
|
||||
}
|
||||
|
||||
public ProjectEvent(Long eventId, String topic, String channel, Object source, Integer messageModel, boolean async, boolean afterCommitTransaction) {
|
||||
this.source = source;
|
||||
this.eventId = Optional.ofNullable(eventId).orElseGet(YitIdHelper::nextId);
|
||||
this.toptic = Optional.ofNullable(topic).orElse(ProjectEventConst.DEFAULT_TOPIC_NAME);
|
||||
this.messageModel = Optional.ofNullable(messageModel).orElse(ProjectEventConst.EVENT_PUBLISH_MODEL_CLUSTERING);
|
||||
this.channel = channel;
|
||||
this.async = async;
|
||||
this.afterCommitTransaction = afterCommitTransaction;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
package com.yunzhupaas.module;
|
||||
|
||||
|
||||
/**
|
||||
* 发送自定义事件模型
|
||||
*/
|
||||
public class ProjectEventBuilder extends ProjectEvent{
|
||||
|
||||
public ProjectEventBuilder(String channel, Object source) {
|
||||
super(null, channel, source);
|
||||
}
|
||||
|
||||
public ProjectEventBuilder(String topic, String channel, Object source) {
|
||||
super(topic, channel, source);
|
||||
}
|
||||
|
||||
public ProjectEventBuilder(Long eventId, String topic, String channel, Object source, Integer messageModel, boolean async, boolean afterCommitTransaction) {
|
||||
super(eventId, topic, channel, source, messageModel, async, afterCommitTransaction);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProjectEventBuilder setSource(Object source) {
|
||||
return (ProjectEventBuilder) super.setSource(source);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProjectEventBuilder setEventId(Long eventId) {
|
||||
return (ProjectEventBuilder) super.setEventId(eventId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProjectEventBuilder setAsync(boolean async) {
|
||||
return (ProjectEventBuilder) super.setAsync(async);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProjectEventBuilder setMessageModel(int messageModel) {
|
||||
return (ProjectEventBuilder) super.setMessageModel(messageModel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProjectEventBuilder setAfterCommitTransaction(boolean afterCommitTransaction) {
|
||||
return (ProjectEventBuilder) super.setAfterCommitTransaction(afterCommitTransaction);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
package com.yunzhupaas.module;
|
||||
|
||||
|
||||
/**
|
||||
* 最终事件触发模型
|
||||
*/
|
||||
public class ProjectEventInstance extends ProjectEvent{
|
||||
|
||||
public ProjectEventInstance() {
|
||||
}
|
||||
|
||||
public ProjectEventInstance(String topic, String channel, Object source) {
|
||||
super(topic, channel, source);
|
||||
}
|
||||
|
||||
public ProjectEventInstance(Long eventId, String topic, String channel, Object source, Integer messageModel, boolean async, boolean afterCommitTransaction) {
|
||||
super(eventId, topic, channel, source, messageModel, async, afterCommitTransaction);
|
||||
}
|
||||
|
||||
public static ProjectEventInstance parseEvent(ProjectEvent event){
|
||||
return new ProjectEventInstance(event.getEventId()
|
||||
, event.getToptic(), event.getChannel(), event.getSource()
|
||||
, event.getMessageModel(), event.isAsync(), event.isAfterCommitTransaction());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.yunzhupaas.module;
|
||||
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
/**
|
||||
* 自定义事件内部无事务流转模型
|
||||
*/
|
||||
@Accessors(chain = true)
|
||||
public class ProjectEventPublish extends ProjectEvent{
|
||||
|
||||
public ProjectEventPublish(Long eventId, String topic, String channel, Object source, Integer messageModel, boolean async, boolean afterCommitTransaction) {
|
||||
super(eventId, topic, channel, source, messageModel, async, afterCommitTransaction);
|
||||
}
|
||||
|
||||
public static ProjectEventPublish parseEvent(ProjectEvent event){
|
||||
return new ProjectEventPublish(event.getEventId()
|
||||
, event.getToptic(), event.getChannel(), event.getSource()
|
||||
, event.getMessageModel(), event.isAsync(), event.isAfterCommitTransaction());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.yunzhupaas.module;
|
||||
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
/**
|
||||
* 自定义事件内部存在事务流转模型
|
||||
*/
|
||||
@Accessors(chain = true)
|
||||
public class ProjectEventPublishTransaction extends ProjectEvent {
|
||||
|
||||
public ProjectEventPublishTransaction(Long eventId, String topic, String channel, Object source, Integer messageModel, boolean async, boolean afterCommitTransaction) {
|
||||
super(eventId, topic, channel, source, messageModel, async, afterCommitTransaction);
|
||||
}
|
||||
|
||||
public static ProjectEventPublishTransaction parseEvent(ProjectEvent event){
|
||||
return new ProjectEventPublishTransaction(event.getEventId()
|
||||
, event.getToptic(), event.getChannel(), event.getSource()
|
||||
, event.getMessageModel(), event.isAsync(), event.isAfterCommitTransaction());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.yunzhupaas.properties;
|
||||
|
||||
|
||||
import com.yunzhupaas.consts.ProjectEventConst;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* 事件模块配置
|
||||
*/
|
||||
@Data
|
||||
public class EventProperty {
|
||||
public static final String PREFIX = "event";
|
||||
|
||||
/**
|
||||
* 默认时间发布、监听渠道: redis, mq
|
||||
*/
|
||||
public String eventPublishType = ProjectEventConst.EVENT_PUBLISH_TYPE_REDIS;
|
||||
|
||||
/**
|
||||
* Redis监听模式:current, all
|
||||
*/
|
||||
public String redisPublishType = ProjectEventConst.REDIS_PUBLISH_TYPE_CURRENT;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,100 @@
|
||||
package com.yunzhupaas.util;
|
||||
|
||||
import com.baomidou.dynamic.datasource.annotation.DsTxEventListener;
|
||||
import com.baomidou.dynamic.datasource.tx.TransactionContext;
|
||||
import com.yunzhupaas.base.UserInfo;
|
||||
import com.yunzhupaas.consts.ProjectEventConst;
|
||||
import com.yunzhupaas.event.ProjectEventSender;
|
||||
import com.yunzhupaas.module.*;
|
||||
import lombok.Getter;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.context.ApplicationEventPublisherAware;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.transaction.event.TransactionalEventListener;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class PublishEventUtil implements ApplicationEventPublisherAware {
|
||||
|
||||
@Getter
|
||||
private static ApplicationEventPublisher publisher;
|
||||
private static ProjectEventSender eventSender = null;
|
||||
|
||||
public PublishEventUtil(ProjectEventSender eventSender) {
|
||||
PublishEventUtil.eventSender = eventSender;
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布自定义事件
|
||||
* async: 是否异步 | 默认异步
|
||||
* messageModel: 1:集群(相同的服务中只有一个服务消费), 2:广播(所有服务都消费), 3:本地消息(当前进程消费) | 默认集群模式
|
||||
* afterCommitTransaction: 如果当前存在事务, 通知在事务提交成功后 |默认否
|
||||
* Redis渠道不支持重试, 禁用同步模式
|
||||
* QUEUE渠道同步模式如果报错可以触发MQ重试
|
||||
*/
|
||||
public static void publish(ProjectEventBuilder eventBuilder) {
|
||||
ProjectEvent newEvent = null;
|
||||
if (eventBuilder.isAfterCommitTransaction()) {
|
||||
// 开启提交事务后发送, 且当前存在事务
|
||||
if (!TransactionSynchronizationManager.isActualTransactionActive() && !StringUtils.hasText(TransactionContext.getXID())) {
|
||||
newEvent = ProjectEventPublishTransaction.parseEvent(eventBuilder);
|
||||
}
|
||||
}
|
||||
if(newEvent == null){
|
||||
newEvent = ProjectEventPublish.parseEvent(eventBuilder);
|
||||
}
|
||||
PublishEventUtil.getPublisher().publishEvent(newEvent);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送本地事件
|
||||
* @see #publish(ProjectEventBuilder)
|
||||
*/
|
||||
public static void publishLocalEvent(ProjectEventBuilder eventBuilder) {
|
||||
eventBuilder.setMessageModel(ProjectEventConst.EVENT_PUBLISH_MODEL_LOCAL);
|
||||
publish(eventBuilder);
|
||||
}
|
||||
|
||||
|
||||
@EventListener
|
||||
public void onPublishEvent(ProjectEventPublish event) {
|
||||
processPublishEvent(event);
|
||||
}
|
||||
|
||||
@TransactionalEventListener
|
||||
@DsTxEventListener
|
||||
public void onPublishTransactionEvent(ProjectEventPublishTransaction event) {
|
||||
processPublishEvent(event);
|
||||
}
|
||||
|
||||
private void processPublishEvent(ProjectEvent event) {
|
||||
Assert.notNull(eventSender, "当前发布渠道未配置");
|
||||
if(event.getToken() == null){
|
||||
String token = UserProvider.getToken();
|
||||
if(token == null){
|
||||
UserInfo userInfo = UserProvider.getLocalLoginUser();
|
||||
if(userInfo != null){
|
||||
token = userInfo.getToken();
|
||||
}
|
||||
}
|
||||
event.setToken(token);
|
||||
}
|
||||
if(event.getTenantId() == null){
|
||||
event.setTenantId(TenantHolder.getDatasourceId());
|
||||
}
|
||||
if(Objects.equals(event.getMessageModel(), ProjectEventConst.EVENT_PUBLISH_MODEL_LOCAL)){
|
||||
PublishEventUtil.getPublisher().publishEvent(ProjectEventInstance.parseEvent(event));
|
||||
}else {
|
||||
eventSender.send(event);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
|
||||
PublishEventUtil.publisher = applicationEventPublisher;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user