java实现定时发布系统消息

业务场景描述

管理员通过后台可以定时给系统推送消息;比如 管理员在春节放假前一周需要请假,但是公司需要在春节放假前一天通过消息推送,给所有员工拜年,这时候管理员可以提前写好拜年消息,在后台配置定时发送日期为新年前一天晚上。

代码实现

消息容器类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.lhb.service.impl;

import java.util.concurrent.ConcurrentHashMap;

/**
* @Program: lhb
* @Description: 消息容器
* @Author: LHB
* @Version: v0.0.1
* @Time: 2021-11-24 10:16
**/
public abstract class AbstractNotificationsContainer {
/** 消息ID-任务ID映射表 */
private static volatile ConcurrentHashMap<Integer,String> msgToTaskMapping = new ConcurrentHashMap<>();
/** 任务ID-消息ID映射表 */
private static volatile ConcurrentHashMap<String,Integer> taskToMsgMapping = new ConcurrentHashMap<>();
/** 任务缓存 */
private static volatile ConcurrentHashMap<String, SystemNotificationHolder> taskCache = new ConcurrentHashMap<>();

public void cache(SystemNotificationHolder release) {
taskCache.put(release.getTaskId(),release);
taskToMsgMapping.put(release.getTaskId(),release.getSystemMessageId());
msgToTaskMapping.put(release.getSystemMessageId(),release.getTaskId());
}

public void clean(SystemNotificationHolder release) {
msgToTaskMapping.remove(release.getSystemMessageId());
taskCache.remove(release.getTaskId());
taskToMsgMapping.remove(release.getTaskId());
}

public void clean(String taskId) {
Integer msgId = taskToMsgMapping.getOrDefault(taskId,null);
if (msgId != null) {
msgToTaskMapping.remove(msgId);
taskCache.remove(taskId);
taskToMsgMapping.remove(taskId);
}
}

public static boolean cancel(Integer msgId) {
String taskIdStr = msgToTaskMapping.get(msgId);
return taskCache.search(1,(taskId,systemMessageRelease)->{
if (taskId.equals(taskIdStr)) {
systemMessageRelease.taskCanceled();
return true;
}
return false;
});
}

public static int notificationCount() {
return taskCache.size();
}
}

消息发布类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.lhb.service.impl;

import cn.hutool.extra.spring.SpringUtil;
import com.lhb.domain.entity.SystemNotifications;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* @Program: lhb
* @Description: 系統消息发布
* @Author: LHB
* @Version: v0.0.1
* @Time: 2021-11-24 09:53
**/
@Slf4j
public class SystemNotificationHolder extends AbstractNotificationsContainer implements Runnable{

/** 任务取消状态 */
private volatile AtomicBoolean canceled = new AtomicBoolean(false);
/** 任务ID */
private String taskId;
/** 系统消息ID */
private Integer systemMessageId;

public SystemNotificationHolder(String taskId, Integer systemMessageId) {
this.taskId = taskId;
this.systemMessageId = systemMessageId;
cache(this);
}

public synchronized void taskCanceled() {
canceled.compareAndSet(false,true);
}

public String getTaskId() {
return this.taskId;
}

public Integer getSystemMessageId() {
return this.systemMessageId;
}

@Override
public void run() {
log.info("定时发布公告");
try {
if (!canceled.get()) {
SystemNotificationsServiceImpl systemMessageService = SpringUtil.getBean(SystemNotificationsServiceImpl.class);
SystemNotifications systemMessage = new SystemNotifications();
systemMessage.setId(systemMessageId);
systemMessage.setStatus(2);
systemMessageService.updateById(systemMessage);
log.info("消息发布成功 {}", systemMessage);
}
clean(this);
} catch (Exception e) {
e.printStackTrace();
}
}
}

系统消息管理服务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.lhb.service.impl;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.UUID;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.lhb.basic.constant.ContextHelper;
import com.lhb.basic.exception.BizException;
import com.lhb.domain.entity.SystemNotifications;
import com.lhb.model.SystemNotificationModel;
import com.lhb.domain.vo.SystemMessageReleasedVO;
import com.lhb.domain.vo.SystemMessageVO;
import com.lhb.mapper.SystemNotificationsMapper;
import com.lhb.service.ISystemNotificationsService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* @Program: lhb
* @Description: 系统消息类
* @Author: LHB
* @Version: v0.0.1
* @Time: 2021-11-18 09:23
**/
@Service
@Slf4j
public class SystemNotificationsServiceImpl extends ServiceImpl<SystemNotificationsMapper, SystemNotifications> implements ISystemNotificationsService {
@Resource(name = "scheduleExecutorService")
private ScheduledThreadPoolExecutor scheduleExecutorService;

/**
* 保存系统推送消息(PC端系统设置-系统通知)
* @param model
*/
@Transactional(rollbackFor = BizException.class)
public void saveNotification(SystemNotificationModel model) {
SystemNotifications entity = getEntity(model);
int insert = baseMapper.insert(entity);
if (insert > 0) {
SystemNotificationHolder systemMessageRelease = new SystemNotificationHolder(UUID.nameUUIDFromBytes(entity.toString().getBytes()).toString(), entity.getId());
scheduleExecutorService.schedule(
systemMessageRelease,
DateUtil.betweenMs(DateUtil.date(), entity.getTime()),
TimeUnit.MILLISECONDS);
log.info("系统定时公告创建成功!");
}
}
}

线程池配置

这里我们用了线程池。线程池配置代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package com.lhb.basic.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.*;

/**
* @Program: lhb
* @Description:
* @Author: LHB
* @Version: v0.0.1
* @Time: 2021-11-24 12:46
**/
@Configuration
@Slf4j
public class GlobalExecutorService {

/** cpu核数 */
private final int CPU_COUNT =Runtime.getRuntime().availableProcessors();
/** 核心线程数 */
private final int CORE_POOL_SIZE = CPU_COUNT;
/** 最大线程数 */
private final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
/** 非核心线程存活时间s */
private final int KEEP_ALIVE_TIME = 1;
/** 线程前缀 */
private final String SCHEDULE_THREAD_NAME_PREFIX = "lhb-msg-";
private final String THREAD_NAME_PREFIX = "lhb-thread-";
/** 缓存队列 */
private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();

@Bean("scheduleExecutorService")
public ScheduledThreadPoolExecutor scheduleExecutorService() {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(CORE_POOL_SIZE, customThreadFactory(SCHEDULE_THREAD_NAME_PREFIX));
scheduledThreadPoolExecutor.setKeepAliveTime(KEEP_ALIVE_TIME,TimeUnit.SECONDS);
scheduledThreadPoolExecutor.setMaximumPoolSize(MAXIMUM_POOL_SIZE);
scheduledThreadPoolExecutor.setRejectedExecutionHandler(customRejectedHandler());
scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
return scheduledThreadPoolExecutor;
}

@Bean("threadExecutor")
public ThreadPoolExecutor threadExecutor() {
return new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
workQueue,
customThreadFactory(THREAD_NAME_PREFIX),
(r, executor) -> {
log.warn("线程池已满,任务加入到缓冲队列");
workQueue.offer(r);
});
}


private ThreadFactory customThreadFactory(String threadNamePrefix) {
return r -> {
Thread t = new Thread(r);
t.setName(threadNamePrefix + t.getId());
if (!t.isDaemon()) {
t.setDaemon(true);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
t.setUncaughtExceptionHandler((t1, e) -> log.warn("创建线程异常{}",e));
return t;
};
}

private RejectedExecutionHandler customRejectedHandler() {
return (r, executor) -> {
if (!executor.isShutdown()) {
log.info("任务被拒绝,由主线程开始执行该任务");
r.run();
}
};
}
}

接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.lhb.controller.web;

import com.github.pagehelper.PageInfo;
import com.lhb.basic.annation.CurrentUser;
import com.lhb.basic.annation.IgnoreLogin;
import com.lhb.basic.api.R;
import com.lhb.domain.model.SystemNotificationModel;
import com.lhb.domain.vo.SystemMessageVO;
import com.lhb.service.impl.SystemNotificationsServiceImpl;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;

/**
* @Program: lhb
* @Description: 系统通知控制层
* @Author: LHB
* @Version: v0.0.1
* @Time: 2021-11-25 10:25
**/
@Api(tags = "系统通知")
@IgnoreLogin
@RestController
@RequestMapping("/settings/notifications")
public class SystemNotificationsController {
@Autowired
private SystemNotificationsServiceImpl systemNotificationsService;

/**
* 保存系统公告
* @param model
* @return
*/
@ApiOperation("保存系统公告")
@PostMapping
@CurrentUser // 自定义注解,获取当前登录用户信息
public R saveNotification(@RequestBody @Validated SystemNotificationModel model) {
systemNotificationsService.saveNotification(model);
return R.success();
}
}

测试页面代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
<link href="https://cdn.bootcdn.net/ajax/libs/semantic-ui/2.4.1/semantic.min.css" rel="stylesheet">
<script src="https://cdn.bootcdn.net/ajax/libs/semantic-ui/2.4.1/semantic.min.js"></script>
<script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.min.js"></script>
</head>
<body>
<div class="ui center segment " style="width: 50%; align-content: center; left: 25%;top: 20%">
<form class="ui small form segment">
<h4 class="ui dividing header">系统公告</h4>
<div class="field">
<label>标 题</label>
<div class="one field">
<div class="field">
<input type="text" id="title" name="title" placeholder="公告标题" value="SSSSS">
</div>
</div>
</div>
<div class="field">
<label>内 容</label>
<div class="one field">
<div class="field">
<textarea id="message" name="message" placeholder="公告内容"></textarea>
</div>
</div>
</div>
<div class="field">
<label>发布日期</label>
<div class="one field">
<div class="field">
<input id="time" type="datetime-local" name="time" value="2021-10-10 12:20:20">
</div>
</div>
</div>
<div class="ui primary button" onclick="submit()">保存</div>
</form>
</div>
<script type="text/javascript">
let data = {
title:"",
message:"",
time: new Date()
};

function submit() {
data.title = $("#title").val();
data.message = $("#message").val();
data.time = $("#time").val().replace("T"," ");
console.log(data);
$.ajax({
url:"http://localhost:8080/settings/notifications/all",
data:data,
type: "post",
data: "json",
success: function (data) {
console.log("success",data);
},
error: function (data) {
console.log("error",data);
}
});
}
</script>
</body>
</html>

测试页面

  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!

请我喝杯咖啡吧~

支付宝
微信