背景:
物联网设备数据推送对接,通过http 接口数据加密签名后推送。平台要求5秒内必须反馈成功或者失败,所以应用接收到数据后,丢给异步线程去处理,直接返回接收成功的状态。奈何设备多数据量大,异步线程池爆满了,队列也满了,直接报错了。
错误信息:
org.springframework.web.util.NestedServletException: Request processing failed;
nested exception is org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@5fac713
a[Running, pool size = 50, active threads = 50, queued tasks = 1000, completed tasks = 254456]]
did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$Lambda$1102/0x00000008007c4840@76f26a19
解决方案:
简单实用原则,在异步线程处理前新增一个缓冲队列,定义一个定时任务,批量处理,做持久化处理。做入库时,按照设备id 分组,采用分批次入库提高效率。
@Service
@Slf4j
public class DataProcessor {
private final PropertyBufferQueue bufferQueue;
private final PropertyService propertyService;
public DataProcessor(PropertyBufferQueue bufferQueue, PropertyService propertyService) {
this.bufferQueue = bufferQueue;
this.propertyService = propertyService;
}
@Scheduled(cron="${datapush.schedule-processing}")
@Async("customTaskExecutorQueue")
public void processData() {
int c = 0;
List<JSONObject> dataToProcess = bufferQueue.dequeue(300);
// 扫描一次最大取三次数据
while (dataToProcess.size() > 0 && c < 3) {
log.info("执行一次队列扫描,数据量 {}",dataToProcess.size());
// 在这里进行数据处理和入库操作...
propertyService.saveBatch2db(dataToProcess);
dataToProcess = bufferQueue.dequeue(300);
c ++;
}
}
}
入库时候注意按设备分组后,批次入库提高入库效率。 数据库可以采用时序数据库例如国产TDengine
基于Postgres 的Timescaledb插件,都比较方便,内置的从空间、时间两个维度进行分表操作,
对程序员运维人员透明很友好。