阿里云开发者社区,千万开发者的选择
阿里云开发者社区,百万精品技术内容、千节免费系统课程、丰富的体验场景、活跃的社群活动、行业专家分享交流,欢迎点击【阅读原文】加入我们。
阿里妹导读
简介
原理
1.Redis内核会暴露出/导出很多API给module使用(如内存分配接口、redis核心db结构的操作接口),注意这些API是redis自己解析绑定的,而不是靠动态连接器解析的。
加载
int moduleLoad(const char *path, void **module_argv, int module_argc, int is_loadex) {
int (*onload)(void *, void **, int);
void *handle;
struct stat st;
if (stat(path, &st) == 0) {
/* This check is best effort */
if (!(st.st_mode & (S_IXUSR | S_IXGRP | S_IXOTH))) {
serverLog(LL_WARNING, "Module %s failed to load: It does not have execute permissions.", path);
return C_ERR;
}
}
// 打开module so
handle = dlopen(path,RTLD_NOW|RTLD_LOCAL);
if (handle == NULL) {
serverLog(LL_WARNING, "Module %s failed to load: %s", path, dlerror());
return C_ERR;
}
// 获取module中的onload函数符号地址
onload = (int (*)(void *, void **, int))(unsigned long) dlsym(handle,"RedisModule_OnLoad");
if (onload == NULL) {
dlclose(handle);
serverLog(LL_WARNING,
"Module %s does not export RedisModule_OnLoad() "
"symbol. Module not loaded.",path);
return C_ERR;
}
RedisModuleCtx ctx;
moduleCreateContext(&ctx, NULL, REDISMODULE_CTX_TEMP_CLIENT); /* We pass NULL since we don't have a module yet. */
// 调用onload对module进行初始化
if (onload((void*)&ctx,module_argv,module_argc) == REDISMODULE_ERR) {
serverLog(LL_WARNING,
"Module %s initialization failed. Module not loaded",path);
if (ctx.module) {
moduleUnregisterCommands(ctx.module);
moduleUnregisterSharedAPI(ctx.module);
moduleUnregisterUsedAPI(ctx.module);
moduleRemoveConfigs(ctx.module);
moduleFreeModuleStructure(ctx.module);
}
moduleFreeContext(&ctx);
dlclose(handle);
return C_ERR;
}
/* Redis module loaded! Register it. */
//... 无关代码省略 ...
moduleFreeContext(&ctx);
return C_OK;
}
API 绑定
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (RedisModule_Init(ctx, "helloworld", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
// ... 无关代码省略 ...
}
static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) {
void *getapifuncptr = ((void**)ctx)[0];
RedisModule_GetApi = (int (*)(const char *, void *)) (unsigned long)getapifuncptr;
// 绑定redis导出的api
REDISMODULE_GET_API(Alloc);
REDISMODULE_GET_API(TryAlloc);
REDISMODULE_GET_API(Calloc);
REDISMODULE_GET_API(Free);
REDISMODULE_GET_API(Realloc);
REDISMODULE_GET_API(Strdup);
REDISMODULE_GET_API(CreateCommand);
REDISMODULE_GET_API(GetCommand);
// ... 无关代码省略 ...
}
RedisModule_GetApi("RedisModule_"
void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_flags) {
memset(out_ctx, 0 ,sizeof(RedisModuleCtx));
// 这里把GetApi地址传递给module
out_ctx->getapifuncptr = (void*)(unsigned long)&RM_GetApi;
out_ctx->module = module;
out_ctx->flags = ctx_flags;
// ... 无关代码省略 ...
}
void *getapifuncptr = ((void**)ctx)[0];
RedisModule_GetApi = (int (*)(const char *, void *)) (unsigned long)getapifuncptr;
struct RedisModuleCtx {
// getapifuncptr是第一个成员
void *getapifuncptr; /* NOTE: Must be the first field. */
struct RedisModule *module; /* Module reference. */
client *client; /* Client calling a command. */
// ... 无关代码省略 ...
};
搞清楚了RM_GetApi是怎么被导出的原理后,我们来接着看下RM_GetApi内部在做什么:
int RM_GetApi(const char *funcname, void **targetPtrPtr) {
/* Lookup the requested module API and store the function pointer into the
* target pointer. The function returns REDISMODULE_ERR if there is no such
* named API, otherwise REDISMODULE_OK.
*
* This function is not meant to be used by modules developer, it is only
* used implicitly by including redismodule.h. */
dictEntry *he = dictFind(server.moduleapi, funcname);
if (!he) return REDISMODULE_ERR;
*targetPtrPtr = dictGetVal(he);
return REDISMODULE_OK;
}
/* Register all the APIs we export. Keep this function at the end of the
* file so that's easy to seek it to add new entries. */
void moduleRegisterCoreAPI(void) {
server.moduleapi = dictCreate(&moduleAPIDictType);
server.sharedapi = dictCreate(&moduleAPIDictType);
// 向全局哈希表中注册函数
REGISTER_API(Alloc);
REGISTER_API(TryAlloc);
REGISTER_API(Calloc);
REGISTER_API(Realloc);
REGISTER_API(Free);
REGISTER_API(Strdup);
REGISTER_API(CreateCommand);
// ... 无关代码省略 ...
}
int moduleRegisterApi(const char *funcname, void *funcptr) {
return dictAdd(server.moduleapi, (char*)funcname, funcptr);
}
moduleRegisterApi("RedisModule_"
一些最佳实践
入口函数禁用c++ mangle
extern "C" __attribute__((visibility("default"))) int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
// Init code and command register
return REDISMODULE_OK;
}
接管内存统计
REDISMODULE_API void * (*RedisModule_Alloc)(size_t bytes) REDISMODULE_ATTR;
REDISMODULE_API void * (*RedisModule_Realloc)(void *ptr, size_t bytes) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_Free)(void *ptr) REDISMODULE_ATTR;
REDISMODULE_API void * (*RedisModule_Calloc)(size_t nmemb, size_t size) REDISMODULE_ATTR;
new/operator new/placement new
1.分配空间(使用operator new)
2.初始化对象(使用placement new或者类型强转),即调用对象的构造函数
void * operator new(size_t, void *location) {
return location;
}
可见,要想实现修改new默认使用的内存分配,我们可以使用两种方式。
Object *p=(Object*)RedisModule_Alloc(sizeof(Object));
new (p)Object();
同时注意析构时也需要特殊处理:
p->~Object();
RedisModule_Free(p);
_GLIBCXX_WEAK_DEFINITION void *
operator new (std::size_t sz) _GLIBCXX_THROW (std::bad_alloc)
{
void *p;
/* malloc (0) is unpredictable; avoid it. */
if (sz == 0)
sz = 1;
while (__builtin_expect ((p = malloc (sz)) == 0, false))
{
new_handler handler = std::get_new_handler ();
if (! handler)
_GLIBCXX_THROW_OR_ABORT(bad_alloc());
handler ();
}
return p;
}
void *operator new(std::size_t size) {
return RedisModule_Alloc(size);
}
void operator delete(void *ptr) noexcept {
RedisModule_Free(ptr);
}
静态链接/动态链接c++标准库
使用block机制提高并发处理能力
图1 典型的异步处理模型
block虽然看上去很美好很强大,但是需要小心处理一些坑,如:
如果采用线程池,需要注意相同key在线程池中的保序执行问题(即相同key的处理不能乱序);
因为redis可以同时加载多个module,这些module可能来自不同的团队和个人,因此存在一定的概率,不同的module会定义相同的函数名。为了避免符号冲突导致的未定义行为,建议每个module都把除了Onload和Unload函数之外的符号都隐藏掉,可以在给编译器传递一些flag实现。如gcc:
-fvisibility=hidden
小心Fork陷阱
如果module采用异步执行模型(参看前文block一节),那么当redis做aofrewrite或bgsave时,在redis fork子进程的瞬间,如果还有一些命令处于inflight状态,那么此时新产生的base aof或者rdb可能并不会包含这些inflight时的数据,虽然这个看上去也没有太大问题,因为inflight的命令最终完成时也会把命令写入增量的aof中。但是,为了和redis原来的行为兼容(即fork时一定没有处于inflight状态的命令,是一个静止的状态),module最好还是保证所有的inflight状态的命令都执行完了再执行fork。
在module中可以通过redis暴露的RedisModuleEvent_ForkChild事件,在fork执行之前执行一个我们传入的回调函数。
RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_ForkChild, waitAllInflightTaskFinish);
比如在waitAllInflightTaskFinish中等待队列为空(即所有task都执行结束):
static void waitAllInflightTaskFinish() {
while (!thread_pool->idle())
;
}
int pthread_atfork(void (*prepare)(void), void (*parent)void(), void (*child)(void));
我们知道通过fork创建的一个子进程几乎但不完全与父进程相同。子进程得到与父进程用户级虚拟地址空间相同的(但是独立的)一份拷贝,包括文本、数据和bss段、堆以及用户栈等。子进程还获得与父进程任何打开文件描述符相同的拷贝,这就意味着子进程可以读写父进程中任何打开的文件,父进程和子进程之间最大的区别在于它们有着不同的PID。
但是有一点需要注意的是,在Linux中,fork的时候只复制当前线程到子进程,在fork(2)-Linux Man Page中有着这样一段相关的描述:
The child process is created with a single thread--the one that called fork(). The entire virtual address space of the parent is replicated in the child, including the states of mutexes, condition variables, and other pthreads objects; the use of pthread_atfork(3) may be helpful for dealing with problems that this can cause.
也就是说除了调用fork的线程外,其他线程在子进程中“蒸发”了。
因此,如果在一些异步线程中持有了一些资源的锁,那么在子进程中,因为这些线程消失了,那么子进程可能会发生死锁的问题。
解决方法和解决inflight一样,保证在fork之前所有的锁都释放掉即可。(其实只要所有inflight状态的命令都执行完了,一般锁也就都释放了)
确保向备库复制的AOF保持语义幂等
Redis的主备复制首要目标就是保证主备的一致性。因此备库要做的就是无条件接收来自主库的复制内容,并严格保持一致。但是对于一些比较特殊的命令而言,需要小心处理。
以Tair暴露的Tair String为例,支持给数据设置版本号,比如用户写入:
EXSET key value VER 10
EXSET key value ABS 11
支持graceful shutdown
Module内部可能会启动一些异步线程或者管理一些异步资源,这些资源需要在redis shutdown时被处理(如停止、析构、写磁盘等),否则redis在退出时可能发生coredump。
在redis中,可以注册RedisModuleEvent_Shutdown事件实现,当redis关机时会回调我们传入的ShutdownCallback。
当然,在较新的redis版本中,module也可以通过暴露unload函数来实现类似的功能。
RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_Shutdown, ShutdownCallback);
RedisModule_EmitAOF中如果包含array类型的参数(即使用'v' flag传递的参数),则array的长度一定要使用size_t类型,否则可能会遇到诡异的错误;
RDB编码具有向后兼容能力
RDB是二进制格式的序列化和反序列化,因此相对而言比较简单。但是需要注意的是,如果数据结构以后的序列化方式可能会改变,则最好加上编解码的版本,这样在升级的时候可以保证兼容性,如下:
void *xxx_RdbLoad(RedisModuleIO *rdb, int encver) {
if (encver == version1 ) {
/* version1 format */
} else if (encver == version2 ){
/* version2 format */
}
}
一些命令实现的建议
全局索引、结构:module中如果有自己维护的全局索引,需要谨慎索引中是否包含dbid、key等信息,因为redis的move、rename、swapdb等命令会“偷梁换柱”式的更换key的名字、交换两个dbid,因此此时如果索引没有同步更新,将得到意想不到的错误
根据角色来确定动作:module本身运行的redis可能是一个主也可能是一个备,module内部可以使用RedisModule_GetContextFlags来判断当前redis的角色,并根据不同的角色来采取不同的行为(如是否进行主动过期处理等)
总结
Tair当前支持了非常多的扩展数据结构(其中redis 5.x企业版使用module方式,Tair自研企业版 6.x使用builtin方式),基本涵盖了各种应用场景(具体见介绍文档),其中既有像TairString和TairHash等小而美的数据结构(已经开源),也有像Tair Search和Vector等更为复杂和强大的计算型数据结构,充分满足AIGC背景下各种业务场景,欢迎使用。
介绍文档:https://help.aliyun.com/zh/redis/developer-reference/extended-data-structures-of-apsaradb-for-redis-enhanced-edition
fork(2)-Linux Man Page:http://linux.die.net/man/2/fork
阿里云开发者社区,千万开发者的选择
阿里云开发者社区,百万精品技术内容、千节免费系统课程、丰富的体验场景、活跃的社群活动、行业专家分享交流,欢迎点击【阅读原文】加入我们。