當(dāng)我們有業(yè)務(wù)需要在事務(wù)提交過(guò)后進(jìn)行某一項(xiàng)或者某一系列的業(yè)務(wù)操作時(shí)候我們就可以使用TransactionSynchronizationManager
通過(guò)spring的aop機(jī)制將需要進(jìn)行后置業(yè)務(wù)處理的操作,提交給spring的處理機(jī)制,并且切入到事務(wù)處理的后面
TransactionSynchronizationManager這個(gè)類中由一系列的ThreadLocal ,我們需要關(guān)注的是synchronizations,在后面使用到的TransactionSynchronizationManager.isSynchronizationActive()、TransactionSynchronizationManager.registerSynchronization()和new TransactionSynchronizationAdapter(),都與它密切有關(guān)。
在Spring在開(kāi)啟數(shù)據(jù)庫(kù)事務(wù)(無(wú)論是使用@Transactional注解,還是用xml配置)時(shí),都會(huì)向其中寫(xiě)入一個(gè)實(shí)例,用于自動(dòng)處理Connection的獲取、提交或回滾等操作。

再看isSynchronizationActive()方法,判斷了synchronizations中是否有數(shù)據(jù)(Set<TransactionSynchronization>非null即可,并不要求其中有TransactionSynchronization實(shí)例。

再看registerSynchronization()方法,首先調(diào)用isSynchronizationActive()做一個(gè)校驗(yàn);然后將入?yún)ynchronization添加到synchronizations 中。入?yún)ynchronization中的方法不會(huì)在這里執(zhí)行,而是要等到事務(wù)執(zhí)行到特定階段時(shí)才會(huì)被調(diào)用。

TransactionSynchronizationAdapter是一個(gè)適配器:它實(shí)現(xiàn)了TransactionSynchronization接口,并為每一個(gè)接口方法提供了一個(gè)空的實(shí)現(xiàn)。這類適配器的基本思想是:接口中定義了很多方法,然而業(yè)務(wù)代碼往往只需要實(shí)現(xiàn)其中一小部分。利用這種“空實(shí)現(xiàn)”適配器,我們可以專注于業(yè)務(wù)上需要處理的回調(diào)方法,而不用在業(yè)務(wù)類中放大量而且重復(fù)的空方法。
結(jié)合TransactionSynchronizationManager和TransactionSynchronizationAdapter利用ThreadPoolExecutor實(shí)現(xiàn)一個(gè)事務(wù)后多線程處理功能。
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* 事務(wù)提交異步線程
*
* @author ly
*/
public class TransactionAfterCommitExecutor extends ThreadPoolExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionAfterCommitExecutor.class);
public TransactionAfterCommitExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public TransactionAfterCommitExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public TransactionAfterCommitExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public TransactionAfterCommitExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
private ThreadLocal<List<Runnable>> currentRunables = new ThreadLocal<List<Runnable>>(){
@Override
protected List<Runnable> initialValue() {
return new ArrayList<>(5);
}
};
private ThreadLocal<Boolean> registed = new ThreadLocal<Boolean>(){
@Override
protected Boolean initialValue() {
return false;
}
};
/**
* 默認(rèn)策略丟棄最老的數(shù)據(jù)
*/
public TransactionAfterCommitExecutor() {
this(
50, 500,
500L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024),
new ThreadFactoryBuilder().setNameFormat("transaction-after-commit-call-pool-%d").build(),
new ThreadPoolExecutor.DiscardOldestPolicy());
}
@Override
public void execute(final Runnable runnable) {
//如果事務(wù)同步未啟用則認(rèn)為事務(wù)已經(jīng)提交,馬上進(jìn)行異步處理
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
super.execute(runnable);
} else {
//同一個(gè)事務(wù)的合并到一起處理
currentRunables.get().add(runnable);
//如果存在事務(wù)則在事務(wù)結(jié)束后異步處理
if(!registed.get()){
TransactionSynchronizationManager.registerSynchronization(new AfterCommitTransactionSynchronizationAdapter());
registed.set(true);
}
}
}
@Override
public Future<?> submit(final Runnable runnable) {
//如果事務(wù)同步未啟用則認(rèn)為事務(wù)已經(jīng)提交,馬上進(jìn)行異步處理
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
return super.submit(runnable);
} else {
final RunnableFuture<Void> ftask = newTaskFor(runnable, null);
//如果存在事務(wù)則在事務(wù)結(jié)束后異步處理
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
TransactionAfterCommitExecutor.super.submit(ftask);
}
});
return ftask;
}
}
private class AfterCommitTransactionSynchronizationAdapter extends TransactionSynchronizationAdapter{
@Override
public void afterCompletion(int status) {
final List<Runnable> txRunables = new ArrayList<>(currentRunables.get());
currentRunables.remove();
registed.remove();
if(status == STATUS_COMMITTED){
TransactionAfterCommitExecutor.super.execute(new Runnable() {
@Override
public void run() {
for (Runnable runnable : txRunables) {
try {
runnable.run();
} catch (Exception e) {
LOGGER.error("ex:",e);
}
}
}
});
}
}
}
}
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
@Transactional(readOnly = false,propagation=Propagation.REQUIRED)//開(kāi)事物
public void save(String name,Integer age ,BigDecimal amount){
Zexample1Model zexample1Model = new Zexample1Model();
zexample1Model.setName(name+"_");
zexample1Model.setAge(age);
zexample1Model.setAmount(amount);
zexample1Model.setAddTime(new Date());
zexample1Model.setStatus(1);
zexample1Dao.save(zexample1Model);
System.out.println("id="+zexample1Model.getId());
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
System.out.println("send email after transaction commit...");
}
});
System.out.println("this method complete....");
}
或者用于切面的事務(wù)處理
import java.lang.reflect.Field;
import java.util.Objects;
import com.my.data.multisource.redismanager.RedisBean;
import com.my.data.utils.ThreadLocalUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.RedisConnectionUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.annotation.Resource;
@Aspect
@Component
public class RedisAspect {
private Logger logger = LogManager.getLogger(RedisAspect.class);
/**
* 定義切入點(diǎn),切入點(diǎn)為com.example.aop下的所有函數(shù)
*/
@Pointcut("execution(public * com.my.data.service..*.*(..))")
public void redisPointcut() {
}
@Resource(name = RedisBean.defaultStringRedis)
private StringRedisTemplate redis;
/**
* 前置通知:在連接點(diǎn)之前執(zhí)行的通知
*
* @param joinPoint
* @throws Throwable
*/
@Before("redisPointcut()")
public void doBefore(JoinPoint joinPoint) throws Throwable {
try {
Field field = joinPoint.getTarget().getClass().getDeclaredField("redis");
field.setAccessible(true);
Object targetRedis = field.get(joinPoint.getTarget());
if (!Objects.isNull(targetRedis)) {
this.redis = (StringRedisTemplate) targetRedis;
logger.info("redis : {}", redis.hashCode());
}
}catch (NoSuchFieldException e) {
logger.info("not found redis");
}catch (Exception e) {
logger.error("doAfterReturning error.", e);
}
}
@AfterReturning(returning = "ret", pointcut = "redisPointcut()")
public void doAfterReturning(Object ret) throws Throwable {
try {
if (!Objects.isNull(redis)) {
logger.info("redis : {}", redis.hashCode());
Object bindResource = TransactionSynchronizationManager.getResource(redis.getConnectionFactory());
if(null == bindResource) {
RedisConnectionUtils.unbindConnection(redis.getConnectionFactory());
}
}
} catch (Exception e) {
logger.error("doAfterReturning error.", e);
}
}
}