Qorus Integration Engine® Enterprise Edition 6.0.16_prod
Loading...
Searching...
No Matches
OMQ::DbRemoteSend Class Reference

Database stream for outbound/writing data to the remote instance. More...

Inheritance diagram for OMQ::DbRemoteSend:
[legend]
Collaboration diagram for OMQ::DbRemoteSend:
[legend]

Public Member Methods

 constructor (DbRemoteBase remote, string stream, string table_name, *hash< auto > options)
 constructor taking a OMQ::DbRemoteBase object for the remote server More...
 
 constructor (string remote, string datasource, string stream, string table_name, *hash< auto > options)
 constructor taking a string giving the name of the remote connection for the remote server More...
 
 constructor (QorusSystemRestHelper remote, string datasource, string stream, string table_name, *hash< auto > options)
 constructor taking an OMQ::QorusSystemRestHelper object for the remote connection More...
 
 openStream (string stream, string table_name, *hash< auto > options)
 flushes the current stream and reopens a new stream with the same remote connection and in the same datasources More...
 
 commit ()
 Commit the remote transaction. It also sends any pending data in the data queue before the commit.
 
 rollback (bool action=DO_DISCONNECT)
 Rollback remote transaction. More...
 
 append (list< auto > data)
 Append data to the ready queue: list. More...
 
 append (hash< auto > new_data)
 Append data to the ready queue: hash. More...
 
 append (Mapper::AbstractMapperIterator it)
 Append data to the ready queue from a AbstractMapperIterator object. More...
 
 append (Qore::AbstractIterator it)
 Append data to the ready queue from a AbstractIterator object. More...
 
 appendCommit (Qore::AbstractIterator it)
 Append data to the ready queue from a AbstractIterator object and commit the transaction. More...
 
OMQ::StreamConfig config ()
 returns the configuration object
 
- Public Member Methods inherited from OMQ::AbstractParallelWriteStream
 destructor ()
 waits for any I/O to complete and then returns
 
nothing cleanup ()
 called when the thread resource is still allocated and the thread exits or one of Qore::throw_thread_resource_exceptions() or Qore::throw_thread_resource_exceptions_to_mark() is called
 
 disconnect ()
 Disconnects the connection.
 
 flush ()
 flushes all remote I/O, signals the I/O thread to terminate and waits for confirmation, then checks for background I/O errors, in which case an exception is thrown in this thread More...
 
abstract private terminateIoWait (string action)
 flushes all remote I/O, signals the I/O thread the terminate and waits for confirmation, then checks for background I/O errors, in which case an exception is thrown in this thread
 
abstract private stopIoThreadImpl ()
 signal the I/O thread to top sending data
 
abstract private bool submitImpl ()
 puts data on the queue for sending in the background I/O thread More...
 

Private Member Methods

 startStreamImpl ()
 opens the remote transaction and starts the background send thread
 
 socketThreadImpl ()
 this method implements the background thread that handles the actual DataStream socket sends
 
 stopIoThreadImpl ()
 signals the I/O thread to stop sending
 
 terminateIoWait (string action)
 flushes all remote I/O, terminates the I/O thread, and checks for background I/O errors, in which case an exception is thrown
 
nothing setupData (hash< auto > row)
 scans the first row for SqlUtil operator hashes and makes a list<auto> of "normal" columns assigned to cols
 
bool submitImpl ()
 submits any pending data on the send queue if the I/O thread is running More...
 
auto sendDataImpl ()
 reimplemented from DataStreamSendMessage More...
 
- Private Member Methods inherited from OMQ::AbstractParallelWriteStream
bool terminateIoWaitIntern (string action)
 flushes all remote I/O, signals the I/O thread the terminate and waits for confirmation, then checks for background I/O errors, in which case an exception is thrown in this thread More...
 

Private Attributes

hash< auto > m_data
 data to be sent in column format
 
DbQueuedBlockStreamConfig m_config
 stream configuration
 
list< auto > cols = ()
 non-operator column list<auto> when processing column-formatted data (hash of lists)
 

Detailed Description

Database stream for outbound/writing data to the remote instance.

Transaction Management
The DbRemoteSend object automatically starts or continues a remote transaction by including the "Qorus-Connection: Continue-Persistent" header when opening the stream. Remote transactions must be explicitly committed (by calling DbRemoteSend::commit() for example) or aborted (by calling DbRemoteSend::disconnect() for example).

Multiple write operations with the same remote connection and the same datasource can be made in the same remote transaction by calling the DbRemoteSend::openStream() method for each new action as in the following example:
DbRemoteSend out(qrest, "omquser", "insert", "table1");
# in case of error, disconnect which will cause a rollback on the remote end
# due to the fact that the DataStream protocol relies on HTTP chunked transfer
# encoding, the socket could be in the middle of a chunked transfer when an
# error occurs, therefore it's better to simply disconnect than to try to
# execute a manual rollback when errors occur
on_error out.disconnect();
on_success out.commit();
out.append(Data1.iterator());
out.openStream("insert", "table2");
out.append(Data2.iterator());

Additionally, the DbRemoteSend::flush() method can be used to flush background I/O to the server before continuing.

This class will start a remote transaction if none is already in place; otherwise any existing remote transaction is continued.

To explicitly start a transaction before stream operations, see AbstractParallelStream::beginTransaction()
See also
Data Format
This class serializes data in column format, meaning a hash (with keys giving column names) of lists (giving the row values for each column). This is the most efficient data serialization technique from the point of view of the packet size and therefore of network performance. Additionally, this format is the native format to use with bulk DML on the remote end.

For this reason, it's best to use the DbRemoteSend::append(hash) method with data already in this format for maximum efficiency (for example as selected by Qore::SQL::SQLStatement::fetchColumns() or SqlUtil::AbstractTable::select()).
Update Stream Data Format
When using the "update" stream, the DbRemoteSend::append() methods take a hash with "set" and "cond" keys, where each of these keys must be assigned a list<auto> of hashes giving the corresponding arguments to the SqlUtil::AbstractTable::update() method.
Threaded I/O Implementation
This class creates a background thread that handles the socket I/O so that socket I/O can be executed in parallel to the main data acquisition thread (the thread that creates the object).

There is a Queue object that passes aggregated row data to the send thread; the default Queue size is 2 (representing a number of blocks), which allows 2 blocks of data to be queued before the internal private method DbRemoteSend::submitImpl() (called implicitly by DbRemoteSend::append()) will block.

If the Queue size is 2 and the block size is 1000 then at most 2000 rows will be queued for sending before the main thread will block in the internal private method DbRemoteSend::submitImpl().

This allows the main data acquisition thread to stay loosely in sync with the I/O thread so that memory usage is optimized and furthermore serves to avoid stalling the I/O thread due to lack of data.

After sending all data, one of the following methods should be called:
  • DbRemoteSend::commit(): waits for background I/O to be sent and commits the remote transaction
  • DbRemoteSend::rollback(): aborts the background I/O as soon as possible and rolls the remote transaction back
  • DbRemoteSend::flush(): waits for background I/O to be sent and then returns after the I/O thread has exited; use this method when a remote transaction will be continued
Note
  • This class assumes that data submitted with the append() methods is stable; that is, it has the same format and same hash keys, additionally, if there are any SqlUtil operator hashes, that they are identical in every row
  • This class is not thread-safe; it's designed to be used from a single thread; calling this class's methods from multiple threads simultaneously will result in unpredictable behavior

Member Function Documentation

◆ append() [1/4]

OMQ::DbRemoteSend::append ( hash< auto >  new_data)

Append data to the ready queue: hash.

Parameters
new_dataa hash with data to be added; can be a single row or multiple rows in column format

Data are sent whenever the row count reaches the block size.

The DB transaction is left open.

Note
  • This class assumes that data submitted with the append() methods is stable; that is, it has the same format and same hash keys, additionally, if there are any SqlUtil operator hashes, that they are identical in every row
  • When submitting bulk data (hashes of lists), any constant data must be submitted in trailing keys; the first key value will determine if the data is handled as bulk (hash of lists) or not, furthermore it is an error to pass a key with a constant value and then in another call as a list of values or vice versa

◆ append() [2/4]

OMQ::DbRemoteSend::append ( list< auto >  data)

Append data to the ready queue: list.

Parameters
dataa list<auto> with row data to be added. It should follow the expected target format (list<auto> of hashes, each list<auto> element represents a row and each has the row data).

Data are sent whenever their count reaches the block size.

The DB transaction is left open.

Note
This class assumes that data submitted with the append() methods is stable; that is, it has the same format and same hash keys, additionally, if there are any SqlUtil operator hashes, that they are identical in every row

◆ append() [3/4]

OMQ::DbRemoteSend::append ( Mapper::AbstractMapperIterator  it)

Append data to the ready queue from a AbstractMapperIterator object.

Parameters
ita AbstractMapperIterator object with data to be added. Data provided by the iterator should follow the expected format (ie hashes of lists for column_format, otherwise simple hashes of row data for non-column_format).

This method processes the whole iterator in one call.

The DB transaction is left open.

Note
SqlUtil operator functions are assumed to be identical for all rows and are only sent once; the input data is assumed to be uniform in this regard also across append(hash) calls

◆ append() [4/4]

OMQ::DbRemoteSend::append ( Qore::AbstractIterator  it)

Append data to the ready queue from a AbstractIterator object.

Parameters
ita AbstractIterator object with data to be added. Data provided by the iterator should follow the expected format (ie hashes of lists for column_format, otherwise simple hashes of row data for non-column_format).

This method processes the whole iterator in one call.

The DB transaction is left open.

Note
SqlUtil operator functions are assumed to be identical for all rows and are only sent once; the input data is assumed to be uniform in this regard also across append(hash) calls

◆ appendCommit()

OMQ::DbRemoteSend::appendCommit ( Qore::AbstractIterator  it)

Append data to the ready queue from a AbstractIterator object and commit the transaction.

Parameters
ita AbstractIterator object with data to be added. Data provided by the iterator should follow the expected format (ie hashes of lists for column_format, otherwise simple hashes of row data for non-column_format).

This method processes the whole iterator in one call.

The DB transaction is committed immediately after all data are sent to the server.

◆ constructor() [1/3]

OMQ::DbRemoteSend::constructor ( DbRemoteBase  remote,
string  stream,
string  table_name,
*hash< auto >  options 
)

constructor taking a OMQ::DbRemoteBase object for the remote server

Parameters
remotea DbRemoteBase object
streamthe system.sqlutil service stream to be used
table_namea string with remote table name located in the remote datasource
optionsoptional Streaming API Constructor Options as follows:
  • "block": data block size
  • "loglevel": a default log level option for logging; see Log Levels for valid value
  • "no_remote_timeout": if True the "timeout" option will not be sent to the remote
  • "omit_update": a list<auto> of columns to omit when updating for asymmetrical upserts; only valid with the "upsert" stream; if this option is given with other streams, it will be ignored
  • "queue_block_size": the number of blocks to queue for sending before the main data thread will block (default: 2)
  • "queue_timeout": the number of milliseconds to wait for queue data before throwing a QUEUE-TIMEOUT exception
  • "timeout": an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
  • "upsert_strategy": an upsert strategy code; only valid with the "upsert" stream; if this option is given with other streams, it will be ignored
Note
the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object

◆ constructor() [2/3]

OMQ::DbRemoteSend::constructor ( QorusSystemRestHelper  remote,
string  datasource,
string  stream,
string  table_name,
*hash< auto >  options 
)

constructor taking an OMQ::QorusSystemRestHelper object for the remote connection

Parameters
remotean OMQ::QorusSystemRestHelper object
datasourcea string with name of the remote datasource to use
streamthe system.sqlutil service stream stream to be used
table_namea string with remote table name located in the remote datasource
optionsoptional Streaming API Constructor Options as follows:
  • "block": data block size giving the number of rows per block
  • "loglevel": a default log level option for logging; see Log Levels for valid value
  • "no_remote_timeout": if True the "timeout" option will not be sent to the remote
  • "omit_update": a list<auto> of columns to omit when updating for asymmetrical upserts; only valid with the "upsert" stream; if this option is given with other streams, it will be ignored
  • "queue_block_size": the number of blocks to queue for sending before the main data thread will block (default: 2)
  • "queue_timeout": the number of milliseconds to wait for queue data before throwing a QUEUE-TIMEOUT exception
  • "timeout": an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
  • "upsert_strategy": an upsert strategy code; only valid with the "upsert" stream; if this option is given with other streams, it will be ignored
Note
the explicit or default timeout value here overrides any socket I/O timeout set for the remote dconnection object

◆ constructor() [3/3]

OMQ::DbRemoteSend::constructor ( string  remote,
string  datasource,
string  stream,
string  table_name,
*hash< auto >  options 
)

constructor taking a string giving the name of the remote connection for the remote server

Parameters
remotea string giving the name of the remote connection for the remote server
datasourcea string with name of the remote datasource to use
streamthe system.sqlutil service stream stream to be used
table_namea string with remote table name located in the remote datasource
optionsoptional Streaming API Constructor Options as follows:
  • "block": data block size
  • "loglevel": a default log level option for logging; see Log Levels for valid value
  • "no_remote_timeout": if True the "timeout" option will not be sent to the remote
  • "omit_update": a list<auto> of columns to omit when updating for asymmetrical upserts; only valid with the "upsert" stream; if this option is given with other streams, it will be ignored
  • "queue_block_size": the number of blocks to queue for sending before the main data thread will block (default: 2)
  • "queue_timeout": the number of milliseconds to wait for queue data before throwing a QUEUE-TIMEOUT exception
  • "timeout": an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
  • "upsert_strategy": an upsert strategy code; only valid with the "upsert" stream; if this option is given with other streams, it will be ignored
Note
the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object

◆ openStream()

OMQ::DbRemoteSend::openStream ( string  stream,
string  table_name,
*hash< auto >  options 
)

flushes the current stream and reopens a new stream with the same remote connection and in the same datasources

Example:
out.openStream("update", "table2");
Parameters
streamthe system.sqlutil service stream to be used
table_namea string with remote table name located in the remote datasource
optionsoptional Streaming API Constructor Options as follows:
  • "block": data block size giving the number of rows per block
  • "timeout": an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
  • "loglevel": a default log level option for logging; see Log Levels for valid value
  • "no_remote_timeout": if True the "timeout" option will not be sent to the remote
  • "queue_block_size": the number of blocks to queue for sending before the main data thread will block (default: 2)
  • "queue_timeout": the number of milliseconds to wait for queue data before throwing a QUEUE-TIMEOUT exception

◆ rollback()

OMQ::DbRemoteSend::rollback ( bool  action = DO_DISCONNECT)

Rollback remote transaction.

Note
it's normally better to disconnect the connection if an error occurs rather than call rollback() when streaming because if a chunked transfer is interrupted, then HTTP calls will fail anyway, and the remote end will rollback the transaction in any case unless an explicit commit is executed
Parameters
actionan optional bool value to keep connection open or close the connection. Connection closing is the default
See also
DO_DISCONNECT
DONT_DISCONNECT
disconnect()

◆ sendDataImpl()

auto OMQ::DbRemoteSend::sendDataImpl ( )
privatevirtual

reimplemented from DataStreamSendMessage

Returns
the data to send
Exceptions
STREAM-TERMINATEDif the stream was terminated in the main thread, this exception will be thrown here to terminate the I/O thread

Implements DataStreamClient::DataStreamSendMessage.

◆ submitImpl()

bool OMQ::DbRemoteSend::submitImpl ( )
privatevirtual

submits any pending data on the send queue if the I/O thread is running

Returns
True if the I/O thread is running, False if not
Exceptions
IO-ERRORthe I/O thread is not running so no data can be submitted

Implements OMQ::AbstractParallelWriteStream.


The documentation for this class was generated from the following file: