阿里妹导读
背景
跟着 iLogtail 回顾多线程基础概念
线程模型
如何确保代码按照预期执行
volatile 不保证原子性,对 volatile 变量的操作可能不是线程安全的。
volatile 不防止由于 CPU 缓存导致的可见性问题,因此不足以处理多线程中的内存顺序问题。
全屏障(Full Barrier):确保所有先于屏障的读写操作完成后,才执行屏障后的读写操作。
读屏障(Read Barrier):确保所有先于屏障的读操作完成后,才执行屏障后的读写操作。
写屏障(Write Barrier):确保所有先于屏障的写操作完成后,才执行屏障后的读写操作。
class SpinLock {
std::atomic_flag v_ = ATOMIC_FLAG_INIT;
public:
SpinLock() {}
bool try_lock() { return !v_.test_and_set(std::memory_order_acquire); }
void lock() {
for (unsigned k = 0; !try_lock(); ++k) {
boost::detail::yield(k);
}
}
void unlock() { v_.clear(std::memory_order_release); }
};
using ScopedSpinLock = std::lock_guard<SpinLock>;
把对应的变量声明为 volatile 的,C++ 标准保证对 volatile 变量间的访问编译器不会进行重排,不过仅仅是 volatile 变量之间, volatile 变量和其他变量间还是有可能会重排的;
在需要的地方手动添加合适的 Memory Barrier 指令,Memory Barrier 指令的语义保证了编译器不会进行错误的重排操作;
把对应变量声明为 atomic 的, 与 volatile 类似,C++ 标准也保证 atomic 变量间的访问编译器不会进行重排。
// 由中断服务例程修改的标志
volatile bool data_ready = false;
// 中断服务例程
void ISR() {
// 数据准备好了,设置标志
data_ready = true;
}
int main() {
// 主程序循环
while (true) {
if (data_ready) {
// 处理数据
// ...
// 重置标志
data_ready = false;
}
// 执行其他任务...
}
}
std::atomic<bool> x(false), y(false);
int a = 0, b = 0;
void thread1() {
x.store(true, std::memory_order_relaxed); // A1
a = y.load(std::memory_order_relaxed); // A2
}
void thread2() {
y.store(true, std::memory_order_relaxed); // B1
b = x.load(std::memory_order_relaxed); // B2
}
int main() {
std::thread t1(thread1);
std::thread t2(thread2);
t1.join();
t2.join();
std::cout << "a: " << a << ", b: " << b << std::endl;
return 0;
}
锁的概念
class LogFileProfiler {
private:
typedef std::unordered_map<std::string, LogStoreStatistic*> LogstoreSenderStatisticsMap;
// key : region, value :unordered_map<std::string, LogStoreStatistic*>
std::map<std::string, LogstoreSenderStatisticsMap*> mAllStatisticsMap;
std::mutex mStatisticLock;
};
std::condition_variable mStopCV;
void LogtailAlarm::Stop() {
{
lock_guard<mutex> lock(mThreadRunningMux);
mIsThreadRunning = false;
}
mStopCV.notify_one();
}
bool LogtailAlarm::SendAlarmLoop() {
{
unique_lock<mutex> lock(mThreadRunningMux);
while (mIsThreadRunning) {
SendAllRegionAlarm();
if (mStopCV.wait_for(lock, std::chrono::seconds(3), [this]() { return !mIsThreadRunning; })) {
break;
}
}
}
SendAllRegionAlarm();
return true;
}
void ReadMetrics::ReadAsLogGroup(std::map<std::string, sls_logs::LogGroup*>& logGroupMap) const {
ReadLock lock(mReadWriteLock);
// 读链表,加读锁,允许多读
MetricsRecord* tmp = mHead;
while (tmp) {
...
tmp = tmp->GetNext();
}
}
void ReadMetrics::Clear() {
WriteLock lock(mReadWriteLock);
// 改链表,加写锁
while (mHead) {
MetricsRecord* toDelete = mHead;
mHead = mHead->GetNext();
delete toDelete;
}
}
自旋锁是一种死等的锁机制。当发生访问资源冲突的时候,可以有两个选择:一个是自旋等待,一个是挂起当前进程,调度其他进程执行。自旋锁当前的执行线程会不断的重新尝试直到获取锁进入临界区。
只允许一个线程进入。一次只能有一个线程获取锁并进入临界区,其他的线程都是在门口不断的尝试。
执行时间短。由于自旋锁等这种特性,因此它适用在那些代码不是非常复杂的临界区,如果临界区执行时间太长,那么不断在临界区门口“死等”的那些线程会浪费大量的CPU资源。
可以在中断上下文执行。由于不睡眠,因此自旋锁可以在中断上下文中适用。
int32_t ConfigManager::FindAllMatch(vector<FileDiscoveryConfig>& allConfig,
const std::string& path,
const std::string& name /*= ""*/) {
...
{
// 自旋锁加锁,数据查找
ScopedSpinLock cachedLock(mCacheFileAllConfigMapLock);
auto iter = mCacheFileAllConfigMap.find(cachedFileKey);
if (iter != mCacheFileAllConfigMap.end()) {
if (iter->second.second == 0
|| time(NULL) - iter->second.second < INT32_FLAG(multi_config_alarm_interval)) {
allConfig = iter->second.first;
return (int32_t)allConfig.size();
}
}
}
...
{
// 自旋锁加锁,数据插入
ScopedSpinLock cachedLock(mCacheFileAllConfigMapLock);
mCacheFileAllConfigMap[cachedFileKey] = std::make_pair(allConfig, alarmFlag ? (int32_t)time(NULL) : (int32_t)0);
}
return (int32_t)allConfig.size();
}
谈谈 iLogtail 中的无锁化(Lock-Free)编程实践
原子类型
class Application {
private:
Application();
~Application() = default;
// 是否接受到SigTerm信号的标识
std::atomic_bool mSigTermSignalFlag = false;
};
void Application::Start() {
。。。
while (true) {
。。。
if (mSigTermSignalFlag.load()) {
LOG_INFO(sLogger, ("received SIGTERM signal", "exit process"));
Exit();
}
}
}
降低锁的粒度
void LogFileProfiler::AddProfilingData(const std::string& configName,
const std::string& region,
const std::string& projectName,
const std::string& category,
const std::string& convertedPath,
const std::string& hostLogPath,
const std::vector<sls_logs::LogTag>& tags,
uint64_t readBytes,
uint64_t skipBytes,
uint64_t splitLines,
uint64_t parseFailures,
uint64_t regexMatchFailures,
uint64_t parseTimeFailures,
uint64_t historyFailures,
uint64_t sendFailures,
const std::string& errorLine) {
string key = projectName + "_" + category + "_" + configName + "_" + hostLogPath;
// 锁
std::lock_guard<std::mutex> lock(mStatisticLock);
// 从全局map中找到每个地域的指标Map
LogstoreSenderStatisticsMap& statisticsMap = *MakesureRegionStatisticsMapUnlocked(region);
// 从地域的指标Map中找到当前采集配置的指标
std::unordered_map<string, LogStoreStatistic*>::iterator iter = statisticsMap.find(key);
// 指标计算
...
}
class Plugin {
protected:
// 指标存储数据结构
mutable MetricsRecordRef mMetricsRecordRef;
};
每个 LogProcess 线程处理数据的时候,实际上调用的是ProcessorInstance::Process 函数,这个函数内部会去进行自己实例的指标计算
void ProcessorInstance::Process(std::vector<PipelineEventGroup>& logGroupList) {
if (logGroupList.empty()) {
return;
}
// 计算插件输入数据指标
for (const auto& logGroup : logGroupList) {
mProcInRecordsTotal->Add(logGroup.GetEvents().size());
}
uint64_t startTime = GetCurrentTimeInMicroSeconds();
mPlugin->Process(logGroupList);
uint64_t durationTime = GetCurrentTimeInMicroSeconds() - startTime;
// 计算插件处理时间指标
mProcTimeMS->Add(durationTime);
// 计算插件输出数据指标
for (const auto& logGroup : logGroupList) {
mProcOutRecordsTotal->Add(logGroup.GetEvents().size());
}
}
class Counter {
private:
std::string mName;
std::atomic_long mVal;
public:
Counter(const std::string& name, uint64_t val);
uint64_t GetValue() const;
const std::string& GetName() const;
void Add(uint64_t val);
Counter* CopyAndReset();
};
双Buffer切换实现读写分离
class WriteMetrics {
private:
WriteMetrics() = default;
std::mutex mMutex;
MetricsRecord* mHead = nullptr;
void Clear();
MetricsRecord* GetHead();
public:
~WriteMetrics();
static WriteMetrics* GetInstance() {
static WriteMetrics* ptr = new WriteMetrics();
return ptr;
}
void PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels);
MetricsRecord* DoSnapshot();
};
class ReadMetrics {
private:
ReadMetrics() = default;
mutable ReadWriteLock mReadWriteLock;
MetricsRecord* mHead = nullptr;
void Clear();
MetricsRecord* GetHead();
public:
~ReadMetrics();
static ReadMetrics* GetInstance() {
static ReadMetrics* ptr = new ReadMetrics();
return ptr;
}
void ReadAsLogGroup(std::map<std::string, sls_logs::LogGroup*>& logGroupMap) const;
void UpdateMetrics();
};
void WriteMetrics::PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels) {
MetricsRecord* cur = new MetricsRecord(std::make_shared<MetricLabels>(labels));
ref.SetMetricsRecord(cur);
std::lock_guard<std::mutex> lock(mMutex);
cur->SetNext(mHead);
mHead = cur;
}
void ReadMetrics::UpdateMetrics() {
MetricsRecord* snapshot = WriteMetrics::GetInstance()->DoSnapshot();
MetricsRecord* toDelete;
{
// Only lock when change head
WriteLock lock(mReadWriteLock);
toDelete = mHead;
mHead = snapshot;
}
// delete old linklist
while (toDelete) {
MetricsRecord* obj = toDelete;
toDelete = toDelete->GetNext();
delete obj;
}
}
延迟释放(Deferred Reclamation)
MetricsRecordRef::~MetricsRecordRef() {
if (mMetrics) {
mMetrics->MarkDeleted();
}
}
MetricsRecord* WriteMetrics::DoSnapshot() {
// new read head
MetricsRecord* toDeleteHead = nullptr;
// 遍历链表,将待删除的节点加到toDeleteHead中
// 执行删除
while (toDeleteHead) {
MetricsRecord* toDelete = toDeleteHead;
toDeleteHead = toDeleteHead->GetNext();
delete toDelete;
writeMetricsDeleteTotal ++;
}
return snapshot;
}
void ReadMetrics::UpdateMetrics() {
MetricsRecord* snapshot = WriteMetrics::GetInstance()->DoSnapshot();
MetricsRecord* toDelete;
{
// Only lock when change head
WriteLock lock(mReadWriteLock);
toDelete = mHead;
mHead = snapshot;
}
// 删除旧链表
while (toDelete) {
MetricsRecord* obj = toDelete;
toDelete = toDelete->GetNext();
delete obj;
}
}
1、内存模型与c++中的memory order:https://www.cnblogs.com/ishen/p/13200838.html
2、volatile与内存屏障总结:https://zhuanlan.zhihu.com/p/43526907
3、X86/GCC memory fence的一些见解:https://zhuanlan.zhihu.com/p/41872203