李成笔记网

专注域名、站长SEO知识分享与实战技巧

物联网数据对接并发太大,直接拒绝服务

背景:

物联网设备数据推送对接,通过http 接口数据加密签名后推送。平台要求5秒内必须反馈成功或者失败,所以应用接收到数据后,丢给异步线程去处理,直接返回接收成功的状态。奈何设备多数据量大,异步线程池爆满了,队列也满了,直接报错了。

错误信息:

org.springframework.web.util.NestedServletException: Request processing failed;

nested exception is org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@5fac713

a[Running, pool size = 50, active threads = 50, queued tasks = 1000, completed tasks = 254456]]

did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$Lambda$1102/0x00000008007c4840@76f26a19


解决方案:

简单实用原则,在异步线程处理前新增一个缓冲队列,定义一个定时任务,批量处理,做持久化处理。做入库时,按照设备id 分组,采用分批次入库提高效率。

@Service
@Slf4j
public class DataProcessor {
	private final PropertyBufferQueue bufferQueue;

	private final PropertyService propertyService;

	public DataProcessor(PropertyBufferQueue bufferQueue, PropertyService propertyService) {
		this.bufferQueue = bufferQueue;
		this.propertyService = propertyService;
	}

	@Scheduled(cron="${datapush.schedule-processing}")
	@Async("customTaskExecutorQueue")
	public void processData() {
        int c = 0;
		List<JSONObject> dataToProcess = bufferQueue.dequeue(300);
        // 扫描一次最大取三次数据
		while (dataToProcess.size() > 0 && c < 3) {
			log.info("执行一次队列扫描,数据量 {}",dataToProcess.size());
			// 在这里进行数据处理和入库操作...
			propertyService.saveBatch2db(dataToProcess);
			dataToProcess = bufferQueue.dequeue(300);
			c ++;
		}
	}
}
入库时候注意按设备分组后,批次入库提高入库效率。 数据库可以采用时序数据库例如国产TDengine
基于Postgres 的Timescaledb插件,都比较方便,内置的从空间、时间两个维度进行分表操作,
对程序员运维人员透明很友好。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言