MySQL主备复制原理
MySQL master将数据变更写入二进制日志(binary log, 其中记录叫做二进制日志事件binary log events,可以通过show binlog events进行查看)MySQL slave将 master 的binary log events拷贝到它的中继日志(relay log)MySQL slave重放relay log中事件,将数据变更反映它自己的数据
Canal原理
- canal 模拟
MySQL slave的交互协议,伪装自己为 MySQL slave ,向MySQL master发送dump 协议 MySQL master收到 dump 请求,开始推送binary log给 slave (即 canal )- canal 解析
binary log对象(原始为 byte 流)

MySQL版本信息
version 8.0.18
操作记录
创建账户
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
修改Binlog保存时间

启动服务端
内网地址
rm-8vbnaph28z6412077.mysql.zhangbei.rds.aliyuncs.com
外网地址
rm-8vbnaph28z64120776o.mysql.zhangbei.rds.aliyuncs.com
linhuanran Lindashu1
docker run -d -it -e canal.auto.scan=false \
-e canal.destinations=trial \
-e canal.instance.master.address=rm-8vbnaph28z64120776o.mysql.zhangbei.rds.aliyuncs.com:3306 \
-e canal.instance.dbUsername=linhuanran \
-e canal.instance.dbPassword=Lindashu1 \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=false \
-e canal.instance.gtidon=false \
-e canal.instance.parser.parallelThreadSize=16 \
-p 11101:11111 --name=canal-server -m 4096m canal/canal-server
进入容器中
tail -f canal-server/logs/canal/canal.log
没有报错出现 即为成功
启动客户端
<?php
namespace xingwenge\canal_php\sample;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\Fmt;
require_once __DIR__. '/../../vendor/autoload.php';
ini_set('display_errors', 'On');
error_reporting(E_ALL);
try {
$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
# $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);
$client->connect("39.103.192.215", 11101);
$client->checkValid();
// $client->subscribe("1001", "trial", ".*\\..*");
$client->subscribe("1001", "trial", "trial.*"); # 设置过滤
while (true) {
$message = $client->get(100);
if ($entries = $message->getEntries()) {
foreach ($entries as $entry) {
Fmt::println($entry);
}
}
sleep(1);
}
$client->disConnect();
} catch (\Exception $e) {
echo $e->getMessage(), PHP_EOL;
}
查看监听

SQL日志结构
index 主键
source 日志来源 文件+偏移行
version 版本号
db_name 库名
tb_name 表名
event 事件 1:INSERT 2:UPDATE 3:DELETE 4:CREATE 5:ALTER 6:ERASE 等
before 之前的数据
after 之后的数据
time 执行时间
应用层修改
在 Db/Eloquent 复写 newEloquentBuilder 方法,让其返回自定义的 Builder 。
<?php
namespace Db;
use Db\Builder;
use Illuminate\Database\Eloquent\Model;
class Eloquent extends Model {
/**
* Create a new Eloquent query builder for the model.
*
* @param \Illuminate\Database\Query\Builder $query
* @return \Illuminate\Database\Eloquent\Builder|static
*/
public function newEloquentBuilder($query)
{
return new Builder($query);
}
}
添加 自定义 Builder 方法,在其中对常用的修改 添加 删除 方法进行复写,维护 sql_version 的更新。
<?php
namespace Db;
use Illuminate\Database\Eloquent\Builder as BaseBuilder;
class Builder extends BaseBuilder
{
/**
* @param array $values
* @return mixed
*/
public function update(array $values)
{
$this->addVersion($values);
return parent::update($values);
}
/**
* @param array $values
* @return bool
*/
public function insert(array $values)
{
$this->addVersion($values);
return parent::insert($values);
}
/**
* @param array $values
* @return int
*/
public function insertGetId(array $values){
$this->addVersion($values);
return parent::insertGetId($values);
}
/**
* @param $values
*/
private function addVersion(&$values){
$values['sql_version'] = $_SERVER['HTTP_REQUEST_ID'] ?? 0;
}
}