DeviceTask.java 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package com.gyjiot.common.config;
  2. import com.gyjiot.common.constant.GyjConstant;
  3. import lombok.Data;
  4. import org.springframework.boot.context.properties.ConfigurationProperties;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.scheduling.annotation.EnableAsync;
  8. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  9. import java.util.concurrent.Executor;
  10. import java.util.concurrent.ThreadPoolExecutor;
  11. /**
  12. * 设备报文处理线程池
  13. * @author shenghua.ji
  14. */
  15. @Configuration
  16. @EnableAsync
  17. @ConfigurationProperties(prefix = "spring.task.execution.pool")
  18. @Data
  19. public class DeviceTask {
  20. private int coreSize;
  21. private int maxSize;
  22. private int queueCapacity;
  23. private int keepAlive;
  24. /*设备状态池*/
  25. @Bean(GyjConstant.TASK.DEVICE_STATUS_TASK)
  26. public Executor deviceStatusTaskExecutor() {
  27. return builder(GyjConstant.TASK.DEVICE_STATUS_TASK);
  28. }
  29. /*平台自动获取线程池(例如定时获取设备信息)*/
  30. @Bean(GyjConstant.TASK.DEVICE_FETCH_PROP_TASK)
  31. public Executor deviceFetchTaskExecutor() {
  32. return builder(GyjConstant.TASK.DEVICE_FETCH_PROP_TASK);
  33. }
  34. /*设备回调信息(下发指令(服务)设备应答信息)*/
  35. @Bean(GyjConstant.TASK.DEVICE_REPLY_MESSAGE_TASK)
  36. public Executor deviceReplyTaskExecutor() {
  37. return builder(GyjConstant.TASK.DEVICE_REPLY_MESSAGE_TASK);
  38. }
  39. /*设备主动上报(设备数据有变化主动上报)*/
  40. @Bean(GyjConstant.TASK.DEVICE_UP_MESSAGE_TASK)
  41. public Executor deviceUpMessageTaskExecutor() {
  42. return builder(GyjConstant.TASK.DEVICE_UP_MESSAGE_TASK);
  43. }
  44. /*指令下发(服务下发)*/
  45. @Bean(GyjConstant.TASK.FUNCTION_INVOKE_TASK)
  46. public Executor functionInvokeTaskExecutor() {
  47. return builder(GyjConstant.TASK.FUNCTION_INVOKE_TASK);
  48. }
  49. /*内部消费线程*/
  50. @Bean(GyjConstant.TASK.MESSAGE_CONSUME_TASK)
  51. public Executor messageConsumeTaskExecutor() {
  52. return builder(GyjConstant.TASK.MESSAGE_CONSUME_TASK);
  53. }
  54. @Bean(GyjConstant.TASK.MESSAGE_CONSUME_TASK_PUB)
  55. public Executor messageConsumePubTaskExecutor(){
  56. return builder(GyjConstant.TASK.MESSAGE_CONSUME_TASK_PUB);
  57. }
  58. @Bean(GyjConstant.TASK.MESSAGE_CONSUME_TASK_FETCH)
  59. public Executor messageConsumeFetchTaskExecutor(){
  60. return builder(GyjConstant.TASK.MESSAGE_CONSUME_TASK_FETCH);
  61. }
  62. @Bean(GyjConstant.TASK.DELAY_UPGRADE_TASK)
  63. public Executor delayedTaskExecutor(){
  64. return builder(GyjConstant.TASK.DELAY_UPGRADE_TASK);
  65. }
  66. /*设备其他消息处理*/
  67. @Bean(GyjConstant.TASK.DEVICE_OTHER_TASK)
  68. public Executor deviceOtherTaskExecutor(){
  69. return builder(GyjConstant.TASK.DEVICE_OTHER_TASK);
  70. }
  71. /*组装线程池*/
  72. private ThreadPoolTaskExecutor builder(String threadNamePrefix){
  73. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  74. executor.setCorePoolSize(coreSize);
  75. executor.setMaxPoolSize(maxSize);
  76. executor.setKeepAliveSeconds(keepAlive);
  77. executor.setQueueCapacity(queueCapacity);
  78. // 线程池对拒绝任务的处理策略
  79. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
  80. //线程池名的前缀
  81. executor.setThreadNamePrefix(threadNamePrefix);
  82. executor.initialize();
  83. return executor;
  84. }
  85. }