背景介紹
1,最近有一個大數(shù)據(jù)量插入的操作入庫的業(yè)務場景,需要先做一些其他修改操作,然后在執(zhí)行插入操作,由于插入數(shù)據(jù)可能會很多,用到多線程去拆分數(shù)據(jù)并行處理來提高響應時間,如果有一個線程執(zhí)行失敗,則全部回滾。
2,在spring中可以使用@Transactional
注解去控制事務,使出現(xiàn)異常時會進行回滾,在多線程中,這個注解則不會生效,如果主線程需要先執(zhí)行一些修改數(shù)據(jù)庫的操作,當子線程在進行處理出現(xiàn)異常時,主線程修改的數(shù)據(jù)則不會回滾,導致數(shù)據(jù)錯誤。
3,下面用一個簡單示例演示多線程事務。
基于 Spring Boot + MyBatis Plus + Vue & Element 實現(xiàn)的后臺管理系統(tǒng) + 用戶小程序,支持 RBAC 動態(tài)權限、多租戶、數(shù)據(jù)權限、工作流、三方登錄、支付、短信、商城等功能
-
項目地址:https://github.com/YunaiV/ruoyi-vue-pro
-
視頻教程:https://doc.iocoder.cn/video/
公用的類和方法
/**
*平均拆分list方法.
*@paramsource
*@paramn
*@param
*@return
*/
publicstaticList>averageAssign(Listsource,intn){
List>result=newArrayList>();
intremaider=source.size()%n;
intnumber=source.size()/n;
intoffset=0;//偏移量
for(inti=0;ivalue=null;
if(remaider>0){
value=source.subList(i*number+offset,(i+1)*number+offset+1);
remaider--;
offset++;
}else{
value=source.subList(i*number+offset,(i+1)*number+offset);
}
result.add(value);
}
returnresult;
}
/**線程池配置
*@versionV1.0
*/
publicclassExecutorConfig{
privatestaticintmaxPoolSize=Runtime.getRuntime().availableProcessors();
privatevolatilestaticExecutorServiceexecutorService;
publicstaticExecutorServicegetThreadPool(){
if(executorService==null){
synchronized(ExecutorConfig.class){
if(executorService==null){
executorService=newThreadPool();
}
}
}
returnexecutorService;
}
privatestaticExecutorServicenewThreadPool(){
intqueueSize=500;
intcorePool=Math.min(5,maxPoolSize);
returnnewThreadPoolExecutor(corePool,maxPoolSize,10000L,TimeUnit.MILLISECONDS,
newLinkedBlockingQueue<>(queueSize),newThreadPoolExecutor.AbortPolicy());
}
privateExecutorConfig(){}
}
/**獲取sqlSession
*@author86182
*@versionV1.0
*/
@Component
publicclassSqlContext{
@Resource
privateSqlSessionTemplatesqlSessionTemplate;
publicSqlSessiongetSqlSession(){
SqlSessionFactorysqlSessionFactory=sqlSessionTemplate.getSqlSessionFactory();
returnsqlSessionFactory.openSession();
}
}
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現(xiàn)的后臺管理系統(tǒng) + 用戶小程序,支持 RBAC 動態(tài)權限、多租戶、數(shù)據(jù)權限、工作流、三方登錄、支付、短信、商城等功能
-
項目地址:https://github.com/YunaiV/yudao-cloud
-
視頻教程:https://doc.iocoder.cn/video/
示例事務不成功操作
/**
*測試多線程事務.
*@paramemployeeDOList
*/
@Override
@Transactional
publicvoidsaveThread(ListemployeeDOList){
try{
//先做刪除操作,如果子線程出現(xiàn)異常,此操作不會回滾
this.getBaseMapper().delete(null);
//獲取線程池
ExecutorServiceservice=ExecutorConfig.getThreadPool();
//拆分數(shù)據(jù),拆分5份
List>lists=averageAssign(employeeDOList,5);
//執(zhí)行的線程
Thread[]threadArray=newThread[lists.size()];
//監(jiān)控子線程執(zhí)行完畢,再執(zhí)行主線程,要不然會導致主線程關閉,子線程也會隨著關閉
CountDownLatchcountDownLatch=newCountDownLatch(lists.size());
AtomicBooleanatomicBoolean=newAtomicBoolean(true);
for(inti=0;iif(i==lists.size()-1){
atomicBoolean.set(false);
}
Listlist=lists.get(i);
threadArray[i]=newThread(()->{
try{
//最后一個線程拋出異常
if(!atomicBoolean.get()){
thrownewServiceException("001","出現(xiàn)異常");
}
//批量添加,mybatisPlus中自帶的batch方法
this.saveBatch(list);
}finally{
countDownLatch.countDown();
}
});
}
for(inti=0;i//當子線程執(zhí)行完畢時,主線程再往下執(zhí)行
countDownLatch.await();
System.out.println("添加完畢");
}catch(Exceptione){
log.info("error",e);
thrownewServiceException("002","出現(xiàn)異常");
}finally{
connection.close();
}
}
數(shù)據(jù)庫中存在一條數(shù)據(jù):
//測試用例
@RunWith(SpringRunner.class)
@SpringBootTest(classes={ThreadTest01.class,MainApplication.class})
publicclassThreadTest01{
@Resource
privateEmployeeBOemployeeBO;
/**
*測試多線程事務.
*@throwsInterruptedException
*/
@Test
publicvoidMoreThreadTest2()throwsInterruptedException{
intsize=10;
ListemployeeDOList=newArrayList<>(size);
for(inti=0;inewEmployeeDO();
employeeDO.setEmployeeName("lol"+i);
employeeDO.setAge(18);
employeeDO.setGender(1);
employeeDO.setIdNumber(i+"XX");
employeeDO.setCreatTime(Calendar.getInstance().getTime());
employeeDOList.add(employeeDO);
}
try{
employeeBO.saveThread(employeeDOList);
System.out.println("添加成功");
}catch(Exceptione){
e.printStackTrace();
}
}
}
測試結果:
可以發(fā)現(xiàn)子線程組執(zhí)行時,有一個線程執(zhí)行失敗,其他線程也會拋出異常,但是主線程中執(zhí)行的刪除操作,沒有回滾,@Transactional
注解沒有生效。
使用sqlSession
控制手動提交事務
@Resource
SqlContextsqlContext;
/**
*測試多線程事務.
*@paramemployeeDOList
*/
@Override
publicvoidsaveThread(ListemployeeDOList)throwsSQLException{
//獲取數(shù)據(jù)庫連接,獲取會話(內(nèi)部自有事務)
SqlSessionsqlSession=sqlContext.getSqlSession();
Connectionconnection=sqlSession.getConnection();
try{
//設置手動提交
connection.setAutoCommit(false);
//獲取mapper
EmployeeMapperemployeeMapper=sqlSession.getMapper(EmployeeMapper.class);
//先做刪除操作
employeeMapper.delete(null);
//獲取執(zhí)行器
ExecutorServiceservice=ExecutorConfig.getThreadPool();
List>callableList=newArrayList<>();
//拆分list
List>lists=averageAssign(employeeDOList,5);
AtomicBooleanatomicBoolean=newAtomicBoolean(true);
for(inti=0;iif(i==lists.size()-1){
atomicBoolean.set(false);
}
Listlist=lists.get(i);
//使用返回結果的callable去執(zhí)行,
Callablecallable=()->{
//讓最后一個線程拋出異常
if(!atomicBoolean.get()){
thrownewServiceException("001","出現(xiàn)異常");
}
returnemployeeMapper.saveBatch(list);
};
callableList.add(callable);
}
//執(zhí)行子線程
List>futures=service.invokeAll(callableList);
for(Futurefuture:futures){
//如果有一個執(zhí)行不成功,則全部回滾
if(future.get()<=0){
connection.rollback();
return;
}
}
connection.commit();
System.out.println("添加完畢");
}catch(Exceptione){
connection.rollback();
log.info("error",e);
thrownewServiceException("002","出現(xiàn)異常");
}finally{
connection.close();
}
}
//sql
"saveBatch"parameterType="List">
INSERTINTO
employee(employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)
values
="list"item="item"index="index"separator=",">
(
#{item.employeeId},
#{item.age},
#{item.employeeName},
#{item.birthDate},
#{item.gender},
#{item.idNumber},
#{item.creatTime},
#{item.updateTime},
#{item.status}
)
數(shù)據(jù)庫中一條數(shù)據(jù):
測試結果:拋出異常,
刪除操作的數(shù)據(jù)回滾了,數(shù)據(jù)庫中的數(shù)據(jù)依舊存在,說明事務成功了。
成功操作示例:
@Resource
SqlContextsqlContext;
/**
*測試多線程事務.
*@paramemployeeDOList
*/
@Override
publicvoidsaveThread(ListemployeeDOList)throwsSQLException{
//獲取數(shù)據(jù)庫連接,獲取會話(內(nèi)部自有事務)
SqlSessionsqlSession=sqlContext.getSqlSession();
Connectionconnection=sqlSession.getConnection();
try{
//設置手動提交
connection.setAutoCommit(false);
EmployeeMapperemployeeMapper=sqlSession.getMapper(EmployeeMapper.class);
//先做刪除操作
employeeMapper.delete(null);
ExecutorServiceservice=ExecutorConfig.getThreadPool();
List>callableList=newArrayList<>();
List>lists=averageAssign(employeeDOList,5);
for(inti=0;ilist=lists.get(i);
Callablecallable=()->employeeMapper.saveBatch(list);
callableList.add(callable);
}
//執(zhí)行子線程
List>futures=service.invokeAll(callableList);
for(Futurefuture:futures){
if(future.get()<=0){
connection.rollback();
return;
}
}
connection.commit();
System.out.println("添加完畢");
}catch(Exceptione){
connection.rollback();
log.info("error",e);
thrownewServiceException("002","出現(xiàn)異常");
//thrownewServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);
}
}
測試結果:
數(shù)據(jù)庫中數(shù)據(jù):
刪除的刪除了,添加的添加成功了,測試成功。
審核編輯 :李倩
評論