一、运行环境
以下所有的描述都是基于Activiti的5.20.0.1版本
public interface ProcessEngine extends EngineServices {
/** the version of the activiti library */
public static String VERSION = "5.20.0.1";
/** The name as specified in 'process-engine-name' in
* the activiti.cfg.xml configuration file.
* The default name for a process engine is 'default */
String getName();
void close();
}
二、Activiti不支持分布的原因分析
- 在Activiti工作流的act_ge_property表中通常情况下有3条记录:
1、 next.dbid
2、 schema.history
3、 schema.version
其中next.dbid对应的值为数据库中当前最近一次增长后的最大记录id,每次增长的步长为2500,
1 protected int idBlockSize = 2500; (在ProcessEngineConfiguration类中)
- Activiti中所有的id(如:Task的id,Execution的id,ProcessInstance的id等)都是通过IdGenerator来生成的
/**
* generates {@link IdBlock}s that are used to assign ids to new objects.
*
* The scope of an instance of this class is process engine,
* which means that there is only one instance in one process engine instance.
*
* @author Tom Baeyens
* @author Joram Barrez
*/
public interface IdGenerator {
String getNextId();
}
- IdGenerator的默认实现是
/**
* @author Tom Baeyens
*/
public class DbIdGenerator implements IdGenerator {
protected int idBlockSize;
protected long nextId = 0;
protected long lastId = -1;
protected CommandExecutor commandExecutor;
protected CommandConfig commandConfig;
public synchronized String getNextId() {
if (lastId<nextId) {
getNewBlock();
}
long _nextId = nextId++;
return Long.toString(_nextId);
}
protected synchronized void getNewBlock() {
IdBlock idBlock = commandExecutor.execute(commandConfig, new GetNextIdBlockCmd(idBlockSize));
this.nextId = idBlock.getNextId();
this.lastId = idBlock.getLastId();
}
从上面的代码可以看出,获取下一个id的方法是加锁的,也就是在一台服务器上id的增长是没有问题的,但是如果将Activiti部署在多台服务器上就会有两个问题
1、 从代码的第17,18行可以看出id是本地自增,如果有多台服务器就会出现id相同的情况(由并发写造成的);
2、 获取lastId的方法是操作同一个数据库的,会有问题,代码22中通过执行GetNextIdBlockCmd来获取数据库中的next.dbid的值,如果在多台服务器上由于一台服务器修改后,其他服务器无法知道
/**
* @author Tom Baeyens
*/
public class GetNextIdBlockCmd implements Command<IdBlock> {
private static final long serialVersionUID = 1L;
protected int idBlockSize;
public GetNextIdBlockCmd(int idBlockSize) {
this.idBlockSize = idBlockSize;
}
public IdBlock execute(CommandContext commandContext) {
PropertyEntity property = (PropertyEntity) commandContext
.getPropertyEntityManager()
.findPropertyById("next.dbid");
long oldValue = Long.parseLong(property.getValue());
long newValue = oldValue+idBlockSize;
property.setValue(Long.toString(newValue));
return new IdBlock(oldValue, newValue-1);
}
}
三、解决方案
要想解决Activiti分布式的问题,就需要解决id生成的问题,也就是要自己实现IdGenerator接口,因此要有一个地方来生成一个全局唯一的id才行。
我在实际工作中是通过redis来实现的,redis也可以做集群,因此不需要考虑redis单点的问题,具体方案如下:
/**
* 分布式id生成器
*
* @version 1.0
* @author Pin Xiong
* @date 创建时间:2016年8月12日 下午3:22:09
*/
public class DistributedIdGenerator implements IdGenerator {
public DistributedIdGenerator(RedisService redisService) {
this.redisService = redisService;
}
private RedisService redisService;
@Override
public String getNextId() {
return String.format("%sX%s", D.formatDate(Constants.ACTIVITI_ENGINE_DISTRIBUTED_ID_PREFIX),this.redisService.incrby(Constants.ACTIVITI_ENGINE_DISTRIBUTED_ID_KEY, 1L));
}
}
其中,D.formatDate(Constants.ACTIVITI_ENGINE_DISTRIBUTED_ID_PREFIX)是通过服务器时间来生成id的前缀,
重点是后面的this.redisService.incrby(MainRK.ACTIVITI_ENGINE_DISTRIBUTED_ID_KEY, 1L)
该方法是在redis中获取key (也就是代码中Constants.ACTIVITI_ENGINE_DISTRIBUTED_ID_KEY)对应的值,并自增1
在实际工作中通过该方案可以解决Activiti分布式问题。
如果其他同学有更好的方案,也希望可以一起分享,谢谢!