瀏覽代碼

update:优化gateio订单列表同步逻辑v1

lvzhiqiang 3 周之前
父節點
當前提交
e8d049061e

+ 4 - 6
src/main/java/top/lvzhiqiang/job/CoinGateJob.java

@@ -1,15 +1,13 @@
 package top.lvzhiqiang.job;
 
-import javax.annotation.Resource;
-
-import org.springframework.stereotype.Component;
-
 import com.xxl.job.core.biz.model.ReturnT;
 import com.xxl.job.core.handler.annotation.XxlJob;
-
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
 import top.lvzhiqiang.service.CoinGateService;
 
+import javax.annotation.Resource;
+
 /**
  * coin-gate任务
  *
@@ -33,7 +31,7 @@ public class CoinGateJob {
     @XxlJob("syncSpotOrdersJobHandler")
     public ReturnT<String> syncSpotOrdersJobHandler() {
         try {
-            coinGateService.syncSpotOrders();
+            coinGateService.syncSpotOrders(null, null, null);
             return ReturnT.SUCCESS;
         } catch (Exception e) {
             String errorMsg = e.getMessage() == null ? e.getClass().getName() : e.getMessage();

+ 5 - 1
src/main/java/top/lvzhiqiang/service/CoinGateService.java

@@ -10,6 +10,10 @@ public interface CoinGateService {
 
     /**
      * 同步现货订单列表
+     *
+     * @param currencyPair 交易对
+     * @param from         开始时间 (秒级时间戳,可为 null)
+     * @param to           结束时间 (秒级时间戳,可为 null)
      */
-    void syncSpotOrders();
+    void syncSpotOrders(String currencyPair, Long from, Long to);
 }

+ 102 - 42
src/main/java/top/lvzhiqiang/service/impl/CoinGateServiceImpl.java

@@ -1,44 +1,31 @@
 package top.lvzhiqiang.service.impl;
 
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.xxl.job.core.context.XxlJobHelper;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.commons.codec.binary.Hex;
-import org.jsoup.Connection;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.http.ResponseEntity;
-import org.springframework.stereotype.Service;
 import org.springframework.http.HttpEntity;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpMethod;
-import org.springframework.util.StopWatch;
+import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Service;
 import org.springframework.web.client.RestTemplate;
-
 import top.lvzhiqiang.config.InitRunner;
-import top.lvzhiqiang.entity.CoinBitgetMixFundrate;
 import top.lvzhiqiang.entity.CoinGateSpotOrders;
-import top.lvzhiqiang.mapper.CoinBitgetMapper;
 import top.lvzhiqiang.mapper.CoinGateMapper;
-import top.lvzhiqiang.service.CoinBitgetService;
 import top.lvzhiqiang.service.CoinGateService;
-import top.lvzhiqiang.util.JsoupUtil;
+import top.lvzhiqiang.util.StringUtils;
 
 import javax.annotation.Resource;
 import javax.crypto.Mac;
 import javax.crypto.spec.SecretKeySpec;
-
 import java.math.BigDecimal;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * Coin Gate ServiceImpl
@@ -64,22 +51,33 @@ public class CoinGateServiceImpl implements CoinGateService {
     /**
      * 同步现货订单列表
      *
-     * @return com.xxl.job.core.biz.model.ReturnT<java.lang.String>
-     * @author lvzhiqiang
-     * 2026/3/13 15:33
+     * @param currencyPair 交易对
+     * @param from         开始时间 (秒级时间戳,可为 null)
+     * @param to           结束时间 (秒级时间戳,可为 null)
      */
     @Override
-    public void syncSpotOrders() {
+    public void syncSpotOrders(String currencyPair, Long from, Long to) {
         String apiKey = InitRunner.dicCodeMap.get("gateio_api_key").getCodeValue();
         String apiSecret = InitRunner.dicCodeMap.get("gateio_api_secret").getCodeValue();
         String host = InitRunner.dicCodeMap.get("gateio_host").getCodeValue();
         String prefix = InitRunner.dicCodeMap.get("gateio_prefix").getCodeValue();
 
         String path = "/spot/orders";
-        // 关键:API 要求必须带上状态参数查历史记录
-        String query = "currency_pair=" + "&status=finished";
-        String url = host + prefix + path + "?" + query;
+        // 构建 Query 参数
+        StringBuilder queryBuilder = new StringBuilder("currency_pair=")
+                .append(StringUtils.isEmpty(currencyPair) ? "" : currencyPair)
+                .append("&status=finished")
+                // 建议加上 limit=1000,尽可能单次多拿点数据,减少请求次数
+                .append("&limit=1000");
+        if (from != null) {
+            queryBuilder.append("&from=").append(from);
+        }
+        if (to != null) {
+            queryBuilder.append("&to=").append(to);
+        }
+        String query = queryBuilder.toString();
 
+        String url = host + prefix + path + "?" + query;
         try {
             // 1. 生成鉴权 Headers
             HttpHeaders headers = generateAuthHeaders("GET", prefix + path, query, "", apiKey, apiSecret);
@@ -93,32 +91,54 @@ public class CoinGateServiceImpl implements CoinGateService {
             List<CoinGateSpotOrders> orderList = new ArrayList<>();
 
             for (JsonNode node : rootNode) {
-                CoinGateSpotOrders order = new CoinGateSpotOrders();
-                order.setId(node.get("id").asText());
-                order.setCurrencyPair(node.get("currency_pair").asText());
-                order.setSide(node.get("side").asText());
-                order.setType(node.get("type").asText());
-                order.setAmount(new BigDecimal(node.get("amount").asText()));
-                order.setPrice(new BigDecimal(node.get("price").asText()));
-                order.setFilledAmount(new BigDecimal(node.get("filled_amount").asText()));
-                order.setAvgDealPrice(new BigDecimal(node.get("avg_deal_price").asText()));
-                order.setFilledTotal(new BigDecimal(node.get("filled_total").asText()));
-                order.setFee(new BigDecimal(node.get("fee").asText()));
-                order.setFeeCurrency(node.get("fee_currency").asText());
-                order.setStatus(node.get("status").asText());
-                order.setFinishAs(node.get("finish_as").asText());
-                order.setCreateTimeMs(node.get("create_time_ms").asLong());
-                order.setUpdateTimeMs(node.get("update_time_ms").asLong());
-                orderList.add(order);
+                try {
+                    CoinGateSpotOrders order = new CoinGateSpotOrders();
+
+                    // 使用安全的方法获取字符串
+                    order.setId(getSafeString(node, "id"));
+                    order.setCurrencyPair(getSafeString(node, "currency_pair"));
+                    order.setSide(getSafeString(node, "side"));
+                    order.setType(getSafeString(node, "type"));
+
+                    // 使用安全的方法获取 BigDecimal (防止缺失或为空字符串)
+                    order.setAmount(getBigDecimalSafely(node, "amount"));
+                    order.setPrice(getBigDecimalSafely(node, "price"));
+                    order.setFilledAmount(getBigDecimalSafely(node, "filled_amount"));
+                    order.setAvgDealPrice(getBigDecimalSafely(node, "avg_deal_price"));
+                    order.setFilledTotal(getBigDecimalSafely(node, "filled_total"));
+                    order.setFee(getBigDecimalSafely(node, "fee"));
+
+                    order.setFeeCurrency(getSafeString(node, "fee_currency"));
+                    order.setStatus(getSafeString(node, "status"));
+                    order.setFinishAs(getSafeString(node, "finish_as"));
+
+                    // 时间戳通常一定会返回,但为了安全也可以加上判断
+                    order.setCreateTimeMs(node.hasNonNull("create_time_ms") ? node.get("create_time_ms").asLong() : 0L);
+                    order.setUpdateTimeMs(node.hasNonNull("update_time_ms") ? node.get("update_time_ms").asLong() : 0L);
+
+                    orderList.add(order);
+                } catch (Exception e) {
+                    // 核心:这里集中拦截所有内层抛出的异常(包括 IllegalArgumentException 和可能的 NullPointerException)
+                    // 并且只在这里打印一次完整的源数据
+
+                    // 【开发排查用】输出到本地服务器和日志中心,完美支持占位符 + 异常堆栈自动追加
+                    log.error("解析单条 Gate.io 订单数据失败,订单JSON: {} ", node.toString(), e);
+                    // 【运维监控用】输出到 XXL-JOB 网页控制台
+                    XxlJobHelper.log("解析单条 Gate.io 订单数据失败,订单JSON: {} ", node.toString());
+                    XxlJobHelper.log(e);
+                }
             }
 
             // 4. 批量保存到 MySQL (由于设置了 ID,JPA 会自动执行 Upsert 操作)
             if (!orderList.isEmpty()) {
                 coinGateMapper.batchUpsert(orderList);
-                log.info("成功同步 {} 条 {} 订单记录到数据库。", orderList.size(), "Gate.io");
+                XxlJobHelper.log("成功同步 {} 条 {} 订单记录到数据库。", orderList.size(), "Gate.io");
             }
         } catch (Exception e) {
-            log.error("同步 Gate.io 订单失败: " + e.getMessage(), e);
+            log.error("同步 Gate.io 订单失败", e);
+            XxlJobHelper.log("同步 Gate.io 订单失败");
+            XxlJobHelper.log(e);
+
             // 生产环境建议使用 log.error() 并对接报警系统
             throw new RuntimeException("订单同步异常", e);
         }
@@ -156,4 +176,44 @@ public class CoinGateServiceImpl implements CoinGateService {
 
         return headers;
     }
+
+    /**
+     * 安全提取 String 的辅助方法
+     *
+     * @param node
+     * @param fieldName
+     * @return
+     */
+    private String getSafeString(JsonNode node, String fieldName) {
+        if (node.hasNonNull(fieldName)) {
+            return node.get(fieldName).asText();
+        }
+
+        // 或者返回 null,取决于你的数据库字段是否允许 null
+        return "";
+    }
+
+    /**
+     * 安全提取 BigDecimal 的辅助方法 (架构师强烈推荐)
+     *
+     * @param node
+     * @param fieldName
+     * @return
+     */
+    private BigDecimal getBigDecimalSafely(JsonNode node, String fieldName) {
+        // 1. 正常的业务缺失(比如撤单没有成交价,或者接口就是没返回),直接给默认值 0
+        if (!node.hasNonNull(fieldName) || node.get(fieldName).asText().trim().isEmpty()) {
+            return null;
+        }
+
+        String textValue = node.get(fieldName).asText().trim();
+
+        // 2. 尝试转换
+        try {
+            return new BigDecimal(textValue);
+        } catch (NumberFormatException e) {
+            // 核心:只抛出具体的错误细节,把完整的 node 留给外层去打
+            throw new IllegalArgumentException(String.format("字段 [%s] 的值 [%s] 格式错误,无法转为数字", fieldName, textValue), e);
+        }
+    }
 }

+ 29 - 7
src/test/java/top/lvzhiqiang/TestCoinGate.java

@@ -1,19 +1,19 @@
 package top.lvzhiqiang;
 
-import javax.annotation.Resource;
-
+import lombok.extern.slf4j.Slf4j;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-
-import lombok.extern.slf4j.Slf4j;
 import top.lvzhiqiang.service.CoinGateService;
 
+import javax.annotation.Resource;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Gate.io单元测试类
  *
- * @author lvzhiqiang 
+ * @author lvzhiqiang
  * 2025/12/8 19:31
  */
 @Slf4j
@@ -28,7 +28,29 @@ public class TestCoinGate {
     private CoinGateService coinGateService;
 
     @Test
-    public void testSyncSpotOrders() throws Exception {
-        coinGateService.syncSpotOrders();
+    public void testInitSpotOrders() {
+        System.out.println("开始执行近1年历史数据初始化: ");
+
+        long now = System.currentTimeMillis() / 1000;
+        long oneYearAgo = now - (30 * 24 * 60 * 60); // 1年前的秒级时间戳
+        long step = 7 * 24 * 60 * 60; // 步长:7天的秒数
+
+        // 从1年前开始,每次往后推进7天,直到当前时间
+        for (long start = oneYearAgo; start < now; start += step) {
+            long end = Math.min(start + step, now);
+
+            // 调用底层方法拉取这个 7 天窗口的数据
+            coinGateService.syncSpotOrders(null, start, end);
+
+            // 【极其重要】防封机制:交易所 API 有频率限制,循环调用必须加短暂休眠
+            try {
+                // 休眠 200~500 毫秒,防止瞬间并发把 API 频控打满
+                TimeUnit.MILLISECONDS.sleep(300);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                break;
+            }
+        }
+        System.out.println("历史数据初始化完成: " + null);
     }
 }