Link Search Menu Expand Document

データ永続化

このチュートリアルでは、DBのデータに対する永続化の方法について紹介します。

サンプルコード

このチュートリアルで紹介するサンプルコードです。
サンプルコードのファイルは、こちらにあります。
永続化したDBについて、再度DBを参照する場合や、ファイル上のデータを参照したい場合は、こちらを参照ください。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/stat.h>
#include "speedbee.h"
#include "sdts_spcf.h"

/*
 * sample 11 データ永続化(HI MI LO)
 */

/* 固定値  */
#define COUNT_OF_ELEMENTS(arr) (sizeof(arr)/sizeof(arr[0]))
#define BSZ			4096
#define ONE_SEC    (1000000000LL)
#define HI_DATA_SIZE (1000)
#define DBDIR "sample11-12db.sdts"
const sdntime_t DURATION = ONE_SEC;
const sdntime_t BASE_TIME = 1589896352381000000LL;   // 2020-05-19T13:52:32.381Z


/* 静的グローバル変数  */
static sdtsdb_t db;
static int hi_data[HI_DATA_SIZE];
static int running = 0;
static struct { const char *name; const char *param; sdtscid_t cid; } col_params[] = {
  { "ch_hi", "COL_TYPE=H;DATA_SIZE=4;SMPL_RATE=10000;SAVE_COUNT=200000;WR_DATA=Yes", 0 },
  { "ch_mi", "COL_TYPE=M;DATA_SIZE=4;SMPL_RATE=200;SAVE_COUNT=8000;WR_DATA=Yes;MM_COUNT=300", 0 },
  { "ch_lo_fix", "COL_TYPE=L;DATA_SIZE=4;SAVE_COUNT=1000;WR_DATA=Yes", 0 },
  { "ch_lo_var", "COL_TYPE=L;SAVE_COUNT=1000;WR_DATA=Yes", 0 },
};

/* static関数プロトタイプ  */
static void insert_hi(sdtsdb_t db, sdtscid_t cid, uint64_t ts, uint64_t duration, double rate);
static void insert_mi(sdtsdb_t db, sdtscid_t cid, uint64_t ts, uint64_t duration, double rate);
static void insert_lo(sdtsdb_t db, sdtscid_t cid, uint64_t ts, uint64_t duration, double rate);
static void init_hi_data(void);
static void *sync_proc(void*p);
static void stopHandler(int sig);
static sdntime_t getBaseTime(void);

int checkDbExists(const char *dbdir) {
	struct stat st;
	if (stat(dbdir, &st) == 0 && S_ISDIR(st.st_mode)) {
		return 1;
	} else {
		return 0;
	}
}

int main(int argc, char *argv[])
{
	pthread_t thrid;
	sdntime_t ts;
	int db_exists;

	db_exists = checkDbExists(DBDIR);

	if (sd_init(NULL) < 0) {
		printf("error sd_init [%d]\n", sd_get_err());
		return 1;
	}
	printf("-- success sd_init --\n");

	if ((db = sdts_open_db("DB_PATH=" DBDIR)) == NULL) {
		printf("error sdts_open_db [%d]\n", sd_get_err());
		sd_end();
		return 1;
	}
	printf("-- success sdts_open_db --\n");

	if (db_exists) {
		col_params[0].cid = sdts_get_col(db, "ch_hi");
		col_params[1].cid = sdts_get_col(db, "ch_mi");
		col_params[2].cid = sdts_get_col(db, "ch_lo_fix");
		col_params[3].cid = sdts_get_col(db, "ch_lo_var");
	} else {
		for (int i=0; i < COUNT_OF_ELEMENTS(col_params); i++) {
			sdtscid_t cid;
			if ((cid = sdts_create_col(db, (sdid_t)col_params[i].name, (char *)col_params[i].param)) < 0) {
				printf("error sdts_create_col [%d]\n", sd_get_err());
				(void)sdts_close_db(db);
				sd_end();
				return 1;
			}
			col_params[i].cid = cid;
			printf("-- success sdts_create_col [%d] --\n", cid);
		}
	}


	// ^Cなどでキルされた場合に正常に終了する
	signal(SIGINT, stopHandler);
	signal(SIGTERM, stopHandler);

	// 同期スレッド起動
	running = true;
	if (pthread_create(&thrid, NULL, sync_proc, NULL)) {
		sd_end();
		exit(EXIT_FAILURE);
	}

	init_hi_data();

	// メインループ
	ts = getBaseTime();
	while (running) {
		insert_hi(db, col_params[0].cid, ts,              DURATION,   10000);
		insert_mi(db, col_params[1].cid, ts,              DURATION/2, 200);
		insert_mi(db, col_params[1].cid, ts+(DURATION/2), DURATION/2, 100);
		insert_lo(db, col_params[2].cid, ts,              DURATION,   10);
		insert_lo(db, col_params[3].cid, ts,              DURATION,   10);

		sleep(DURATION/ONE_SEC);
		ts += DURATION;
	}
	running = false;

	pthread_join(thrid, NULL);

	if (sdts_close_db(db) < 0) {
		printf("error sdts_close_db [%d]\n", sd_get_err());
		sd_end();
		return 1;
	}
	printf("-- success sdts_close_db --\n");

	if (sd_end() < 0) {
		printf("error sd_end [%d]\n", sd_get_err());
		return 1;
	}
	printf("-- success sd_end --\n");

	return 0;
}

static void init_hi_data()
{
	srandom(0);
	for (int i=0; i<HI_DATA_SIZE; i+=2) {
		hi_data[i+0] = i;
		hi_data[i+1] = random();
	}
}

static void insert_hi(sdtsdb_t db, sdtscid_t cid, uint64_t ts, uint64_t duration, double rate)
{
	int i;
	static int flag = 0;
	int count = duration * rate / ONE_SEC;

	for (i = 0; i < count; i+=HI_DATA_SIZE) {
		// 登録
		int size = (count-i<HI_DATA_SIZE)?count-1:HI_DATA_SIZE;
		if (sdts_insert(
					db, cid, (flag)?0:ts,
					(char *)&hi_data[0],
					size) != size) {
			printf("error sdts_insert hi [%d]\n", sd_get_err());
			(void)sdts_close_db(db);
			sd_end();
			exit(EXIT_FAILURE);
		}

		// 初回以降、タイムスタンプは0を指定する。
		// Hi において 0以外である場合、データの連続性を取り消し、
		// 再スタートの意味がある。 データのリセットする。
		flag = 1;
	}
	//printf("-- success sdts_insert hi [%d] --\n", i);
}

static void insert_mi(sdtsdb_t db, sdtscid_t cid, uint64_t ts, uint64_t duration, double rate)
{
	int i;

	// サンプリングレートを 200Hzに変更
	if (sdts_set_smpl_rate(db, (sdid_t)"ch_mi", rate) < 0) {
		printf("error sdts_set_smpl_rate[%d]\n", sd_get_err());
		sdts_close_db(db);
		sd_end();
		exit(EXIT_FAILURE);
	}

	// durationの時間分だけ追加
	int count = duration * rate / ONE_SEC;
	for (i=0; i < count; i++) {
		// 登録
		if (sdts_insert(db, cid, ts, (char *)&i, 1) != 1) {
			printf("error sdts_insert mi [%d]\n", sd_get_err());
			(void)sdts_close_db(db);
			sd_end();
			exit(EXIT_FAILURE);
		}
		ts = 0;
	}
	//printf("-- success sdts_insert mi [%d] --\n", i);
}

static void insert_lo(sdtsdb_t db, sdtscid_t cid, uint64_t ts, uint64_t duration, double rate)
{
	const uint64_t t = ONE_SEC / rate;
	const int count = duration * rate / ONE_SEC;
	int i;
	static int D = 0;

	for (i = 0; i < count; i++) {
		// 登録
		if (sdts_insert(db, cid, ts, (char *)&D, 1) != 1) {
			printf("error sdts_insert lo [%d]\n", sd_get_err());
			(void)sdts_close_db(db);
			sd_end();
			exit(EXIT_FAILURE);
		}
		// タイムスタンプ設定  0を指定した場合システム時間が設定される。
		ts += t;
		D++;
	}
	//printf("-- success sdts_insert lo [%d] --\n", i);
}

static void *sync_proc(void*p)
{
	int counter = 0;
	while (running) {
		usleep(ONE_SEC/1000/3);
		if (counter%10 == 0) {
			fprintf(stderr, "\ntrying to sync..");
			if (sdts_sync_db(db, SDTS_SYNC_CMD_REG)) {
				fprintf(stderr, "error sdts_sync_db [%d]\n", sd_get_err());
			} else {
				fprintf(stderr, "sync success.\n");
			}
		}
		counter++;
	}
	return NULL;
}

static void stopHandler(int sig)
{
	running = 0;
}

static sdntime_t getBaseTime(void)
{
	time_t t;

	t = time(NULL);
	if (t < 0) {
		return BASE_TIME;
	} else {
		return t * ONE_SEC;
	}
}

永続化DBの作成(sdts_open_db)

sdts_open_db APIにて、オプションのDB_PATHを指定することで、永続化DBを作成します。
DB_PATHを指定しない場合は、メモリDBのみになります。
DB_PATHに指定した場所にDBファイルがない場合は、新規にDBのファイル構造を作成します。
既にあるDBファイルを指定した場合は、永続化されたDBを読み込みます。

if ((db = sdts_open_db("DB_PATH=" DBDIR)) == NULL) {
    printf("error sdts_open_db [%d]\n", sd_get_err());
    sd_end();
    return 1;
}

DBの永続化同期処理(sdts_sync_db)

メモリ上にあるデータを永続化するために同期処理を行います。

if (sdts_sync_db(db, SDTS_SYNC_CMD_REG)) {
	fprintf(stderr, "error sdts_sync_db [%d]\n", sd_get_err());
} else {
	fprintf(stderr, "sync success.\n");
}

Back to top

Copyright © 2020 SALTYSTER Inc. All rights reserved.

Page last modified: Jul 13 2020 at 12:00 PM.