Qorus Integration Engine®  4.0.3_git
sqlutil-v4.0.qsd File Reference

provides access to SqlUtil functionality from system datasources More...

Classes

class  QorusSystemSqlutilService
 the main sqlutil service class More...
 

Detailed Description

provides access to SqlUtil functionality from system datasources

sqlutil Streams

The sqlutil service supports several data streams, allowing data to be efficiently selected, updated, inserted, deleted, or "upserted" (ie merged) from a remote system, allowing large volumes of data to be processed in a remote database with minimal intermediate state in Qorus.

Note
See Work with Data Streaming for easy to use functionality.

In client code, data streams exposed by the sqlutil service are accessed through a QorusSystemRestHelper object as returned by a call to OMQ::get_remote_rest_connection() as in the following example:

QorusSystemRestHelper nodea = get_remote_rest_connection("node-a");

In the above example, "node-a" is assumed to be a remote connection configured in the Qorus to Qorus Connections

The data streams in the following table are provided.

SqlUtil Streams

Stream Dir HTTP Method Args Description
select out GET datasource=name
table=name
timeout=ms
[block=integer]
[column_format=bool]
options:
block: the number of rows to send in each chunk
column_format: gives the format of the encoded data; if True then each chunk is a hash of lists, otherwise it is a list of hashes

All other arguments are passed to AbstractTable::getRowIterator(). Requires DATASOURCE-CONTROL or SQLUTIL-READ permissions to access
insert in POST datasource=name
table=name
timeout=ms
Streamed data (as hashes representing rows or lists of hashes representing blocks of rows) are inserted into the target table in an atomic transaction.
For DBI drivers supporting bulk DML, efficient bulk DML queries are executed with one round trip to the server for each block of row data sent to the stream.
Requires DATASOURCE-CONTROL or SQLUTIL-WRITE permissions to access
update in POST datasource=name
table=name
timeout=ms
Streamed data (as hashes with 'set' and 'cond' keys (with lists of hashes giving the 'set' and 'cond' parameters) or lists of such hashes and uses this information to update data in the target table in a single atomic transaction. All values sent to this stream must have the same hash keys in the same order.
For DBI drivers supporting bulk DML, efficient bulk DML queries are executed with one round trip to the server for each block of row data sent to the stream.
Requires DATASOURCE-CONTROL or SQLUTIL-WRITE permissions to access
delete in POST datasource=name
table=name
timeout=ms
Streamed data (as hashes with 'cond' keys or lists of such hashes and uses this information to delete data from the target table in a single atomic transaction.
Requires DATASOURCE-CONTROL or SQLUTIL-WRITE permissions to access
upsert in POST datasource=name
table=name
timeout=ms
[upsert_strategy=integer]
[delete_others=boolean]
[omit_update=list]
Streamed data (as hashes representing rows or lists of hashes representing blocks of rows) are merged or "upserted" into the target table in an atomic transaction; see Upserting or Merging Data for more information.
For DBI drivers supporting bulk DML, efficient bulk DML queries are executed with one round trip to the server for each block of row data sent to the stream.
Requires DATASOURCE-CONTROL or SQLUTIL-WRITE permissions to access

The following are pseudo-streams in that no stream data is accepted or returned (and therefore these "streams" can be accessed with standard REST commands), but they are accessed otherwise like normal streams; see examples below the following table and see Transaction Management with the sqlutil Service for more information.
SqlUtil Transaction-Management Streams

Stream Dir HTTP Method Args Description
beginTransaction n/a POST datasource=name
timeout=ms
in order for this call to work, the following HTTP header must be sent: Qorus-Connection: Persistent
While a persistent remote transaction is in progress, even "normal" sqlutil methods called over the same HTTP connection with REST calls will be made in the same transaction, meaning that these must be explicitly committed with a call to the "commit" pseudo-stream.
Requires DATASOURCE-CONTROL or SQLUTIL-WRITE permissions to access.
commit timeout=ms POST n/a Commits an existing transaction; can only be executed in a persistent connection while a transaction is in progress.
Requires DATASOURCE-CONTROL or SQLUTIL-WRITE permissions to access
rollback timeout=ms POST n/a Rolls an existing transaction back; can only be executed in a persistent connection while a transaction is in progress.
Requires DATASOURCE-CONTROL or SQLUTIL-WRITE permissions to access

sqlutil Client Examples

Please see the following high-level classes for easy-to-use APIs based on the sqlutil service:

The following are low-level examples of how to use sqlutil streams in Qorus client code.

Get Connection Example
Get a connection to remote Qorus instance "node-a" defined in the Qorus to Qorus Connections :
OMQ::QorusSystemRestHelper nodea = get_remote_rest_connection("node-a");
Start Remote Transaction Example
Start or continue a remote transaction (see Transaction Management with the sqlutil Service for more information):
nodea.setPersistent();
nodea.post("services/sqlutil?action=stream;stream=beginTransaction;datasource=omquser", NOTHING, ("Qorus-Connection": "Continue-Persistent"));
# commit the transaction if there are no errors
on_success
nodea.post("services/sqlutil?action=stream;stream=commit");
# in case of error, disconnect which will cause a rollback on the remote end
# due to the fact that the DataStream protocol relies on HTTP chunked transfer
# encoding, the socket could be in the middle of a chunked transfer when an
# error occurs, therefore it's better to simply disconnect than to try to
# execute a manual rollback when errors occur
on_error
nodea.disconnect();
Select Stream Example
Select data from a table in a new transaction in the remote database and log the results:
class DataStreamReceiveLogger inherits DataStreamRecvMessage {
private nothing recvDataImpl(any d) {
ServiceApi::logInfo("row: %y", d);
}
}
sub example() {
OMQ::QorusSystemRestHelper nodea = get_remote_rest_connection("node-a");
nodea.setPersistent();
nodea.post("services/sqlutil?action=stream;stream=beginTransaction;datasource=omquser", NOTHING, ("Qorus-Connection": "Persistent"));
# in case of error, disconnect which will ensure that a remote rollback is performed
on_error
nodea.disconnect();
DataStreamReceiveLogger recv();
hash args = (
"action": "stream",
"stream": "select",
"table": "example_table",
"where": ("last_update_date": op_ge(2014-11-01), "1:last_update_date": op_lt(2014-11-17)),
"orderby": "last_update_date",
);
nodea.recvDataStream(recv, "GET", "services/sqlutil", args);
nodea.post("services/sqlutil?action=stream;stream=commit");
}
Note
  • The "GET" call does not require a "datasource" parameter because a persistent connection was made in the stream call to "beginTransaction" above
  • Note the use of column prefixes to allow the use of the same column with multiple criteria in the where clause of the first select stream; see Where Clauses for more information
  • See Transaction Management with the sqlutil Service for more information on remote transaction management with the sqlutil service

Transaction Management with the sqlutil Service

Before initiating remote transaction management, it's critical to call Qore::HTTPClient::setPersistent() on the QorusSystemRestHelper object to ensure that any break in the HTTP session is caught and an exception is thrown, otherwise the client object would silently reconnect and the transaction state would be lost on the remote end.

Note
This is necessary because the remote transaction is based on a single HTTP session; if the HTTP connection is broken before either the commit or rollback stream can be called, then the remote transaction is automatically rolled back on the remote end. Therefore if the QorusSystemRestHelper object would lose the connection and silently reconnect, the remote transaction would be rolled back and the local QorusSystemRestHelper object would not be aware of this fact, therefore it could appear to the user that the remote transaction was successfully executed when in fact it was not.

After disabling automatic reconnections for this HTTP session, transaction management is initiated by calling the "beginTransaction" stream with a "datasource" argument with a "POST" REST call and the following header included: "Qorus-Connection: Persistent" (meaning start a new transaction unconditionally; if any if currently in progress then it will be rolled back and logged in the sqlutil service log), or "Qorus-Connection: Continue-Persistent" (start a new transaction if none is in progress, continue any existing transaction).

Note that the following helper classes start or continue transactions:

  • DbRemote: always starts or continues a remote transaction
  • DbRemoteReceive: will start or continue a remote transaction if the "transaction" option is set in the constructor
  • DbRemoteSend: always starts or continues a remote transaction

Additionally, the OMQ::AbstractParallelStream::beginTransaction() static method will connect to the remote server, set a persistent connection, and start or continue a remote transaction as described above. This is the easiest way to start a remote transaction when not using one of the DB data streaming classes listed above.

Once a remote transaction is in progress, then even calls to non-stream service methods in the same HTTP connection in the same datasource over the REST infrastructure will take part in the transaction, meaning that no implicit commits are made to non-stream methods called on the same datasource in the same HTTP connection as long as the transaction is in progress.

Transaction example
The following example starts or continues a remote transaction and commits it in a remote datasource:
OMQ::QorusSystemRestHelper nodea = get_remote_rest_connection("node-a");
OMQ::AbstractParallelStream::beginTransaction(nodea, "omquser");
on_success
nodea.post("services/sqlutil?action=stream;stream=commit");
on_error
nodea.disconnect();
# some code here in the remote transaction
This is equivalent to the following code:
OMQ::QorusSystemRestHelper nodea = get_remote_rest_connection("node-a");
nodea.connect();
nodea.setPersistent();
nodea.post("services/sqlutil?action=stream;stream=beginTransaction;datasource=omquser", NOTHING, ("Qorus-Connection": "Continue-Persistent"));
on_success
nodea.post("services/sqlutil?action=stream;stream=commit");
on_error
nodea.disconnect();
# some code here in the remote transaction