譯:西風微雨
預估稿費:100RMB(不服你也來投稿啊!)
投稿方式:發送郵件至linwei#360.cn,或登陸網頁版在線投稿
0x00 漏洞背景
e107 CMS是一個基于PHP、Bootstrap、Mysql的網站內容管理系統,可廣泛用于個人博客、企業建站,在全球范圍內使用較為廣泛。
0x01 漏洞影響版本
version <=2.1.2
0x02 漏洞分析環境
運行環境:macOS10.12.2 + apache2.4.23 + PHP5.6.27 + Mysql5.7.16
e107 CMS版本:v2.1.2
0x03 漏洞詳情
首先我們從rips的掃描報告https://blog.ripstech.com/2016/e107-sql-injection-through-object-injection/中可以大致知道整個漏洞的觸發是利用反序列化漏洞來進行數據庫數據修改,進一步進行權限提升。 接下來,我們就來對整個觸發流程進行分析:
1.首先我們注冊普通用戶test2,原始郵箱地址為test22@1.com;我們可以看到user_admin字段為0(e107 CMS以user_admin字段標示用戶權限,1為管理員,0為普通用戶),因此test2是普通用戶;接下來我們進入/e107/usersettings.php修改郵箱
2.反序列化漏洞及數據庫注入漏洞代碼跟蹤
變量關系注釋:$_POST[‘updated_data’]為base64編碼的值,$new_data是base64解碼后的值是一個序列化的值,$changedUserData為反序列化后的值,是一個數組。
首先跟進usersettings.php 353-387行的代碼
123 | 353 $new_data=base64_decode($_POST['updated_data']); ... 387 $changedUserData=unserialize($new_data); |
353行中用戶可控變量$_POST['updated_data']未經進一步處理就直接在387行中進行了反序列化,并將數據賦值給$changedUserData變量,以便進一步操作.
繼續跟進$changedUserData變量
123 | 455 $changedData['data']=$changedUserData; ... 460 if (FALSE===$sql->update('user', $changedData)) |
$changedUserData變量在460行進入mysql類方法,跟進/e107_handlers/mysql_class.php中的update函數
1234 | 1160 function update($tableName, $arg, $debug=FALSE, $log_type='', $log_remark='') { 1162 $arg=$this->_prepareUpdateArg($tableName, $arg); ... 1183 $result=$this->mySQLresult=$this->db_Query($query, NULL, 'db_Update'); |
跟進_prepareUpdateArg函數
123456789 | 1083 private function _prepareUpdateArg($tableName, $arg) { 1084 ... 1085 foreach ($arg[‘data’] as $fn=> $fv) { 1086 $new_data .=($new_data ? ', ' : ''); 1087 $ftype=isset($fieldTypes[$fn]) ? $fieldTypes[$fn] : 'str'; 1088 $new_data .="{$fn}=".$this->_getFieldValue($fn, $fv, $fieldTypes); 1089 ... 1090 } 1091 return $new_data .(isset($arg[‘WHERE’]) ? ' WHERE '. $arg['WHERE'] : ''); |
跟進_getFieldValue函數
123456 | 1247 function _getFieldValue($fieldKey, $fieldValue, &$fieldTypes) { 1248 $type=isset($fieldTypes[$fieldKey]) ? $fieldTypes[$fieldKey] : $fieldTypes['_DEFAULT']; 1249 switch ($type) { 1250 case 'str': 1251 case 'string': 1252 return "'".$this->escape($fieldValue, false)."'"; |
可以看出$changedUserData變量僅僅被拆分開來,而沒有做進一步校驗是否有惡意參數,因此只要$changedUserData中包含惡意的user表字段,便能夠任意修改數據表中的值。
3.漏洞利用
首先我們來看看測試正常修改郵箱的數據格式,測試更改郵箱為22test2@1.com
這里就可以清楚地看到,$new_data變量為被修改數據序列化的值,$changedUserData為$new_data反序列化后的值,數據校驗成功后,$changedUserData就會被拆分,然后進入$sql->update函數執行,進而任意修改數據庫數據。
那么,我們如何利用這個漏洞鏈呢?
要做到提權操作,我們就需要更新test2用戶的user_admin字段,并且在修改$new_data變量的值后,必須順利通過usersetings.php的兩個if語句檢查:
123 | 358 if (md5($new_data) !=$_POST['updated_key'] || ($userMethods->hasReadonlyField($new_data) !==false)) ... 366 if (md5($new_extended) !=$_POST['extended_key']) |
從358行來看,我們在抓包修改$_POST['updated_data']的同時需要修改掉$_POST['updated_key'],使之滿足md5值校驗。 我使用如下的php代碼生成update_key和updated_data
123456 | /* php code */ $a=array('user_email'=>'2test2@1.com','user_admin'=>1); $b=serialize($a); echo 'updated_data is: '.$b; echo 'update_key is : '.md5($b); /* php code */ |
接下來使用burpsuite抓包修改$_POST['updated_data']為以及$_POST['update_key'](注意:e107 在修改郵箱時會驗證密碼,我們只修改校驗了密碼之后的數據包,如下圖:)
成功反序列化:
查看數據庫字段,發現test2用戶的user_admin字段已經被成功修改為1,權限提升成功
test2用戶成功進入后臺管理面板:
0x04 漏洞修復
升級e107 CMS至2.1.3版本
0x05 漏洞總結
此漏洞的修復過程也有些許奇妙,Kacper Szurek安全研究員早在2016年6月就在2.1.1版本發現了此漏洞,官方多次修復均被饒過,并且在2.1.2版本中仍未修復,或許官方暫未找到更好的修復方法,此漏洞便一度被擱置;直到2016年11月RIPS再次報告漏洞,官方終于在2.1.3版本的修復中完全重寫了usersettings.php文件,以修復包括此漏洞在內的多個漏洞。 另外,此篇文章在我的個人博客中也有備份:https://lightrains.org/e107-cms-privilege-escalation/。
0x06 參考鏈接
https://blog.ripstech.com/2016/e107-sql-injection-through-object-injection/
http://security.szurek.pl/e107-cms-211-privilege-escalation.html
https://github.com/e107inc/e107/commit/6a306323d4a14045d9ee4fe80f0153a9555fadff#diff-dbac6e5a7c66d48e23884c0968e6dad7
https://github.com/e107inc/e107/commit/0af67301ea2743536ba8f3fe74751e000e3f495d#diff-dbac6e5a7c66d48e23884c0968e6dad7
https://github.com/e107inc/e107/commit/dd2cebbb3ccc6b9212d64ce0ec4acd23e14c527
2017年6月份微軟補丁發布了一個針對Windows系統處理LNK文件過程中發生的遠程代碼執行漏洞,通用漏洞編號CVE-2017-8464。 當存在該漏洞的電腦被插上存在漏洞文件的U盤時,不需要任何額外操作,漏洞攻擊程序就可以借此完全控制用戶的電腦系統。同時,該漏洞也可借由用戶訪問網絡共享、從互聯網下載、拷貝文件等操作被觸發和利用攻擊。
與2015年的CVE-2015-0096上一代相比,CVE-2017-8464利用觸發更早,更隱蔽。
早,指的是U盤插入后即觸發,而前代需要在U盤插入后瀏覽到.lnk文件。
隱蔽,指的是本代.lnk文件可以藏在層層(非隱藏的)文件夾中,不需要暴露給受害人見到。
程序層面講,CVE-2015-0096利用點是在explorer需要渲染.lnk文件圖標時,而CVE-2017-8464利用點在于.lnk文件本身被預加載時顯示名的解析過程中。
本文中,筆者將對這兩個漏洞從漏洞的復現和反漏洞技術檢測的防御角度進行剖析。本文是筆者在2017年6月份,沒有任何PoC的情況下作的一個探索。
CVE-2017-8464利用能夠成功實現基于以下3點:
本次利用原理就是由于在解碼特殊文件夾時,能夠有機會按上述3點完成觸發。
細節見0x02節。
(顯示名解析,參見IShellFolder:: ParseDisplayName, 以及shell對外的接口SHParseDisplayName。)
首先,猜下問題點出現在 shell32.dll 中。
通過diff比對分析,可以得知問題點有極大概率是存在于函數 CControlPanelFolder::_GetPidlFromAppletId 中的如下代碼:
易知 CControlPanelFolder::_GetPidlFromAppletId 的上層函數是 CControlPanelFolder::ParseDisplayName。
看名字大約理解為解析顯示名,這很容易關聯到shell提供的接口 SHParseDisplayName,查MSDN可知此函數的功能是把shell名字空間對象的顯示名(字符串)轉換成PIDL(項目標識符列表的指針,我更喜歡稱其為對象串燒)。
(那么PIDL大約長這樣子:2-bytes length, (length-2) bytes contents, 2-bytes length, (length-2) bytes contents, …, 2-bytes length(=0)。實例:04 00 41 41 03 00 41 00 00 )
shell32.dll 中調用 SHParseDisplayName 的地方有很多,先驗證下從 SHParseDisplayName 能否連通到目標 CControlPanelFolder::ParseDisplayName。(另外 shell32里還有個 ParseDisplayNameChild 效用也是差不多)
建立一個例子小程序工程,代碼大概如下:
至于填充names的素材,網上可以搜索到很多,注冊表里也容易找到不少:
這個地方似乎有不錯的貨源:https://wikileaks.org/ciav7p1/cms/page_13762807.html
調試發現類似這樣的名字可以滿足要求:
L"::{20D04FE0-3AEA-1069-A2D8-08002B30309D}\::{21EC2020-3AEA-1069-A2DD-08002B30309D}\C:\stupid.dll"
如第一張圖片中,把想要加載的動態庫路徑傳入到 CPL_LoadCPLModule 就成功了。但這里,雖然從 SHParseDisplayName 出發,就能把文件路徑送到 CControlPanelFolder::ParseDisplayName -> CControlPanelFolder::_GetPidlFromAppletId。但 CControlPanelFolder::_GetPidlFromAppletId 之前還有 CControlPanelFolder::_GetAppletPathForTemporaryAppId 這一頭攔路虎:
這段代碼的大概意思是要檢查一下傳過來的名字是否在它的臨時應用識別列表里面,若是則返個對應的路徑名回來(顯示名<->實際路徑)。
跟一下,發現它要對比的檢查項,是一個GUID。
通過 CControlPanelFolder::s_dsaTemporaryAppId 這個標識符,容易得知,這個GUID是僅在 CControlPanelFolder::_GetTemporaryAppIdForApplet 中隨機生成的:
這就尷尬了,也就是說,我們用 SHParseDisplayName 把動態庫路徑直接傳到這里是不行的。我們需要先去觸發CControlPanelFolder::_GetTemporaryAppIdForApplet函數,然后再把GUID替換掉動態庫路徑,再傳過來。
就是說,如果我們先調用某個函數以參數L"::{20D04FE0-3AEA-1069-A2D8-08002B30309D}\::{21EC2020-3AEA-1069-A2DD-08002B30309D}\C:\stupid.dll" 觸發 CControlPanelFolder::_GetTemporaryAppIdForApplet,并從explorer內存中”偷”到那個隨機GUID。再以 L"::{20D04FE0-3AEA-1069-A2D8-08002B30309D}\::{21EC2020-3AEA-1069-A2DD-08002B30309D}\{{GUID}}" 為參數調用 SHParseDisplayName,就可以成功加載stupid.dll(如果C盤根目錄真的有)了。
好吧,那么就來看看哪個函數可以先行觸發CControlPanelFolder::_GetTemporaryAppIdForApplet 來添加隨機GUID。
容易得到它的上層函數是 CControlPanelFolder::GetDetailsEx。
在之前的分析過程中,有個猜測: CRegFolder 似乎是一系列 CxxxFolder 類的分發類,可以在 CControlPanelFolder::GetDetailsEx 和 CRegFolder 同名類函數上下斷,搞幾下就能得到一票撞過來的斷點。
棧回溯中最惹眼的顯然就是DisplayNameOfW了。
深入一下,發現它確實就是我們要找的火雞!(或者SHGetNameAndFlagsW?先不關注)
那么,現在如果能結合 DisplayNameOfW 和 SHParseDisplayName,應該就能實現我們的目標,把.lnk中指定的.dll跑起來了。
不妨寫個小程序驗證一下是否屬實:
其中ucIDList就是L"::{20D04FE0-3AEA-1069-A2D8-08002B30309D}\::{21EC2020-3AEA-1069-A2DD-08002B30309D}\C:\stupid.dll" 轉換成PIDL的樣子。
DisplayNameOfW 參數 0x8001 表示返回目標路徑,0x8000 表示返回全路徑。
跑起來有點小意外,stupid并沒有被加載。
原因是加載之前有一段代碼檢測 PSGetNameFromPropertyKey(&PKEY_Software_AppId, &ppszCanonicalName); 是否成功。在explorer里這句是成功的,自己的小程序load shell32.dll跑則失敗。
好吧,這不是重點。那么把這段程序load到explorer里去跑下,果然成功了,stupid.dll被加載。或者在 PSGetNameFromPropertyKey 下斷,把返回值改為0,也可以成功跑出stupid。
至此,我們知道,只要能來一發 DisplayNameOfW + SHParseDisplayName 連續技,就可以成功利用。
接下來就是尋找哪里可以觸發連續技。
DisplayNameOfW的調用點也是蠻多,排除掉一眼看上去就不靠譜的,再把二眼看上去猶疑的踢到次級優先梯隊,還剩下這么些需要深入排查的:
然而逐一鑒定后,發現一個都不好使,再把第二梯隊拉出來溜一圈,依然不好使。
那么,再看看有關聯但之前暫不關注的SHGetNameAndFlagsW吧,另外又一個功能也差不多的DisplayNameOfAsString 也一并進入視野(在分析CShellLink::_ResolveIDList時,這里面就能看到DisplayNameOfAsString,也有 ParseDisplayNameChild。這里面花了很大功夫,然而這里的GetAttributesWithFallback 函數要求滿足屬性值存在0x40000000位這個條件無法通過。最后不得不轉移陣地。另外其實即使這里能跑通,這個函數也不是插入U盤就能立刻觸發的,還是需要一定操作。)。
SHGetNameAndFlagsW,這個函數調用點很多,又花了很多時間,然而并沒有驚喜。
好在DisplayNameOfAsString的調用點不多,才十多個,并且終于在這里見到了彩頭。
可以回溯了:
DisplayNameOfAsString <- ReparseRelativeIDList <- TranslateAliasWithEvent <- TranslateAlias<- CShellLink::_DecodeSpecialFolder <- CShellLink::_LoadFromStream <- CShellLink::Load
就是說,加載 .lnk 文件即觸發!
一如既往,再寫個小程序測試一下。如料觸發:
接下來,按 CShellLink::_LoadFromStream 和 CShellLink::_DecodeSpecialFolder中的判斷,制作出 .lnk 文件,就比較輕松愉快了。
研究發現,目前多數安全軟件對利用的檢測還不夠完善,幾種變形手段都可以逃過包括微軟 Win10 Defender 在內的安全軟件的檢測。
1、 LinkFlag域變形
可以添加和改變各bit位包括unused位來逃避固定值檢測。
事實上,所有高依賴此域的檢測,都是可以被繞過的。
2、 LinkTargetIDList域變形
::{20D04FE0-3AEA-1069-A2D8-08002B30309D}(我的電腦)由SHParseDisplayName解析對應的 PIDL 內容是 0x14, 0x00, 0x1f,0x50, 0xe0, 0x4f, 0xd0, 0x20, 0xea, 0x3a, 0x69, 0x10, 0xa2, 0xd8, 0x08,0x00, 0x2b, 0x30, 0x30, 0x9d
因此 .lnk 利用文件LinkInfo域通常第一項IDList項就是這個值,但其實第[3]號字節值是可以改的,并且不影響結果。
小程序一試便知:
同理,第二段 ::{21EC2020-3AEA-1069-A2DD-08002B30309D}(控制面板項)對應的PIDL內容也可以這樣變形。
這樣,所有精確檢測LinkInfo域的安全軟件也被繞過了。
3、 SpecialFolderDataBlock域變形
研究發現有的安全軟件會檢查SpecialFolderID值,然而這個值也是可以變的。
4、 去掉LinkTargetIDList
研究發現,LinkFlag bit0 位清0,這讓所有以此為必要條件的安全軟件都失效了。但這個方法在Vista及更高版本的Windows系統才有效。
那么,安全軟件應該如何檢測?
1、 對PIDL的檢測要mask掉特殊項的[3]號字節。
更為穩妥的方法是調用 DisplayNameOf 檢測其結果(相當于檢查DisplayName,也就是那個”::{…..}” 字符串)。
2、 LinkFlag域只看bit0和bit7位。bit0位為1檢查LinkTargetIDList,為0檢查 VistaAndAboveIDListDataBlock。
簡單回顧下前代CVE-2015-0096利用:
與CVE-2015-0096比較,CVE-2017-8464 的分析過程沒有特別難點,就作業量而言,CVE-2015-0096 要小很多,但需要靈光一現,巧用一長名一短名雙文件和恰好的切分過3處檢測。
問題在這里:
CControlPanelFolder::GetUIObjectOf函數中這段處理不當,Start長度限定在0x104,但v15為0x220,在ControlExtractIcon_CreateInstance中進行CCtrlExtIconBase::CCtrlExtIconBase初始化時又會截斷為0x104,并且里面沒有判斷返回值。
意味著v14以%d輸入的 “-1″值,我們可以通過增加 Start的長度到0x101,使得CCtrlExtIconBase初始化對象名最終尾部變成”xxxxx,-“的樣子。
但這里的CControlPanelFolder::GetModuleMapped函數判斷了大長名文件的存在性,所以這個文件一定要真的存在才行。
這樣就能通過 CCtrlExtIconBase::_GetIconLocationW 中的檢測,因為StrToIntW(L”-“)=0,從而調用到CPL_FindCPLInfo:
接著,在CPL_FindCPLInfo -> CPL_ParseCommandLine -> CPL_ParseToSeparator 中我們又可以將上面使用過的大長文件名截斷為短文件名,因為 CPL_ParseToSeparator 中除了使用”,”作為分割符,也是包含了空格符。
切成短名字,是為了過 CPL_LoadCPLModule 函數中的:
這里有返回值檢查,超長的話就返回了。
我們的0x101長度名字,是不能在尾部附加一串”.manifest”的。
過了它,我們的短名dll(如果存在的話)就真的被加載起來了。
所以,這個利用需要用到一長名一短名雙文件技巧。
長名文件任意內容,0字節都可以,只是被檢測一下存在性。
比如:
3.dll
3333333300000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.000
(注意dll后面有個空格)
短名文件(真正加載的就是它了):3.dll
.lnk里指定那個長名字就好了。
Hf,全文完!
原文鏈接:https://www.anquanke.com/post/id/202705
ebRTC的線程管理
為什么是從線程開始切入整個WebRTC源碼?相信只要對WebRTC有一定的了解的都清楚WebRTC內部有著自己的一套線程管理機制,WebRTC通過這套線程管理機制,非常簡單就達到了多線程安全編碼的目的,并且給每個線程劃分屬于自己的職責,方便后續維護、閱讀代碼 (當然,WebRTC的線程管理和Chromium、Flutter都非常相似),如果你不了解WebRTC的這套線程管理機制,閱讀WebRTC代碼會很懵逼,又因為線程管理并不會涉及到一些專業性知識,非常適合作為切入WebRTC源碼的起點。
WebRTC代碼邏輯主要通過三個線程管理(這里不介紹一些編解碼線程):
WebRTC線程之間的任務投遞
WebRTC線程之間的任務(這里的任務主要指的是函數)投遞主要有兩種方式
Invoke機制,代碼如下:
// 比如NeedsIceRestart函數是在工作者線程被調用,那么network_thread()->Invoke將會將
// lambda匿名函數從工作者線程派遣到網絡線程,并等待執行完成
bool PeerConnection::NeedsIceRestart(const std::string& content_name) const {
return network_thread()->Invoke<bool>(RTC_FROM_HERE, [this, &content_name] {
RTC_DCHECK_RUN_ON(network_thread());
return transport_controller_->NeedsIceRestart(content_name);
});
}
PostTask機制,代碼如下:
// 同Invoke機制不同的是,調用完PostTask之后不用等待任務執行完成
void EmulatedNetworkManager::EnableEndpoint(EmulatedEndpointImpl* endpoint) {
network_thread_->PostTask(RTC_FROM_HERE, [this, endpoint]() {
endpoint->Enable();
UpdateNetworksOnce();
});
}
注意:源碼版本 M92
先從WebRTC的信令、工作者、網絡線程的創建開始
file://src/http://pc_connection_context.cc:81
ConnectionContext::ConnectionContext(
PeerConnectionFactoryDependencies* dependencies)
: network_thread_(MaybeStartThread(dependencies->network_thread,
"pc_network_thread",
true,
owned_network_thread_)),
worker_thread_(MaybeStartThread(dependencies->worker_thread,
"pc_worker_thread",
false,
owned_worker_thread_)),
signaling_thread_(MaybeWrapThread(dependencies->signaling_thread,
wraps_current_thread_)) {
}
通過MabeStartThread函數初始化了工作者、網絡線程,信令線程比較特殊一點,是由于信令線程可以直接托管進程中的主線程(準確來說應該是當前調用線程),所以調用的函數是MaybeWrapThread
MaybeStartThread
file://src/http://pc_connection_context.cc:27
rtc::Thread* MaybeStartThread(rtc::Thread* old_thread,
const std::string& thread_name,
bool with_socket_server,
std::unique_ptr<rtc::Thread>& thread_holder) {
if (old_thread) {
return old_thread;
}
if (with_socket_server) {
thread_holder=rtc::Thread::CreateWithSocketServer();
} else {
thread_holder=rtc::Thread::Create();
}
thread_holder->SetName(thread_name, nullptr);
thread_holder->Start();
return thread_holder.get();
}
暫時忽略with_socket_server,后面會說明CreateWithSocketServer,MaybeStartThread整體流程
相關學習資料推薦,點擊下方鏈接免費報名,先碼住不迷路~】
音視頻免費學習地址:FFmpeg/WebRTC/RTMP/NDK/Android音視頻流媒體高級開發
【免費分享】音視頻學習資料包、大廠面試題、技術視頻和學習路線圖,資料包括(C/C++,Linux,FFmpeg webRTC rtmp hls rtsp ffplay srs 等等)有需要的可以點擊788280672加群免費領取~
file://src/http://pc_connection_context.cc:44
rtc::Thread* MaybeWrapThread(rtc::Thread* signaling_thread,
bool& wraps_current_thread) {
wraps_current_thread=false;
if (signaling_thread) {
return signaling_thread;
}
auto this_thread=rtc::Thread::Current();
if (!this_thread) {
// If this thread isn't already wrapped by an rtc::Thread, create a
// wrapper and own it in this class.
this_thread=rtc::ThreadManager::Instance()->WrapCurrentThread();
wraps_current_thread=true;
}
return this_thread;
}
如果外部沒有傳入signaling_thread,內部將會獲取當前線程作為signaling_thread
rtc::Thread::Start流程
ProcessMessage
file://src/rtc_base/http://thread.cc:1132
bool Thread::ProcessMessages(int cmsLoop) {
//...
int64_t msEnd=(kForever==cmsLoop) ? 0 : TimeAfter(cmsLoop);
int cmsNext=cmsLoop;
while (true) {
#if defined(WEBRTC_MAC)
ScopedAutoReleasePool pool;
#endif
Message msg;
if (!Get(&msg, cmsNext))
return !IsQuitting();
Dispatch(&msg);
if (cmsLoop !=kForever) {
cmsNext=static_cast<int>(TimeUntil(msEnd));
if (cmsNext < 0)
return true;
}
}
}
主要邏輯如下:函數通過一個while循環處理消息,每次循環都會通過Get獲取一個可用的Message,然后調用Dispatch派遣獲取到的Message,兩個主要函數Dispatch、Get。到這里整個WebRTC線程的初始化和啟動流程就介紹完了
消息獲取、派遣、投遞分析
上面的ProcessMessages,可以把它當成一個消息循環,循環中每次都會通過Get函數去獲取消息
Get (消息獲取)
file://src/rtc_base/http://thread.cc:472
bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
// ......
// Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
int64_t cmsTotal=cmsWait;
int64_t cmsElapsed=0;
int64_t msStart=TimeMillis();
int64_t msCurrent=msStart;
while (true) {
// Check for posted events
int64_t cmsDelayNext=kForever;
bool first_pass=true;
while (true) {
// All queue operations need to be locked, but nothing else in this loop
// (specifically handling disposed message) can happen inside the crit.
// Otherwise, disposed MessageHandlers will cause deadlocks.
{
CritScope cs(&crit_);
// On the first pass, check for delayed messages that have been
// triggered and calculate the next trigger time.
if (first_pass) {
first_pass=false;
while (!delayed_messages_.empty()) {
if (msCurrent < delayed_messages_.top().run_time_ms_) {
cmsDelayNext= TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
break;
}
messages_.push_back(delayed_messages_.top().msg_);
delayed_messages_.pop();
}
}
// Pull a message off the message queue, if available.
if (messages_.empty()) {
break;
} else {
*pmsg=messages_.front();
messages_.pop_front();
}
} // crit_ is released here.
// If this was a dispose message, delete it and skip it.
if (MQID_DISPOSE==pmsg->message_id) {
RTC_DCHECK(nullptr==pmsg->phandler);
delete pmsg->pdata;
*pmsg=Message();
continue;
}
return true;
}
if (IsQuitting())
break;
// Which is shorter, the delay wait or the asked wait?
int64_t cmsNext;
if (cmsWait==kForever) {
cmsNext=cmsDelayNext;
} else {
cmsNext=std::max<int64_t>(0, cmsTotal - cmsElapsed);
if ((cmsDelayNext !=kForever) && (cmsDelayNext < cmsNext))
cmsNext=cmsDelayNext;
}
{
// Wait and multiplex in the meantime
if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
return false;
}
// If the specified timeout expired, return
msCurrent=TimeMillis();
cmsElapsed=TimeDiff(msCurrent, msStart);
if (cmsWait !=kForever) {
if (cmsElapsed >=cmsWait)
return false;
}
}
return false;
}
核心是通過一個循環來獲取一個有效的消息,循環會在Get成功、失敗或者外部調用了Stop停止了線程時結束。
消息的獲取機制
可能在一開始看代碼會對獲取可用延遲消息產生疑問,為什么只判斷延遲消息隊列的第一個元素的運行時間有沒有到達,難道隊列后面的消息不會有比這個頂部消息的運行時間更小的嗎?
while (!delayed_messages_.empty()) {
if (msCurrent < delayed_messages_.top().run_time_ms_) {
cmsDelayNext= TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
break;
}
messages_.push_back(delayed_messages_.top().msg_);
delayed_messages_.pop();
}
進一步查看delayed_messages_的定義PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_);
// DelayedMessage goes into a priority queue, sorted by trigger time. Messages
// with the same trigger time are processed in num_ (FIFO) order.
class DelayedMessage {
public:
DelayedMessage(int64_t delay,
int64_t run_time_ms,
uint32_t num,
const Message& msg)
: delay_ms_(delay),
run_time_ms_(run_time_ms),
message_number_(num),
msg_(msg) {}
bool operator<(const DelayedMessage& dmsg) const {
return (dmsg.run_time_ms_ < run_time_ms_) ||
((dmsg.run_time_ms_==run_time_ms_) &&
(dmsg.message_number_ < message_number_));
}
int64_t delay_ms_; // for debugging
int64_t run_time_ms_;
// Monotonicaly incrementing number used for ordering of messages
// targeted to execute at the same time.
uint32_t message_number_;
Message msg_;
};
class PriorityQueue : public std::priority_queue<DelayedMessage> {
public:
container_type& container() { return c; }
void reheap() { make_heap(c.begin(), c.end(), comp); }
};
延遲消息隊列其實就是一個大項堆的優先級消息隊列,也就是使用降序排序,DelayedMessage的大小比較是通過run_time_ms_參數,如果run_time_ms_越小其實DelayedMessage越大,如果run_time_ms_ 相等就使用message_number來比較,通俗說就是延遲時間越小在隊列中越靠前。
Message介紹
在介紹消息派遣處理之前需要先弄清楚Message
file://src/rtc_base/thread_message.h
struct Message {
Message() : phandler(nullptr), message_id(0), pdata(nullptr) {}
inline bool Match(MessageHandler* handler, uint32_t id) const {
return (handler==nullptr || handler==phandler) &&
(id==MQID_ANY || id==message_id);
}
Location posted_from;
MessageHandler* phandler;
uint32_t message_id;
MessageData* pdata;
};
主要看兩個數據phander和pdata,對應類如下
class RTC_EXPORT MessageHandler {
public:
virtual ~MessageHandler() {}
virtual void OnMessage(Message* msg)=0;
};
class MessageData {
public:
MessageData() {}
virtual ~MessageData() {}
};
兩個虛基類,MesageData用來存儲消息的內容,MesageHandler用來處理消息,使用者可以自定義屬于自己的MessageHanlder和MessageData,比如我們自定義一個自己的MessageData如下:
// 定義了一個自己的MyMessageTask,其中保存了一個function,并且對外提供了一個Run方法
template <class FunctorT>
class MyMessageTask final : public MessageData {
public:
explicit MessageWithFunctor(FunctorT&& functor)
: functor_(std::forward<FunctorT>(functor)) {}
void Run() { functor_(); }
private:
~MessageWithFunctor() override {}
typename std::remove_reference<FunctorT>::type functor_;
};
在自己定義一個MessageHandler用來處理消息
// OnMessage函數會在派遣消息的時候被調用,里面的msg存放著一個MessageData對象,這個MessageData對象就是我們自定義的MyMessageTask,獲取到這個對象直接調用我們剛剛寫好的Run函數運行。
class MyMessageHandlerWithTask : public MessageHandler {
public:
void OnMessage(Message* msg) overrider {
static_cast<MyMesageTask*>(msg->pdata)->Run();
delete msg->pdata;
}
}
上面我們定義了一個handler和data,主要用來在收到派遣過來的消息時通過handler處理消息,來看看如何使用我們自定義的handler和data吧
// Thread::Post原型
virtual void Post(const Location& posted_from,
MessageHandler* phandler,
uint32_t id=0,
MessageData* pdata=nullptr,
bool time_sensitive=false);
// 注意看Post函數里面有需要我們傳入MessageHandler和MessageData,我們只需要將自定義
// 的MessageHandler和MessageData傳入即可
static MyMessageHandlerWithTask* myhandler=new MyMessageHandlerWithTask;
MyMessageTask* mytask=new MyMessageTask([]() {int c=a+b;});
Post(FROME_HERE, myhandler, 0, mytask);
執行完上面的Post,MyMessageTask里面的匿名函數將被執行
Dispatch (消息派遣)
介紹完Message,就可以看看Dispatch是如何將消息派遣到MessageHandler去處理的
file://src/rtc_base/http://thread.cc
void Thread::Dispatch(Message* pmsg) {
TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file",
pmsg->posted_from.file_name(), "src_func",
pmsg->posted_from.function_name());
RTC_DCHECK_RUN_ON(this);
int64_t start_time=TimeMillis();
pmsg->phandler->OnMessage(pmsg);
int64_t end_time=TimeMillis();
int64_t diff=TimeDiff(end_time, start_time);
if (diff >=dispatch_warning_ms_) {
RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
<< "ms to dispatch. Posted from: "
<< pmsg->posted_from.ToString();
// To avoid log spew, move the warning limit to only give warning
// for delays that are larger than the one observed.
dispatch_warning_ms_=diff + 1;
}
}
Dispatch函數非常簡單,抓住重點就是調用了傳入的Message的OnMessage,將消息傳遞給MessageHandler去處理
消息的投遞
前面有看了消息獲取的實現原理,如果沒有消息將會調用Wait進行等待,既然有Wait,那么肯定就有地方觸發WaitUp,沒錯,就是在外部投遞消息的時候會觸發WaitUp, 在 WebRTC線程之間的任務投遞中有介紹了兩種方式,一種同步Invoke,一種異步Post
file://src/rtc_base/thread.h:449
template <class FunctorT>
void PostTask(const Location& posted_from, FunctorT&& functor) {
Post(posted_from, GetPostTaskMessageHandler(), /*id=*/0,
new rtc_thread_internal::MessageWithFunctor<FunctorT>(
std::forward<FunctorT>(functor)));
}
PostTask核心還是調用了Post函數,并且傳入了屬于自己的MessageData和MessageHandler
file://src/rtc_base/http://thread.cc:563
void Thread::Post(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata,
bool time_sensitive) {
RTC_DCHECK(!time_sensitive);
if (IsQuitting()) {
delete pdata;
return;
}
// Keep thread safe
// Add the message to the end of the queue
// Signal for the multiplexer to return
{
CritScope cs(&crit_);
Message msg;
msg.posted_from=posted_from;
msg.phandler=phandler;
msg.message_id=id;
msg.pdata=pdata;
messages_.push_back(msg);
}
WakeUpSocketServer();
}
void Thread::WakeUpSocketServer() {
ss_->WakeUp();
}
Post函數實現非常簡單清晰,構造一個Message添加到隊列,然后調用ss_->WakeUp()喚醒Wait,ss_是一個SocketServer對象,后面在分析, 先看同步Invoke
file://src/rtc_base/thread.h:388
template <
class ReturnT,
typename=typename std::enable_if<!std::is_void<ReturnT>::value>::type>
ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) {
ReturnT result;
InvokeInternal(posted_from, [functor, &result] { result=functor(); });
return result;
}
template <
class ReturnT,
typename=typename std::enable_if<std::is_void<ReturnT>::value>::type>
void Invoke(const Location& posted_from, FunctionView<void()> functor) {
InvokeInternal(posted_from, functor);
}
兩個重載函數一個有返回結果,一個沒有,內部都調用InvokeInternal完成,InvokeInternal緊接著調用了Send函數
file://src/rtc_base/http://thread.cc:914
void Thread::Send(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
RTC_DCHECK(!IsQuitting());
if (IsQuitting())
return;
// Sent messages are sent to the MessageHandler directly, in the context
// of "thread", like Win32 SendMessage. If in the right context,
// call the handler directly.
Message msg;
msg.posted_from=posted_from;
msg.phandler=phandler;
msg.message_id=id;
msg.pdata=pdata;
if (IsCurrent()) {
#if RTC_DCHECK_IS_ON
RTC_DCHECK_RUN_ON(this);
could_be_blocking_call_count_++;
#endif
msg.phandler->OnMessage(&msg);
return;
}
AssertBlockingIsAllowedOnCurrentThread();
Thread* current_thread=Thread::Current();
#if RTC_DCHECK_IS_ON
if (current_thread) {
RTC_DCHECK_RUN_ON(current_thread);
current_thread->blocking_call_count_++;
RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this));
ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
this);
}
#endif
// Perhaps down the line we can get rid of this workaround and always require
// current_thread to be valid when Send() is called.
std::unique_ptr<rtc::Event> done_event;
if (!current_thread)
done_event.reset(new rtc::Event());
bool ready=false;
PostTask(webrtc::ToQueuedTask(
[&msg]() mutable { msg.phandler->OnMessage(&msg); },
[this, &ready, current_thread, done=done_event.get()] {
if (current_thread) {
CritScope cs(&crit_);
ready=true;
current_thread->socketserver()->WakeUp();
} else {
done->Set();
}
}));
if (current_thread) {
bool waited=false;
crit_.Enter();
while (!ready) {
crit_.Leave();
current_thread->socketserver()->Wait(kForever, false);
waited=true;
crit_.Enter();
}
crit_.Leave();
// Our Wait loop above may have consumed some WakeUp events for this
// Thread, that weren't relevant to this Send. Losing these WakeUps can
// cause problems for some SocketServers.
//
// Concrete example:
// Win32SocketServer on thread A calls Send on thread B. While processing
// the message, thread B Posts a message to A. We consume the wakeup for
// that Post while waiting for the Send to complete, which means that when
// we exit this loop, we need to issue another WakeUp, or else the Posted
// message won't be processed in a timely manner.
if (waited) {
current_thread->socketserver()->WakeUp();
}
} else {
done_event->Wait(rtc::Event::kForever);
}
}
Send函數的代碼比較多,不過整體思路還是很清晰
消息投遞、派遣、獲取狀態轉移圖
為了更加清楚的了解WebRTC的消息投遞、派遣、獲取機制,我自己定義了4種狀態,方便理解
提出疑問點:如果我想要在代碼任意位置獲取當前線程的Thread對象,要怎么做?單例?
看看WebRTC Thread的Current函數原型:
class Thread {
public:
//......
static Thread* Current();
}
當我們在線程A調用Thread::Current將會獲得一個線程A的Thread對象,在線程B調用Thread::Current將會獲取一個線程B的Thread對象, 來看看內部實現
// static
Thread* Thread::Current() {
ThreadManager* manager=ThreadManager::Instance();
Thread* thread=manager->CurrentThread();
#ifndef NO_MAIN_THREAD_WRAPPING
// Only autowrap the thread which instantiated the ThreadManager.
if (!thread && manager->IsMainThread()) {
thread=new Thread(CreateDefaultSocketServer());
thread->WrapCurrentWithThreadManager(manager, true);
}
#endif
return thread;
}
核心實現都在ThreadManager中,ThreadManager是針對WebRTC Thread提供的一個管理類,里面會存放所有外部創建的Thread
Thread* ThreadManager::CurrentThread() {
return static_cast<Thread*>(TlsGetValue(key_));
}
ThreadManager::CurrentThread實現很簡單,通過TlsGetValue獲取了私有變量key_,那這個key_肯定有Set操作,沒錯,這個key_的Set操作,是在Thread的構造函數中進行的 Thraed() -> DoInit() -> ThreadManager::SetCurrentThread -> ThreadManager::SetCurrentThreadInternal
void ThreadManager::SetCurrentThreadInternal(Thread* thread) {
TlsSetValue(key_, thread);
}
TlsSetValue和TlsGetValue是什么意思? 這里涉及到了一個知識點,也就是TLS
TLS介紹
TLS全稱是Thread Local Storage 線程局部變量或者線程私有變量,私有的意思是每個線程都將獨自擁有這個變量
詳細鏈接:
www.notion.so/TLS-78870a0…
www.notion.so/TLS-78870a0…
回歸Current函數實現,它就是借助了TLS技術得以實現在不同線程存儲屬于自己的私有變量(這個私有變量就是Thread*),然后再對應線程調用Current獲取到的Thread*也就是當前線程的了
WebRTC線程Proxy機制
前面有提到,WebRTC對外暴露的API比如PeerConnectionInterface在內部都一層代理機制,來確保每一個API調用在正確的線程,先看PeerConnectiontProxy
file://src/api/peer_connection_proxy.h
BEGIN_PROXY_MAP(PeerConnection)
PROXY_PRIMARY_THREAD_DESTRUCTOR()
PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, local_streams)
PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, remote_streams)
PROXY_METHOD1(bool, AddStream, MediaStreamInterface*)
PROXY_METHOD1(void, RemoveStream, MediaStreamInterface*)
PROXY_METHOD2(RTCErrorOr<rtc::scoped_refptr<RtpSenderInterface>>,
AddTrack,
rtc::scoped_refptr<MediaStreamTrackInterface>,
const std::vector<std::string>&)
// ......
// This method will be invoked on the network thread. See
// PeerConnectionFactory::CreatePeerConnectionOrError for more details.
PROXY_SECONDARY_METHOD1(rtc::scoped_refptr<DtlsTransportInterface>,
LookupDtlsTransportByMid,
const std::string&)
// This method will be invoked on the network thread. See
// PeerConnectionFactory::CreatePeerConnectionOrError for more details.
PROXY_SECONDARY_CONSTMETHOD0(rtc::scoped_refptr<SctpTransportInterface>,
GetSctpTransport)
上面的一堆宏,會生成一個PeerConnectionProxyWithInternal類,我們主要看三個宏 BEGIN_PROXY_MAP、PROXY_METHOD0、PROXY_SECONDARY_METHOD1
#define BEGIN_PROXY_MAP(c) \
PROXY_MAP_BOILERPLATE(c) \
SECONDARY_PROXY_MAP_BOILERPLATE(c) \
REFCOUNTED_PROXY_MAP_BOILERPLATE(c) \
public: \
static rtc::scoped_refptr<c##ProxyWithInternal> Create( \
rtc::Thread* primary_thread, rtc::Thread* secondary_thread, \
INTERNAL_CLASS* c) { \
return rtc::make_ref_counted<c##ProxyWithInternal>(primary_thread, \
secondary_thread, c); \
}
// Helper macros to reduce code duplication.
#define PROXY_MAP_BOILERPLATE(c) \
template <class INTERNAL_CLASS> \
class c##ProxyWithInternal; \
typedef c##ProxyWithInternal<c##Interface> c##Proxy; \
template <class INTERNAL_CLASS> \
class c##ProxyWithInternal : public c##Interface { \
protected: \
typedef c##Interface C; \
\
public: \
const INTERNAL_CLASS* internal() const { return c_; } \
INTERNAL_CLASS* internal() { return c_; }
看重點, 第一typedef c##ProxyWithInternal<c##Interface> c##Proxy;, 也就是外部使用的類名采用PeerConnectionProxy, c##ProxyWithInternal: public c##Interface,也就是繼承自PeerConnectionInterface類,也就是我們在外部拿到的PeerConnect指針對象,其實是PeerConnectionProxyWithInternal對象, 重點2 , Create函數,這個Create函數會在什么時候調用,并且primary_thread和secondary_thread分別對應著什么線程,看下面代碼
RTCErrorOr<rtc::scoped_refptr<PeerConnectionInterface>>
PeerConnectionFactory::CreatePeerConnectionOrError(
const PeerConnectionInterface::RTCConfiguration& configuration,
PeerConnectionDependencies dependencies) {
rtc::scoped_refptr<PeerConnectionInterface> result_proxy=PeerConnectionProxy::Create(signaling_thread(), network_thread(),
result.MoveValue());
return result_proxy;
}
通過上面的代碼可以確定,在PeerConnectionProxy類中primary_thread對應的就是signaling_thread,secondary_thread線程就是network_thread線程
#define PROXY_METHOD0(r, method) \
r method() override { \
MethodCall<C, r> call(c_, &C::method); \
return call.Marshal(RTC_FROM_HERE, primary_thread_); \
}
創建MethodCall類,并調用Marshal,注意調用Marshal傳入的參數primary_thread_ ,在PeerConnectionProxy中也就是,signaling_thread
#define PROXY_SECONDARY_METHOD1(r, method, t1) \
r method(t1 a1) override { \
MethodCall<C, r, t1> call(c_, &C::method, std::move(a1)); \
return call.Marshal(RTC_FROM_HERE, secondary_thread_); \
}
與PROXY_METHOD不同的是在調用Marshal時傳入的是secondary_thread_,在PeerConnectionProxy也就是network_thread
template <typename C, typename R, typename... Args>
class MethodCall : public QueuedTask {
public:
typedef R (C::*Method)(Args...);
MethodCall(C* c, Method m, Args&&... args)
: c_(c),
m_(m),
args_(std::forward_as_tuple(std::forward<Args>(args)...)) {}
R Marshal(const rtc::Location& posted_from, rtc::Thread* t) {
if (t->IsCurrent()) {
Invoke(std::index_sequence_for<Args...>());
} else {
t->PostTask(std::unique_ptr<QueuedTask>(this));
event_.Wait(rtc::Event::kForever);
}
return r_.moved_result();
}
private:
bool Run() override {
Invoke(std::index_sequence_for<Args...>());
event_.Set();
return false;
}
template <size_t... Is>
void Invoke(std::index_sequence<Is...>) {
r_.Invoke(c_, m_, std::move(std::get<Is>(args_))...);
}
C* c_;
Method m_;
ReturnType<R> r_;
std::tuple<Args&&...> args_;
rtc::Event event_;
};
主要看Marshal函數,如果是在當前線程直接調用Invoke,否則調用PostTask將任務投遞到指定線程,并等待運行完成. 關于std::tuple 的使用可以查看官方文檔,上面的代碼用到了兩個C++14的新特性 std::index_sequence_for和 std::get 來輔助tuple的使用
作者:spider集控團隊
原文 WebRTC線程管理學習 - 掘金
*請認真填寫需求信息,我們會在24小時內與您取得聯系。