Loading...
 
HowTo: Synchronize Database Data Between Databases

This HowTo explains how to implement inter-database data exchange interfaces in Qorus; by using Bulk DML, SqlUtil, and the DataStream protocol, Qorus provides infrastructure capable of transferring large amounts of data with high performance and a small memory footprint between databases from disparate vendors even in geographically-separated networking environments.

1. Local Source Database

With a locally connected source databases, you will need a datasource in the local system.

You can create an SQLStatement object manually by acquiring a DatasourcePool object from a datasource with get_datasource_pool() as in the following example:
DatasourcePool dsp = get_datasource_pool("source-db");
SQLStatement stmt(dsp);
stmt.prepare("select id, name, amount from source_table where type = %v and amount >= 100", current_type);

Alternatively, you can use SqlUtil to acquire an input iterator by acquiring an AbstractTable object by calling get_sql_table() and then calling AbstractTable::getRowIterator() to create the SQLStatement object. Once you have an input iterator for the input data, you can use Bulk DML (Qore wiki) to retrieve the input data using SQLStatement::fetchColumns1 as in the following example:
# get the source table object
AbstractTable source_table = get_sql_table("source_datasource", "source_table");

# get a select iterator from the source table
hash sh = (
    "columns": ("id", "name", "amount"),
    "where": ("type": current_type, "amount": op_ge(100)),
);
string sql;
SQLStatement stmt = source_table.getRowIterator(sh, \sql);
if (opt.verbose)
    log(LL_DETAIL_2, "input SQL: %y\n", sql);

# even though we only select from the SQLStatement, we have to release the transaction lock when we're done
on_error source_table.rollback();
on_success source_table.commit();

# use a block size of 1000 to select the source data
while (*hash h = stmt.fetchColumns(1000)) {
    # ... process the data
    map process_row($1), h.contextIterator();
}

2. Remote Source Database

If the database is connected to a remote Qorus instance, you can use the DbRemoteReceive class to stream the remote data to the local system from the remote datasource.
# get the remote connection object
QorusSystemRestHelper remote = get_remote_rest_connection("remote");

# setup the select hash arguments for the remote source table
hash sh = (
    "columns": ("id", "name", "amount"),
    "where": ("type": current_type, "amount": op_ge(100)),
);

# create the remote select stream object
DbRemoteReceive recv(remote, "remote_source_datasource_name", "select", table_name, ("select": sh));

# DbRemoteReceive::getData() returns data in a "hash of lists" format (column format)
while (*hash h = recv.getData()) {
    # ... process the data
    map process_row($1), h.contextIterator();
}

The DbRemoteReceive class uses the DataStream protocol to transfer the data, which is selected from the remote database using the sqlutil service in the remote Qorus instance. The DbRemoteReceive class uses a block size of 1000 by default; this and other options can be set in the DbRemoteReceive::constructor() call (in the above example, only the select option is used).
Note: The DataStream protocol requires a direct point-to-point connection or HTTP infrastructure that allows HTTP chunked transfers to pass through any intermediate nodes without modifying the HTTP traffic. If there is an HTTP proxy between the two Qorus instances that modifies HTTP chunked data sent through it, the DataStream protocol may not be usable.

3. Local Target Database

With a locally connected target database, you can use the BulkUpsertOperation or BulkInsertOperation classes to perform upserts (SQL merging or data alignment) or inserts in the target database, respectively.

These classes require an AbstractTable object for the target table, which can be acquired by calling get_sql_table(). Then the data to be upserted/merged or inserted can be passed to the object with the AbstractBulkOperation::queueData() method. Note that this method will take raw data in column format (hash of lists), as returned from SQLStatement::fetchColumns or DbRemoteRecv::getData().

3.1. Local Source Target Database

Example with local source:
# get the local source datasource object
AbstractTable source_table = get_sql_table("source_datasource", "source_table");

# get a select iterator from the source table
hash sh = (
    "columns": ("id", "name", "amount"),
    "where": ("type": current_type, "amount": op_ge(100)),
);
string sql;
SQLStatement stmt = source_table.getRowIterator(sh, \sql);
if (opt.verbose)
    log(LL_DETAIL_2, "input SQL: %y\n", sql);

# even though we only select from the SQLStatement, we have to release the transaction lock when we're done
on_error source_table.rollback();
on_success source_table.commit();

# get the target table object
AbstractTable target_table = get_sql_table("target_datasource", "target_table");

# create the bulk upsert operation object
BulkUpsertOperation upsert(target_table);

# perform bulk API and transaction handling on exit
on_error {
    upsert.discard();
    source_table.rollback();
}
on_success {
    upsert.flush();
    source_table.commit();
}

# use a block size of 1000 to select the source data
while (*hash h = stmt.fetchColumns(1000)) {
    # BulkUpsertOperation::queueData() accepts data in "column format" (a hash of lists)
    upsert.queueData(h);
}

3.2. Remote Source Target Database

Example with a remote source:
# get the remote connection object
QorusSystemRestHelper remote = get_remote_rest_connection("remote");

hash sh = (
    "columns": ("id", "name", "amount"),
    "where": ("type": current_type, "amount": op_ge(100)),
);

# open the remote DB select stream
DbRemoteReceive recv(remote, "remote_source_datasource_name", "select", source_table_name, ("select": sh));

# get the target table object
AbstractTable target_table = get_sql_table("target_datasource", "target_table");

# create the bulk upsert operation object
BulkUpsertOperation upsert(target_table);

# handle the bulk API and transaction handling
on_error {
    upsert.discard();
    source_table.rollback();
}
on_success {
    upsert.flush();
    source_table.commit();
}

# use a block size of 1000 to select the source data
while (*hash h = recv.getData()) {
    # BulkUpsertOperation::queueData() accepts data in "column format" (a hash of lists)
    upsert.queueData(h);
}

Note: upserting/merging can only work if the target table has a unique key that can be used to perform the merge; see upserting or merging data for more information

4. Remote Target Database

With a remote target database, you can use the DbRemoteSend classes to stream upserts (SQL merge statements) or inserts in the target database.

The DbRemoteSend class uses the DataStream protocol to transfer the data, which is then upserted/merged or inserted into the remote database using the sqlutil service in the remote Qorus instance. The DbRemoteSend class uses a block size of 1000 by default; this and other options can be set in the DbRemoteSend::constructor() call (in the example below no options are used).

Example with local source:
# get the source table object
AbstractTable source_table = get_sql_table("source_datasource", "source_table");

# get a select iterator from the source table
hash sh = (
    "columns": ("id", "name", "amount"),
    "where": ("type": current_type, "amount": op_ge(100)),
);
string sql;
SQLStatement stmt = source_table.getRowIterator(sh, \sql);
if (opt.verbose)
    log(LL_DETAIL_2, "input SQL: %y\n", sql);

# even though we only select from the SQLStatement, we have to release the transaction lock when we're done
on_error source_table.rollback();
on_success source_table.commit();

# get the remote connection object
QorusSystemRestHelper remote = get_remote_rest_connection("remote");

# start the remote "upsert" stream
DbRemoteSend out(qrest, "remote_target_datasource_name", "upsert", "target_table");

# in case of error, disconnect which will cause a rollback on the remote end
# due to the fact that the DataStream protocol relies on HTTP chunked transfer
# encoding, the socket could be in the middle of a chunked transfer when an
# error occurs, therefore it's better to simply disconnect than to try to
# execute a manual rollback when errors occur
on_error out.disconnect();
on_success out.commit();

# use a block size of 1000 to select the source data
while (*hash h = stmt.fetchColumns(1000)) {
    # DbRemoteSend::append(hash) accepts data in "column format" (a hash of lists)
    out.append(h);
}


Example with a remote source:
# get the remote source connection object
QorusSystemRestHelper remote_source = get_remote_rest_connection("remote_source");

hash sh = (
    "columns": ("id", "name", "amount"),
    "where": ("type": current_type, "amount": op_ge(100)),
);

# open the remote DB select stream
DbRemoteReceive recv(remote_source, "remote_source_datasource_name", "select", source_table_name, ("select": sh));

# get the remote target connection object
QorusSystemRestHelper remote_target = get_remote_rest_connection("remote_target");

# start the remote "upsert" stream
DbRemoteSend out(remote_target, "remote_target_datasource_name", "upsert", "target_table");

# in case of error, disconnect which will cause a rollback on the remote end
# due to the fact that the DataStream protocol relies on HTTP chunked transfer
# encoding, the socket could be in the middle of a chunked transfer when an
# error occurs, therefore it's better to simply disconnect than to try to
# execute a manual rollback when errors occur
on_error out.disconnect();
on_success out.commit();

# use a block size of 1000 to select the source data
while (*hash h = recv.getData()) {
    # DbRemoteSend::append(hash) accepts data in "column format" (a hash of lists)
    out.append(h);
}

Note:
  • if the source and target are both remote as in the above example, then the data will be transferred through the instance executing the code; this is only recommended if no direct connection between the source and target is possible.
  • upserting/merging can only work if the target table has a unique key that can be used to perform the merge; see upserting or merging data for more information

See Also:

1 The "hash of lists" format (hash keys are column names, values are lists of row values for that column) returned by SQLStatement::fetchColumns is the most efficient memory representation for SQL data, which is why SQLStatement::fetchColumns is recommended instead of SQLStatement::fetchRows()