A class for inbound/reading data from the remote instance, I/O is handled in a background thread and is made available in the getData() and getAllData() methods.
More...
|
| constructor (DbRemoteBase remote, string stream, string table_name, *hash< auto > options) |
| constructor taking an OMQ::DbRemoteBase object for the remote connection More...
|
|
| constructor (DbRemoteBase remote, string table_name, *hash< auto > options) |
| constructor taking an OMQ::DbRemoteBase object for the remote connection and assuming the "select" stream More...
|
|
| constructor (string remote, string datasource, string stream, string table_name, *hash< auto > options) |
| constructor taking a string giving the name of the remote connection for the remote server More...
|
|
| constructor (string remote, string datasource, string table_name, *hash< auto > options) |
| constructor taking a string giving the name of the remote connection for the remote server and assuming the "select" stream More...
|
|
| constructor (QorusSystemRestHelper remote, string datasource, string stream, string table_name, *hash< auto > options) |
| constructor taking an OMQ::QorusSystemRestHelper object for the remote connection More...
|
|
| constructor (QorusSystemRestHelper remote, string datasource, string table_name, *hash< auto > options) |
| constructor taking an OMQ::QorusSystemRestHelper object for the remote connection and assuming the "select" stream More...
|
|
| openStream (string stream, string table_name, *hash< auto > options) |
| flushes the current stream and reopens a new stream with the same remote connection and in the same datasource More...
|
|
| commit () |
| Commit remote transaction. More...
|
|
| rollback (bool action=DO_DISCONNECT) |
| Rollback remote transaction. More...
|
|
| disconnect () |
| Disconnects the connection.
|
|
*hash< auto > | getData (*timeout timeout_ms) |
| returns queued data as soon as it is available, if the timeout is omitted or equal or less than 0, no timeout is used and the call will block until data is available on the queue; once this method return NOTHING it signifies that the end of stream data condition has been reached; do not call this method again after it returns NOTHING More...
|
|
*hash< auto > | getAllData (*timeout timeout_ms) |
| returns all data recevied by the object in a single call, if the timeout is omitted or equal or less than 0, no timeout is used and the call will block until data is available on the queue More...
|
|
*list< auto > | getDataRows (*timeout timeout_ms) |
| returns queued data as a list<auto> of rows as soon as it is available, if the timeout is omitted or equal or less than 0, no timeout is used and the call will block until data is available on the queue; once this method return NOTHING it signifies that the end of stream data condition has been reached; do not call this method again after it returns NOTHING More...
|
|
*list< auto > | getAllDataRows (*timeout timeout_ms) |
| returns all data recevied by the object in a single call as a list<auto> of rows, if the timeout is omitted or equal or less than 0, no timeout is used and the call will block until data is available on the queue More...
|
|
OMQ::StreamConfig | config () |
| returns the configuration object
|
|
| destructor () |
| disconnects and aborts the I/O thread if it's still running
|
|
nothing | cleanup () |
| called when the thread resource is still allocated and the thread exits or one of Qore::throw_thread_resource_exceptions() or Qore::throw_thread_resource_exceptions_to_mark() is called
|
|
abstract private | throwAbortedExceptionImpl (string meth) |
| throw a user-friendly exception about why the I/O thread was aborted and the connection forcibly closed and how to avoid such situations in the future
|
|
| destructor () |
| wait for any I/O in progress to complete
|
|
nothing | cleanupBase () |
| called by child classes when the thread resource is still allocated and the thread exits or one of Qore::throw_thread_resource_exceptions() or Qore::throw_thread_resource_exceptions_to_mark() is called
|
|
bool | inTransaction () |
| Returns True if a remote transaction is in progress. More...
|
|
OMQ::StreamInfo | getInfo () |
| returns performance information about the stream
|
|
abstract private | socketThreadImpl () |
| this method implements the background thread that handles the actual DataStream socket sends
|
|
abstract private | startStreamImpl () |
| opens the remote transaction before starting the background I/O socket thread
|
|
abstract OMQ::StreamConfig | config () |
| returns the configuration object
|
|
|
| constructor (*hash< auto > options, DbRemoteBase remote, string sql, *list< auto > args) |
| Private constructor.
|
|
| constructor (*hash< auto > options, string remote, string datasource, string sql, *list< auto > args) |
| Private constructor.
|
|
| constructor (*hash< auto > options, QorusSystemRestHelper remote, string datasource, string sql, *list< auto > args) |
| Private constructor.
|
|
| throwAbortedExceptionImpl (string meth) |
| throw a user-friendly exception about why the I/O thread was aborted and the connection forcibly closed and how to avoid such situations in the future
|
|
| startStreamImpl () |
| opens the remote transaction if necessary
|
|
| socketThreadImpl () |
| receive reimplementation
|
|
nothing | recvDataImpl (auto rdata) |
| An abstract method to handle incoming data. More...
|
|
nothing | recvDataDoneImpl (*string err) |
| called when all data has been received or the background I/O operation terminates due to an error
|
|
| checkComplete (string meth) |
| checks if data is requested after the transfer is complete
|
|
| abortInternMethod (string meth) |
| if the I/O thread is running, then abort it, disconnect the connection, and throw a user-friendly exception what happened and how to avoid such situations in the future
|
|
| abortIntern (string meth, string fmt) |
| if the I/O thread is running, then abort it, disconnect the connection, and throw a user-friendly exception what happened and how to avoid such situations in the future
|
|
Qore::Thread::Counter | cnt () |
| confirmation counter for the socket I/O thread
|
|
OMQ::StreamInfo | m_info () |
| stream information
|
|
| waitIoComplete () |
| waits for the I/O thread to complete and returns
|
|
| markIoInProgress () |
| marks the current connection as in progress; must be already checked beforehand that it's not in progress
|
|
| markIoComplete () |
| performs thread cleanup on the background I/O thread if any is registered and running
|
|
| checkError (string cmd) |
| checks for an error in the background send thread; if there is one, then rethrows the exception in the main application / data thread
|
|
| startStream () |
| opens the remote transaction by calling startStreamImpl() and starts the background I/O socket thread More...
|
|
| socketThread (*ThreadLocalData my_tld) |
| sets up thread-local data if possible and calls socketThreadImpl() that implements the background socket I/O for the object
|
|
| abortIo (string reason) |
| signals the end of queue operations
|
|
bool | abortIoWait (string reason) |
| signals the end of queue operations and waits until the socket thread terminates More...
|
|
| beginTransaction (OMQ::DbStreamConfig m_config) |
| starts or continues a remote transaction
|
|
| commit (OMQ::DbStreamConfig m_config) |
| commits a remote transaction
|
|
| rollback (OMQ::DbStreamConfig m_config) |
| rolls back a remote transaction
|
|
A class for inbound/reading data from the remote instance, I/O is handled in a background thread and is made available in the getData() and getAllData() methods.
- Transaction Management
- The DbRemoteReceive object automatically starts or continues a remote transaction by including the
"Qorus-Connection: Continue-Persistent"
header when opening the stream if the "transaction"
constructor() option is set to True. Remote transactions must be explicitly committed (by calling DbRemoteReceive::commit() for example) or aborted (by calling DbRemoteReceive::disconnect() for example); otherwise the remote transaction status is left unchanged by the DbRemoteReceive object.
To explicitly start a transaction before stream operations, see AbstractParallelStream::beginTransaction()
- See also
-
- Data Format
- This class serializes data in column format, meaning a hash (with keys giving column names) of lists (giving the row values for each column). This is the most efficient data serialization technique from the point of view of the packet size and therefore of network performance. Additionally, this format is the native format to use with bulk DML on the local end.
Data in row format (lists of hashes) can be received by using the DbRemoteReceive::getDataRows() and DbRemoteReceive::getAllDataRows() methods.
- Threaded I/O Implementation
- This class creates a background thread that handles the socket I/O so that socket I/O can be executed in parallel to the main data handling thread (the thread that creates the object).
There is a Queue object that receives row data from the socket I/O thread; the default Queue size is 2 (representing a number of blocks), which allows two blocks of data to be queued before the DbRemoteReceive::getData() method will block.
If the Queue size is 2 and the block size is 1000 then at most 2000 rows will be queued for receiving before the I/O thread will block in the internal DbRemoteReceive::recvDataImpl() method.
This allows the main data handling thread to stay loosely in sync with the I/O thread so that memory usage is optimized and furthermore serves to avoid stalling the main data handling thread due to lack of data.
- DbRemoteReceive Usage
- The stream is set up in the constructor, which also starts the background socket I/O thread, which posts data on an internal queue. The data received is then made available in the getData(), getAllData(), getDataRows(), or getAllDataRows() methods.
- Example:
DbRemoteReceive recv(qrest, "omquser", table_name, {"block": 2000});
while (*hash<auto> h = recv.getData()) {
processData(h);
}
- Note
- a remote transaction is only started in this object's constructor if the
transaction
option is set, otherwise the remote transaction status is left unchanged