*Redis 模块中的阻塞命令

Redis 在内置命令集中有一些阻塞命令。其中最常用的是 BLPOP(或对称的 BRPOP),它会阻塞等待列表中的元素到达。

阻塞命令的一个有趣事实是,它们不会阻塞整个服务器,而只会阻塞调用它们的客户端。通常阻塞的原因是,我们期望某些外部事件发生:这可能是 Redis 数据结构中的某些变化,如 BLPOP 的情况,线程中发生的长时间计算,从网络接收某些数据等等。

Redis 模块也能够实现阻塞命令,本文档展示了 API 的工作原理,并描述了一些可用于建模阻塞命令的模式。

注意:此 API 目前为 实验性,因此只有在定义了宏 REDISMODULE_EXPERIMENTAL_API 时才能使用。这是必需的,因为这些调用仍处于设计的最终阶段,因此可能会在未来发生变化,某些部分可能会被弃用等等。

要使用模块 API 的这一部分,请像这样包含模块头文件:

#define REDISMODULE_EXPERIMENTAL_API
#include "redismodule.h"

*阻塞和恢复的工作原理

注意:你可能想查看 Redis 源码树中 src/modules 目录下的 helloblock.c 示例,以了解如何应用阻塞 API 的简单易懂示例。

在 Redis 模块中,命令由 Redis 核心在用户调用特定命令时调用的回调函数实现。通常回调通过向客户端发送一些回复来终止其执行。使用以下函数,实现模块命令的函数可以请求将客户端置于阻塞状态:

RedisModuleBlockedClient *RedisModule_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms);

该函数返回一个 RedisModuleBlockedClient 对象,稍后用于解除客户端阻塞。参数含义如下:

  • ctx 是命令执行上下文,与 API 其余部分一样。
  • reply_callback 是回调函数,具有与普通命令函数相同的原型,在客户端被解除阻塞时调用,以便向客户端返回回复。
  • timeout_callback 是回调函数,具有与普通命令函数相同的原型,在客户端达到 ms 超时时调用。
  • free_privdata 是为了释放私有数据而调用的回调函数。私有数据是指针,指向在用于解除阻塞客户端的 API 与将回复发送给客户端的回调之间传递的某些数据。我们将在本文档后面看到这个机制的工作原理。
  • ms 是以毫秒为单位的超时。当达到超时时,将调用超时回调函数,客户端将自动中止。

一旦客户端被阻塞,可以使用以下 API 解除阻塞:

int RedisModule_UnblockClient(RedisModuleBlockedClient *bc, void *privdata);

该函数以先前调用 RedisModule_BlockClient() 返回的阻塞客户端对象为参数,并解除客户端阻塞。在客户端被解除阻塞之前,客户端被阻塞时指定的 reply_callback 函数会被调用:此函数可以访问此处使用的 privdata 指针。

重要:上述函数是线程安全的,可以在执行某些工作的线程中调用,以实现阻塞客户端的命令。

当客户端被解除阻塞时,privdata 数据将自动使用 free_privdata 回调函数释放。这很有用 因为回复回调可能永远不会被调用(如果客户端超时或断开与服务器连接),因此由外部函数负责在需要时释放传递的数据。

为了更好地理解 API 的工作原理,我们可以想象编写一个阻塞客户端一秒钟,然后发送 "Hello!" 作为回复的命令。

注意:为了示例简单,未实现元数检查和其他不重要的内容。

int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
                         int argc)
{
    RedisModuleBlockedClient *bc =
        RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);

    pthread_t tid;
    pthread_create(&tid,NULL,threadmain,bc);

    return REDISMODULE_OK;
}

void *threadmain(void *arg) {
    RedisModuleBlockedClient *bc = arg;

    sleep(1); /* 等待一秒钟然后解除阻塞。 */
    RedisModule_UnblockClient(bc,NULL);
}

上述命令尽快阻塞客户端,生成一个线程,该线程等待一秒钟并解除客户端阻塞。让我们检查回复和超时回调,在我们的情况下它们非常相似,因为它们只是用不同的回复类型回复客户端。

int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv,
               int argc)
{
    return RedisModule_ReplyWithSimpleString(ctx,"Hello!");
}

int timeout_func(RedisModuleCtx *ctx, RedisModuleString **argv,
               int argc)
{
    return RedisModule_ReplyWithNull(ctx);
}

回复回调只是向客户端发送 "Hello!" 字符串。重要的是,回复回调在客户端从线程中解除阻塞时被调用。

超时命令返回 NULL,这在实际的 Redis 阻塞命令超时时经常发生。

*解除阻塞时传递回复数据

上面的示例简单易懂,但缺乏实际阻塞命令实现的一个重要方面:通常回复函数需要知道回复客户端什么,而这信息通常在客户端被解除阻塞时提供。

我们可以修改上面的示例,使线程在等待一秒钟后生成一个随机数。你可以将其视为某种昂贵的操作。然后这个随机数可以传递给回复函数,以便我们将其返回给命令调用者。为了实现这一点,我们修改函数如下:

void *threadmain(void *arg) {
    RedisModuleBlockedClient *bc = arg;

    sleep(1); /* 等待一秒钟然后解除阻塞。 */

    long *mynumber = RedisModule_Alloc(sizeof(long));
    *mynumber = rand();
    RedisModule_UnblockClient(bc,mynumber);
}

如你所见,现在解除阻塞调用传递了一些私有数据,即 mynumber 指针,给回复回调。为了获得这些私有数据,回复回调将使用以下函数:

void *RedisModule_GetBlockedClientPrivateData(RedisModuleCtx *ctx);

因此我们的回复回调被修改成这样:

int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv,
               int argc)
{
    long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
    /* 重要:不要在这里释放 mynumber,而是在
     * free privdata 回调中释放。 */
    return RedisModule_ReplyWithLongLong(ctx,mynumber);
}

注意,在使用 RedisModule_BlockClient() 阻塞客户端时,我们还需要传递一个 free_privdata 函数,因为分配的 long 值必须被释放。我们的回调将如下所示:

void free_privdata(void *privdata) {
    RedisModule_Free(privdata);
}

注意:需要强调的是,私有数据最好在 free_privdata 回调中释放,因为如果客户端断开连接或超时,回复函数可能不会被调用。

还要注意,私有数据也可以从超时回调中访问,始终使用 GetBlockedClientPrivateData() API。

*中止客户端的阻塞

有时会出现一个问题,即我们需要分配资源来实现非阻塞命令。因此我们先阻塞客户端,然后例如尝试创建一个线程,但线程创建函数返回错误。在这种情况下,为了恢复,我们该怎么办?我们不想让客户端保持阻塞状态,也不想调用 UnblockClient(),因为这会触发回复回调被调用。

在这种情况下,最好的做法是使用以下函数:

int RedisModule_AbortBlock(RedisModuleBlockedClient *bc);

实际使用方式如下:

int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
                         int argc)
{
    RedisModuleBlockedClient *bc =
        RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);

    pthread_t tid;
    if (pthread_create(&tid,NULL,threadmain,bc) != 0) {
        RedisModule_AbortBlock(bc);
        RedisModule_ReplyWithError(ctx,"抱歉,无法创建线程");
    }

    return REDISMODULE_OK;
}

客户端将被解除阻塞,但不会调用回复回调。

*使用单个函数实现命令、回复和超时回调

以下函数可用于使用实现主命令函数的同一个函数来实现回复和回调:

int RedisModule_IsBlockedReplyRequest(RedisModuleCtx *ctx);
int RedisModule_IsBlockedTimeoutRequest(RedisModuleCtx *ctx);

因此我可以重写示例命令,而不使用单独的回复和超时回调:

int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
                         int argc)
{
    if (RedisModule_IsBlockedReplyRequest(ctx)) {
        long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
        return RedisModule_ReplyWithLongLong(ctx,mynumber);
    } else if (RedisModule_IsBlockedTimeoutRequest) {
        return RedisModule_ReplyWithNull(ctx);
    }

    RedisModuleBlockedClient *bc =
        RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);

    pthread_t tid;
    if (pthread_create(&tid,NULL,threadmain,bc) != 0) {
        RedisModule_AbortBlock(bc);
        RedisModule_ReplyWithError(ctx,"抱歉,无法创建线程");
    }

    return REDISMODULE_OK;
}

功能上是相同的,但有些人会喜欢将大部分命令逻辑集中在单个函数中的不太冗长的实现。

*在线程内处理数据副本

一个有趣的线程模式,用于实现命令的慢速部分,是处理数据的副本,以便在对某个键执行某些操作时,用户继续看到旧版本。然而当线程完成其工作时,表示会被交换,新的、处理过的版本会被使用。

这种方法的一个例子是 Neural Redis 模块,其中神经网络在不同的线程中训练,而用户仍然可以执行和检查它们的旧版本。

*未来工作

目前正在进行一项 API 工作,以允许 Redis 模块 API 以线程安全的方式被调用,以便线程化命令可以访问数据空间并执行增量操作。

此功能没有预计发布时间,但可能会在 Redis 4.0 发布过程中的某个时候出现。