Forráskód Böngészése

update:引入定时任务模块v1

zhiqiang.lv 2 hónapja
szülő
commit
90250ad552

+ 7 - 0
pom.xml

@@ -190,6 +190,13 @@
             <artifactId>mp3agic</artifactId>
             <version>0.9.1</version>
         </dependency>
+
+        <!-- 定时任务 -->
+        <dependency>
+            <groupId>com.xuxueli</groupId>
+            <artifactId>xxl-job-core</artifactId>
+            <version>2.3.1</version>
+        </dependency>
     </dependencies>
 
     <build>

+ 18 - 0
src/main/java/top/lvzhiqiang/config/EnableXXLJob.java

@@ -0,0 +1,18 @@
+package top.lvzhiqiang.config;
+
+import org.springframework.context.annotation.Import;
+
+import java.lang.annotation.*;
+
+/**
+ * 启用XXL-JOB定时任务
+ *
+ * @author shiyong
+ * 2022/3/13 12:40
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Import(XXLJobMarkerConfiguration.class)
+public @interface EnableXXLJob {
+}

+ 1 - 1
src/main/java/top/lvzhiqiang/config/MyCoinJobs.java

@@ -58,7 +58,7 @@ public class MyCoinJobs {
         String startTime = String.valueOf(DateUtils.localDateTimeToMilliseconds(LocalDateTime.now().minusDays(1)));
         String endTime = String.valueOf(System.currentTimeMillis());
         String pageSize = "500";
-        coinService2.syncData4Binance(startTime, endTime, pageSize);
+        coinService2.syncData4Binance(startTime, endTime, pageSize, null, true);
     }
 
     /**

+ 49 - 0
src/main/java/top/lvzhiqiang/config/XXLJobAutoConfiguration.java

@@ -0,0 +1,49 @@
+package top.lvzhiqiang.config;
+
+import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * XXL-JOB定时任务自动配置
+ *
+ * @author shiyong
+ * 2022/3/13 12:40
+ */
+@Configuration
+@ConditionalOnBean(XXLJobMarkerConfiguration.Marker.class)
+public class XXLJobAutoConfiguration {
+    /**
+     * 设置XXL-JOB定时任务属性值
+     *
+     * @return com.riskraider.config.job.XXLJobProperties
+     * @author shiyong
+     * 2022/3/13 12:49
+    */
+    @Bean
+    public XXLJobProperties xxlJobProperties() {
+        return new XXLJobProperties();
+    }
+
+    /**
+     * 注册定时任务执行器
+     *
+     * @param xxlJobProperties 定时任务属性值
+     * @return com.xxl.job.core.executor.impl.XxlJobSpringExecutor
+     * @author shiyong
+     * 2022/3/13 12:57
+    */
+    @Bean
+    public XxlJobSpringExecutor xxlJobExecutor(XXLJobProperties xxlJobProperties) {
+        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
+        xxlJobSpringExecutor.setAdminAddresses(xxlJobProperties.getAdmin().getAddresses());
+        xxlJobSpringExecutor.setAppname(xxlJobProperties.getExecutor().getAppName());
+        xxlJobSpringExecutor.setIp(xxlJobProperties.getExecutor().getIp());
+        xxlJobSpringExecutor.setPort(xxlJobProperties.getExecutor().getPort());
+        xxlJobSpringExecutor.setLogPath(xxlJobProperties.getExecutor().getLogPath());
+        xxlJobSpringExecutor.setLogRetentionDays(xxlJobProperties.getExecutor().getLogRetentionDays());
+        xxlJobSpringExecutor.setAccessToken(xxlJobProperties.getAccessToken());
+        return xxlJobSpringExecutor;
+    }
+}

+ 20 - 0
src/main/java/top/lvzhiqiang/config/XXLJobMarkerConfiguration.java

@@ -0,0 +1,20 @@
+package top.lvzhiqiang.config;
+
+import org.springframework.context.annotation.Bean;
+
+/**
+ * XXL-JOB标记配置类
+ *
+ * @author shiyong
+ * 2022/3/13 12:40
+ */
+public class XXLJobMarkerConfiguration {
+    @Bean
+    public Marker xxlJobMarker() {
+        return new Marker();
+    }
+
+    public static class Marker {
+
+    }
+}

+ 110 - 0
src/main/java/top/lvzhiqiang/config/XXLJobProperties.java

@@ -0,0 +1,110 @@
+package top.lvzhiqiang.config;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+
+/**
+ * XXL-JOB定时任务属性值
+ *
+ * @author shiyong
+ * 2022/3/13 12:39
+ */
+@EnableConfigurationProperties(XXLJobProperties.class)
+@ConfigurationProperties(prefix = XXLJobProperties.XXL_JOB_PREFIX)
+public class XXLJobProperties {
+    public static final String XXL_JOB_PREFIX = "xxl.job";
+
+    private String accessToken;
+
+    private Admin admin = new Admin();
+
+    private Executor executor = new Executor();
+
+    public String getAccessToken() {
+        return accessToken;
+    }
+
+    public void setAccessToken(String accessToken) {
+        this.accessToken = accessToken;
+    }
+
+    public Admin getAdmin() {
+        return admin;
+    }
+
+    public void setAdmin(Admin admin) {
+        this.admin = admin;
+    }
+
+    public Executor getExecutor() {
+        return executor;
+    }
+
+    public void setExecutor(Executor executor) {
+        this.executor = executor;
+    }
+
+    public static class Admin {
+        private String addresses;
+
+        public String getAddresses() {
+            return addresses;
+        }
+
+        public void setAddresses(String addresses) {
+            this.addresses = addresses;
+        }
+    }
+
+    public static class Executor {
+        private String appName;
+
+        private String ip;
+
+        private int port;
+
+        private String logPath;
+
+        private int logRetentionDays;
+
+        public String getAppName() {
+            return appName;
+        }
+
+        public void setAppName(String appName) {
+            this.appName = appName;
+        }
+
+        public String getIp() {
+            return ip;
+        }
+
+        public void setIp(String ip) {
+            this.ip = ip;
+        }
+
+        public int getPort() {
+            return port;
+        }
+
+        public void setPort(int port) {
+            this.port = port;
+        }
+
+        public String getLogPath() {
+            return logPath;
+        }
+
+        public void setLogPath(String logPath) {
+            this.logPath = logPath;
+        }
+
+        public int getLogRetentionDays() {
+            return logRetentionDays;
+        }
+
+        public void setLogRetentionDays(int logRetentionDays) {
+            this.logRetentionDays = logRetentionDays;
+        }
+    }
+}

+ 37 - 0
src/main/java/top/lvzhiqiang/job/JavdbJob.java

@@ -0,0 +1,37 @@
+package top.lvzhiqiang.job;
+
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.context.XxlJobHelper;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+/**
+ * javdb更新任务
+ *
+ * @author lvzhiqiang
+ * 2025/10/15 17:58
+ */
+@Component
+@Slf4j
+public class JavdbJob {
+    /**
+     * test任务处理器
+     *
+     * @return com.xxl.job.core.biz.model.ReturnT<java.lang.String>
+     * @author lvzhiqiang
+     * 2025/10/15 17:58
+     */
+    @XxlJob("testJobHandler")
+    public ReturnT<String> testJobHandler() {
+        try {
+            XxlJobHelper.log("1111");
+        } catch (Exception e) {
+            log.error("2222", e);
+            XxlJobHelper.log(e);
+            return ReturnT.FAIL;
+        }
+
+        return ReturnT.SUCCESS;
+    }
+}

+ 2 - 7
src/main/java/top/lvzhiqiang/service/CoinService2.java

@@ -1,11 +1,6 @@
 package top.lvzhiqiang.service;
 
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import me.chanjar.weixin.cp.api.WxCpService;
-import me.chanjar.weixin.cp.bean.article.NewArticle;
-
-import java.util.Map;
+import java.net.Proxy;
 
 /**
  * Coin Service
@@ -16,5 +11,5 @@ import java.util.Map;
 public interface CoinService2 {
 
 
-    void syncData4Binance(String startTime, String endTime, String pageSize);
+    void syncData4Binance(String startTime, String endTime, String pageSize, Proxy proxy, Boolean exchangeInfoFlag);
 }

+ 81 - 76
src/main/java/top/lvzhiqiang/service/impl/CoinService2Impl.java

@@ -40,6 +40,9 @@ public class CoinService2Impl implements CoinService2 {
 
     private static final String secretKey = "Do5YDooH9OFimHXsF6nhi7rTZfxIZWWxaP35zsBEktGvCbogtbzBHpJs5cdVmple";
     private static final String apiKey = "1qY3LgxmNcW1dku8NiL8NWX8KZG4SBevXHtqLFgzWZbAuJ207CUuf0FSNzIAONh5";
+    String baseUrl = "https://fapi.binance.com";
+    String businessUrl = "/fapi/v1/allOrders";
+    String businessUrl2 = "/fapi/v1/userTrades";
 
     @Resource
     private CoinMapper coinMapper;
@@ -55,28 +58,27 @@ public class CoinService2Impl implements CoinService2 {
     }
 
     @Override
-    @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
-    public void syncData4Binance(String startTime, String endTime, String pageSize) {
-        String baseUrl = "https://fapi.binance.com";
-        String businessUrl;
-        String businessUrl2;
+    public void syncData4Binance(String startTime, String endTime, String pageSize, Proxy proxy_, Boolean exchangeInfoFlag) {
         Connection.Response response = null;
-        Proxy proxy = Proxy.NO_PROXY;
+        Proxy proxy = proxy_ != null ? proxy_ : Proxy.NO_PROXY;
         Map<String, String> paramMap = new LinkedHashMap<>();
         Map<String, String> headerMap = new HashMap<>();
 
         // 1同步合约交易对
-        /*try {
-            businessUrl = "/fapi/v1/exchangeInfo";
-            response = JsoupUtil.requestBody(baseUrl + businessUrl, "GET", proxy, headerMap, paramMap);
-            JSONArray symbolList = JSONObject.parseObject(response.body()).getJSONArray("symbols");
-            if (symbolList.size() > 0) {
-                coinMapper.insertCoinBinanceSymbolList(JSONArray.parseArray(symbolList.toJSONString(), CoinBinanceSymbol.class));
-                log.warn("syncData4Binance->exchangeInfo,startTime={},endTime={},size={}", startTime, endTime, symbolList.size());
+        if (exchangeInfoFlag != null && exchangeInfoFlag) {
+            try {
+                businessUrl = "/fapi/v1/exchangeInfo";
+                response = JsoupUtil.requestBody(baseUrl + businessUrl, "GET", proxy, headerMap, paramMap);
+                JSONArray symbolList = JSONObject.parseObject(response.body()).getJSONArray("symbols");
+                if (symbolList.size() > 0) {
+                    coinMapper.insertCoinBinanceSymbolList(JSONArray.parseArray(symbolList.toJSONString(), CoinBinanceSymbol.class));
+                    log.warn("syncData4Binance->exchangeInfo,startTime={},endTime={},size={}", startTime, endTime, symbolList.size());
+                }
+            } catch (Exception e) {
+                log.error("syncData4Binance->exchangeInfo error,response={}", response, e);
             }
-        } catch (Exception e) {
-            log.error("syncData4Binance->exchangeInfo error,response={}", response, e);
-        }*/
+        }
+
         // 2同步订单历史
         // 2.1获取合约交易对
         Map<String, Object> queryParamMap = new HashMap<>();
@@ -91,73 +93,76 @@ public class CoinService2Impl implements CoinService2 {
         // 2.3循环调取查询所有订单(包括历史订单)接口
         headerMap.put("X-MBX-APIKEY", apiKey);
         try {
-            businessUrl = "/fapi/v1/allOrders";
-            businessUrl2 = "/fapi/v1/userTrades";
             for (CoinBinanceSymbol coinBinanceSymbol : coinBinanceSymbolList) {
-                List<CoinBinanceOrderHistory> coinBinanceOrderHistoryListAll = new ArrayList<>();
-                //权重: 5;每12秒调1次
-                Thread.sleep(12000L);
-
-                paramMap.clear();
-                String timestamp = String.valueOf(System.currentTimeMillis());
-                paramMap.put("symbol", coinBinanceSymbol.getSymbol());
-                paramMap.put("startTime", startTime);
-                paramMap.put("endTime", endTime);
-                paramMap.put("timestamp", timestamp);
-                String queryString = paramMap.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).collect(Collectors.joining("&"));
-                String sign = generate("", "", "", queryString, null, secretKey);
-                paramMap.put("signature", sign);
-
-                try {
-                    response = JsoupUtil.requestBody(baseUrl + businessUrl, "GET", proxy, headerMap, paramMap);
-                    JSONArray orderList = JSONArray.parseArray(response.body());
-                    if (orderList != null && orderList.size() > 0) {
-                        List<CoinBinanceOrderHistory> coinBinanceOrderHistoryList = JSONArray.parseArray(orderList.toJSONString(), CoinBinanceOrderHistory.class);
-                        for (CoinBinanceOrderHistory coinBinanceOrderHistory : coinBinanceOrderHistoryList) {
-                            if (orderIdList.contains(coinBinanceOrderHistory.getOrderId())) {
-                                continue;
-                            }
-
-                            paramMap.clear();
-                            timestamp = String.valueOf(System.currentTimeMillis());
-                            paramMap.put("symbol", coinBinanceSymbol.getSymbol());
-                            paramMap.put("orderId", coinBinanceOrderHistory.getOrderId());
-                            paramMap.put("startTime", startTime);
-                            paramMap.put("endTime", endTime);
-                            paramMap.put("timestamp", timestamp);
-                            queryString = paramMap.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).collect(Collectors.joining("&"));
-                            sign = generate("", "", "", queryString, null, secretKey);
-                            paramMap.put("signature", sign);
-                            response = JsoupUtil.requestBody(baseUrl + businessUrl2, "GET", proxy, headerMap, paramMap);
-                            JSONArray userTradeList = JSONArray.parseArray(response.body());
-                            BigDecimal commission = BigDecimal.ZERO;
-                            BigDecimal realizedPnl = BigDecimal.ZERO;
-                            if (userTradeList != null && userTradeList.size() > 0) {
-                                for (int i = 0; i < userTradeList.size(); i++) {
-                                    JSONObject userTradeJO = userTradeList.getJSONObject(i);
-                                    commission = commission.add(new BigDecimal(userTradeJO.getString("commission")));
-                                    realizedPnl = realizedPnl.add(new BigDecimal(userTradeJO.getString("realizedPnl")));
-                                }
-                                coinBinanceOrderHistory.setCommission(commission.setScale(4, RoundingMode.HALF_UP));
-                                coinBinanceOrderHistory.setRealizedPnl(realizedPnl.setScale(4, RoundingMode.HALF_UP));
-                            }
-
-                            coinBinanceOrderHistoryListAll.add(coinBinanceOrderHistory);
-                            System.out.println(coinBinanceOrderHistory);
-                        }
+                syncData4BinanceSub(paramMap, startTime, endTime, proxy_, headerMap, orderIdList, coinBinanceSymbol);
+            }
+        } catch (Exception e) {
+            log.error("syncData4Binance->allOrders error,response={}", response, e);
+        }
+    }
+
+    @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
+    public void syncData4BinanceSub(Map<String, String> paramMap, String startTime, String endTime, Proxy proxy, Map<String, String> headerMap, List<String> orderIdList, CoinBinanceSymbol coinBinanceSymbol) throws Exception {
+        List<CoinBinanceOrderHistory> coinBinanceOrderHistoryListAll = new ArrayList<>();
+        //权重: 5;每秒调8次
+        Thread.sleep(500L);
+
+        paramMap.clear();
+        String timestamp = String.valueOf(System.currentTimeMillis());
+        paramMap.put("symbol", coinBinanceSymbol.getSymbol());
+        paramMap.put("startTime", startTime);
+        paramMap.put("endTime", endTime);
+        paramMap.put("timestamp", timestamp);
+        String queryString = paramMap.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).collect(Collectors.joining("&"));
+        String sign = generate("", "", "", queryString, null, secretKey);
+        paramMap.put("signature", sign);
+
+        Connection.Response response = null;
+        try {
+            response = JsoupUtil.requestBody(baseUrl + businessUrl, "GET", proxy, headerMap, paramMap);
+            JSONArray orderList = JSONArray.parseArray(response.body());
+            if (orderList != null && orderList.size() > 0) {
+                List<CoinBinanceOrderHistory> coinBinanceOrderHistoryList = JSONArray.parseArray(orderList.toJSONString(), CoinBinanceOrderHistory.class);
+                for (CoinBinanceOrderHistory coinBinanceOrderHistory : coinBinanceOrderHistoryList) {
+                    if (orderIdList.contains(coinBinanceOrderHistory.getOrderId())) {
+                        continue;
                     }
 
-                    if (coinBinanceOrderHistoryListAll.size() > 0) {
-                        coinMapper.insertOrUpdateBinanceOrderHistoryList(coinBinanceOrderHistoryListAll);
-                        log.warn("syncData4Binance->allOrders sub,startTime={},endTime={},size={}", startTime, endTime, coinBinanceOrderHistoryListAll.size());
+                    paramMap.clear();
+                    timestamp = String.valueOf(System.currentTimeMillis());
+                    paramMap.put("symbol", coinBinanceSymbol.getSymbol());
+                    paramMap.put("orderId", coinBinanceOrderHistory.getOrderId());
+                    paramMap.put("startTime", startTime);
+                    paramMap.put("endTime", endTime);
+                    paramMap.put("timestamp", timestamp);
+                    queryString = paramMap.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).collect(Collectors.joining("&"));
+                    sign = generate("", "", "", queryString, null, secretKey);
+                    paramMap.put("signature", sign);
+                    response = JsoupUtil.requestBody(baseUrl + businessUrl2, "GET", proxy, headerMap, paramMap);
+                    JSONArray userTradeList = JSONArray.parseArray(response.body());
+                    BigDecimal commission = BigDecimal.ZERO;
+                    BigDecimal realizedPnl = BigDecimal.ZERO;
+                    if (userTradeList != null && userTradeList.size() > 0) {
+                        for (int i = 0; i < userTradeList.size(); i++) {
+                            JSONObject userTradeJO = userTradeList.getJSONObject(i);
+                            commission = commission.add(new BigDecimal(userTradeJO.getString("commission")));
+                            realizedPnl = realizedPnl.add(new BigDecimal(userTradeJO.getString("realizedPnl")));
+                        }
+                        coinBinanceOrderHistory.setCommission(commission.setScale(4, RoundingMode.HALF_UP));
+                        coinBinanceOrderHistory.setRealizedPnl(realizedPnl.setScale(4, RoundingMode.HALF_UP));
                     }
-                } catch (Exception ee) {
-                    log.error("syncData4Binance->allOrders sub error,response={}", response, ee);
+
+                    coinBinanceOrderHistoryListAll.add(coinBinanceOrderHistory);
+                    System.out.println(coinBinanceOrderHistory);
                 }
             }
 
-        } catch (Exception e) {
-            log.error("syncData4Binance->allOrders error,response={}", response, e);
+            if (coinBinanceOrderHistoryListAll.size() > 0) {
+                coinMapper.insertOrUpdateBinanceOrderHistoryList(coinBinanceOrderHistoryListAll);
+                log.warn("syncData4Binance->allOrders sub,startTime={},endTime={},size={}", startTime, endTime, coinBinanceOrderHistoryListAll.size());
+            }
+        } catch (Exception ee) {
+            log.error("syncData4Binance->allOrders sub error,response={}", response, ee);
         }
     }
 

+ 21 - 1
src/main/resources/application.yml

@@ -47,4 +47,24 @@ work:
     enabled: true
     corp-id: ww95a4adba56acb55f
     agent-id: 1000002
-    secret: tqWepGSe91U2Cc2SDf2EGt6M2KaEy2PbvdEauGWywxs
+    secret: tqWepGSe91U2Cc2SDf2EGt6M2KaEy2PbvdEauGWywxs
+
+# xxl-job定时任务配置信息
+xxl:
+  job:
+    admin:
+      # 调度中心部署跟地址:如调度中心集群部署存在多个地址则用逗号分隔。
+      # 执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调"。
+      addresses: http://127.0.0.1:8500/job
+    # 分别配置执行器的名称、ip地址、端口号
+    # 注意:如果配置多个执行器时,防止端口冲突
+    executor:
+      appname: javExecutor
+      ip:
+      port: 8501
+      # 执行器运行日志文件存储的磁盘位置,需要对该路径拥有读写权限
+      logpath: logs/job
+      # 执行器Log文件定期清理功能,指定日志保存天数,日志文件过期自动删除。
+      # 限制至少保持3天,否则功能不生效;-1表示永不删除
+      logretentiondays: 7
+    accessToken: xxxxx

+ 5 - 2
src/test/java/top/lvzhiqiang/TestCoin.java

@@ -60,11 +60,14 @@ public class TestCoin {
     @Test
     public void testSyncData4Binance() {
         LocalDateTime now = LocalDateTime.now();
-        for (int i = 0; i < 100; i++) {
+
+        Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("127.0.0.1", 7897));
+
+        for (int i = 0; i < 15; i++) {
             String startTime = String.valueOf(DateUtils.localDateTimeToMilliseconds(now.minusDays(6 + (6 * i))));
             String endTime = String.valueOf(DateUtils.localDateTimeToMilliseconds(now.minusDays((6 * i))));
             String pageSize = "100";
-            coinService2.syncData4Binance(startTime, endTime, pageSize);
+            coinService2.syncData4Binance(startTime, endTime, pageSize, proxy, null);
         }
     }