分页-跨服务合并&过滤&不排序
方案选择判断
| 数据量规模 | 推荐方案 |
|---|---|
| 总量 < 500 条 | 方案一:全量拉取后内存分页 |
| 总量 > 500 条 | 方案二:服务侧分页 + 轮询合并 |
方案一:全量拉取后内存分页(简单高效)
适用场景
-
总数不多(比如 < 500 条)
-
查询频率不高
-
对实时性要求高
实现流程
并发拉取全量数据
flowchart TD
A([请求: index, count, 过滤条件]) --> B[并发请求服务 A 全量]
A --> C[并发请求服务 B 全量]
B --> D{errA?}
C --> E{errB?}
D -->|有误| F([返回错误])
E -->|有误| F
D -->|无误| G[合并 allTapes = A + B]
E -->|无误| G
G --> H[计算 start/end 边界]
H --> I[内存分页切片 start:end]
I --> J([返回 pagedTapes + totalCount])
func (s *Service) QueryTapeInfoList(req QueryTapeRequest) (*QueryTapeResponse, error) {
// 1. 并发请求 A、B 的全量磁带数据(带过滤条件)
var wg sync.WaitGroup
var tapesA, tapesB []TapeInfo
var errA, errB error
wg.Add(2)
// 请求服务 A
go func() {
defer wg.Done()
tapesA, errA = s.serviceA.GetAllTapes(TapeFilter{
ExportStatus: req.ExportStatus, // 下推过滤条件
})
}()
// 请求服务 B
go func() {
defer wg.Done()
tapesB, errB = s.serviceB.GetAllTapes(TapeFilter{
ExportStatus: req.ExportStatus,
})
}()
wg.Wait()
if errA != nil {
return nil, errA
}
if errB != nil {
return nil, errB
}
// 2. 合并结果(简单 append)
allTapes := append(tapesA, tapesB...)
// 3. 内存分页
totalCount := len(allTapes)
start := req.Index
end := req.Index + req.Count
if start > int64(totalCount) {
start = int64(totalCount)
}
if end > int64(totalCount) {
end = int64(totalCount)
}
pagedTapes := allTapes[start:end]
return &QueryTapeResponse{
TapeInfos: pagedTapes,
TotalCount: int64(totalCount),
}, nil
}
具体例子
场景: 查询"已导出"的磁带,每页 20 条
第一页请求
{
"exportStatus": true,
"index": 0,
"count": 20
}
后端处理
1. 并发请求:
- 服务A:GetAllTapes(exportStatus=true)
返回:150 条已导出磁带
- 服务B:GetAllTapes(exportStatus=true)
返回:80 条已导出磁带
2. 合并:150 + 80 = 230 条
3. 内存分页:取 [0:20]
4. 返回:
{
"tapeInfos": [
{tapeLabel: "tape-A-001", exportStatus: true, mediaSet: "MS-01", tapeLibrary: "LIB-1"},
{tapeLabel: "tape-A-002", exportStatus: true, mediaSet: "MS-01", tapeLibrary: "LIB-1"},
...
{tapeLabel: "tape-B-001", exportStatus: true, mediaSet: "MS-02", tapeLibrary: "LIB-2"}
], // 20条
"totalCount": 230
}
第二页请求
{
"exportStatus": true,
"index": 20,
"count": 20
}
后端处理
1. 再次请求 A、B(拉全量)
2. 合并:230 条
3. 内存分页:取 [20:40]
4. 返回 20 条 + totalCount=230
方案二:服务侧分页 + 轮询合并(数据量大时)
适用场景
-
总数很多(> 1000 条)
-
全量拉取耗时/占内存
前提条件
服务 A、B 必须各自支持分页接口
// 服务 A 的接口
GetTapes(filter TapeFilter, index int64, count int64) ([]TapeInfo, int64)
// 服务 B 的接口
GetTapes(filter TapeFilter, index int64, count int64) ([]TapeInfo, int64)
实现:轮询采样策略
核心思路
因为不需要排序,可以用平均分配的方式:
-
需要 20 条,向 A 请求 10 条,向 B 请求 10 条
-
根据 A、B 的数据量占比动态调整
流程图
flowchart TD
A([请求: index, count, 过滤条件]) --> B[并发获取 totalA / totalB]
B --> C[totalCount = totalA + totalB]
C --> D[按比例计算 countA, countB]
D --> E[按比例计算 indexA, indexB]
E --> F[并发向 A 请求 GetTapes]
E --> G[并发向 B 请求 GetTapes]
F --> H[合并 result = tapesA + tapesB]
G --> H
H --> I([返回 result + totalCount])
func (s *Service) QueryTapeInfoList(req QueryTapeRequest) (*QueryTapeResponse, error) {
// 1. 先获取 A、B 的总量(用于计算比例)
var wg sync.WaitGroup
var totalA, totalB int64
wg.Add(2)
go func() {
defer wg.Done()
totalA = s.serviceA.GetTotalCount(req.ExportStatus)
}()
go func() {
defer wg.Done()
totalB = s.serviceB.GetTotalCount(req.ExportStatus)
}()
wg.Wait()
totalCount := totalA + totalB
// 2. 计算 A、B 各自应该提供多少条
ratioA := float64(totalA) / float64(totalCount)
countA := int64(float64(req.Count) * ratioA)
countB := req.Count - countA
// 3. 计算各自的起始位置
indexA := int64(float64(req.Index) * ratioA)
indexB := req.Index - indexA
// 4. 并发请求分页数据
var tapesA, tapesB []TapeInfo
wg.Add(2)
go func() {
defer wg.Done()
tapesA, _ = s.serviceA.GetTapes(TapeFilter{
ExportStatus: req.ExportStatus,
}, indexA, countA)
}()
go func() {
defer wg.Done()
tapesB, _ = s.serviceB.GetTapes(TapeFilter{
ExportStatus: req.ExportStatus,
}, indexB, countB)
}()
wg.Wait()
// 5. 合并结果
result := append(tapesA, tapesB...)
return &QueryTapeResponse{
TapeInfos: result,
TotalCount: totalCount,
}, nil
}
具体例子
场景: 查询"已导出"磁带,每页 20 条
数据分布:
-
服务 A:已导出磁带 150 条
-
服务 B:已导出磁带 50 条
-
总计:200 条
第一页请求
{
"exportStatus": true,
"index": 0,
"count": 20
}
后端处理
1. 获取总量:
- totalA = 150
- totalB = 50
- total = 200
2. 计算占比:
- ratioA = 150/200 = 0.75
- ratioB = 50/200 = 0.25
3. 分配请求量:
- countA = 20 * 0.75 = 15 条
- countB = 20 * 0.25 = 5 条
4. 计算起始位置:
- indexA = 0 * 0.75 = 0
- indexB = 0 * 0.25 = 0
5. 并发请求:
- 向 A 请求:GetTapes(exportStatus=true, index=0, count=15)
返回 15 条
- 向 B 请求:GetTapes(exportStatus=true, index=0, count=5)
返回 5 条
6. 合并返回:
{
"tapeInfos": [
...15 条来自 A,
...5 条来自 B
], // 共 20 条
"totalCount": 200
}
第二页请求
{
"exportStatus": true,
"index": 20,
"count": 20
}
后端处理
1. 总量不变:totalA=150, totalB=50, total=200
2. 分配:
- countA = 15 条
- countB = 5 条
3. 计算起始位置:
- indexA = 20 * 0.75 = 15 (从 A 的第 15 条开始)
- indexB = 20 * 0.25 = 5 (从 B 的第 5 条开始)
4. 并发请求:
- 向 A 请求:GetTapes(exportStatus=true, index=15, count=15)
- 向 B 请求:GetTapes(exportStatus=true, index=5, count=5)
5. 返回 20 条