Dinky 是一個開箱即用、易擴展,以 Apache Flink 為基礎,連接 OLAP 和數據湖等眾多框架的一站式實時計算平臺,致力于流批一體和湖倉一體的探索與實踐。
Dinky 作為 Apache Flink 的 FlinkSQL 的實時計算平臺,具有以下核心特點。
支持 Flink 原生語法、連接器、UDF 等: 幾乎零成本將 Flink 作業遷移至 Dinky。
增強 FlinkSQL 語法: 表值聚合函數、全局變量、CDC多源合并、執行環境、語句合并等。
支持 Flink 多版本: 支持作為多版本 FlinkSQL Server 的能力以及 OpenApi。
支持外部數據源的 DB SQL 操作: 如 ClickHouse、Doris、Hive、Mysql、Oracle、Phoenix、PostgreSql、Presto、SqlServer、StarRocks 等。
支持實時任務運維: 作業上線下線、作業信息、集群信息、作業快照、異常信息、作業日志、數據地圖、即席查詢、歷史版本、報警記錄等。
Dinky 實時計算平臺開發模塊包括 數據開發、運維中心、注冊中心 和 系統設置 四大模塊。
數據開發包括作業管理、作業配置和運維管理等
注冊中心包括集群管理、Jar管理、數據源管理、報警管理和文檔管理
系統設置包括用戶管理和Flink設置
通過 dinky-mysql-server 和 dinky-standalone-server 鏡像快速體驗 Flink 實時計算平臺。
-啟動該鏡像提供 Dinky 的 Mysql 業務庫能力。
docker run --name dinky-mysql dinkydocker/dinky-mysql-server:0.7.2
-啟動該鏡像提供 Dinky 實時計算平臺。
docker run --restart=always -p 8888:8888 -p 8081:8081 -e MYSQL_ADDR=dinky-mysql:3306 --name dinky --link dinky-mysql:dinky-mysql dinkydocker/dinky-standalone-server:0.7.2-flink14
IP:8888 地址打開平臺并 admin/admin 登錄,創建 功能示例 目錄,創建 HelloWorld 的 FlinkSQL 作業。
執行模式選擇 Local 并輸入以下語句:
CREATE TABLE Orders ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'number-of-rows'='50' ); select order_number,price,first_name,last_name,order_time from Orders
點擊 執行按鈕(執行當前的SQL),下方切換至 結果 選項卡,點擊 獲取最新數據 ,即可查看 Select 語句的執行結果。
-FlinkSQL 操作步驟?
1.進入 Dinky 的 Data Studio
2.在左側菜單欄,右鍵 目錄
3.新建目錄或作業
4.在新建文件的對話框,填寫作業信息
參數說明備注作業名稱作業名稱在當前項目中必須保持唯一作業類型流作業和批作業均支持以下作業類型:FlinkSQL:支持SET、DML、DDL語法FlinkSQLEnv:支持SET、DDL語法FlinkSQLEnv 場景適用于所有作業的SET、DDL語法統一管理的場景,當前FlinkSQLEnv 在SQL編輯器的語句限制在1000行以內
5.在作業開發 SQL 編輯器,編寫 DDL 和 DML 代碼
示例代碼如下:
-創建源表datagen_source CREATE TABLE datagen_source( id BIGINT, name STRING ) WITH ( 'connector'='datagen' ); -創建結果表blackhole_sink CREATE TABLE blackhole_sink( id BIGINT, name STRING ) WITH ( 'connector'='blackhole' ); -將源表數據插入到結果表 INSERT INTO blackhole_sink SELECT id , name from datagen_source;
新建作業如下圖:
6.在作業開發頁面右側 執行配置,填寫配置信息
類型配置項備注作業配置執行模式區別詳見用戶手冊數據開發中的作業概述作業配置集群實例Standalone 和 Session 執行模式需要選擇集群實例,詳見集群實例管理作業配置集群配置Per-Job 和 Application 執行模式需要選擇集群配置,詳見集群配置管理作業配置FlinkSQL 環境選擇已創建的 FlinkSQLEnv,如果沒有則不選作業配置任務并行度指定作業級任務并行度,默認為 1作業配置Insert 語句集默認禁用,開啟后將 SQL編輯器中編寫的多個 Insert 語句合并為一個 JobGraph 進行提交作業配置全局變量默認禁用,開啟后可以使用數據源連接配置變量、自定義變量等作業配置批模式默認禁用,開啟后啟用 Batch Mode作業配置SavePoint 策略默認禁用,策略包括:最近一次最早一次指定一次作業配置報警組報警組配置詳見報警管理作業配置其他配置其他的 Flink 作業配置,具體可選參數,請參考 Flink 官網
作業配置如下圖:
-功能說明
執行當前的 SQL: 提交執行未保存的作業配置,并可以同步獲取 SELECT、SHOW 等執行結果,常用于 Local、Standalone、Session 執行模式;
異步提交: 提交執行最近保存的作業配置,可用于所有執行模式;
發布 發布當前作業的最近保存的作業配置,發布后無法修改;
上線 提交已發布的作業配置,可觸發報警;
下線 停止已上線的作業,并觸發 SavePoint;
停止 只停止已提交的作業;
維護 使已發布的作業進入維護狀態,可以被修改;
注銷 標記作業為注銷不可用狀態。
-常用場景
查看 FlinkSQL 執行結果: 執行當前的 SQL。
提交作業: 異步提交。
上線作業: SavePoint 最近一次 + 上線。
下線作業: 下線。
升級作業: 下線后上線。
全新上線作業: SavePoint 禁用 + 上線。
-Flink作業啟動操作步驟
1.首先登錄Dinky數據開發控制臺
2.在左側菜單欄選擇目錄 > 作業名稱 > 檢查當前的FlinkSql > 發布 > 上線
或者選擇目錄 > 作業名稱 > 檢查當前的FlinkSql > 異步提交
作業啟動異步提交如下圖:
作業啟動發布上線如下圖:
可以選擇使用 Standalone 或 Session 集群在開發測試環境對作業調試,如作業運行、檢查結果等。配置 Standalone 或 Session 集群請參考注冊中心中集群管理的集群實例管理。
也可以調試普通的 DB SQL 作業。
FlinkSQL作業調試步驟
1.進入 Data Studio
2.點擊 目錄 > 新建目錄 > 新建作業
3.填寫完作業信息后,單擊 確認,作業類型選擇 FlinkSQL
4.編寫完整的 FlinkSQL 語句,包含 CREATE TABLE 等
示例代碼如下:
-創建源表datagen_source CREATE TABLE datagen_source( id BIGINT, name STRING ) WITH ( 'connector'='datagen' ); -將源表數據插入到結果表 SELECT id BIGINT, name STRING from datagen_source
5.單擊保存
6.單擊語法檢查
7.配置執行配置
配置項 說明
預覽結果 默認開啟,可預覽 FlinkSQL 的執行結果
打印流 默認禁用,開啟后將展示 ChangeLog
最大行數 默認 100,可預覽的執行結果最大的記錄數
自動停止 默認禁用,開啟后達到最大行數將停止作業
注意: 預覽聚合結果如 count 等操作時,關閉打印流可合并最終結果。
8.單擊執行當前的SQL
9.結果 或者 歷史 > 預覽數據 可以手動查詢最新的執行結果
1.執行模式必須是 Local、Standalone、Yarn Session、Kubernetes Session 其中的一種;
2.必須關閉 Insert 語句集;
3.除 SET 和 DDL 外,必須只提交一個 SELECT 或 SHOW 或 DESC 語句;
4.必須開啟 預覽結果;
5.作業必須是提交成功并且返回 JID,同時在遠程集群可以看到作業處于 RUNNING 或 FINISHED 狀態;
6.Dinky 重啟后,之前的預覽結果將失效
選擇對應數據源,并書寫其 sql 執行即可。
Dinky 是基于 Flink 的流批一體化數據匯聚、數據同步的實時計算平臺,通過閱讀本文檔,您將可以零基礎上手實時計算平臺 Dinky 。
首先,登錄 Dlinky,選擇注冊中心>>集群管理>>集群實例管理或集群配置管理,點擊新建 Flink 集群
-創建作業
選擇數據開發>>目錄,首先點擊創建目錄,點擊創建好的目錄右鍵即可創建作業
Dinky 推薦您在使用 Yarn Session、K8s Session、StandAlone 采用集群實例的方式注冊集群。
1.可通過數據開發中的快捷引導注冊集群實例?;蛘咄ㄟ^注冊中心中的集群管理注冊集群實例。
2.添加 Flink 集群
-集群配置
Dinky 推薦您在使用 Yarn Per Job、Yarn Application、K8s Application 采用集群配置的方式注冊集群。
1.可通過數據開發中的快捷引導注冊集群配置。或者通過注冊中心中的集群管理注冊集群配置。
2.添加集群配置
-創建集群完成后,就可進一步開發 FlinkSQL 作業
-腳本選用 Flink 官網提供的 SQL 腳本,參考鏈接如下:
https://github.com/ververica/flink-sql-cookbook
-下載 flink-faker 放入$FLINK_HOME/lib下及Dlinky的plugins下
https://github.com/knaufk/flink-faker/releases
下面創建一個作業名稱為"test66"的作業
創建完成后,即可在"test66"作業下寫 SQL 及 配置作業參數
FlinkSQL 作業編寫,分為三部分內容,分別是 SET 參數設置、DDL 語句編寫、DML 語句編寫。下面以Inserting Into Tables 為例。
當 FlinkSQL 編寫完成后,即可進行作業的配置。作業配置的詳細說明詳見用戶手冊的作業基礎配置
在作業配置中,您可以選擇作業執行模式、Flink 集群、SavePoint策略等配置,對作業進行提交前的配置。
上述 FlinkSQL 作業配置完成后,可以對 SQL 做查詢預覽。
點擊執行配置,開啟打印流,保存。點擊執行當前的SQL。即可獲取到最新結果。
在數據寫入 Sink 端時,Dlinky 提供了異步提交 和 上線發布功能,將其作業提交到遠程集群
當作業提交到遠程集群后,您可以在運維中心查看作業的運行情況。
選擇注冊中心>>數據源管理>>新建,假設您連接Doris。
測試創建成功后,顯示如下
點擊數據開發>>目錄>>右鍵,出現創建作業菜單。作業類型選擇Doris
作業創建完成后,在最右側會出現數據源,選擇連接的數據源
外部數據源可以創建 DDL、DML語句對其進行ETL開發。
當 ETL 開發結束 或者做即席查詢時,可以點擊保存>>語法檢查>>運行當前的SQL 將 SQL 提交。
目前通過 FlinkCDC 進行會存在諸多問題,如需要定義大量的 DDL 和編寫大量的 INSERT INTO,更為嚴重的是會占用大量的數據庫連接,對 Mysql 和網絡造成壓力。
Dinky 定義了 CDCSOURCE 整庫同步的語法,該語法和 CDAS 作用相似,可以直接自動構建一個整庫入倉入湖的實時任務,并且對 source 進行了合并,不會產生額外的 Mysql 及網絡壓力,支持對任意 sink 的同步,如 kafka、doris、hudi、jdbc 等等
?
?Dinky 采用的是只構建一個 source,然后根據 schema、database、table 進行分流處理,分別 sink 到對應的表。
Dinky 是通過自身的數據源中心的元數據功能捕獲源庫的元數據信息,并同步構建 sink 階段 datastream 或 tableAPI 所使用的 FlinkDDL。
Dinky 提供了各式各樣的 sink 方式,通過修改語句參數可以實現不同的 sink 方式。Dinky 支持通過 DataStream 來擴展新的 sink,也可以使用 FlinkSQL 無需修改代碼直接擴展新的 sink。
禁用全局變量、禁用語句集、禁用批模式。
目前 dlink-client-1.14 內的整庫同步能力最多且主要維護,如果要使用其他 flink 版本的整庫同步,如果 SQLSink 不滿足需求,需要DataStreamSink 支持,請手動仿照 dlink-client-1.14 擴展相應代碼實現,很簡單。
目前 dlink-client-1.14 內默認實現常用的 Flink CDC,如 MysqlCDC、OracleCDC、PostgresCDC 和 SQLServerCDC,如果要使用其他 FlinkCDC,請在 Dinky 源碼中仿照 MysqlCDC 進行擴展,很簡單。
由于 CDCSOURCE 是 Dinky 封裝的新功能,Apache Flink 源碼不包含,非 Application 模式提交需要在遠程 Flink 集群所使用的依賴里添加一下依賴:
# 將下面 Dinky根目錄下 整庫同步依賴包放置 $FLINK_HOME/lib下
lib/dlink-client-base-${version}.jar
lib/dlink-common-${version}.jar
plugins/flink-${flink-version}/dlink-client-${version}.jar
目前已經支持 application ,需提前準備好相關jar包,或者和 add jar語法并用。以 mysqlcdc-2.3.0 和 flink-1.14 為例,需要以下 jar
flink-shaded-guava-18.0-13.0.jar
HikariCP-4.0.3.jar
druid-1.2.8.jar
dlink-metadata-mysql-0.7.2.jar
dlink-metadata-base-0.7.2.jar
jackson-datatype-jsr310-2.13.4.jar
flink-sql-connector-mysql-cdc-2.3.0.jar
dlink-client-1.14-0.7.2.jar
cdcsource_example.png
一個 FlinkSQL 任務只能寫一個 CDCSOURCE,CDCSOURCE 前可寫 set、add jar 和 ddl 語句。
配置項中的英文逗號前不能加空格,需要緊隨右單引號。
配置項 是否必須 默認值 說明 connector 是 無 指定要使用的連接器 hostname 是 無 數據庫服務器的 IP 地址或主機名 port 是 無 數據庫服務器的端口號 username 是 無 連接到數據庫服務器時要使用的數據庫的用戶名 password 是 無 連接到數據庫服務器時要使用的數據庫的密碼 scan.startup.mode 否 latest-offset 消費者的可選啟動模式,有效枚舉為“initial”和“latest-offset” database-name 否 無 此參數非必填 table-name 否 無 只支持正則,示例:"test\.student,test\.score",所有表示例:"test\..*" source.* 否 無 指定個性化的 CDC 配置,如 source.server-time-zone 即為 server-time-zone 配置參數。 checkpoint 否 無 單位 ms parallelism 否 無 任務并行度 sink.connector 是 無 指定 sink 的類型,如 datastream-kafka、datastream-doris、datastream-hudi、kafka、doris、hudi、jdbc 等等,以 datastream- 開頭的為 DataStream 的實現方式 sink.sink.db 否 無 目標數據源的庫名,不指定時默認使用源數據源的庫名 sink.table.prefix 否 無 目標表的表名前綴,如 ODS 即為所有的表名前拼接 ODS sink.table.suffix 否 無 目標表的表名后綴 sink.table.upper 否 false 目標表的表名全大寫 sink.table.lower 否 false 目標表的表名全小寫 sink.auto.create 否 false 目標數據源自動建表,目前只支持 Mysql,其他可自行擴展 sink.timezone 否 UTC 指定目標數據源的時區,在數據類型轉換時自動生效 sink.column.replace.line-break 否 false 指定是否去除換行符,即在數據轉換中進行 REGEXP_REPLACE(column, '\n', '') sink.* 否 無 目標數據源的配置信息,同 FlinkSQL,使用 ${schemaName} 和 ${tableName} 可注入經過處理的源表名 sink[N].* 否 無 N代表為多數據源寫入, 默認從0開始到N, 其他配置參數信息參考sink.*的配置.
該示例為將 mysql 整庫同步到另一個 mysql 數據庫,寫入 test 庫,表名前綴 test_,表名全小寫,開啟自動建表。
EXECUTE CDCSOURCE cdc_mysql WITH ( 'connector'='mysql-cdc', 'hostname'='127.0.0.1', 'port'='3306', 'username'='root', 'password'='123456', 'checkpoint'='3000', 'scan.startup.mode'='initial', 'parallelism'='1', 'table-name'='bigdata\.products,bigdata\.orders', 'sink.connector'='jdbc', 'sink.url'='jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8&useSSL=false', 'sink.username'='root', 'sink.password'='123456', 'sink.sink.db'='test', 'sink.table.prefix'='test_', 'sink.table.lower'='true', 'sink.table-name'='${tableName}', 'sink.driver'='com.mysql.jdbc.Driver', 'sink.sink.buffer-flush.interval'='2s', 'sink.sink.buffer-flush.max-rows'='100', 'sink.sink.max-retries'='5', 'sink.auto.create'='true'
該示例將 Oracle 數據庫 TEST 下所有表同步到該數據庫的 TEST2下。
EXECUTE CDCSOURCE cdc_oracle WITH ( 'connector'='oracle-cdc', 'hostname'='127.0.0.1', 'port'='1521', 'username'='root', 'password'='123456', 'database-name'='ORCL', 'checkpoint'='3000', 'scan.startup.mode'='initial', 'parallelism'='1', 'table-name'='TEST\..*', 'connector'='jdbc', 'url'='jdbc:oracle:thin:@127.0.0.1:1521:orcl', 'username'='root', 'password'='123456', 'table-name'='TEST2.${tableName}' )
匯總到一個 topic
當指定 sink.topic 參數時,所有 Change Log 會被寫入這一個 topic。
EXECUTE CDCSOURCE cdc_kafka_one WITH ( 'connector'='mysql-cdc', 'hostname'='127.0.0.1', 'port'='3306', 'username'='root', 'password'='123456', 'checkpoint'='3000', 'scan.startup.mode'='initial', 'parallelism'='1', 'table-name'='bigdata\.products,bigdata\.orders', 'sink.connector'='datastream-kafka', 'sink.topic'='cdctest', 'sink.brokers'='bigdata2:9092,bigdata3:9092,bigdata4:9092' )
同步到對應 topic?
當不指定 sink.topic 參數時,所有 Change Log 會被寫入對應庫表名的 topic。
EXECUTE CDCSOURCE cdc_kafka_mul WITH ( 'connector'='mysql-cdc', 'hostname'='127.0.0.1', 'port'='3306', 'username'='root', 'password'='123456', 'checkpoint'='3000', 'scan.startup.mode'='initial', 'parallelism'='1', 'table-name'='bigdata\.products,bigdata\.orders', 'sink.connector'='datastream-kafka', 'sink.brokers'='bigdata2:9092,bigdata3:9092,bigdata4:9092' )
使用 FlinkSQL 同步到對應 topic
EXECUTE CDCSOURCE cdc_upsert_kafka WITH ( 'connector'='mysql-cdc', 'hostname'='127.0.0.1', 'port'='3306', 'username'='root', 'password'='123456', 'checkpoint'='3000', 'scan.startup.mode'='initial', 'parallelism'='1', 'table-name'='bigdata\.products,bigdata\.orders', 'sink.connector'='upsert-kafka', 'sink.topic'='${tableName}', 'sink.properties.bootstrap.servers'='bigdata2:9092,bigdata3:9092,bigdata4:9092', 'sink.key.format'='avro', 'sink.value.format'='avro' )
EXECUTE CDCSOURCE cdc_clickhouse WITH ( 'connector'='mysql-cdc', 'hostname'='127.0.0.1', 'port'='3306', 'username'='root', 'password'='123456', 'checkpoint'='3000', 'scan.startup.mode'='initial', 'parallelism'='1', 'table-name'='bigdata\.products,bigdata\.orders', 'sink.connector'='clickhouse', 'sink.url'='clickhouse://127.0.0.1:8123', 'sink.username'='default', 'sink.password'='123456', 'sink.sink.db'='test', 'sink.table.prefix'='test_', 'sink.table.lower'='true', 'sink.database-name'='test', 'sink.table-name'='${tableName}', 'sink.sink.batch-size'='500', 'sink.sink.flush-interval'='1000', 'sink.sink.max-retries'='3' )
EXECUTE CDCSOURCE jobname WITH ( 'connector'='mysql-cdc', 'hostname'='127.0.0.1', 'port'='3306', 'username'='root', 'password'='123456', 'checkpoint'='3000', 'scan.startup.mode'='initial', 'parallelism'='1', 'table-name'='test\.student,test\.score', 'sink[0].connector'='doris', 'sink[0].fenodes'='127.0.0.1:8030', 'sink[0].username'='root', 'sink[0].password'='dw123456', 'sink[0].sink.batch.size'='1', 'sink[0].sink.max-retries'='1', 'sink[0].sink.batch.interval'='60000', 'sink[0].sink.db'='test', 'sink[0].table.prefix'='ODS_', 'sink[0].table.upper'='true', 'sink[0].table.identifier'='${schemaName}.${tableName}', 'sink[0].sink.label-prefix'='${schemaName}_${tableName}_1', 'sink[0].sink.enable-delete'='true', 'sink[1].connector'='datastream-kafka', 'sink[1].topic'='cdc', 'sink[1].brokers'='127.0.0.1:9092' )
在搭建Dinky開發環境之前請確保你已經安裝如下軟件
環境版本npm7.19.0node.js14.17.0jdk1.8maven3.6.0+lombokIDEA插件安裝mysql5.7+
請通過 git 管理工具從 GitHub 中拉取 Dinky 源碼
mkdir workspace
cd workspace
git clone https://github.com/DataLinkDC/dlink.git
IDEA 提供了插件設置來安裝 Lombok 插件。如果尚未安裝,請在導入 Dlink 之前按照以下說明來進行操作以啟用對 Lombok 注解的支持:
安裝 node.js, 安裝 npm
因 node.js 安裝后 npm 版本較高,因此需要可用版本 7.19.0,升級npm命令如下:
npm install npm@7.19.0 -g
初始化依賴
npm install --force
IDEA 里 Build → Build Project
mvn clean install -Dmaven.test.skip=true
# 如若修改版本,按以下指定即可。flink可支持多版本(1.11-1.16)
mvn clean install -Dmaven.test.skip=true -P pord,scala-2.11,flink-1.14,flink-1.15
# 如若不需要web編譯,-P 后面加: `!web`
mvn clean install -Dmaven.test.skip=true -P !web,pord,scala-2.11,flink-1.14,flink-1.15
需要修改 dlink根目錄下的pom文件,下面以本地開發為例,修改如下:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>${target.java.version}</maven.compiler.source> <maven.compiler.target>${target.java.version}</maven.compiler.target> <!-- `provided` for product environment ,`compile` for dev environment --> <scope.runtime>compile</scope.runtime> </properties>
修改dlink根目錄下/dlink-admin/src/main/resources/application.ym文件
配置數據庫連接信息:
spring: datasource: url: jdbc:mysql://127.0.0.1:3306/dlink?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true username: dlink password: dlink driver-class-name: com.mysql.cj.jdbc.Driver
在MySQL數據庫創建 dlink 用戶并在 dlink 數據庫中執行 script/sql/dinky-mysql.sql 文件。此外 script/sql/upgrade 目錄下存放了了各版本的升級 sql 請依次按照版本號執行。
啟動 dlink-admin 下的 Dlink 啟動類,可見 8888 端口。
稍微等待一會,即可訪問 127.0.0.1:8888 可見登錄頁。
登錄用戶/密碼: admin/admin
下載最新的編譯包,對比一下 安裝目錄/config 下的文件,主要是 application.yml ,如果沒用到最新特性,在最新版中修改一下mysql連接配置即可, 如果需要使用新特性,相關配置看相關文檔描述即可
-- 注意: 按照版本號依次升級 切不可跨版本升級 ${version} 代表的是你目前的 dinky版本+1 依次往下執行
- 其中 /opt/dinky 是你dinky安裝的目錄
mysql> source /opt/dinky/sql/upgrade/${version}_schema/mysql/dinky_ddl.sql
-- 表的ddl
mysql> source /opt/dinky/sql/upgrade/${version}_schema/mysql/dinky_dml.sql
-- 表初始化數據 (部分版本無)
完
超市、地鐵、車站等很多場景中,人臉識別已經被廣泛應用,但是這個功能究竟是怎么實現的?
在本文中,將以 pico.js 庫為例,分享實現輕量級人臉識別功能的具體開發過程 。
pico.js 是一個只有 200 行純 JavaScript 代碼的人臉檢測庫,具備實時檢測功能(在實際環境中可達到200+ FPS),壓縮后僅 2kB 。
開源代碼地址:https://github.com/tehnokv/picojs;
簡介
本文將介紹pico.js,這一由JavaScript編寫的用于人臉檢測的代碼庫,并展示其工作原理。盡管現已有類似的項目,但我們的目標是提供更小、計算效率更高的替代方案。
在深入考究其細節前,建議各位用計算機的網絡攝像頭體驗一下人臉檢測的實時演示(也適用于移動設備)。注意,所有進程都是在客戶端完成的,即不向服務端發送圖像。因此,各位無需擔心在運行這段代碼時的隱私問題。
在接下來的篇幅里,我將闡述pico.js的理論背景及其工作原理。
Pico對象監測框架
2013年,Markus團隊在一個技術報告中介紹了這一由JavaScript實現的pico.js代碼庫。它是參考C語言實現的,我們可在GitHub上獲取其源碼:https://github.com/nenadmarkus/pico。我們密切關注其實現方法,因為我們不打算復制學習過程,而僅關注它的運行。這背后的原因是,我們最好學習帶有官方代碼的檢測器,將其加載到JavaScript中并執行進程,如此就帶有獨特的優勢(比如跨操作系統與設備的強大的可移植性)。
Pico對象檢測框架是流行的Viola-Jones方法的一個改進。
Viola-Jones方法是基于區域分類的概念。這意味著在圖像的每個合理位置和尺度上都使用分類器。這個區域枚舉過程的可視化如下圖所示:
該分類器試圖判斷當前區域是否存在人臉。最后,獲取到的人臉區域將根據重疊程度進行聚類。鑒于每張圖像都有很多區域,在這實時進程中有兩個小技巧:
分類級聯由一系列分類器組成。這些分類器中的每一個都能正確識別幾乎所有的人臉,并丟棄一小部分非人臉區域。如果一個圖像區域通過了級聯的所有成員,那么它就被認定為人臉。通過(設計)序列中靠前的分類器比靠后的分類器更簡單,這種效果得到了進一步放大。級聯分類算法如下圖所示:
每個階段包括一個分類器Cn,它既可以拒絕圖像區域(R),也可以接受圖像區域(A)。一旦被拒絕,該區域將不會進入下一級聯成員。如果沒有一個分類器拒絕該區域,我們認為它是一張人臉。
在Viola-Jones框架中,每個分類器Cn都基于Haar-like特性。這使得每個區域可通過名為積分圖像的預算結構來進行O(1)計算時間。
然而,積分圖像也有一些缺點。最明顯的缺點是,這種數據結構需要額外的內存來儲存:通常是unit8輸入圖像的4倍。另外一個問題是構建一個完整的圖像所需的時間(也與輸入的像素數有關)。在功能有限的小型硬件上處理大的圖像也可能會有問題。這種方法的一個更微妙的問題是它的優雅性:隨之而來的問題是我們是否能夠創建一個不需要這種結構、并且具有所有重要屬性的框架。
Pico框架對每個分類器Cn用像素對比測試取代了Haar-like特性,形式如下:
其中R是一個圖像區域,(Xi,Yi)表示用于比較像素值的位置。注意,這種測試可以應用于各種尺寸的區域,而不需要任何專門的數據結構,這與Haar-like的特性不同。這是通過將位置(Xi,Yi)存儲在標準化坐標中(例如,(Xi,Yi)在[?1,1]×[?1,1]中),并乘以當前區域的比例。這就是pico實現多尺度檢測功能的思路。
由于此類測試很簡單,又因混疊和噪聲而存在潛在問題,我們有必要將大量測試應用于該區域,以便對其內容進行推理。在pico框架中,這是通過
這可以用數學符號表示,如下:
其中Tt(R)表示決策樹Tt在輸入區域R上生成的標量輸出。由于每個決策樹都由若干個像素比較測試組成,這些測試可以根據需要調整大小,因此運行分類階段Cn的計算復雜度與區域大小無關。
每個Cn決策樹都是AdaBoost的變體。接下來以這種方式將閾值設置為Cn的輸出,以獲取期望的真陽率(例如0.995)。所有得分低于這個閾值的區域都不認為是人臉。添加級聯的新成員,直到達到預期的假陽率。請參閱原出版物學習相關細節內容。
正如簡介中說的那樣,我們不會復制pico的學習過程,而僅關注它的運行。如果您想學習自定義對象/人臉檢測器,請使用官方的實現方法。Pico.js能夠加載二進制級聯文件并有效地處理圖像。接下來的小節將解釋如何使用pico.js來檢測圖像中的人臉。
pico.js的組件
庫的組成部分如下:
通過<script src="pico.js"></script>(或它的壓縮版本) 引入并進行一些預處理后,就可以使用這些工具了。我們將討論對圖像進行人臉檢測的JS代碼(GitHub repo中的代碼)。但愿這能詳盡說明使用該庫的方法。實時演示也有說明。
實例化區域分類器
區域分類器應識別圖像區域是否為人臉。其思路是在整個圖像中運行這個分類器,以獲得其中的所有面孔(稍后詳細介紹)。Pico.js的區域分類過程封裝在一個函數中,其原型如下:
function(r, c, s, pixels, ldim) { /* ... */} /* ... */ }
前三個參數(r、c和s)指定區域的位置(其中心的行和列)及其大小。pixels陣列包含圖像的灰度強度值。參數ldim規定從圖像的一行移動到下一行的方式(在諸如OpenCV的庫中稱為stride)。也就是說,從代碼中可以看出(r,c)位置的像素強度為[r*ldim + c]像素。該函數會返回一個浮點值,表示該區域的得分。如果分數大于或等于0.0,則該區域認定為人臉。如果分數低于0.0,則該區域認定為非人臉,即屬于背景類。
Pico.js中pico.unpack_cascade過程將二進制的級聯作為參數,將其解壓并返回一個帶有分類過程和分類器數據的閉包函數。我們用它初始化區域分類過程,以下是詳細說明。
官方pico的人臉檢測級聯稱為facefinder。它由近450個決策樹組成,每個決策樹的深度為6,它們集成一個25級聯。該級聯將在我們是實驗中用到,它能對正臉圖像以適當的檢測速率進行實時處理,正如實時演示看到的那樣。
facefinder級聯可以直接從官方的github庫上下載,代碼寫為:
var facefinder_classify_region=function(r, c, s, pixels, ldim) {return -1.0;};var cascadeurl='https://raw.githubusercontent.com/nenadmarkus/pico/c2e81f9d23cc11d1a612fd21e4f9de0921a5d0d9/rnt/cascades/facefinder';fetch(cascadeurl).then(function(response) { response.arrayBuffer().then(function(buffer) { var bytes=new Int8Array(buffer); facefinder_classify_region=pico.unpack_cascade(bytes); console.log('* cascade loaded'); })})function(r, c, s, pixels, ldim) {return -1.0;}; var cascadeurl='https://raw.githubusercontent.com/nenadmarkus/pico/c2e81f9d23cc11d1a612fd21e4f9de0921a5d0d9/rnt/cascades/facefinder'; fetch(cascadeurl).then(function(response) { response.arrayBuffer().then(function(buffer) { var bytes=new Int8Array(buffer); facefinder_classify_region=pico.unpack_cascade(bytes); console.log('* cascade loaded'); }) })
首先,將facefinder_classify_region初始化,即任何圖像區域先認定為非人臉(它總是返回-1.0)。接下來,我們使用Fetch API從cascadeurl URL中獲取級聯二進制數據。這是一個異步調用,我們不能即刻獲取到數據。最后,在獲取到響應數據后,將其轉換為int8數組并傳遞給pico.unpack_cascade,然后pico.unpack_cascade生成正確的facefinder_classify_region函數。
將facefinder_classify_region函數應用于圖像中每個區域的合理位置和等級以便檢測到所有的人臉。這個過程將在下一小節中解釋。
在圖像上運行分類器
假定HTML body內有一個canvas元素,一個image標簽和一個帶有onclick回調的button標簽。用戶一旦點擊了人臉檢測按鈕,檢測過程就開始了。
下面的JS代碼用于繪制內容和圖像,并獲取原始像素值(紅、綠、藍+ alpha的格式):
var img=document.getElementById('image');var ctx=document.getElementById('canvas').getContext('2d');ctx.drawImage(img, 0, 0);var rgba=ctx.getImageData(0, 0, 480, 360).data; // the size of the image is 480x360 (width x height)document.getElementById('image'); var ctx=document.getElementById('canvas').getContext('2d'); ctx.drawImage(img, 0, 0); var rgba=ctx.getImageData(0, 0, 480, 360).data; // the size of the image is 480x360 (width x height)
下面,我們編寫一個輔助函數,將輸入的RGBA數組轉換為灰度:
function rgba_to_grayscale(rgba, nrows, ncols) { var gray=new Uint8Array(nrows*ncols); for(var r=0; r<nrows; ++r) for(var c=0; c<ncols; ++c) // gray=0.2*red + 0.7*green + 0.1*blue gray[r*ncols + c]=(2*rgba[r*4*ncols+4*c+0]+7*rgba[r*4*ncols+4*c+1]+1*rgba[r*4*ncols+4*c+2])/10; return gray;} var gray=new Uint8Array(nrows*ncols); for(var r=0; r<nrows; ++r) for(var c=0; c<ncols; ++c) // gray=0.2*red + 0.7*green + 0.1*blue gray[r*ncols + c]=(2*rgba[r*4*ncols+4*c+0]+7*rgba[r*4*ncols+4*c+1]+1*rgba[r*4*ncols+4*c+2])/10; return gray; }
現在我們準備調用這個過程,它將在整個圖像中運行facefinder_classify_region函數:
image={ "pixels": rgba_to_grayscale(rgba, 360, 480), "nrows": 360, "ncols": 480, "ldim": 480}params={ "shiftfactor": 0.1, // move the detection window by 10% of its size "minsize": 20, // minimum size of a face "maxsize": 1000, // maximum size of a face "scalefactor": 1.1 // for multiscale processing: resize the detection window by 10% when moving to the higher scale}// run the cascade over the image// dets is an array that contains (r, c, s, q) quadruplets// (representing row, column, scale and detection score)dets=pico.run_cascade(image, facefinder_classify_region, params);"pixels": rgba_to_grayscale(rgba, 360, 480), "nrows": 360, "ncols": 480, "ldim": 480 } params={ "shiftfactor": 0.1, // move the detection window by 10% of its size "minsize": 20, // minimum size of a face "maxsize": 1000, // maximum size of a face "scalefactor": 1.1 // for multiscale processing: resize the detection window by 10% when moving to the higher scale } // run the cascade over the image // dets is an array that contains (r, c, s, q) quadruplets // (representing row, column, scale and detection score) dets=pico.run_cascade(image, facefinder_classify_region, params);
注意,人臉的最小尺寸默認設置為20。這太小了,對于大部分應用程序來說都是不必要的。但還需要注意的是,運行速度在很大程度上取決于此參數。對于實時應用程序,應該將此值設置為100。但是,設置的最小尺寸需匹配示例圖像。
檢測過程完成后,數組dets包含表單(r,c,s,q),其中r,c,s指定人臉區域的位置(行,列)和大小,q表示檢測分數。該地區得分越高,越有可能是人臉。
我們可以將得到的檢測結果渲染到畫布上:
qthresh=5.0for(i=0; i<dets.length; ++i) // check the detection score // if it's above the threshold, draw it if(dets[i][3]>qthresh) { ctx.beginPath(); ctx.arc(dets[i][1], dets[i][0], dets[i][2]/2, 0, 2*Math.PI, false); ctx.lineWidth=3; ctx.strokeStyle='red'; ctx.stroke(); }<dets.length; ++i) // check the detection score // if it's above the threshold, draw it if(dets[i][3]>qthresh) { ctx.beginPath(); ctx.arc(dets[i][1], dets[i][0], dets[i][2]/2, 0, 2*Math.PI, false); ctx.lineWidth=3; ctx.strokeStyle='red'; ctx.stroke(); }
我們需要根據經驗設置變量qthresh(5.0剛好,適用于facefinder級聯和靜止圖像中的人臉檢測)。典型的檢測結果是這樣的:
我們可以看到每張臉周圍都有多個探測器。這個問題用非極大值抑制來解決,在下一小節中解釋。
原始檢測的非極大值抑制(聚類)
非極大值抑制聚類的目的是將重疊的人臉區域融合在一起。每個集群的代表是其中得分最高的一次檢測(該方法因此而得名)。它的分數更新為集群中所有檢測分數的總和。
pico.js中的實現方式是:
dets=pico.cluster_detections(dets, 0.2); // set IoU threshold to 0.20.2); // set IoU threshold to 0.2
IoU閾值設置為0.2。這意味著兩個重疊大于該值的檢測將合并在一起。
現在的結果是這樣的:
我們已經學習了使用pico.js檢測靜止圖像中人臉的基本知識。值得注意的是,pico方法不如基于深度學習的現代人臉檢測器強大。然而,pico非常快,這使得它成為許多應用程序的首選,比如那些需要實時處理的應用程序。
在視頻中使用pico.js進行實時人臉檢測
由于pico.js產生的檢測噪聲比較大,我們開發了一種時間記憶模塊,在處理實時視頻時可減輕少此問題。該方法用于上述實時演示中,顯著提高了主觀檢測質量。
其思想是將幾個連續幀的檢測結合起來,以準確判斷給定區域是否為人臉。這是通過實例化一個電路緩沖區來實現的,該緩沖區包含從最后一個f幀檢測到的信號:
var update_memory=pico.instantiate_detection_memory(5); // f is set to 5 in this example// f is set to 5 in this example
update_memory閉包封裝了電路緩沖區和刷新數據的代碼。返回的數組包含來自最后f幀的檢測。
現在我們不再從單幀中檢測聚類,而是在聚類之前進行累加:
dets=pico.run_cascade(image, facefinder_classify_region, params);dets=update_memory(dets); // accumulates detections from last f framesdets=pico.cluster_detections(dets, 0.2); // set IoU threshold to 0.2 dets=update_memory(dets); // accumulates detections from last f frames dets=pico.cluster_detections(dets, 0.2); // set IoU threshold to 0.2
最終的分類閾值qthresh會顯著提高,這會減少假陽性的數量,而不會顯著影響到真陽率。
轉載自:https://blog.csdn.net/csdnnews/article/details/92841099
隨著 Web 的發展,用戶對于 Web 的實時推送要求也越來越高 ,比如,工業運行監控、Web 在線通訊、即時報價系統、在線游戲等,都需要將后臺發生的變化主動地、實時地傳送到瀏覽器端,而不需要用戶手動地刷新頁面。本文對過去和現在流行的 Web 實時推送技術進行了比較與總結。
HTTP 協議有一個缺陷:通信只能由客戶端發起。舉例來說,我們想了解今天的天氣,只能是客戶端向服務器發出請求,服務器返回查詢結果。HTTP 協議做不到服務器主動向客戶端推送信息。這種單向請求的特點,注定了如果服務器有連續的狀態變化,客戶端要獲知就非常麻煩。在WebSocket協議之前,有三種實現雙向通信的方式:輪詢(polling)、長輪詢(long-polling)和iframe流(streaming)。
1.輪詢(polling)
輪詢是客戶端和服務器之間會一直進行連接,每隔一段時間就詢問一次。其缺點也很明顯:連接數會很多,一個接受,一個發送。而且每次發送請求都會有Http的Header,會很耗流量,也會消耗CPU的利用率。
// 1.html
<div id="clock"></div>
<script>
let clockDiv=document.getElementById('clock');
setInterval(function(){
let xhr=new XMLHttpRequest;
xhr.open('GET','/clock',true);
xhr.onreadystatechange=function(){
if(xhr.readyState==4 && xhr.status==200){
console.log(xhr.responseText);
clockDiv.innerHTML=xhr.responseText;
}
}
xhr.send();
},1000);
</script>
//輪詢 服務端
let express=require('express');
let app=express();
app.use(express.static(__dirname));
app.get('/clock',function(req,res){
res.end(new Date().toLocaleString());
});
app.listen(8080);
啟動本地服務,打開http://localhost:8080/1.html,得到如下結果:
2.長輪詢(long-polling)
長輪詢是對輪詢的改進版,客戶端發送HTTP給服務器之后,看有沒有新消息,如果沒有新消息,就一直等待。當有新消息的時候,才會返回給客戶端。在某種程度上減小了網絡帶寬和CPU利用率等問題。由于http數據包的頭部數據量往往很大(通常有400多個字節),但是真正被服務器需要的數據卻很少(有時只有10個字節左右),這樣的數據包在網絡上周期性的傳輸,難免對網絡帶寬是一種浪費。
// 2.html 服務端代碼同上
<div id="clock"></div>
<script>
let clockDiv=document.getElementById('clock')
function send() {
let xhr=new XMLHttpRequest()
xhr.open('GET', '/clock', true)
xhr.timeout=2000 // 超時時間,單位是毫秒
xhr.onreadystatechange=function() {
if (xhr.readyState==4) {
if (xhr.status==200) {
//如果返回成功了,則顯示結果
clockDiv.innerHTML=xhr.responseText
}
send() //不管成功還是失敗都會發下一次請求
}
}
xhr.ontimeout=function() {
send()
}
xhr.send()
}
send()
</script>
3.iframe流(streaming)
iframe流方式是在頁面中插入一個隱藏的iframe,利用其src屬性在服務器和客戶端之間創建一條長連接,服務器向iframe傳輸數據(通常是HTML,內有負責插入信息的javascript),來實時更新頁面。
// 3.html
<body>
<div id="clock"></div>
<iframe src="/clock" style="display:none"></iframe>
</body>
//iframe流
let express=require('express')
let app=express()
app.use(express.static(__dirname))
app.get('/clock', function(req, res) {
setInterval(function() {
let date=new Date().toLocaleString()
res.write(`
<script type="text/javascript">
parent.document.getElementById('clock').innerHTML="${date}";//改變父窗口dom元素
</script>
`)
}, 1000)
})
app.listen(8080)
啟動本地服務,打開http://localhost:8080/3.html,得到如下結果:
上述代碼中,客戶端只請求一次,然而服務端卻是源源不斷向客戶端發送數據,這樣服務器維護一個長連接會增加開銷。
以上我們介紹了三種實時推送技術,然而各自的缺點很明顯,使用起來并不理想,接下來我們著重介紹另一種技術--websocket,它是比較理想的雙向通信技術。
1.什么是websocket
WebSocket是一種全新的協議,隨著HTML5草案的不斷完善,越來越多的現代瀏覽器開始全面支持WebSocket技術了,它將TCP的Socket(套接字)應用在了webpage上,從而使通信雙方建立起一個保持在活動狀態連接通道。
一旦Web服務器與客戶端之間建立起WebSocket協議的通信連接,之后所有的通信都依靠這個專用協議進行。通信過程中可互相發送JSON、XML、HTML或圖片等任意格式的數據。由于是建立在HTTP基礎上的協議,因此連接的發起方仍是客戶端,而一旦確立WebSocket通信連接,不論服務器還是客戶端,任意一方都可直接向對方發送報文。
初次接觸 WebSocket 的人,都會問同樣的問題:我們已經有了 HTTP 協議,為什么還需要另一個協議?
2.HTTP的局限性
3.WebSocket的特點
相對于傳統的HTTP每次請求-應答都需要客戶端與服務端建立連接的模式,WebSocket是類似Socket的TCP長連接的通訊模式,一旦WebSocket連接建立后,后續數據都以幀序列的形式傳輸。在客戶端斷開WebSocket連接或Server端斷掉連接前,不需要客戶端和服務端重新發起連接請求。在海量并發和客戶端與服務器交互負載流量大的情況下,極大的節省了網絡帶寬資源的消耗,有明顯的性能優勢,且客戶端發送和接受消息是在同一個持久連接上發起,實時性優勢明顯。
接下來我看下websocket如何實現客戶端與服務端雙向通信:
// websocket.html
<div id="clock"></div>
<script>
let clockDiv=document.getElementById('clock')
let socket=new WebSocket('ws://localhost:9999')
//當連接成功之后就會執行回調函數
socket.onopen=function() {
console.log('客戶端連接成功')
//再向服務 器發送一個消息
socket.send('hello') //客戶端發的消息內容 為hello
}
//綁定事件是用加屬性的方式
socket.onmessage=function(event) {
clockDiv.innerHTML=event.data
console.log('收到服務器端的響應', event.data)
}
</script>
// websocket.js
let express=require('express')
let app=express()
app.use(express.static(__dirname))
//http服務器
app.listen(3000)
let WebSocketServer=require('ws').Server
//用ws模塊啟動一個websocket服務器,監聽了9999端口
let wsServer=new WebSocketServer({ port: 9999 })
//監聽客戶端的連接請求 當客戶端連接服務器的時候,就會觸發connection事件
//socket代表一個客戶端,不是所有客戶端共享的,而是每個客戶端都有一個socket
wsServer.on('connection', function(socket) {
//每一個socket都有一個唯一的ID屬性
console.log(socket)
console.log('客戶端連接成功')
//監聽對方發過來的消息
socket.on('message', function(message) {
console.log('接收到客戶端的消息', message)
socket.send('服務器回應:' + message)
})
})
啟動本地服務,打開http://localhost:3000/websocket.html,得到如下結果:
綜上所述:Websocket協議不僅解決了HTTP協議中服務端的被動性,即通信只能由客戶端發起,也解決了數據同步有延遲的問題,同時還帶來了明顯的性能優勢,所以websocket 是Web 實時推送技術的比較理想的方案,但如果要兼容低版本瀏覽器,可以考慮用輪詢來實現。
*請認真填寫需求信息,我們會在24小時內與您取得聯系。