问题
项目中遇到对表数据全量迁移,表数据量比较大,基本上是在百万、千万级别。
方案
limit方式
首先最开始的方案就是简单粗暴最直接的Limit查询,这种方案对少量数据是可行的,但是到后面数据量大的时候 再进行limit x,y 这个时候效率会很低,会执行全表扫描,例如
select * from table limit 150000,1000;
- 优点
- 实现逻辑简单
- 缺点
- limit数据量大的时候效率低
索引方式
此方式是直接获取的满足条件的最小和最大ID(主键),然后通过ID区间的批量BETWEEN AND方式来获取数据
- 优点
- 每次查询都是通过索引ID来获取数据,执行效率高
-
缺点
- 在获取min(id)和max(id)会执行全表扫描,比较慢
- id段不连续,会出现不符合条件的空查询
public File getData() {
long min = x;
long max = y;
if (min == 0 || max == 0) {
return null;
}
long size = max - min + 1;
int batchNum = 5000;
int pageNum = size % batchNum == 0 ? (int) (size / batchNum) : (int) (size / batchNum) + 1;
ExecutorService es = Executors.newFixedThreadPool(5);
Future<Boolean>[] result = new Future[pageNum];
for (int offset = 0; offset < pageNum; offset++) {
Future<Boolean> future = es.submit(new DataFuture(min, batchNum, offset));
result[i] = future;
}
for (Future<Boolean> future : result) {
try {
future.get();
} catch (Exception e) {
logger.error("callBack error", e);
}
}
es.shutdown();
}
DataFuture执行分批数据获取
class DataFuture implements Callable<Boolean> {
ResultCallBack<Set<String>> callBack;
int offset;
long min;
int batchNum;
public DataFuture(long min, int batchNum, int offset) {
super();
this.batchNum = batchNum;
this.offset = offset;
this.min = min;
}
@Override
public Boolean call() throws Exception {
long start = offset * batchNum + min;
long end = start + batchNum - 1;
// 数据库执行 select * from table where id BETWEEN start and end
Set<String> querySet = getData(start, end);
return true;
}
}
ResultSet方式
采用流方式获取数据
- 当statement设置以下属性时,采用的是流数据接收方式,每次只从服务器接收部份数据,
直到所有数据处理完毕,不会发生JVM OOM。
setResultSetType(ResultSet.TYPE_FORWARD_ONLY); setFetchSize(Integer.MIN_VALUE);
-
调用statement的enableStreamingResults方法,实际上enableStreamingResults方法内部封装的就是第1种方式。
- 设置连接属性useCursorFetch=true (5.0版驱动开始支持),statement以TYPE_FORWARD_ONLY打开, 再设置fetch size参数,表示采用服务器端游标,每次从服务器取fetch_size条数据。
public void getSubData(int i, final CallBack<SubData> cb) {
StringBuffer sql = new StringBuffer();
sql.append("select * from table");
Connection con = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
con = subJdbcTemplate.getDataSource().getConnection();
ps = con.prepareStatement(String.valueOf(sql), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(Integer.MIN_VALUE);
ps.setFetchDirection(ResultSet.FETCH_REVERSE);
rs = ps.executeQuery();
long batchSize = 1000;
int currLineNum = 0;
List<SubData> result = Lists.newArrayList();
while (rs.next()) {
currLineNum = currLineNum + 1;
int cp = rs.getInt(1);
String appId = rs.getString(2);
String deviceId = rs.getString(3);
String token = rs.getString(4);
String alias = rs.getString(5);
SubData subData = new SubData(appId, cp, appId, deviceId, token, alias);
if (currLineNum % batchSize != 0) {
result.add(subData);
} else {
result.add(subData);
List<SubData> copyList = Lists.newArrayList();
copyList.addAll(result);
result.clear();
cb.callBack(copyList);
}
}
if (CollectionUtils.isNotEmpty(result)) {
List<SubData> copyList = Lists.newArrayList();
copyList.addAll(result);
result.clear();
cb.callBack(copyList);
}
} catch (SQLException e) {
logger.error("getSubData error", e);
} finally {
try {
if (rs != null) {
rs.close();
}
} catch (SQLException e) {
logger.error("getSubData error", e);
}
try {
if (ps != null) {
ps.close();
}
} catch (SQLException e) {
logger.error("getSubData error", e);
}
try {
if (con != null) {
con.close();
}
} catch (SQLException e) {
logger.error("getSubData error", e);
}
}
}
PreparedStatement中
/**
* We only stream result sets when they are forward-only, read-only, and the
* 仅当结果集仅为只读、只读时,才流出结果集。
* fetch size has been set to Integer.MIN_VALUE
* 获取大小已设置为整数。
* @return true if this result set should be streamed row at-a-time, rather
* than read all at once.
* 如果此结果集应按时间顺序流行,则返回true,而不是一次阅读。
*
*/
protected boolean createStreamingResultSet() {
try {
synchronized (checkClosed()) {
return ((this.resultSetType == java.sql.ResultSet.TYPE_FORWARD_ONLY)
&& (this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY) && (this.fetchSize == Integer.MIN_VALUE));
}
} catch (SQLException e) {
// we can't break the interface, having this be no-op in case of error is ok
return false;
}
}
- 优点
- 流式获取数据,只需执行一次请求,效率高,不会发生OOM
- 缺点
- 流式执行失败,需要从头再开始
总结
个人推荐使用ResultSet方式流方式获取数据