果你不了解python,可以先了解python的簡單用法。不過人郵君相信,在座的各位都是大佬,我們直接介紹操作。
python 與 mysql 實現交互的過程,通常分為:建立連接、把sql語句定義為字符串,提交指令、關閉連接。
核心的技能在于 sql語句;除了定義sql語句,其余3個處理都是固定的寫法。接下來,人郵君結合《MySQL是怎樣運行的》這本書,以Linux環境為主,為大家進行說明。
MySQL是怎樣運行的 從根兒上理解MySQL
首先來看第一步,安裝 MySQL 數據庫:
如果你想要使用python操作MySQL數據庫,就必須先要安裝pymysql庫,這個庫的安裝很簡單;
第二步,pymysql 模塊安裝與使用:
MySQL-python驅動,是python 操作mysql必不可少的模塊。
下載MySQL-python-1.2.5.zip 文件之后直接解壓。進入MySQL-python-1.2.5目錄:
>>python setup.py install
下載地址:https://pypi.python.org/pypi/MySQL-python/
第三步,python與mysql的交互實現:
1)連接
pymysql .connect () 函數:連接數據庫
使用 pymysql 的 connect() 方法連接數據庫,涉及到幾個參數,具體代表意義如下:
host:MySQL服務的地址,若數據庫在本地上,使用 localhost 或者127.0.0.1。如果在其它服務器上,則寫對應的 IP地址
port:服務的端口號,默認為3306,不寫則為默認值。
user:登錄數據庫的用戶名
passwd:登錄 MySQL 的密碼
db:數據庫名
charset:設置為 utf8 編碼,解決存漢字亂碼問題
eg:
# 導入模塊
import pymysql
# 打開數據庫連接
conn=pymysql.connect(
host="127.0.0.1",
user="root",
password="123456",
database="test_db",
charset="utf8")
print(conn)
print(type(conn))
輸出結果顯示如下:表面數據庫連接成功
詳細可以參考
https://www.cnblogs.com/qjj19931230/p/12550384.html?utm_source=tuicool
這里要強調的是,除了上面的連接方式,還有其他的連接。在《MySQL是怎樣運行的》這本書中,介紹到,mysql連接分為內連接和外連接。內外連接的根本區別是在驅動表中記錄不符合ON子句中的連接條件時,內連接不會把該記錄加入到最后的結果集中,而外連接會。外連接分為左(外)連接和右(外)連接。
三種鏈接方式如下圖所示:
2)獲取游標
conn.cursor():獲取游標
對數據庫進行操作,只連接數據庫是不夠的,還需要獲取操作數據庫的游標,才能進行后續的操作。游標的主要作用是用來接收數據庫操作后的返回結果,比如數據查詢、插入和刪除等。通過獲取到的數據庫連接實例 conn 下的 cursor() 方法來創建游標,如下:
# 導入模塊
import pymysql
# 打開數據庫連接
conn=pymysql.connect(
host="127.0.0.1",
user="root",
password="123456",
database="test_db",
charset="utf8")
# print(conn)
# print(type(conn))
# 獲取連接下的游標
cursor_test=conn.cursor()
print(cursor_test)
3)數據庫操作
import pymysql
# 打開數據庫連接
conn=pymysql.connect(
host="127.0.0.1",
user="root",
password="123456",
database="test_db",
charset="utf8")
# 獲取連接下的游標
cursor_test=conn.cursor()
# 使用 execute() 方法執行 SQL,如果表存在則刪除
cursor_test.execute("DROP TABLE IF EXISTS EMPLOYEE")
# 使用預處理語句創建表
sql="""CREATE TABLE user1 (
FIRST_NAME CHAR(20) NOT NULL,
LAST_NAME CHAR(20),
AGE INT,
SEX CHAR(1),
INCOME FLOAT )"""
cursor_test.execute(sql)
# 關閉數據庫連接
conn.close()
如下所示數據庫表創建成功:
mysql> desc user1;
+------------+----------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------+----------+------+-----+---------+-------+
| FIRST_NAME | char(20) | NO | | NULL | |
| LAST_NAME | char(20) | YES | | NULL | |
| AGE | int(11) | YES | | NULL | |
| SEX | char(1) | YES | | NULL | |
| INCOME | float | YES | | NULL | |
+------------+----------+------+-----+---------+-------+
5 rows in set (0.00 sec)
import pymysql
# 打開數據庫連接
conn=pymysql.connect(
host="127.0.0.1",
user="root",
password="123456",
database="test_db",
charset="utf8")
# 獲取連接下的游標
cursor_test=conn.cursor()
# 使用預處理語句創建表
sql="""INSERT INTO user1(FIRST_NAME,
LAST_NAME, AGE, SEX, INCOME)
VALUES ('Fei', 'Fei', 20, 'M', 1000)"""
try:
# 執行sql語句
cursor_test.execute(sql)
# 提交到數據庫執行
conn.commit()
except:
# 如果發生錯誤則回滾
conn.rollback()
# 關閉數據庫連接
conn.close()
import pymysql
# 打開數據庫連接
conn=pymysql.connect(
host="127.0.0.1",
user="root",
password="123456",
database="test_db",
charset="utf8")
# 獲取連接下的游標
cursor_test=conn.cursor()
sql="""
select * from user1"""
try:
# 執行 sql 語句
cursor_test.execute(sql)
# 顯示出所有數據
data_result=cursor_test.fetchall()
for row in data_result:
fname=row[0]
lname=row[1]
age=row[2]
sex=row[3]
income=row[4]
# 打印結果
print("fname=%s,lname=%s,age=%s,sex=%s,income=%s" % \
(fname, lname, age, sex, income))
except:
print("Error: unable to fetch data")
# 關閉數據庫連接
conn.close()
# 導入模塊
import pymysql
# 打開數據庫連接
conn=pymysql.connect(
host="127.0.0.1",
user="root",
password="123456",
database="test_db",
charset="utf8")
# print(conn)
# print(type(conn))
# 獲取連接下的游標
cursor_test=conn.cursor()
sql="DELETE * FROM user1"
try:
# 執行SQL語句
cursor_test.execute(sql)
# 提交到數據庫執行
conn.commit()
except:
# 發生錯誤時回滾
conn.rollback()
# 關閉數據庫連接
conn.close()
在《MySQL是怎樣運行的》,作者小孩子4919強調,嵌套循環連接算法是指驅動表只訪問一次,但被驅動表卻可能會訪問多次,訪問次數取決于驅動表執行單表查詢后的結果集中有多少條記錄,大致過程如下:
步驟1,選取驅動表,使用與驅動表相關的過濾條件,選取代價最低的單表訪問方法來執行對驅動表的單表查詢;
步驟2,對步驟1中查詢驅動表得到的結果集中的每一條記錄,都分別到被驅動表中查找匹配的記錄。
由于被驅動表可能會訪問多次,因此可以為被驅動表建立合適的索引以加快查詢速度。
所以,如果被驅動表非常大,即需要完成大量的數據交換,多次訪問被驅動表可能導致很多次的磁盤I/O讀取操作,此時可以使用基于塊的嵌套循環連接算法來緩解由此造成的性能損耗。Mysql的設計者,提出了名為Join Buffer(連接緩沖區)的概念:
有興趣的同學,建議根據書中詳細描述走一遍。
此外,人郵君特別建議大家看看《MySQL是怎樣運行的》,它解決了“為什么這個SQL語句執行得這么慢?為什么我明明建立了索引,但是查詢計劃顯示沒用?為什么IN查詢中的參數一多就不使用索引了?為什么我的數據顯示成了亂碼?”等等每一位DBA和后端開發人員在與MySQL打交道時,所遇到的很多常見問題。除此之外,索引結構、MVCC、隔離級別的實現、鎖的使用等知識,也是求職人員在MySQL面試中躲不過去的高頻問題,作者都在書中給出了很詳細的介紹。
MySQL是怎樣運行的 從根兒上理解MySQL
這本書的初稿最初是以小冊的形式發布在掘金平臺上的,一經發布便得到大家的青睞,十分火爆!歷經兩年,現在終于成書,有興趣的小伙伴也可以去掘金圍觀~(小孩子4919 的個人主頁)
從底層到應用,從基礎到進階,關于MySQL的一切,作者都在書中講解得非常清楚,幫助你從根兒上理解MySQL。
在現代Web應用程序中,一個強大的后端服務器是必不可少的。而Node.js作為一種輕量級高效的后端開發語言,已經被廣泛運用于各種應用場景。
在本文中,我們將探討如何使用Node.js和MySQL數據庫進行交互,以便構建一個強大的后端服務器,并使其能夠支持復雜的數據操作。
在開始之前,我們需要確保已經安裝了Node.js和MySQL服務器。如果你還沒有安裝,可以按照以下步驟進行:
完成以上兩個步驟后,我們需要創建一個新的Node.js項目。在終端中執行以下命令來創建一個空白的Node.js項目:
bash復制代碼mkdir my-project
cd my-project
npm init -y
接下來,我們需要安裝一些必要的依賴項,包括mysql和express:
bash復制代碼npm install mysql express
在這里,我們使用了Express框架來創建我們的Web服務器。而MySQL則是用于連接和操作數據庫的一種流行的關系型數據庫。
首先,我們需要創建一個MySQL數據庫并創建一張表來存儲一些數據。在終端中執行以下命令來連接到MySQL服務器:
bash復制代碼mysql -u root -p
這里,-u參數用于指定用戶名,而-p參數則用于提示輸入密碼。如果你的MySQL服務器沒有設置密碼,則可以省略-p參數。
接下來,我們可以創建一個新的數據庫和表。在MySQL客戶端中執行以下命令:
sql復制代碼CREATE DATABASE my_database;
USE my_database;
CREATE TABLE users (
id INT(11) NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
password VARCHAR(255) NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
這個表將用于存儲用戶信息。它有三個字段:name、email和password。
現在,我們已經準備好連接到MySQL服務器并開始使用它了。在我們的Node.js應用程序中,我們需要使用以下代碼來連接到MySQL:
js復制代碼const mysql=require('mysql');
const connection=mysql.createConnection({
host: 'localhost',
user: 'root',
password: 'your_password',
database: 'my_database'
});
connection.connect((error)=> {
if (error) {
console.error('Error connecting to MySQL: ' + error.stack);
return;
}
console.log('Connected to MySQL as ID ' + connection.threadId);
});
在這里,我們使用createConnection()函數來創建一個新的MySQL連接對象,并傳入一些必要的參數。這些參數包括MySQL服務器的主機名、用戶名、密碼和數據庫名稱。
一旦連接到MySQL服務器,我們可以使用connection.query()函數來向服務器發送SQL查詢。接下來我們將會探討如何使用connection.query()函數來執行常用的SQL操作。
現在,我們已經連接到MySQL服務器了,接下來讓我們看看如何從數據庫中獲取數據。在這里,我們將使用SELECT語句來查詢用戶信息。
js復制代碼app.get('/users', (req, res)=> {
connection.query('SELECT * FROM users', (error, results, fields)=> {
if (error) throw error;
res.json(results);
});
});
在這里,我們使用Express框架創建了一個路由處理程序,該處理程序將處理HTTP GET請求并返回所有用戶數據。當我們收到GET請求時,我們將使用connection.query()函數來執行SELECT * FROM users語句,并將結果作為JSON格式發送回客戶端。如果發生錯誤,我們將拋出一個異常。
results參數包含從MySQL服務器返回的結果集。這是一個數組,其中每個元素都表示表中的一行數據。而fields參數則包含結果集的字段信息。
現在,我們可以使用瀏覽器或其他HTTP客戶端來訪問/users路由,并獲得所有用戶數據了。
除了查詢數據外,我們還需要能夠向數據庫中插入新數據。在這里,我們將使用INSERT INTO語句來插入新用戶數據。
js復制代碼app.post('/users', (req, res)=> {
const { name, email, password }=req.body;
connection.query('INSERT INTO users SET ?', { name, email, password }, (error, results, fields)=> {
if (error) throw error;
res.send('User added successfully');
});
});
在這里,我們創建了一個路由處理程序,該處理程序將處理HTTP POST請求并將新用戶數據插入到數據庫中。當收到POST請求時,我們首先從請求正文中提取name、email和password數據。然后,我們使用connection.query()函數執行INSERT INTO語句,并將新用戶數據作為對象傳遞給它。最后,我們向客戶端發送一條成功消息。
更新數據與插入數據類似。在這里,我們使用UPDATE語句來更新數據庫中的現有數據。
javascript復制代碼js
app.put('/users/:id', (req, res)=> {
const { name, email, password }=req.body;
const id=req.params.id;
connection.query('UPDATE users SET name=?, email=?, password=? WHERE id=?', [name, email, password, id], (error, results, fields)=> {
if (error) throw error;
res.send('User updated successfully');
});
});
在這里,我們創建了一個路由處理程序,該處理程序將處理HTTP PUT請求并更新指定ID的用戶數據。我們首先從請求正文中提取name、email和password數據,然后從URL參數中提取用戶ID。接下來,我們使用connection.query()函數執行UPDATE語句,并將新的用戶數據和用戶ID作為參數傳遞給它。
最后,我們需要能夠從數據庫中刪除現有數據。在這里,我們將使用DELETE語句來刪除指定ID的用戶數據。
js復制代碼app.delete('/users/:id', (req, res)=> {
const id=req.params.id;
connection.query('DELETE FROM users WHERE id=?', [id], (error, results, fields)=> {
if (error) throw error;
res.send('User deleted successfully');
});
});
在這里,我們創建了一個路由處理程序,該處理程序將處理HTTP DELETE請求并刪除指定ID的用戶數據。我們從URL參數中提取用戶ID,并使用connection.query()函數執行DELETE語句,將用戶ID作為參數傳遞給它。
在本文中,我們探討了如何使用Node.js和MySQL數據庫進行交互,以便構建一個強大的后端服務器,并使其能夠支持復雜的數據操作。我們學習了一些常用的SQL操作,包括查詢、插入、更新和刪除數據。通過將這些技術應用于你的下一個項目中,你將能夠創建出一個高效、可靠并且易于維護的Web應用程序。
距離上一篇文章發布又過去了兩周,這次先填掉上一篇秒殺系統文章結尾處開的坑,介紹一下數據庫中間件Canal的使用。
「Canal用途很廣,并且上手非常簡單,小伙伴們在平時完成公司的需求時,很有可能會用到。」
舉個例子:
公司目前有多個開發人員正在開發一套服務,為了縮短調用延時,對部分接口數據加入了緩存。一旦這些數據在數據庫中進行了更新操作,緩存就成了舊數據,必須及時刪除。
刪除緩存的代碼「理所當然可以寫在更新數據的業務代碼里」,但有時候者寫操作是在別的項目代碼里,你可能無權修改,亦或者別人不愿你在他代碼里寫這種業務之外的代碼。(畢竟多人協作中間會產生各種配合問題)。又或者就是單純的刪除緩存的操作失敗了,緩存依然是舊數據。
正如上篇文章緩存與數據庫雙寫一致性實戰里面所說,我們可以將緩存更新操作完全獨立出來,形成一套單獨的系統。「Canal正是這么一個很好的幫手。」 能幫我們實現像下圖這樣的系統:
「本篇文章的要點如下:」
?
歡迎關注我的個人公眾號獲取最全的原創文章:「后端技術漫談」(二維碼見文章底部)
?
眾所周知,阿里是國內比較早地大量使用MySQL的互聯網企業(去IOE化:去掉IBM的小型機、Oracle數據庫、EMC存儲設備,代之以自己在開源軟件基礎上開發的系統),并且基于阿里巴巴/淘寶的業務,從 2010 年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。
Canal應運而生,它通過偽裝成數據庫的從庫,讀取主庫發來的binlog,用來實現「數據庫增量訂閱和消費業務需求」。
「Canal用途:」
開源項目地址:
https://github.com/alibaba/canal
在這里就不再摘抄項目簡介了,提煉幾個值得注意的點:
Canal實際是將自己偽裝成數據庫的從庫,來讀取Binlog。我們先補習下關于「MySQL數據庫主從數據庫」的基礎知識,這樣就能更快的理解Canal。
為了應對高并發場景,MySQL支持把一臺數據庫主機分為單獨的一臺寫主庫(主要負責寫操作),而把讀的數據庫壓力分配給讀的從庫,而且讀從庫可以變為多臺,這就是讀寫分離的典型場景。
實現數據庫的讀寫分離,是通過數據庫主從同步,讓從數據庫監聽主數據庫Binlog實現的。大體流程如下圖:
?
MySQL master 將數據變更寫入二進制日志( binary log, 其中記錄叫做二進制日志事件binary log events,可以通過 show binlog events 進行查看)
MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
MySQL slave 重放 relay log 中事件,將數據變更反映它自己的數據
?
詳細主從同步原理在這里就不展開細說了。
可以看到,這種架構下會有一個問題,「數據庫主從同步會存在延遲,那么就會有短暫的時間,主從數據庫的數據是不一致的。」
這種不一致大多數情況下非常短暫,很多時候我們可以忽略他。
但一旦要求數據一致,就會引申出如何解決這個問題的思考。
我們通常使用MySQL主從復制來解決MySQL的單點故障問題,其通過邏輯復制的方式把主庫的變更同步到從庫,主備之間無法保證嚴格一致的模式,
于是,MySQL的主從復制帶來了主從“數據一致性”的問題。「MySQL的復制分為:異步復制、半同步復制、全同步復制。」
MySQL默認的復制即是異步復制,主庫在執行完客戶端提交的事務后會立即將結果返給給客戶端,并不關心從庫是否已經接收并處理,這樣就會有一個問題,「主如果crash掉了,此時主上已經提交的事務可能并沒有傳到從庫上,如果此時,強行將從提升為主,可能導致新主上的數據不完整。」
?
主庫將事務 Binlog 事件寫入到 Binlog 文件中,此時主庫只會通知一下 Dump 線程發送這些新的 Binlog,然后主庫就會繼續處理提交操作,而此時不會保證這些 Binlog 傳到任何一個從庫節點上。
?
指當主庫執行完一個事務,所有的從庫都執行了該事務才返回給客戶端。「因為需要等待所有從庫執行完該事務才能返回」,所以全同步復制的性能必然會收到嚴重的影響。
?
當主庫提交事務之后,所有的從庫節點必須收到、APPLY并且提交這些事務,然后主庫線程才能繼續做后續操作。但缺點是,主庫完成一個事務的時間會被拉長,性能降低。
?
是介于全同步復制與全異步復制之間的一種,「主庫只需要等待至少一個從庫節點收到」并且 Flush Binlog 到 Relay Log 文件即可,主庫不需要等待所有從庫給主庫反饋。同時,「這里只是一個收到的反饋,而不是已經完全完成并且提交的反饋」,如此,節省了很多時間。
?
介于異步復制和全同步復制之間,主庫在執行完客戶端提交的事務后不是立刻返回給客戶端,而是等待至少一個從庫接收到并寫到relay log中才返回給客戶端。相對于異步復制,半同步復制提高了數據的安全性,「同時它也造成了一定程度的延遲,這個延遲最少是一個TCP/IP往返的時間。所以,半同步復制最好在低延時的網絡中使用。」
?
「事實上,半同步復制并不是嚴格意義上的半同步復制,MySQL半同步復制架構中,主庫在等待備庫ack時候,如果超時會退化為異步后,也可能導致“數據不一致”。」
?
當半同步復制發生超時時(由rpl_semi_sync_master_timeout參數控制,單位是毫秒,默認為10000,即10s),會暫時關閉半同步復制,轉而使用異步復制。當master dump線程發送完一個事務的所有事件之后,如果在rpl_semi_sync_master_timeout內,收到了從庫的響應,則主從又重新恢復為半同步復制。
?
關于半同步復制的詳細原理分析可以看這篇引申文章,在此不展開:
https://www.cnblogs.com/ivictor/p/5735580.html
回顧了數據庫從庫的數據同步原理,理解Canal十分簡單,直接引用官網原文:
這個步驟我在之前的文章教你使用Binlog日志恢復誤刪的MySQL數據已經提到過,這里完善了一下,再貼一下,方便大家。
首先進入數據庫控制臺,運行指令:
mysql> show variables like'log_bin%';
+---------------------------------+-------+
| Variable_name | Value |
+---------------------------------+-------+
| log_bin | OFF |
| log_bin_basename | |
| log_bin_index | |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
+---------------------------------+-------+
5 rows in set (0.00 sec)
可以看到我們的binlog是關閉的,都是OFF。接下來我們需要修改Mysql配置文件,執行命令:
sudo vi /etc/mysql/mysql.conf.d/mysqld.cnf
在文件末尾添加:
log-bin=/var/lib/mysql/mysql-bin
binlog-format=ROW
保存文件,重啟mysql服務:
sudo service mysql restart
重啟完成后,查看下mysql的狀態:
systemctl status mysql.service
這時,如果你的mysql版本在5.7或更高版本,就會報錯:
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.190791Z 0 [Warning] Changed limits: max_open_files: 1024 (requested 5000)
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.190839Z 0 [Warning] Changed limits: table_open_cache: 431 (requested 2000)
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.359713Z 0 [Warning] TIMESTAMP with implicit DEFAULT value is deprecated. Please use --explicit_defaults_for_timestamp server option (se
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.361395Z 0 [Note] /usr/sbin/mysqld (mysqld 5.7.28-0ubuntu0.16.04.2-log) starting as process 5930 ...
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363017Z 0 [ERROR] You have enabled the binary log, but you haven't provided the mandatory server-id. Please refer to the proper server
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363747Z 0 [ERROR] Aborting
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363922Z 0 [Note] Binlog end
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.364108Z 0 [Note] /usr/sbin/mysqld: Shutdown complete
Jan 06 15:49:58 VM-0-11-ubuntu systemd[1]: mysql.service: Main process exited, code=exited, status=1/FAILURE
「You have enabled the binary log, but you haven't provided the mandatory server-id. Please refer to the proper server」
之前我們的配置,對于5.7以下版本應該是可以的。但對于高版本,我們需要指定server-id。
我們給這個MySQL指定為2(只要不與其他庫id重復):
server-id=2
mysql> select user, host from user;
+------------------+-----------+
| user | host |
+------------------+-----------+
| root | % |
| debian-sys-maint | localhost |
| mysql.session | localhost |
| mysql.sys | localhost |
| root | localhost |
+------------------+-----------+
5 rows in set
CREATE USER canal IDENTIFIED BY 'xxxx'; (填寫密碼)
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
show grants for 'canal'
去Github下載最近的Canal穩定版本包:
解壓縮:
mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal
配置文件設置:
主要有兩個文件配置,一個是conf/canal.properties一個是conf/example/instance.properties。
為了快速運行Demo,只修改conf/example/instance.properties里的數據庫連接賬號密碼即可
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=xxxxxxx
canal.instance.connectionCharset=UTF-8
請先確保機器上有JDK,接著運行Canal啟動腳本:
sh bin/startup.sh
下圖即成功運行:
我在秒殺系統系列文章的代碼倉庫里(miaosha-job)編寫了如下客戶端代碼
倉庫源碼地址:https://github.com/qqxx6661/miaosha
package job;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class CanalClient {
private static final Logger LOGGER=LoggerFactory.getLogger(CanalClient.class);
public static void main(String[] args) {
// 第一步:與canal進行連接
CanalConnector connector=CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
"example", "", "");
connector.connect();
// 第二步:開啟訂閱
connector.subscribe();
// 第三步:循環訂閱
while (true) {
try {
// 每次讀取 1000 條
Message message=connector.getWithoutAck(1000);
long batchID=message.getId();
int size=message.getEntries().size();
if (batchID==-1 || size==0) {
LOGGER.info("當前暫時沒有數據,休眠1秒");
Thread.sleep(1000);
} else {
LOGGER.info("-------------------------- 有數據啦 -----------------------");
printEntry(message.getEntries());
}
connector.ack(batchID);
} catch (Exception e) {
LOGGER.error("處理出錯");
} finally {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 獲取每條打印的記錄
*/
public static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
// 第一步:拆解entry 實體
Header header=entry.getHeader();
EntryType entryType=entry.getEntryType();
// 第二步: 如果當前是RowData,那就是我需要的數據
if (entryType==EntryType.ROWDATA) {
String tableName=header.getTableName();
String schemaName=header.getSchemaName();
RowChange rowChange=null;
try {
rowChange=RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
EventType eventType=rowChange.getEventType();
LOGGER.info(String.format("當前正在操作表 %s.%s, 執行操作=%s", schemaName, tableName, eventType));
// 如果是‘查詢’ 或者 是 ‘DDL’ 操作,那么sql直接打出來
if (eventType==EventType.QUERY || rowChange.getIsDdl()) {
LOGGER.info("執行了查詢語句:[{}]", rowChange.getSql());
return;
}
// 第三步:追蹤到 columns 級別
rowChange.getRowDatasList().forEach((rowData) -> {
// 獲取更新之前的column情況
List<Column> beforeColumns=rowData.getBeforeColumnsList();
// 獲取更新之后的 column 情況
List<Column> afterColumns=rowData.getAfterColumnsList();
// 當前執行的是 刪除操作
if (eventType==EventType.DELETE) {
printColumn(beforeColumns);
}
// 當前執行的是 插入操作
if (eventType==EventType.INSERT) {
printColumn(afterColumns);
}
// 當前執行的是 更新操作
if (eventType==EventType.UPDATE) {
printColumn(afterColumns);
// 進行刪除緩存操作
deleteCache(afterColumns, tableName, schemaName);
}
});
}
}
}
/**
* 每個row上面的每一個column 的更改情況
* @param columns
*/
public static void printColumn(List<Column> columns) {
columns.forEach((column) -> {
String columnName=column.getName();
String columnValue=column.getValue();
String columnType=column.getMysqlType();
// 判斷 該字段是否更新
boolean isUpdated=column.getUpdated();
LOGGER.info(String.format("數據列:columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName, columnValue, columnType, isUpdated));
});
}
/**
* 秒殺下單接口刪除庫存緩存
*/
public static void deleteCache(List<Column> columns, String tableName, String schemaName) {
if ("stock".equals(tableName) && "m4a_miaosha".equals(schemaName)) {
AtomicInteger id=new AtomicInteger();
columns.forEach((column) -> {
String columnName=column.getName();
String columnValue=column.getValue();
if ("id".equals(columnName)) {
id.set(Integer.parseInt(columnValue));
}
});
// TODO: 刪除緩存
LOGGER.info("Canal刪除stock表id:[{}] 的庫存緩存", id);
}
}
}
代碼中有詳細的注釋,就不做解釋了。
我們跑起代碼,緊接著我們在數據庫中進行更改UPDATE操作,把法外狂徒張三改成張三1,然后再改回張三,見下圖。
Canal成功收到了兩條更新操作:
緊接著我們模擬一個刪除Cache緩存的業務,在代碼中有:
/**
* 秒殺下單接口刪除庫存緩存
*/
public static void deleteCache(List<Column> columns, String tableName, String schemaName) {
if ("stock".equals(tableName) && "m4a_miaosha".equals(schemaName)) {
AtomicInteger id=new AtomicInteger();
columns.forEach((column) -> {
String columnName=column.getName();
String columnValue=column.getValue();
if ("id".equals(columnName)) {
id.set(Integer.parseInt(columnValue));
}
});
// TODO: 刪除緩存
LOGGER.info("Canal刪除stock表id:[{}] 的庫存緩存", id);
}
}
「在上面的代碼中,在收到m4a_miaosha.stock表的更新操作后,我們刷新庫存緩存。效果如下:」
簡單的Canal使用就介紹到這里,剩下的發揮空間留給各位讀者大大們。
本文總結了Canal的基本原理和簡單的使用。
「總結如下幾點:」
「希望大家多多支持我的原創技術文章公眾號:后端技術漫談,我最全的原創文章都在這里首發。」
我是一名后端開發工程師。主要關注后端開發,數據安全,爬蟲,物聯網,邊緣計算等方向,歡迎交流。
個人公眾號:后端技術漫談
「如果文章對你有幫助,不妨收藏,轉發,在看起來~」
*請認真填寫需求信息,我們會在24小時內與您取得聯系。