Filesystem stream for outbound/writing data to the remote instance.
More...
|
| constructor (string remote, string path, *hash< auto > options) |
| constructor taking a string giving the name of the remote connection for the remote server
|
|
| constructor (string remote, string conn, string path, *hash< auto > options) |
| constructor taking a string giving the name of the remote connection for the remote server
|
|
| constructor (QorusSystemRestHelper remote, string path, *hash< auto > options) |
| constructor taking an OMQ::QorusSystemRestHelper object for the remote connection
|
|
| constructor (QorusSystemRestHelper remote, string conn, string path, *hash< auto > options) |
| constructor taking an OMQ::QorusSystemRestHelper object for the remote connection
|
|
| commit () |
| this method flushes all buffered I/O to the remote server and returns when all data have been sent to the server and the socket I/O thread has terminated
|
|
| cancel () |
| this method purges any queued I/O and stops the socket thread and returns when the socket I/O thread has terminated
|
|
| append (data data) |
| Append data to the ready queue: data.
|
|
| append (Qore::FileLineIterator it) |
| Append data to the ready queue using a FileLineIterator object for input.
|
|
nothing | rename (string target, softbool man=False) |
| executes an implicit commit() and renames/moves the file given in path in the constructor()
|
|
nothing | del () |
| executes an implicit commit() and deletes the file given in path in the constructor()
|
|
OMQ::StreamConfig | config () |
| returns the configuration object
|
|
| destructor () |
| waits for any I/O to complete and then returns
|
|
nothing | cleanup () |
| called when the thread resource is still allocated and the thread exits or one of Qore::throw_thread_resource_exceptions() or Qore::throw_thread_resource_exceptions_to_mark() is called
|
|
| disconnect () |
| Disconnects the connection.
|
|
| flush () |
| flushes all remote I/O, signals the I/O thread to terminate and waits for confirmation, then checks for background I/O errors, in which case an exception is thrown in this thread
|
|
| destructor () |
| wait for any I/O in progress to complete
|
|
nothing | cleanupBase () |
| called by child classes when the thread resource is still allocated and the thread exits or one of Qore::throw_thread_resource_exceptions() or Qore::throw_thread_resource_exceptions_to_mark() is called
|
|
bool | inTransaction () |
| Returns True if a remote transaction is in progress.
|
|
OMQ::StreamInfo | getInfo () |
| returns performance information about the stream
|
|
|
| stopIoThreadImpl () |
| signal the I/O thread to stop sending data
|
|
| terminateIoWait (string action) |
| flushes all remote I/O, terminates the I/O thread, and checks for background I/O errors, in which case an exception is thrown
|
|
| startStreamImpl () |
| no implementation in this class
|
|
| socketThreadImpl () |
| opens the remote socket stream in the socket I/O thread and starts streaming the data
|
|
bool | submitImpl () |
| queues buffered data to be sent in the socket I/O thread
|
|
auto | sendDataImpl () |
| reimplemented from DataStreamClient::DataStreamSendMessage
|
|
bool | terminateIoWaitIntern (string action) |
| flushes all remote I/O, signals the I/O thread the terminate and waits for confirmation, then checks for background I/O errors, in which case an exception is thrown in this thread
|
|
Qore::Thread::Counter | cnt () |
| confirmation counter for the socket I/O thread
|
|
OMQ::StreamInfo | m_info () |
| stream information
|
|
| waitIoComplete () |
| waits for the I/O thread to complete and returns
|
|
| markIoInProgress () |
| marks the current connection as in progress; must be already checked beforehand that it's not in progress
|
|
| markIoComplete () |
| performs thread cleanup on the background I/O thread if any is registered and running
|
|
| checkError (string cmd) |
| checks for an error in the background send thread; if there is one, then rethrows the exception in the main application / data thread
|
|
| startStream () |
| opens the remote transaction by calling startStreamImpl() and starts the background I/O socket thread
|
|
| socketThread (*ThreadLocalData my_tld) |
| sets up thread-local data if possible and calls socketThreadImpl() that implements the background socket I/O for the object
|
|
| abortIo (string reason) |
| signals the end of queue operations
|
|
bool | abortIoWait (string reason) |
| signals the end of queue operations and waits until the socket thread terminates
|
|
| beginTransaction (OMQ::DbStreamConfig m_config) |
| starts or continues a remote transaction
|
|
| commit (OMQ::DbStreamConfig m_config) |
| commits a remote transaction
|
|
| rollback (OMQ::DbStreamConfig m_config) |
| rolls back a remote transaction
|
|
|
*string | m_connection |
| optional name of the user connection. It can be NOTHING if the non-connection methods should be called
|
|
data | m_data |
| waiting data buffer
|
|
FsSendStreamConfig | m_config |
| stream configuration
|
|
Qore::Thread::Queue | dataq |
| Queue for the socket I/O thread.
|
|
*hash< auto > | thread_ex |
| exception info for send thread to communicate errors to the primary data thread
|
|
string | objid |
| stream object identifier
|
|
const | StreamTerminatedError = "STREAM-TERMINATED" |
| stream terminated error
|
|
const | ThreadKey = "_STREAM_" |
| thread-local data key
|
|
|
static | staticCleanup () |
| performs static thread resource cleanup on I/O thread resources
|
|
static | checkStream (QorusSystemRestHelper helper, string op) |
| checks if there is a stream operation running in the background, if so, a user-friendly exception is thrown
|
|
const | DO_DISCONNECT = True |
| A constant for rollback() methods to perform real disconnect.
|
|
const | DONT_DISCONNECT = False |
| A constant for rollback() methods to keep the network connection alive.
|
|
Filesystem stream for outbound/writing data to the remote instance.
- FsRemoteSend Background Socket I/O
- This class creates a background thread that handles the socket I/O so that socket I/O can be executed in parallel to the main data acquisition thread (the thread that creates the object).
There is a Queue object that passes aggregated row data to the send thread; the default Queue size is 2 (representing a number of blocks), which allows 2 blocks of data to be queued before the internal private FsRemoteSend::submitImpl() (called implicitly by DbRemoteSend::append()) will block.
If the Queue size is 2 and the block size is 16384 then at most 32768 bytes will be queued for sending before the main thread will block in the internal private method FsRemoteSend::submitImpl().
This allows the main data acquisition thread to stay loosely in sync with the I/O thread so that memory usage is optimized and furthermore serves to avoid stalling the I/O thread due to lack of data.
OMQ::FsRemote class can be used for synchronous calls of the remote system.fs service
- FsRemoteSend Usage
- Call FsRemoteSend::append() to queue data for sending, then call FsRemoteSend::commit() to ensure that all data have been sent to the remote server before continuing.
In case of an error in the sending thread, FsRemoteSend::cancel() should be called to cancel the socket I/O send operation and shut down the socket thread. Note that in this case the file is not automatically deleted; to delete the file, call omq.system.service.fs.del() manually.
- FsRemoteSend Example
FsRemoteSend fs(qrest, remote_fs_connection, relative_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
- Note
- This class is not thread-safe; it's designed to be used from a single thread; calling this class's methods from multiple threads simultaneously will result in unpredictable behavior
◆ append() [1/2]
OMQ::FsRemoteSend::append |
( |
data |
data | ) |
|
Append data to the ready queue: data.
- Example:
FsRemoteSend fs(qrest, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
- Parameters
-
Data are queued for sending when the buffered byte count reaches the block
size.
- Note
- make sure to call either commit() (to flush all data to the server) or cancel() (to cancel the I/O operation) before continuing (see the example above)
- exceptions thrown in the socket I/O thread will be rethrown when this method is called so that errors can be handled in the main data thread
◆ append() [2/2]
Append data to the ready queue using a FileLineIterator object for input.
- Example:
FsRemoteSend fs(qrest, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
fs.append(i);
- Parameters
-
This method processes the whole iterator in one call.
- Note
- make sure to call either commit() (to flush all data to the server) or cancel() (to cancel the I/O operation) before continuing (see the example above)
- exceptions thrown in the socket I/O thread will be rethrown when this method is called so that errors can be handled in the main data thread
◆ cancel()
OMQ::FsRemoteSend::cancel |
( |
| ) |
|
this method purges any queued I/O and stops the socket thread and returns when the socket I/O thread has terminated
- Example:
FsRemoteSend fs(qrest, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
- Note
- either this method or commit() should be called to ensure that all data has been sent
◆ commit()
OMQ::FsRemoteSend::commit |
( |
| ) |
|
this method flushes all buffered I/O to the remote server and returns when all data have been sent to the server and the socket I/O thread has terminated
- Example:
FsRemoteSend fs(qrest, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
Log messages are created with performance information (bytes send, time of transfer, bytes / second, etc).
- Note
- either this method, flush() or cancel() should be called to ensure that all data has been sent
- exceptions thrown in the socket I/O thread will be rethrown when this method is called so that errors can be handled in the main data thread
- equivalent to flush()
- See also
- flush()
◆ constructor() [1/4]
OMQ::FsRemoteSend::constructor |
( |
QorusSystemRestHelper |
remote, |
|
|
string |
conn, |
|
|
string |
path, |
|
|
*hash< auto > |
options |
|
) |
| |
constructor taking an OMQ::QorusSystemRestHelper object for the remote connection
- Example:
FsRemoteSend fs(qrest, remote_connection, relative_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
- Parameters
-
remote | an OMQ::QorusSystemRestHelper object |
conn | an user connection |
path | a string with relative path of the target file (inluding file name) under conn |
options | optional Streaming API Constructor Options as follows:
"timeout" : an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
"loglevel" : a default log level option for logging; see Log Levels for valid value
"block" : data block size (minimum 4096); the number of bytes sent in each DataStream protocol chunk
"queue_block_size" : the number of blocks to queue for sending before the main data thread will block (default: 2)
"queue_timeout" : the number of milliseconds to wait for queue data before throwing a QUEUE-TIMEOUT exception
"no_remote_timeout" : if True the "timeout" option will not be sent to the remote
|
- Note
- the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object
◆ constructor() [2/4]
constructor taking an OMQ::QorusSystemRestHelper object for the remote connection
- Example:
FsRemoteSend fs(qrest, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
- Parameters
-
remote | an OMQ::QorusSystemRestHelper object |
path | a string with full or relative path of the target file (inluding file name) |
options | optional Streaming API Constructor Options as follows:
"timeout" : an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
"loglevel" : a default log level option for logging; see Log Levels for valid value
"block" : data block size (minimum 4096); the number of bytes sent in each DataStream protocol chunk
"queue_block_size" : the number of blocks to queue for sending before the main data thread will block (default: 2)
"queue_timeout" : the number of milliseconds to wait for queue data before throwing a QUEUE-TIMEOUT exception
"no_remote_timeout" : if True the "timeout" option will not be sent to the remote
|
- Note
- the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object
◆ constructor() [3/4]
OMQ::FsRemoteSend::constructor |
( |
string |
remote, |
|
|
string |
conn, |
|
|
string |
path, |
|
|
*hash< auto > |
options |
|
) |
| |
constructor taking a string giving the name of the remote connection for the remote server
- Example:
FsRemoteSend fs(remote_name, target_connection, relative_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
- Parameters
-
remote | a string giving the name of the remote connection for the remote server |
conn | an user connection |
path | a string with relative path of the target file (inluding file name) under conn |
options | optional Streaming API Constructor Options as follows:
"timeout" : an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
"loglevel" : a default log level option for logging; see Log Levels for valid value
"block" : data block size (minimum 4096); the number of bytes sent in each DataStream protocol chunk
"queue_block_size" : the number of blocks to queue for sending before the main data thread will block (default: 2)
"queue_timeout" : the number of milliseconds to wait for queue data before throwing a QUEUE-TIMEOUT exception
"no_remote_timeout" : if True the "timeout" option will not be sent to the remote
|
- Note
- the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object
◆ constructor() [4/4]
OMQ::FsRemoteSend::constructor |
( |
string |
remote, |
|
|
string |
path, |
|
|
*hash< auto > |
options |
|
) |
| |
constructor taking a string giving the name of the remote connection for the remote server
- Example:
FsRemoteSend fs(remote_name, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
- Parameters
-
remote | a string giving the name of the remote connection for the remote server |
path | a string with full or relative path of the target file (inluding file name) |
options | optional Streaming API Constructor Options as follows:
"timeout" : an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
"loglevel" : a default log level option for logging; see Log Levels for valid value
"block" : data block size (default 16384, minimum 4096); the number of bytes sent in each DataStream protocol chunk
"queue_block_size" : the number of blocks to queue for sending before the main data thread will block (default: 2)
"queue_timeout" : the number of milliseconds to wait for queue data before throwing a QUEUE-TIMEOUT exception
"no_remote_timeout" : if True the "timeout" option will not be sent to the remote
|
- Note
- to calculate the total data queued for the socket I/O thread, multiply block by queue_block_size (default 16384 * 2 = 32768 bytes)
- the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object
◆ del()
nothing OMQ::FsRemoteSend::del |
( |
| ) |
|
executes an implicit commit() and deletes the file given in path
in the constructor()
- Example:
FsRemoteSend fs(qrest, target_path, opts);
{
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
}
fs.del(target);
- Note
- do not call this method before all data have been sent (ie call only after calling commit() first as in the example above)
◆ rename()
nothing OMQ::FsRemoteSend::rename |
( |
string |
target, |
|
|
softbool |
man = False |
|
) |
| |
executes an implicit commit() and renames/moves the file given in path
in the constructor()
- Example:
FsRemoteSend fs(qrest, target_path, opts);
{
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
}
fs.rename(target);
- Parameters
-
target | a new path with filename. Relative to user connection if the user connection constructor is used. |
man | rename the file manually by copying the file's data and then deleting the source file, this is sometimes necessary on some OSes when renaming files across filesystem boundaries |
- Warning
- The original file after the rename does not exist anymore if
man
is set to True. Any additional append() call will result in an exception.
- Note
- do not call this method before all data have been sent (ie call only after calling commit() first as in the example above)
◆ sendDataImpl()
auto OMQ::FsRemoteSend::sendDataImpl |
( |
| ) |
|
|
private |
reimplemented from DataStreamClient::DataStreamSendMessage
- Returns
- the data to send
- Exceptions
-
STREAM-TERMINATED | if the stream was terminated in the main thread, this exception will be thrown here to terminate the I/O thread |
◆ submitImpl()
bool OMQ::FsRemoteSend::submitImpl |
( |
| ) |
|
|
private |
queues buffered data to be sent in the socket I/O thread
- Returns
- True if the I/O thread is running, False if not
- Exceptions
-
IO-ERROR | the I/O thread is not running so no data can be submitted |
The documentation for this class was generated from the following file: