目前我们处理消息的同步,一般是落地到DB后,再同过异步的方式做数据的聚合和处理。至于DB的操作为了简单直接用了Hibernate提供的一套JPA接口,(老实说真的是不喜欢JPA,一是sql log不好分析无法优化,二是必须非常了解JPA的所有关键字含义,不然就要出问题,所以我一直喜欢用mybatis这种更轻量的甚至spring-jdbc)。
那么使用JPA的过程就遇到了一些问题,见招拆招一件一件来。
问题1 遇到的第一个问题就非常的要命,我们的系统是一张单表需要支持multi-tenant多租户,简单说就是表中有个tenantId的字段来区分租户,这是比较常见的设计。那么在对DB做操作的时候,ORM框架应该提供分租户的CURD接口,而不需要开发人员都自己在where
中加tenantId=***
。
解决 这个问题其实没有解决,因为Hibernate还没有实现单表的Multi-tenant(真是相当的坑)。官网文档 中说了这样三种情况
SCHEMA Correlates to the separate schema approach. It is an error to attempt to open a session without a tenant identifier using this strategy. Additionally, a MultiTenantConnectionProvider must be specified.
DATABASE Correlates to the separate database approach. It is an error to attempt to open a session without a tenant identifier using this strategy. Additionally, a MultiTenantConnectionProvider must be specified.
DISCRIMINATOR Correlates to the partitioned (discriminator) approach. It is an error to attempt to open a session without a tenant identifier using this strategy. This strategy is not yet implemented in Hibernate as of 4.0 and 4.1. Its support is planned for 5.0.
可以看到最后一种还不支持呢。没办法只有手写where啦。
问题2 由于是处理消息,即使收到DELETE的message也不能真的删除,因为消息是乱序 的,如果先来了DELETE再来UPDATE怎么办,实际上是先UPDATE再DELETE,但由于处理效率不一致,所以收到的消息顺序也是无法确定的。基于这点,为了保证数据的最终一致性,所以操作都作为UPDATE处理。删除操作必须是soft delete
解决 可以写一个BaseEntity,都有isactive这个字段,默认都是true
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @MappedSuperclass public class BaseEntity { @Column (name="isactive" , columnDefinition="boolean DEFAULT true" ) private Boolean active = true ; public Boolean getActive () { return active; } public void setActive (Boolean active) { this .active = active; } }
然后继承一下
1 2 3 4 5 @Entity @Inheritance (strategy = InheritanceType.JOINED)@Table (name = "Product" )@Where (clause="isactive = 1" )public class ProductEntity extends BaseEntity {}
注意@Where
就是所有操作都会拼上这个condition从而实现soft delete。
问题3 在处理类似外键关联这种数据的时候,例如Product上有个CategoryId字段,那么数据库设计是一张Category表,一张Product表,Product表上CategoryId字段作为外键关联到Category表的ID字段。那么作为一个JPA的Entity,大家知道Entity是OO的,Product Entity下应该包含一个Category Entity,关系是oneToMany的。
1 2 3 4 5 6 7 public class ProductEntity extends BaseEntity { @ManyToOne (fetch = FetchType.EAGER) @JoinColumn (name = "categoryId" ) private CategoryEntity category; }
(这里要插一句,其实如果只是把Category当普通字段,存一个CategoryId也是没有问题的,但是在查询的时候就需要用这个Product.CategoryId再去Category里查一次。用了JPA之后,为了减少一次查询,有时候事情反而会复杂)。
至于消息,比如先收到Product的CREATE事件,这时候会拿消息体里的categoryId去category表查一下有没有这个Category Entity,如果有直接得到后塞到Product的Category属性上去,但是如果没有这个Category怎么办?
解决 如果没有的话,按照JPA的外键关联原则,我们需要建立一个虚拟的Category,也就是说插入一条占位数据到Category表中,只有ID有值。所以对ProductEntity做些改造。
1 2 3 4 5 6 7 8 public class ProductEntity extends BaseEntity { @ManyToOne (cascade = {CascadeType.PERSIST}, fetch = FetchType.EAGER) @NotFound (action= NotFoundAction.IGNORE) @JoinColumn (name = "categoryId" ) private CategoryEntity category; }
注意加了两点,一是cascade = {CascadeType.PERSIST}
,意思是如果Persist了Product的话,发现categoryId不为空而category表中又没有该Category,那么级联插入这条数据(只有ID)。二是@NotFound(action= NotFoundAction.IGNORE)
,加这条是防止当收到一个Category.DELETE事件后软删除了Category,而读取Product的时候就会Eager地获得Category,一旦获取不到JPA会抛出EntityNotExist
的异常。加了这个注解,Product里的category就为null,不会出异常。
问题4 这实际上是问题3的衍生,解决3的时候我们使用了Cascade=PERSIST
,那么在发现Category不存在的时候,JPA会发起一个insert,当然数据只有ID,其他的字段等待真正的Category的CREATE事件来了再填充。但是并发的问题就出现了,如果正好就在发起insert之前,Category的CREATE事件来了(另一个Worker在处理),那里也发现表里没有这个Category,所以也随即发起一个insert操作。conflict就这样发生了,主键冲突!这时候怎么办?
解决 我采取了一种比较粗暴的方式,就是retry,首先每次收到事件后的写操作,都是查Entity是否存在,存在就Update,不存在就Insert。当两个Worker同时做写入操作,肯定一个成功一个失败,失败的只要retry一次就会发现Entity有了(另一个Worker写入的),这时候变成Update操作就不会有conflict。
因为项目中依赖Spring,所以恰好有了spring-retry这个包,直接用起来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class RetryTemplateBuilder { protected RetryTemplate buildable; protected RetryTemplateBuilder builder; public RetryTemplateBuilder () { buildable = createBuildable(); builder = getBuilder(); } public static RetryTemplateBuilder retryTemplate () { return new RetryTemplateBuilder(); } public RetryTemplateBuilder withPolicies (RetryPolicy... policies) { CompositeRetryPolicy compositePolicy = new CompositeRetryPolicy(); compositePolicy.setPolicies(policies); buildable.setRetryPolicy(compositePolicy); return this ; } public RetryTemplateBuilder withPolicies (RetryPolicy retryPolicy, BackOffPolicy backOffPolicy) { buildable.setRetryPolicy(retryPolicy); buildable.setBackOffPolicy(backOffPolicy); return this ; } public RetryTemplateBuilder withPolicies (BackOffPolicy backOffPolicy) { buildable.setBackOffPolicy(backOffPolicy); return this ; } public RetryTemplate build () { return buildable; } protected RetryTemplate createBuildable () { return new RetryTemplate(); } protected RetryTemplateBuilder getBuilder () { return this ; } }
这是一个TemplateBuilder,可以理解成retry的模板,一个retryTemplate可以包含多个policy。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class SimpleRetryPolicyBuilder { protected SimpleRetryPolicy buildable; protected SimpleRetryPolicyBuilder builder; public SimpleRetryPolicyBuilder () { buildable = createBuildable(); builder = getBuilder(); } public static SimpleRetryPolicyBuilder simpleRetryPolicy () { return new SimpleRetryPolicyBuilder(); } public static SimpleRetryPolicy simpleRetryPolicyWithRetryableExceptions (int maxAttempts, Map<Class<? extends Throwable>, Boolean> exception) { return new SimpleRetryPolicy(maxAttempts, exception); } public SimpleRetryPolicyBuilder withMaxAttempts (int maxAttempts) { buildable.setMaxAttempts(maxAttempts); return this ; } public SimpleRetryPolicy build () { return buildable; } protected SimpleRetryPolicy createBuildable () { return new SimpleRetryPolicy(); } protected SimpleRetryPolicyBuilder getBuilder () { return this ; } }
比如这种Policy,就是可以定义需要重试几次,在哪些异常发生的时候重试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class FixedBackOffPolicyBuilder { protected FixedBackOffPolicy buildable; protected FixedBackOffPolicyBuilder builder; private FixedBackOffPolicyBuilder () { buildable = createBuildable(); builder = getBuilder(); } public static FixedBackOffPolicyBuilder fixedBackOffPolicy () { return new FixedBackOffPolicyBuilder(); } public FixedBackOffPolicyBuilder withDelay (long delay) { buildable.setBackOffPeriod(delay); return this ; } public FixedBackOffPolicy build () { return buildable; } protected FixedBackOffPolicy createBuildable () { return new FixedBackOffPolicy(); } protected FixedBackOffPolicyBuilder getBuilder () { return this ; } }
还有这种可以定义retry的间隔时间。
最后用起来就手到擒来了,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Map<Class<? extends Throwable>, Boolean> retryFor = new HashMap<>(); retryFor.put(DataIntegrityViolationException.class , Boolean .TRUE ) ; retryFor.put(ConstraintViolationException.class , Boolean .TRUE ) ; RetryTemplate retryTemplate = RetryTemplateBuilder.retryTemplate() .withPolicies( SimpleRetryPolicyBuilder.simpleRetryPolicyWithRetryableExceptions(MAX_ATTEMPTS, retryFor), FixedBackOffPolicyBuilder.fixedBackOffPolicy().withDelay(RETRY_DELAY).build()) .build(); retryTemplate.execute(new RetryCallback() { public Void doWithRetry (RetryContext context) { log.info("Attempt times [" + context.getRetryCount() + "]" ); return null ; } });
在生产环境测试,99%的情况一次retry就可以解决问题,所以我的经验值是设置了3次最大重试次数。