Qorus Integration Engine®  5.0.6_git
Examples

Working With Databases

To work with a database, a datasource must be defined and known by the system.

For example, to acquire a shared DatasourcePool object for transaction management on the "billing" datasource, use the UserApi::getDatasourcePool() method as in the following example:

# acquire a DatasourcePool object for datasource "billing"
DatasourcePool billing = UserApi::getDatasourcePool()("billing");

If Qore user code enters a transaction with either Datasource or DatasourcePool objects and the thread is terminated (such as a workflow step or even a service call called from the network API) without closing the transaction, an exception will automatically be raised and the transaction will be rolled back.

Executing Store Procedures and Functions

The following is an example of executing a stored procedure on an Oracle database and retrieving the results:

# execute stored procedure "pkg.get_imsi" with iccid as argument,
# retrieving a hash as a result.
# The resulting hash will contain the following keys:
# imsi, status_code, error_code, error_desc
hash<auto> h = ds.exec("begin pkg.get_imsi(‘CRM', %v, :imsi, :status_code, :error_code, :error_desc); end;", iccid);

The following is an example of executing a function in an Oracle database and retrieving the result (note that you have to declare the variable type in the SQL):

# execute function create_blacklist_entry() with msisdn as argument,
# retrieving a hash with a single key "code" as a result.
# note that the hash is dereferenced with ".code" after the exec()
string code = ds.exec("declare code varchar2(30) := null; begin :code = create_blacklist_entry(%v); end;", Type::String, msisdn).code;

The Qore Oracle driver needs to know the buffer type for all placeholder bind operations in advance. The placeholders can be found in the SQL strings in the two examples above as the text prefixed by ":". When no buffer type is given, the Oracle driver assumes a string buffer. To declare another buffer type, use the Qore Type constants as arguments in the position corresponding to the placeholder position in the SQL string.

For example, here is an example similar to the first example above using different buffer types for the return values:

# execute stored procedure "pkg.get_imsi" with iccid as argument,
# retrieving a hash as a result.
# The resulting hash will contain the following keys:
# imsi, status_code, error_code, error_desc
hash<auto> h = ds.exec("begin pkg.get_imsi(‘CRM', %v, :date :blob, :clob, :integer); end;", iccid, Type::Date, Type::Binary, SQL::CLOB, Type::Integer);

Note that the MySQL, PostgreSQL, Sybase, and FreeTDS drivers do not require placeholder buffer specifications when executing stored procedures. For more examples of stored procedure calls and more information about connecting to other databases, see the Qore Programmer's Reference Manual in your Qorus Integration Engine installation directory.

Work with Data Streaming

The data streaming APIs provide high-level interfaces for the DataStream Protocol without need of the low level protocol interaction. DataStream socket I/O always takes place in a background thread, which allows the main thread to handle data operations in parallel with network I/O.

Streaming APIs are available for Qorus server objects (workflows, services, jobs) and also in the Qorus client.

The following classes provide APIs to stream data to/from remote Qorus instances:

Streaming API Constructor Options

Key Default Description
block 1000 (DB rows) or 16384 (FS bytes) a block size for DataStream transmission chunks giving the row count for DB streams or the bytes count for filesystem streams
encoding "UTF-8" the encoding of the target file; used in OMQ::FsRemoteSend
loglevel LL_INFO the default logging level used in Qorus user code
mode 0644 the file's creation mode as used in Qore::File::open2(); used in OMQ::FsRemoteSend
queue_block_size 2 the number of blocks to queue for sending before the main data thread will block; used in OMQ::DbRemoteSend and OMQ::FsRemoteSend
queue_size block * 2 the number of rows to queue before the main data thread will block; used in OMQ::DbRemoteReceive
select NOTHING Complex Select Criteria structure used in OMQ::DbRemoteReceive
timeout 60s a timeout in milliseconds for HTTP operations (ex: 120s)

Streaming API Transaction Management

A remote transaction can be performed in an external DB connected to a remote Qorus instance by using the system.sqlutil service to begin the transaction and then commit or abort it.

The OMQ::DbRemoteSend and OMQ::DbRemote classes will automatically start or continue a remote transaction by sending the "Qorus-Connection: Continue-Persistent" header when opening the stream. The OMQ::DbRemoteReceive class will do the same if the "transaction" option is set in the constructor() call.

Additionally, a call to OMQ::AbstractParallelStream::beginTransaction() can be made to explicitly start or continue a remote transaction in a remote database independently of any remote stream operations.

Remote transactions must be explicitly committed with a call to the sqlutil commit stream or to OMQ::DbRemoteSend::commit() or OMQ::DbRemoteReceive::commit() or OMQ::DbRemote::commit()

To abort a remote transaction, it's recommended to simply close the socket connection, particuarly because the HTTP connection could be in the middle of a stream action which would make HTTP messages impossible to send until the stream is completely sent or received. When the connection is closed, any streams in progress are immediately terminated and the remote transaction is automatically rolled back.

See also
Transaction Management with the sqlutil Service for more information

Database: Single DML Statements

DbRemote db("my-remote", "omquser");
on_succes db.commit();
on_error db.rollback()
# get list of available tables
*list all_tables = db.list_tables();
# single insert
db.insert("my_table", ("id" : 11, "foo" : "bar"));

Database: Simple Data Sending

DbRemoteSend send("my-remote", "omquser", "insert", "my_table");
on_success send.commit();
on_error send.disconnect();
send.append( ("id" : 1, "column1" : "foo") );
# type: STEP
# version: 1.0
# desc: stream sending example
# author: Petr Vanek (Qore Technologies, sro)
%new-style
%require-types
%strict-args
%enable-all-warnings
%requires SqlUtil
sub rem_stream_send() {
DatasourcePool dsstage = UserApi::getDatasourcePool("isepl");
on_success dsstage.commit();
on_error dsstage.rollback();
# just an sql logging
string sql;
on_exit UserApi::logInfo("sql: %s", sql);
# select source data
SqlUtil::AbstractTable t = UserApi::getSqlCache(dsstage, "h3g_je_idt");
hash sc_select = ( "columns" : ( "id", "entered_dr", "entered_cr" ), );
SQLStatement stmt = t.getRowIterator(sc_select, \sql);
DbRemoteSend out("test", "omquser", "insert", "h3g_je_idt_test");
on_success out.commit();
on_error out.disconnect();
out.append(stmt);
}
# END

Database: Simple Data Receiving

DbRemoteReceive recv("my-remote", "omquser", "select", "my_table");
while (auto d = recv.getData()) UserApi::logInfo("row: %y", d);
on_success send.commit();
on_error send.disconnect();
# rows will be logged out
# type: STEP
# version: 1.0
# desc: example of simple select
# author: Petr Vanek (Qore Technologies, sro)
%new-style
%require-types
%strict-args
%enable-all-warnings
sub rem_stream_recv() {
DbRemoteReceive recv("test", "omquser", "select", "h3g_je_idt_test");
while (auto d = recv.getData()) UserApi::logInfo("row: %y", d);
}
# END

Database: Complex Transactions (mixed mode)

# type: STEP
# version: 1.0
# desc: more complex transaction handling using remote streams
# author: Petr Vanek (Qore Technologies, sro)
%new-style
%require-types
%strict-args
%enable-all-warnings
class SelectInsert {
private {
DbRemoteSend m_send;
DbRemoteReceive m_recv;
}
constructor(string r) {
m_recv = new DbRemoteReceive(r, "omquser", "select", "h3g_je_idt_test", \received());
m_send = new DbRemoteSend(r, "omquser", "insert", "h3g_je_idt_test1");
m_recv.receive();
}
commit() { m_send.commit(); }
disconnect() { m_send.disconnect(); }
nothing received(auto row) {
UserApi::logInfo("received: %y", row);
m_send.append(row);
}
}
sub rem_stream_comb() {
SelectInsert si("test");
on_success si.commit();
on_error si.disconnect();
}
# END

Filesystem: Qorus client in action

%new-style
%require-types
%strict-args
%enable-all-warnings
%requires QorusClientCore
qorus_client_init2();
FsRemoteSend fs("test", "/tmp/qorus-client-stream.txt");
fs.append("lorem ipsum\n");
fs.commit();