十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
今天就跟大家聊聊有关Canal1.1.4中怎么使用RocketMQ将MySQL同步到redis,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
为蒙自等地区用户提供了全套网页设计制作服务,及蒙自网站建设行业解决方案。主营业务为成都网站设计、做网站、蒙自网站设计,以传统方式定制建设网站,并提供域名空间备案等一条龙服务,秉承以专业、用心的态度为用户提供真诚的服务。我们深信只要达到每一位用户的要求,就会得到认可,从而选择与我们长期合作。这样,我们也可以走得更远!
Canal结合RocketMQ同步MySQL
略
略
com.alibaba.otter canal.client 1.1.4 org.apache.rocketmq rocketmq-spring-boot-starter 2.0.2 javax.persistence persistence-api
SQLType.java
import lombok.AccessLevel; import lombok.NoArgsConstructor; /** * Canal监听SQL类型 * * @author Yu * @date 2019/09/08 00:18 **/ @NoArgsConstructor(access = AccessLevel.PRIVATE) public class SQLType { /**插入*/ public static final String INSERT = "INSERT"; /**更新*/ public static final String UPDATE = "UPDATE"; /**删除*/ public static final String DELETE = "DELETE"; }
User.java
import lombok.Data; import javax.persistence.Id; import java.io.Serializable; /** * UserPo对象 * * @author Yu * @date 2019/09/08 14:13 **/ @Data public class User implements Serializable { private static final long serialVersionUID = -6845801275112259322L; @Id private Integer uid; private String username; private String password; private String sex; }
CanalSynService.java
import com.alibaba.otter.canal.protocol.FlatMessage; import java.util.Collection; /** * Canal同步服务 * * @author Yu * @date 2019/09/08 00:00 **/ public interface CanalSynService{ /** * 处理数据 * * @param flatMessage CanalMQ数据 */ void process(FlatMessage flatMessage); /** * DDL语句处理 * * @param flatMessage CanalMQ数据 */ void ddl(FlatMessage flatMessage); /** * 插入 * * @param list 新增数据 */ void insert(Collection list); /** * 更新 * * @param list 更新数据 */ void update(Collection list); /** * 删除 * * @param list 删除数据 */ void delete(Collection list); }
AbstractCanalMQ2RedisService.java
import com.alibaba.otter.canal.protocol.FlatMessage; import com.google.common.collect.Sets; import com.taco.springcloud.canal.constant.SQLType; import com.taco.springcloud.core.component.ApplicationContextHolder; import com.taco.springcloud.core.exception.BizException; import com.taco.springcloud.core.exception.constants.BaseApiCodeEnum; import com.taco.springcloud.core.utils.JsonUtil; import com.taco.springcloud.redis.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.util.ReflectionUtils; import javax.annotation.Resource; import javax.persistence.Id; import java.lang.reflect.Field; import java.lang.reflect.ParameterizedType; import java.util.*; /** * 抽象CanalMQ通用处理服务 * * @author Yu * @date 2019/09/08 00:05 **/ @Slf4j public abstract class AbstractCanalMQ2RedisServiceimplements CanalSynService { @Resource private RedisTemplate redisTemplate; @Resource private RedisUtils redisUtils; private Class cache; /** * 获取Model名称 * * @return Model名称 */ protected abstract String getModelName(); @Override public void process(FlatMessage flatMessage) { if(flatMessage.getIsDdl()) { ddl(flatMessage); return; } Set data = getData(flatMessage); if(SQLType.INSERT.equals(flatMessage.getType())) { insert(data); } if(SQLType.UPDATE.equals(flatMessage.getType())) { update(data); } if(SQLType.DELETE.equals(flatMessage.getType())) { delete(data); } } @Override public void ddl(FlatMessage flatMessage) { //TODO : DDL需要同步,删库清空,更新字段处理 } @Override public void insert(Collection list) { insertOrUpdate(list); } @Override public void update(Collection list) { insertOrUpdate(list); } private void insertOrUpdate(Collection list) { redisTemplate.executePipelined( (RedisConnection redisConnection) -> { for (T data : list) { String key = getWrapRedisKey(data); RedisSerializer keySerializer = redisTemplate.getKeySerializer(); RedisSerializer valueSerializer = redisTemplate.getValueSerializer(); redisConnection.set(keySerializer.serialize(key), valueSerializer.serialize(data)); } return null; }); } @Override public void delete(Collection list) { Set keys = Sets.newHashSetWithExpectedSize(list.size()); for (T data : list) { keys.add(getWrapRedisKey(data)); } //Set keys = list.stream().map(this::getWrapRedisKey).collect(Collectors.toSet()); redisUtils.delAll(keys); } /** * 封装redis的key * * @param t 原对象 * @return key */ protected String getWrapRedisKey(T t) { return new StringBuilder() .append(ApplicationContextHolder.getApplicationName()) .append(":") .append(getModelName()) .append(":") .append(getIdValue(t)) .toString(); } /** * 获取类泛型 * * @return 泛型Class */ protected Class getTypeArguement() { if(cache == null) { cache = (Class ) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]; } return cache; } /** * 获取Object标有@Id注解的字段值 * * @param t 对象 * @return id值 */ protected Object getIdValue(T t) { Field fieldOfId = getIdField(); ReflectionUtils.makeAccessible(fieldOfId); return ReflectionUtils.getField(fieldOfId, t); } /** * 获取Class标有@Id注解的字段名称 * * @return id字段名称 */ protected Field getIdField() { Class clz = getTypeArguement(); Field[] fields = clz.getDeclaredFields(); for (Field field : fields) { Id annotation = field.getAnnotation(Id.class); if (annotation != null) { return field; } } log.error("PO类未设置@Id注解"); throw new BizException(BaseApiCodeEnum.FAIL); } /** * 转换Canal的FlatMessage中data成泛型对象 * * @param flatMessage Canal发送MQ信息 * @return 泛型对象集合 */ protected Set getData(FlatMessage flatMessage) { List
TestUsersConsumer.java
import com.alibaba.otter.canal.protocol.FlatMessage; import com.taco.springcloud.canal.model.User; import com.taco.springcloud.canal.service.AbstractCanalMQ2RedisService; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Slf4j @Service @RocketMQMessageListener(topic = "test_users", consumerGroup = "users") public class TestUsersConsumer extends AbstractCanalMQ2RedisServiceimplements RocketMQListener { @Getter private String modelName = "user"; @Override public void onMessage(FlatMessage s) { process(s); } }
看完上述内容,你们对Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注创新互联行业资讯频道,感谢大家的支持。