WEBKT

PostgreSQL 性能优化:手撸一个高性能行级触发器扩展

190 0 0 0

你好,我是那个喜欢折腾的程序员老王。

咱们今天来聊聊 PostgreSQL 里的触发器。你肯定用过触发器,这玩意儿在数据变更时自动执行一些操作,挺方便的。但,你有没有遇到过这种情况:数据批量更新时,触发器导致性能急剧下降?特别是行级触发器,每改一行数据就触发一次,简直是性能杀手!

别担心,今天咱们就来一起手撸一个 PostgreSQL 扩展,实现一个高性能的行级触发器。通过批量缓存变更数据、异步写入等方式,大幅减少触发器带来的开销,让你的数据库飞起来!

为什么需要高性能触发器?

在深入代码之前,咱们先来聊聊为什么需要高性能触发器。PostgreSQL 原生的触发器在处理大量数据变更时,性能瓶颈主要体现在以下几个方面:

  1. 频繁的上下文切换: 行级触发器为每一行数据的变更都会触发一次,这意味着数据库需要在 PostgreSQL 主进程和触发器函数执行环境之间频繁切换。这种切换是有开销的,尤其是在高并发场景下。
  2. 同步执行: 默认情况下,触发器是同步执行的。也就是说,在触发器函数执行完成之前,数据变更操作会被阻塞。如果触发器函数执行时间较长,会严重影响数据库的整体性能。
  3. 缺乏批量处理: 原生触发器没有批量处理机制,每次只处理一行数据。如果触发器函数内部需要执行一些复杂的操作,比如写入其他表、调用外部服务等,那么每一行数据都会触发一次这些操作,效率极低。

为了解决这些问题,我们需要一个高性能的触发器。这个触发器应该具备以下特性:

  • 批量处理: 将多次数据变更合并成一批进行处理,减少触发器函数的调用次数。
  • 异步执行: 将触发器函数的执行与数据变更操作分离,避免阻塞主进程。
  • 可配置: 提供灵活的配置选项,允许用户根据实际需求调整触发器的行为。

设计思路

我们的高性能触发器扩展将采用以下设计思路:

  1. 共享内存缓冲区: 使用 PostgreSQL 的共享内存机制,创建一个缓冲区用于存储变更的数据。所有的数据变更操作都会先将变更数据写入缓冲区,而不是直接触发触发器函数。
  2. 后台工作进程 (Background Worker): 创建一个后台工作进程,负责从缓冲区读取变更数据,并批量执行触发器函数。后台工作进程与 PostgreSQL 主进程异步执行,不会阻塞数据变更操作。
  3. 信号机制: 当缓冲区中的数据达到一定数量或一定时间间隔后,后台工作进程会向 PostgreSQL 主进程发送信号,通知主进程有新的数据需要处理。主进程收到信号后,会唤醒后台工作进程。
  4. 自定义触发器函数: 提供一个自定义的触发器函数,用于替换原生的触发器函数。这个自定义函数会将变更数据写入共享内存缓冲区,而不是直接执行用户定义的逻辑。
  5. 控制函数: 提供创建/销毁 触发器, 启动/停止 后台工作进程的控制函数. 便于使用.

代码实现 (C 语言)

下面,我们将逐步实现这个高性能触发器扩展。为了简化代码,我们只实现核心功能,并省略一些错误处理和边界检查。

1. 定义扩展

首先,我们需要定义一个 PostgreSQL 扩展。创建一个名为 pg_hll_trigger 的目录,并在其中创建以下文件:

  • pg_hll_trigger.control:
# pg_hll_trigger extension
comment = 'High-performance row-level trigger for PostgreSQL'
default_version = '1.0'
module_pathname = '$libdir/pg_hll_trigger'
relocatable = true
  • pg_hll_trigger--1.0.sql:
-- 定义共享内存结构
CREATE TYPE hll_trigger_data AS (
    tg_op   text,
    tg_table_name  text,
    new_row jsonb,
    old_row jsonb
);

-- 创建控制函数
CREATE FUNCTION hll_trigger_create(trigger_name text, table_name text, function_name text, buffer_size int DEFAULT 100, flush_interval int DEFAULT 10) RETURNS void
AS '$libdir/pg_hll_trigger', 'hll_trigger_create'
LANGUAGE C STRICT;

CREATE FUNCTION hll_trigger_drop(trigger_name text) RETURNS void
AS '$libdir/pg_hll_trigger', 'hll_trigger_drop'
LANGUAGE C STRICT;

CREATE FUNCTION hll_trigger_start() RETURNS void 
AS '$libdir/pg_hll_trigger','hll_trigger_start'
LANGUAGE C STRICT;

CREATE FUNCTION hll_trigger_stop() RETURNS void 
AS '$libdir/pg_hll_trigger','hll_trigger_stop'
LANGUAGE C STRICT;

-- 内部使用的触发器函数
CREATE FUNCTION hll_trigger_handler() RETURNS trigger
AS '$libdir/pg_hll_trigger', 'hll_trigger_handler'
LANGUAGE C STRICT;

2. 编写 C 代码

创建一个名为 pg_hll_trigger.c 的文件,并编写以下 C 代码:

#include "postgres.h"
#include "fmgr.h"
#include "funcapi.h"
#include "executor/spi.h"
#include "commands/trigger.h"
#include "utils/memutils.h"
#include "utils/guc.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/shmem.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "postmaster/bgworker.h"
#include "access/xact.h"
#include <time.h>

PG_MODULE_MAGIC;

// 共享内存结构
typedef struct {
    LWLock  lock;           // 轻量级锁,用于保护缓冲区
    int     buffer_size;    // 缓冲区大小(记录条数)
    int     flush_interval; // 刷新间隔(秒)
    int     count;          // 当前缓冲区中的记录数
    time_t  last_flush;    // 上次刷新的时间
    char    data[FLEXIBLE_ARRAY_MEMBER]; // 存储触发器数据的数组. hll_trigger_data
} SharedBuffer;

static SharedBuffer *shared_buffer = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static bool hll_trigger_worker_running = false;


// 获取共享内存中记录的结构体
static inline hll_trigger_data* get_hll_trigger_data(int index) {
  return (hll_trigger_data*)(shared_buffer->data + index * sizeof(hll_trigger_data));
}

void hll_trigger_start(void);
void hll_trigger_stop(void);

// 共享内存初始化
static void hll_trigger_shmem_startup(void);

// 后台工作进程主函数
void hll_trigger_main(Datum main_arg);

// 信号处理函数
static void hll_trigger_sigterm(SIGNAL_ARGS);


// 扩展初始化函数
void _PG_init(void) {
    if (!process_shared_preload_libraries_in_progress) {
        return;
    }

    RequestAddinShmemSpace(sizeof(SharedBuffer));
    RequestNamedLWLockTranche("hll_trigger", 1);

    prev_shmem_startup_hook = shmem_startup_hook;
    shmem_startup_hook = hll_trigger_shmem_startup;

     BackgroundWorker worker = {
        .bgw_name = "HLL Trigger Worker",
        .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
        .bgw_start_time = BgWorkerStart_ConsistentState,
        .bgw_restart_time = BGW_NEVER_RESTART, // 发生错误不自动重启, 简化流程.
        .bgw_main = hll_trigger_main,
        .bgw_notify_pid = 0
    };

    RegisterBackgroundWorker(&worker);

}

// 共享内存初始化
static void hll_trigger_shmem_startup(void) {
    bool found;

    if (prev_shmem_startup_hook) {
        prev_shmem_startup_hook();
    }

    shared_buffer = (SharedBuffer *)ShmemInitStruct("HLL Trigger Buffer", sizeof(SharedBuffer), &found);

    if (!found) {
        LWLockInitialize(&shared_buffer->lock, 0);
        shared_buffer->buffer_size = 100; // 默认缓冲区大小
        shared_buffer->flush_interval = 10; // 默认刷新间隔
        shared_buffer->count = 0;
        shared_buffer->last_flush = time(NULL);
    }
}


PG_FUNCTION_INFO_V1(hll_trigger_create);
Datum hll_trigger_create(PG_FUNCTION_ARGS) {

    text *trigger_name_text = PG_GETARG_TEXT_P(0);
    text *table_name_text = PG_GETARG_TEXT_P(1);
    text *function_name_text = PG_GETARG_TEXT_P(2);
    int32 buffer_size = PG_GETARG_INT32(3);
    int32 flush_interval = PG_GETARG_INT32(4);

    char *trigger_name = text_to_cstring(trigger_name_text);
    char *table_name = text_to_cstring(table_name_text);
    char *function_name = text_to_cstring(function_name_text);

    if(!hll_trigger_worker_running){
        ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("hll_trigger worker is not running, please call hll_trigger_start()")));
    }

    // 检查是否已经存在同名触发器
    if (SPI_connect() != SPI_OK_CONNECT) {
        ereport(ERROR, (errmsg("SPI_connect failed")));
    }

    char query[512];
    snprintf(query, sizeof(query), "SELECT 1 FROM pg_trigger WHERE tgname = '%s'", trigger_name);

    if (SPI_execute(query, true, 0) != SPI_OK_SELECT) {
         ereport(ERROR, (errmsg("SPI_execute failed")));
    }

    if(SPI_processed > 0 ) {
        ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("trigger \"%s\" already exists", trigger_name)));
    }

    // 修改共享内存配置
    LWLockAcquire(&shared_buffer->lock, LW_EXCLUSIVE);
    shared_buffer->buffer_size = buffer_size;
    shared_buffer->flush_interval = flush_interval;
    LWLockRelease(&shared_buffer->lock);


    // 创建触发器
    snprintf(query, sizeof(query),
             "CREATE TRIGGER %s AFTER INSERT OR UPDATE OR DELETE ON %s FOR EACH ROW EXECUTE PROCEDURE hll_trigger_handler()",
             trigger_name, table_name);


     if (SPI_execute(query, false, 0) != SPI_OK_UTILITY) {
          ereport(ERROR, (errmsg("SPI_execute failed")));
     }

    //记录function_name, 在后台进程中使用.

    SPI_finish();

    PG_RETURN_VOID();
}


PG_FUNCTION_INFO_V1(hll_trigger_drop);
Datum hll_trigger_drop(PG_FUNCTION_ARGS) {
    text *trigger_name_text = PG_GETARG_TEXT_P(0);
    char *trigger_name = text_to_cstring(trigger_name_text);

    if (SPI_connect() != SPI_OK_CONNECT) {
         ereport(ERROR, (errmsg("SPI_connect failed")));
    }

    // 删除触发器
    char query[256];
    snprintf(query, sizeof(query), "DROP TRIGGER IF EXISTS %s ON %s", trigger_name,quote_ident("table_xxx")); //quote_ident 防止sql注入

     if (SPI_execute(query, false, 0) != SPI_OK_UTILITY) {
         ereport(ERROR, (errmsg("SPI_execute failed")));
     }

    SPI_finish();

    PG_RETURN_VOID();
}

PG_FUNCTION_INFO_V1(hll_trigger_handler);
Datum hll_trigger_handler(PG_FUNCTION_ARGS) {
    TriggerData *trigdata = (TriggerData *) fcinfo->context;
    HeapTuple   new_row = trigdata->tg_newtuple;
    HeapTuple   old_row = trigdata->tg_trigtuple;

    if (!CALLED_AS_TRIGGER(fcinfo)) {
        ereport(ERROR, (errmsg("not called as trigger")));
    }

    // 获取操作类型
    char *tg_op;
    switch (trigdata->tg_event & TRIGGER_EVENT_OPMASK) {
        case TRIGGER_EVENT_INSERT:
            tg_op = "INSERT";
            break;
        case TRIGGER_EVENT_UPDATE:
            tg_op = "UPDATE";
            break;
        case TRIGGER_EVENT_DELETE:
            tg_op = "DELETE";
            break;
        default:
            tg_op = "UNKNOWN";
    }


    LWLockAcquire(&shared_buffer->lock, LW_EXCLUSIVE);

    // 检查缓冲区是否已满
    if (shared_buffer->count >= shared_buffer->buffer_size) {
         LWLockRelease(&shared_buffer->lock);
          //这里可以根据需要选择是否阻塞. 简化处理,直接返回.
         ereport(WARNING,(errmsg("hll_trigger buffer is full, discarding trigger data")));
        return PointerGetDatum(NULL);
    }

    // 将数据写入缓冲区
    hll_trigger_data* trigger_data =  get_hll_trigger_data(shared_buffer->count);


    trigger_data->tg_op = pstrdup(tg_op);
    trigger_data->tg_table_name = pstrdup(SPI_getrelname(trigdata->tg_relation));

    if (new_row) {
      trigger_data->new_row = SPI_rowtojsonb(new_row);
    } else {
      trigger_data->new_row = NULL;
    }

     if (old_row) {
        trigger_data->old_row = SPI_rowtojsonb(old_row);
    } else {
        trigger_data->old_row = NULL;
    }

    shared_buffer->count++;

    LWLockRelease(&shared_buffer->lock);

    return PointerGetDatum(NULL);
}


// 后台工作进程主函数
void hll_trigger_main(Datum main_arg) {
    pqsignal(SIGTERM, hll_trigger_sigterm);

    BackgroundWorkerUnblockSignals();

    BackgroundWorkerInitializeConnection("postgres", NULL,0);

    while (true) {
        CHECK_FOR_INTERRUPTS();
        // 检查是否需要退出
        if (IsBackgroundWorkerShutdownRequested()) {
          proc_exit(0);
        }

        LWLockAcquire(&shared_buffer->lock, LW_SHARED);
        int count = shared_buffer->count;
        time_t last_flush = shared_buffer->last_flush;
        LWLockRelease(&shared_buffer->lock);

        // 检查是否需要刷新缓冲区
        if (count > 0 && (count >= shared_buffer->buffer_size || (time(NULL) - last_flush) >= shared_buffer->flush_interval)) {
            // 复制数据
            hll_trigger_data *data_copy = (hll_trigger_data *) palloc(count * sizeof(hll_trigger_data));

             LWLockAcquire(&shared_buffer->lock, LW_EXCLUSIVE);
              for(int i=0; i< count; i++){
                data_copy[i].tg_op = pstrdup(get_hll_trigger_data(i)->tg_op);
                data_copy[i].tg_table_name = pstrdup(get_hll_trigger_data(i)->tg_table_name);
                data_copy[i].new_row = get_hll_trigger_data(i)->new_row ? jsonb_copy(get_hll_trigger_data(i)->new_row): NULL;
                data_copy[i].old_row = get_hll_trigger_data(i)->old_row ? jsonb_copy(get_hll_trigger_data(i)->old_row) : NULL;
              }
              shared_buffer->count = 0;
              shared_buffer->last_flush = time(NULL);
            LWLockRelease(&shared_buffer->lock);

            // 处理数据 (这里只是简单的打印, 应该调用用户定义的函数)

            if (SPI_connect() != SPI_OK_CONNECT) {
                ereport(ERROR, (errmsg("SPI_connect failed")));
            }

            for (int i = 0; i < count; i++) {
                ereport(LOG,(
                    errmsg("table: %s, op: %s, new: %s, old: %s",
                        data_copy[i].tg_table_name,
                        data_copy[i].tg_op,
                        data_copy[i].new_row ? TextDatumGetCString(JsonbToCString(NULL,data_copy[i].new_row, -1)): "null",
                        data_copy[i].old_row ? TextDatumGetCString(JsonbToCString(NULL,data_copy[i].old_row, -1)) : "null"
                    )
                ));
                //TODO: 调用用户函数.
                //pfree(data_copy[i]);
            }

            SPI_finish();

            pfree(data_copy);
        }

        // 休眠一段时间
        pg_usleep(1000000L); // 1 second
    }
}

// 信号处理函数, 处理关闭信号.
static void hll_trigger_sigterm(SIGNAL_ARGS) {
    int save_errno = errno;
    SetLatch(MyLatch);
    if(MyProc) {
      MyProc->procLatch = save_errno;
    }

}

PG_FUNCTION_INFO_V1(hll_trigger_start);
Datum hll_trigger_start(PG_FUNCTION_ARGS) {
  if(hll_trigger_worker_running) {
    ereport(WARNING, (errmsg("hll_trigger worker already running")));
    PG_RETURN_VOID();
  }

   BackgroundWorker worker = {
        .bgw_name = "HLL Trigger Worker",
        .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
        .bgw_start_time = BgWorkerStart_ConsistentState,
        .bgw_restart_time = BGW_NEVER_RESTART, // 发生错误不自动重启, 简化流程.
        .bgw_main = hll_trigger_main,
        .bgw_notify_pid = 0
    };

    RegisterBackgroundWorker(&worker);
    hll_trigger_worker_running = true;
    PG_RETURN_VOID();
}


PG_FUNCTION_INFO_V1(hll_trigger_stop);
Datum hll_trigger_stop(PG_FUNCTION_ARGS) {
   //TODO: 向后台进程发送关闭信号.
   hll_trigger_worker_running = false;
    PG_RETURN_VOID();
}

3. 编译和安装

要编译这个扩展,你需要安装 PostgreSQL 开发包,并确保 pg_config 在你的 PATH 中。然后,你可以使用以下命令编译和安装扩展:

make
make install

4. 使用扩展

安装扩展后,你可以在 PostgreSQL 中使用以下步骤创建和使用高性能触发器:

  1. 加载扩展:
CREATE EXTENSION pg_hll_trigger;
  1. 启动后台工作进程:
SELECT hll_trigger_start();
  1. 创建触发器:
-- 假设你有一个名为 my_table 的表,和一个名为 my_trigger_function 的触发器函数
CREATE TABLE my_table (id serial, data text);

CREATE OR REPLACE FUNCTION my_trigger_function() RETURNS trigger AS $$
BEGIN
    -- 这里是你的触发器逻辑
    RAISE NOTICE 'Trigger function called for table % with operation %', TG_TABLE_NAME, TG_OP;
     IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN
       RAISE NOTICE 'NEW: %', row_to_json(NEW);
    END IF;
     IF TG_OP = 'DELETE' OR TG_OP = 'UPDATE' THEN
        RAISE NOTICE 'OLD: %', row_to_json(OLD);
     END IF;
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

-- 使用 hll_trigger_create 创建高性能触发器, 1000条 或 60s 触发
SELECT hll_trigger_create('my_hll_trigger', 'my_table', 'my_trigger_function',1000, 60);
  1. 测试触发器:
-- 插入一些数据
INSERT INTO my_table (data) VALUES ('data1'), ('data2'), ('data3');

-- 更新一些数据
UPDATE my_table SET data = 'updated_data' WHERE id = 1;

-- 删除一些数据
DELETE FROM my_table WHERE id = 2;

-- 查看日志输出, 确认触发器是否被批量执行
  1. 停止后台进程
SELECT hll_trigger_stop();
  1. 删除触发器:
SELECT hll_trigger_drop('my_hll_trigger');

总结

通过以上步骤,我们就实现了一个简单的 PostgreSQL 高性能行级触发器扩展。这个扩展通过共享内存缓冲区、后台工作进程和自定义触发器函数,实现了批量处理和异步执行,从而减少了触发器带来的性能开销。

当然,这只是一个简单的示例,还有很多可以优化和完善的地方,比如:

  • 更精细的锁机制: 使用更精细的锁机制,减少锁竞争,提高并发性能。
  • 错误处理: 添加更完善的错误处理机制,提高扩展的健壮性。
  • 动态注册/注销后台进程: 根据需要动态注册和注销后台进程,减少资源占用。
  • 支持更多触发器事件: 支持更多的触发器事件,比如 TRUNCATE 等。
  • 用户自定义函数调用: 目前只是打印日志, 需要修改代码, 调用用户创建触发器时传入的函数.
  • 完善 hll_trigger_stop 函数: 停止后台工作进程, 避免异常.

希望这个示例能帮助你更好地理解 PostgreSQL 触发器的工作原理,并为你的数据库性能优化提供一些思路。记住,性能优化是一个持续的过程,需要不断地分析和调整。如果你有任何问题或建议,欢迎随时交流!

爱折腾的老王 PostgreSQL触发器性能优化

评论点评