Filesystem stream for outbound/writing data to the remote instance.
More...
|
| constructor (string remote, string path, *hash< auto > options) |
| constructor taking a string giving the name of the remote connection for the remote server More...
|
|
| constructor (string remote, string conn, string path, *hash< auto > options) |
| constructor taking a string giving the name of the remote connection for the remote server More...
|
|
| constructor (QorusSystemRestHelper remote, string path, *hash< auto > options) |
| constructor taking an OMQ::QorusSystemRestHelper object for the remote connection More...
|
|
| constructor (QorusSystemRestHelper remote, string conn, string path, *hash< auto > options) |
| constructor taking an OMQ::QorusSystemRestHelper object for the remote connection More...
|
|
| commit () |
| this method flushes all buffered I/O to the remote server and returns when all data have been sent to the server and the socket I/O thread has terminated More...
|
|
| cancel () |
| this method purges any queued I/O and stops the socket thread and returns when the socket I/O thread has terminated More...
|
|
| append (data data) |
| Append data to the ready queue: data. More...
|
|
| append (Qore::FileLineIterator it) |
| Append data to the ready queue using a FileLineIterator object for input. More...
|
|
nothing | rename (string target, softbool man=False) |
| executes an implicit commit() and renames/moves the file given in path in the constructor() More...
|
|
nothing | del () |
| executes an implicit commit() and deletes the file given in path in the constructor() More...
|
|
OMQ::StreamConfig | config () |
| returns the configuration object
|
|
| destructor () |
| waits for any I/O to complete and then returns
|
|
nothing | cleanup () |
| called when the thread resource is still allocated and the thread exits or one of Qore::throw_thread_resource_exceptions() or Qore::throw_thread_resource_exceptions_to_mark() is called
|
|
| disconnect () |
| Disconnects the connection.
|
|
| flush () |
| flushes all remote I/O, signals the I/O thread to terminate and waits for confirmation, then checks for background I/O errors, in which case an exception is thrown in this thread More...
|
|
Filesystem stream for outbound/writing data to the remote instance.
- FsRemoteSend Background Socket I/O
- This class creates a background thread that handles the socket I/O so that socket I/O can be executed in parallel to the main data acquisition thread (the thread that creates the object).
There is a Queue object that passes aggregated row data to the send thread; the default Queue size is 2 (representing a number of blocks), which allows 2 blocks of data to be queued before the internal private FsRemoteSend::submitImpl() (called implicitly by DbRemoteSend::append()) will block.
If the Queue size is 2 and the block size is 16384 then at most 32768 bytes will be queued for sending before the main thread will block in the internal private method FsRemoteSend::submitImpl().
This allows the main data acquisition thread to stay loosely in sync with the I/O thread so that memory usage is optimized and furthermore serves to avoid stalling the I/O thread due to lack of data.
OMQ::FsRemote class can be used for synchronous calls of the remote system.fs service
- FsRemoteSend Usage
- Call FsRemoteSend::append() to queue data for sending, then call FsRemoteSend::commit() to ensure that all data have been sent to the remote server before continuing.
In case of an error in the sending thread, FsRemoteSend::cancel() should be called to cancel the socket I/O send operation and shut down the socket thread. Note that in this case the file is not automatically deleted; to delete the file, call omq.system.service.fs.del() manually.
- FsRemoteSend Example
FsRemoteSend fs(qrest, remote_fs_connection, relative_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
- Note
- This class is not thread-safe; it's designed to be used from a single thread; calling this class's methods from multiple threads simultaneously will result in unpredictable behavior
◆ append() [1/2]
OMQ::FsRemoteSend::append |
( |
data |
data | ) |
|
Append data to the ready queue: data.
- Example:
FsRemoteSend fs(qrest, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
- Parameters
-
Data are queued for sending when the buffered byte count reaches the block
size.
- Note
- make sure to call either commit() (to flush all data to the server) or cancel() (to cancel the I/O operation) before continuing (see the example above)
- exceptions thrown in the socket I/O thread will be rethrown when this method is called so that errors can be handled in the main data thread
◆ append() [2/2]
Append data to the ready queue using a FileLineIterator object for input.
- Example:
FsRemoteSend fs(qrest, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
fs.append(i);
- Parameters
-
This method processes the whole iterator in one call.
- Note
- make sure to call either commit() (to flush all data to the server) or cancel() (to cancel the I/O operation) before continuing (see the example above)
- exceptions thrown in the socket I/O thread will be rethrown when this method is called so that errors can be handled in the main data thread
◆ cancel()
OMQ::FsRemoteSend::cancel |
( |
| ) |
|
this method purges any queued I/O and stops the socket thread and returns when the socket I/O thread has terminated
- Example:
FsRemoteSend fs(qrest, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
- Note
- either this method or commit() should be called to ensure that all data has been sent
◆ commit()
OMQ::FsRemoteSend::commit |
( |
| ) |
|
this method flushes all buffered I/O to the remote server and returns when all data have been sent to the server and the socket I/O thread has terminated
- Example:
FsRemoteSend fs(qrest, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
Log messages are created with performance information (bytes send, time of transfer, bytes / second, etc).
- Note
- either this method, flush() or cancel() should be called to ensure that all data has been sent
- exceptions thrown in the socket I/O thread will be rethrown when this method is called so that errors can be handled in the main data thread
- equivalent to flush()
- See also
- flush()
◆ constructor() [1/4]
constructor taking an OMQ::QorusSystemRestHelper object for the remote connection
- Example:
FsRemoteSend fs(qrest, remote_connection, relative_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
- Parameters
-
remote | an OMQ::QorusSystemRestHelper object |
conn | an user connection |
path | a string with relative path of the target file (inluding file name) under conn |
options | optional Streaming API Constructor Options as follows:
"timeout" : an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
"loglevel" : a default log level option for logging; see Log Levels for valid value
"block" : data block size (minimum 4096); the number of bytes sent in each DataStream protocol chunk
"queue_block_size" : the number of blocks to queue for sending before the main data thread will block (default: 2)
"queue_timeout" : the number of milliseconds to wait for queue data before throwing a QUEUE-TIMEOUT exception
"no_remote_timeout" : if True the "timeout" option will not be sent to the remote
|
- Note
- the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object
◆ constructor() [2/4]
constructor taking an OMQ::QorusSystemRestHelper object for the remote connection
- Example:
FsRemoteSend fs(qrest, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
- Parameters
-
remote | an OMQ::QorusSystemRestHelper object |
path | a string with full or relative path of the target file (inluding file name) |
options | optional Streaming API Constructor Options as follows:
"timeout" : an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
"loglevel" : a default log level option for logging; see Log Levels for valid value
"block" : data block size (minimum 4096); the number of bytes sent in each DataStream protocol chunk
"queue_block_size" : the number of blocks to queue for sending before the main data thread will block (default: 2)
"queue_timeout" : the number of milliseconds to wait for queue data before throwing a QUEUE-TIMEOUT exception
"no_remote_timeout" : if True the "timeout" option will not be sent to the remote
|
- Note
- the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object
◆ constructor() [3/4]
constructor taking a string giving the name of the remote connection for the remote server
- Example:
FsRemoteSend fs(remote_name, target_connection, relative_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
- Parameters
-
remote | a string giving the name of the remote connection for the remote server |
conn | an user connection |
path | a string with relative path of the target file (inluding file name) under conn |
options | optional Streaming API Constructor Options as follows:
"timeout" : an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
"loglevel" : a default log level option for logging; see Log Levels for valid value
"block" : data block size (minimum 4096); the number of bytes sent in each DataStream protocol chunk
"queue_block_size" : the number of blocks to queue for sending before the main data thread will block (default: 2)
"queue_timeout" : the number of milliseconds to wait for queue data before throwing a QUEUE-TIMEOUT exception
"no_remote_timeout" : if True the "timeout" option will not be sent to the remote
|
- Note
- the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object
◆ constructor() [4/4]
OMQ::FsRemoteSend::constructor |
( |
string |
remote, |
|
|
string |
path, |
|
|
*hash< auto > |
options |
|
) |
| |
constructor taking a string giving the name of the remote connection for the remote server
- Example:
FsRemoteSend fs(remote_name, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
- Parameters
-
remote | a string giving the name of the remote connection for the remote server |
path | a string with full or relative path of the target file (inluding file name) |
options | optional Streaming API Constructor Options as follows:
"timeout" : an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
"loglevel" : a default log level option for logging; see Log Levels for valid value
"block" : data block size (default 16384, minimum 4096); the number of bytes sent in each DataStream protocol chunk
"queue_block_size" : the number of blocks to queue for sending before the main data thread will block (default: 2)
"queue_timeout" : the number of milliseconds to wait for queue data before throwing a QUEUE-TIMEOUT exception
"no_remote_timeout" : if True the "timeout" option will not be sent to the remote
|
- Note
- to calculate the total data queued for the socket I/O thread, multiply block by queue_block_size (default 16384 * 2 = 32768 bytes)
- the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object
◆ del()
nothing OMQ::FsRemoteSend::del |
( |
| ) |
|
executes an implicit commit() and deletes the file given in path
in the constructor()
- Example:
FsRemoteSend fs(qrest, target_path, opts);
{
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
}
fs.del(target);
- Note
- do not call this method before all data have been sent (ie call only after calling commit() first as in the example above)
◆ rename()
nothing OMQ::FsRemoteSend::rename |
( |
string |
target, |
|
|
softbool |
man = False |
|
) |
| |
executes an implicit commit() and renames/moves the file given in path
in the constructor()
- Example:
FsRemoteSend fs(qrest, target_path, opts);
{
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
}
fs.rename(target);
- Parameters
-
target | a new path with filename. Relative to user connection if the user connection constructor is used. |
man | rename the file manually by copying the file's data and then deleting the source file, this is sometimes necessary on some OSes when renaming files across filesystem boundaries |
- Warning
- The original file after the rename does not exist anymore if
man
is set to True. Any additional append() call will result in an exception.
- Note
- do not call this method before all data have been sent (ie call only after calling commit() first as in the example above)
◆ sendDataImpl()
auto OMQ::FsRemoteSend::sendDataImpl |
( |
| ) |
|
|
private |
reimplemented from DataStreamClient::DataStreamSendMessage
- Returns
- the data to send
- Exceptions
-
STREAM-TERMINATED | if the stream was terminated in the main thread, this exception will be thrown here to terminate the I/O thread |
◆ submitImpl()
bool OMQ::FsRemoteSend::submitImpl |
( |
| ) |
|
|
privatevirtual |
queues buffered data to be sent in the socket I/O thread
- Returns
- True if the I/O thread is running, False if not
- Exceptions
-
IO-ERROR | the I/O thread is not running so no data can be submitted |
Implements OMQ::AbstractParallelWriteStream.
The documentation for this class was generated from the following file: