分页-跨服务合并&过滤&不排序

方案选择判断

数据量规模 推荐方案
总量 < 500 条 方案一:全量拉取后内存分页
总量 > 500 条 方案二:服务侧分页 + 轮询合并

方案一:全量拉取后内存分页(简单高效)

适用场景

  • 总数不多(比如 < 500 条)

  • 查询频率不高

  • 对实时性要求高

实现流程

并发拉取全量数据

flowchart TD
    A([请求: index, count, 过滤条件]) --> B[并发请求服务 A 全量]
    A --> C[并发请求服务 B 全量]
    B --> D{errA?}
    C --> E{errB?}
    D -->|有误| F([返回错误])
    E -->|有误| F
    D -->|无误| G[合并 allTapes = A + B]
    E -->|无误| G
    G --> H[计算 start/end 边界]
    H --> I[内存分页切片 start:end]
    I --> J([返回 pagedTapes + totalCount])
func (s *Service) QueryTapeInfoList(req QueryTapeRequest) (*QueryTapeResponse, error) {
    // 1. 并发请求 A、B 的全量磁带数据(带过滤条件)
    var wg sync.WaitGroup
    var tapesA, tapesB []TapeInfo
    var errA, errB error
    
    wg.Add(2)
    
    // 请求服务 A
    go func() {
        defer wg.Done()
        tapesA, errA = s.serviceA.GetAllTapes(TapeFilter{
            ExportStatus: req.ExportStatus,  // 下推过滤条件
        })
    }()
    
    // 请求服务 B
    go func() {
        defer wg.Done()
        tapesB, errB = s.serviceB.GetAllTapes(TapeFilter{
            ExportStatus: req.ExportStatus,
        })
    }()
    
    wg.Wait()
    
    if errA != nil {
        return nil, errA
    }
    if errB != nil {
        return nil, errB
    }
    
    // 2. 合并结果(简单 append)
    allTapes := append(tapesA, tapesB...)
    
    // 3. 内存分页
    totalCount := len(allTapes)
    start := req.Index
    end := req.Index + req.Count
    
    if start > int64(totalCount) {
        start = int64(totalCount)
    }
    if end > int64(totalCount) {
        end = int64(totalCount)
    }
    
    pagedTapes := allTapes[start:end]
    
    return &QueryTapeResponse{
        TapeInfos:  pagedTapes,
        TotalCount: int64(totalCount),
    }, nil
}

具体例子

场景: 查询"已导出"的磁带,每页 20 条

第一页请求

{
  "exportStatus": true,
  "index": 0,
  "count": 20
}

后端处理

1. 并发请求:
   - 服务A:GetAllTapes(exportStatus=true)
     返回:150 条已导出磁带
   
   - 服务B:GetAllTapes(exportStatus=true)  
     返回:80 条已导出磁带

2. 合并:150 + 80 = 230 条

3. 内存分页:取 [0:20]

4. 返回:
   {
     "tapeInfos": [
       {tapeLabel: "tape-A-001", exportStatus: true, mediaSet: "MS-01", tapeLibrary: "LIB-1"},
       {tapeLabel: "tape-A-002", exportStatus: true, mediaSet: "MS-01", tapeLibrary: "LIB-1"},
       ...
       {tapeLabel: "tape-B-001", exportStatus: true, mediaSet: "MS-02", tapeLibrary: "LIB-2"}
     ],  // 20条
     "totalCount": 230
   }

第二页请求

{
  "exportStatus": true,
  "index": 20,
  "count": 20
}

后端处理

1. 再次请求 A、B(拉全量)
2. 合并:230 条
3. 内存分页:取 [20:40]
4. 返回 20 条 + totalCount=230

方案二:服务侧分页 + 轮询合并(数据量大时)

适用场景

  • 总数很多(> 1000 条)

  • 全量拉取耗时/占内存

前提条件

服务 A、B 必须各自支持分页接口

// 服务 A 的接口
GetTapes(filter TapeFilter, index int64, count int64) ([]TapeInfo, int64)

// 服务 B 的接口
GetTapes(filter TapeFilter, index int64, count int64) ([]TapeInfo, int64)

实现:轮询采样策略

核心思路

因为不需要排序,可以用平均分配的方式:

  • 需要 20 条,向 A 请求 10 条,向 B 请求 10 条

  • 根据 A、B 的数据量占比动态调整

流程图

flowchart TD
    A([请求: index, count, 过滤条件]) --> B[并发获取 totalA / totalB]
    B --> C[totalCount = totalA + totalB]
    C --> D[按比例计算 countA, countB]
    D --> E[按比例计算 indexA, indexB]
    E --> F[并发向 A 请求 GetTapes]
    E --> G[并发向 B 请求 GetTapes]
    F --> H[合并 result = tapesA + tapesB]
    G --> H
    H --> I([返回 result + totalCount])
func (s *Service) QueryTapeInfoList(req QueryTapeRequest) (*QueryTapeResponse, error) {
    // 1. 先获取 A、B 的总量(用于计算比例)
    var wg sync.WaitGroup
    var totalA, totalB int64
    
    wg.Add(2)
    go func() {
        defer wg.Done()
        totalA = s.serviceA.GetTotalCount(req.ExportStatus)
    }()
    go func() {
        defer wg.Done()
        totalB = s.serviceB.GetTotalCount(req.ExportStatus)
    }()
    wg.Wait()
    
    totalCount := totalA + totalB
    
    // 2. 计算 A、B 各自应该提供多少条
    ratioA := float64(totalA) / float64(totalCount)
    countA := int64(float64(req.Count) * ratioA)
    countB := req.Count - countA
    
    // 3. 计算各自的起始位置
    indexA := int64(float64(req.Index) * ratioA)
    indexB := req.Index - indexA
    
    // 4. 并发请求分页数据
    var tapesA, tapesB []TapeInfo
    wg.Add(2)
    go func() {
        defer wg.Done()
        tapesA, _ = s.serviceA.GetTapes(TapeFilter{
            ExportStatus: req.ExportStatus,
        }, indexA, countA)
    }()
    go func() {
        defer wg.Done()
        tapesB, _ = s.serviceB.GetTapes(TapeFilter{
            ExportStatus: req.ExportStatus,
        }, indexB, countB)
    }()
    wg.Wait()
    
    // 5. 合并结果
    result := append(tapesA, tapesB...)
    
    return &QueryTapeResponse{
        TapeInfos:  result,
        TotalCount: totalCount,
    }, nil
}

具体例子

场景: 查询"已导出"磁带,每页 20 条

数据分布:

  • 服务 A:已导出磁带 150 条

  • 服务 B:已导出磁带 50 条

  • 总计:200 条

第一页请求

{
  "exportStatus": true,
  "index": 0,
  "count": 20
}

后端处理

1. 获取总量:
   - totalA = 150
   - totalB = 50
   - total = 200

2. 计算占比:
   - ratioA = 150/200 = 0.75
   - ratioB = 50/200 = 0.25

3. 分配请求量:
   - countA = 20 * 0.75 = 15 条
   - countB = 20 * 0.25 = 5 条

4. 计算起始位置:
   - indexA = 0 * 0.75 = 0
   - indexB = 0 * 0.25 = 0

5. 并发请求:
   - 向 A 请求:GetTapes(exportStatus=true, index=0, count=15)
     返回 15 条
   
   - 向 B 请求:GetTapes(exportStatus=true, index=0, count=5)
     返回 5 条

6. 合并返回:
   {
     "tapeInfos": [
       ...15 条来自 A,
       ...5 条来自 B
     ],  // 共 20 条
     "totalCount": 200
   }

第二页请求

{
  "exportStatus": true,
  "index": 20,
  "count": 20
}

后端处理

1. 总量不变:totalA=150, totalB=50, total=200

2. 分配:
   - countA = 15 条
   - countB = 5 条

3. 计算起始位置:
   - indexA = 20 * 0.75 = 15  (从 A 的第 15 条开始)
   - indexB = 20 * 0.25 = 5   (从 B 的第 5 条开始)

4. 并发请求:
   - 向 A 请求:GetTapes(exportStatus=true, index=15, count=15)
   - 向 B 请求:GetTapes(exportStatus=true, index=5, count=5)

5. 返回 20 条
最后修改:2026 年 03 月 07 日
如果觉得我的文章对你有用,请随意赞赏