统一采集是数据产品部的数据入口,它承载了数据接收采集工作。日常流量大,国内高峰时几万QPS,即使有1秒钟出问题,意味着有几万条数据丢失。所以它的高性能,高可用显得尤为重要。
高级新技术往往是经过多层封装,功能丰富,开发一般只熟悉应用方法,难于了解低层的技术实现。我们希望是用简单的低级技术实现,开发能全盘了解各项技术细节。出问题能快速定位。
最终技术选型为Openresty lua接收数据、filebeat传输数据、kafka存储数据。数据流程如下:
因为数据接收到存入Kafka是需要经过网络,可预见的下游故障有:网络故障,kafka故障。即使Openresty异步写能在内存缓存数据,但内存大小毕竟有限,一旦下游故障时间长一点,内存很快就放不下。所以为了高可用,实现了数据先落盘,再异步传到kafka。数据接收、传输存储两者解耦。
OpenResty并未提供数据写文件功能。唯一与文件相关的函数是ngx.log(),但这主要用于记录日志,而不适用于写入数据文件。在网上找到的解决方案大多是在接收数据请求时,调用lua的io文件处理函数,连续调用open、write、flush、close函数。虽然可行,但效率低下,每个请求都需要反复打开和关闭文件。
由于OpenResty采用多进程处理高并发的架构设计,实现高性能写入数据文件功能需要解决三大问题:
1.如何在高并发下避免写入同一文件导致数据混乱;
2.如何在高并发下高效地完成写入数据文件;
3如何在文件切割过程中,让业务平稳无感,不受影响;
为了解决这些问题,我们设计了一套OpenResty高性能写入数据文件的解决方案。
首先,我们利用lua_shared_dict开辟一块共享内存,供各个worker共享,以此实现排它锁,协调不同worker并发写入同一文件,从而解决数据混乱问题。核心代码如下:
在nginx.conf 文件,增加"lua_shared_dict file_locks 1m;"
在业务系统lua文件里,这样使用:
............
local lock, err = resty_lock:new("file_locks")
if not lock then
ngx.log(ngx.WARN, "failed to create lock: " .. err)
end
............
local elapsed, err = lock:lock(topic)
if not elapsed then
ngx.log(ngx.WARN, "failed to lock: " .. err)
end
/*这里写业务逻辑*/
local ok, err = lock:unlock()
if not ok then
ngx.log(ngx.WARN, "failed to unlock: " .. err)
end
............
其次,我们利用init_worker_by_lua_block在worker启动时定义全局变量fileHandleList,用于存放打开文件的句柄,以避免同一文件被反复打开,以此提高效率。
核心代码如下:
在nginx.conf 文件,增加"init_worker_by_lua_block {fileHandleList = {}};"
在业务系统lua文件里,这样使用:
............
local filePath = "/data/" .. filename .. ".log"
if io.type(fileHandleList[filePath]) ~= 'file' then
fileHandleList[filePath] = assert(io.open(filePath, "a"))
end
local fileHandle = fileHandleList[filePath]
............
fileHandle:write(message, "\n")
fileHandle:flush()
............
最后,当数据文件写入到一定程度,需要进行切割,例如每天一个文件。这与日志切割的原理相同。OpenResty内置了USR1信号处理,收到信号会重新打开日志,但这只限于日志。对于我们实现的数据文件,需要用到HUP信号,即nginx -s reload。OpenResty会重新加载配置,并重新打开数据文件,从而实现在文件切割过程中,业务平稳无感,不受影响。
核心代码如下:
data_path="/openresty/nginx/data"
olddata_path="/openresty/nginx/olddata"
mkdir -p ${olddata_path}/$(date -d "yesterday" +"%Y%m%d")/
data_name=`ls ${data_path} |grep '\.json$' |awk -F'.json$' '{print $1}'|sort|uniq|xargs`
for loop in ${data_name}
do
mv ${data_path}/${loop}.json ${olddata_path}/$(date -d "yesterday" +"%Y%m%d"/${loop}.json
done
/openresty/nginx/sbin/nginx -s reload
在数据文件切割移动时,需要注意的是,mv的目标路径需要与原路径在同一文件系统(同一磁盘),这样文件的inode号不变,切割过程不会影响新数据的写入。
通过测试对比我们的方案对比网上方案,性能提升13.5倍。
扫码关注 了解更多