Qorus Integration Engine®  4.0.3.p2_git
OMQ::FsRemoteSend Class Reference

Filesystem stream for outbound/writing data to the remote instance. More...

Inheritance diagram for OMQ::FsRemoteSend:
OMQ::AbstractParallelWriteStream OMQ::AbstractParallelStream

Public Member Methods

 constructor (string remote, string path, *hash options)
 constructor taking a string giving the name of the remote connection for the remote server More...
 
 constructor (string remote, string conn, string path, *hash options)
 constructor taking a string giving the name of the remote connection for the remote server More...
 
 constructor (QorusSystemRestHelper remote, string path, *hash options)
 constructor taking an OMQ::QorusSystemRestHelper object for the remote connection More...
 
 constructor (QorusSystemRestHelper remote, string conn, string path, *hash options)
 constructor taking an OMQ::QorusSystemRestHelper object for the remote connection More...
 
private stopIoThreadImpl ()
 signal the I/O thread to stop sending data
 
private terminateIoWait (string action)
 flushes all remote I/O, terminates the I/O thread, and checks for background I/O errors, in which case an exception is thrown
 
private startStreamImpl ()
 no implementation in this class
 
private socketThreadImpl ()
 opens the remote socket stream in the socket I/O thread and starts streaming the data
 
 commit ()
 this method flushes all buffered I/O to the remote server and returns when all data have been sent to the server and the socket I/O thread has terminated More...
 
 cancel ()
 this method purges any queued I/O and stops the socket thread and returns when the socket I/O thread has terminated More...
 
 append (data data)
 Append data to the ready queue: data. More...
 
 append (Qore::FileLineIterator it)
 Append data to the ready queue using a FileLineIterator object for input. More...
 
private bool submitImpl ()
 queues buffered data to be sent in the socket I/O thread More...
 
nothing rename (string target, softbool man=False)
 executes an implicit commit() and renames/moves the file given in path in the constructor() More...
 
nothing del ()
 executes an implicit commit() and deletes the file given in path in the constructor() More...
 
private any sendDataImpl ()
 reimplemented from DataStreamClient::DataStreamSendMessage More...
 
OMQ::StreamConfig config ()
 returns the configuration object
 
- Public Member Methods inherited from OMQ::AbstractParallelWriteStream
 destructor ()
 waits for any I/O to complete and then returns
 
nothing cleanup ()
 called when the thread resource is still allocated and the thread exits or one of Qore::throw_thread_resource_exceptions() or Qore::throw_thread_resource_exceptions_to_mark() is called
 
 disconnect ()
 Disconnects the connection.
 
 flush ()
 flushes all remote I/O, signals the I/O thread to terminate and waits for confirmation, then checks for background I/O errors, in which case an exception is thrown in this thread More...
 
private bool terminateIoWaitIntern (string action)
 flushes all remote I/O, signals the I/O thread the terminate and waits for confirmation, then checks for background I/O errors, in which case an exception is thrown in this thread More...
 

Private Attributes

*string m_connection
 optional name of the user connection. It can be NOTHING if the non-connection methods should be called
 
data m_data
 waiting data buffer
 
FsSendStreamConfig m_config
 stream configuration
 

Additional Inherited Members

Detailed Description

Filesystem stream for outbound/writing data to the remote instance.

FsRemoteSend Background Socket I/O
This class creates a background thread that handles the socket I/O so that socket I/O can be executed in parallel to the main data acquisition thread (the thread that creates the object).

There is a Queue object that passes aggregated row data to the send thread; the default Queue size is 2 (representing a number of blocks), which allows 2 blocks of data to be queued before the internal private FsRemoteSend::submitImpl() (called implicitly by DbRemoteSend::append()) will block.

If the Queue size is 2 and the block size is 16384 then at most 32768 bytes will be queued for sending before the main thread will block in the internal private method FsRemoteSend::submitImpl().

This allows the main data acquisition thread to stay loosely in sync with the I/O thread so that memory usage is optimized and furthermore serves to avoid stalling the I/O thread due to lack of data.

OMQ::FsRemote class can be used for synchronous calls of the remote system.fs service

FsRemoteSend Usage
Call FsRemoteSend::append() to queue data for sending, then call FsRemoteSend::commit() to ensure that all data have been sent to the remote server before continuing.

In case of an error in the sending thread, FsRemoteSend::cancel() should be called to cancel the socket I/O send operation and shut down the socket thread. Note that in this case the file is not automatically deleted; to delete the file, call omq.system.service.fs.del() manually.
FsRemoteSend Example
FsRemoteSend fs(qrest, remote_fs_connection, relative_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
Note
This class is not thread-safe; it's designed to be used from a single thread; calling this class's methods from multiple threads simultaneously will result in unpredictable behavior

Member Function Documentation

◆ append() [1/2]

OMQ::FsRemoteSend::append ( data  data)

Append data to the ready queue: data.

Example:
FsRemoteSend fs(qrest, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
Parameters
datadata to be sent

Data are queued for sending when the buffered byte count reaches the block size.

Note
  • make sure to call either commit() (to flush all data to the server) or cancel() (to cancel the I/O operation) before continuing (see the example above)
  • exceptions thrown in the socket I/O thread will be rethrown when this method is called so that errors can be handled in the main data thread

◆ append() [2/2]

OMQ::FsRemoteSend::append ( Qore::FileLineIterator  it)

Append data to the ready queue using a FileLineIterator object for input.

Example:
FsRemoteSend fs(qrest, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
fs.append(i);
Parameters
ita FileLineIterator object with data to be sent to the remote server

This method processes the whole iterator in one call.

Note
  • make sure to call either commit() (to flush all data to the server) or cancel() (to cancel the I/O operation) before continuing (see the example above)
  • exceptions thrown in the socket I/O thread will be rethrown when this method is called so that errors can be handled in the main data thread

◆ cancel()

OMQ::FsRemoteSend::cancel ( )

this method purges any queued I/O and stops the socket thread and returns when the socket I/O thread has terminated

Example:
FsRemoteSend fs(qrest, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
Note
either this method or commit() should be called to ensure that all data has been sent

◆ commit()

OMQ::FsRemoteSend::commit ( )

this method flushes all buffered I/O to the remote server and returns when all data have been sent to the server and the socket I/O thread has terminated

Example:
FsRemoteSend fs(qrest, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;

Log messages are created with performance information (bytes send, time of transfer, bytes / second, etc).

Note
  • either this method, flush() or cancel() should be called to ensure that all data has been sent
  • exceptions thrown in the socket I/O thread will be rethrown when this method is called so that errors can be handled in the main data thread
  • equivalent to flush()
See also
flush()

◆ constructor() [1/4]

OMQ::FsRemoteSend::constructor ( string  remote,
string  path,
*hash  options 
)

constructor taking a string giving the name of the remote connection for the remote server

Example:
FsRemoteSend fs(remote_name, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
Parameters
remotea string giving the name of the remote connection for the remote server
patha string with full or relative path of the target file (inluding file name)
optionsoptional Streaming API Constructor Options as follows:
  • "timeout": an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
  • "loglevel": a default log level option for logging; see Log Levels for valid value
  • "block": data block size (default 16384, minimum 4096); the number of bytes sent in each DataStream protocol chunk
  • "queue_block_size": the number of blocks to queue for sending before the main data thread will block (default: 2)
  • "queue_timeout": the number of milliseconds to wait for queue data before throwing a QUEUE-TIMEOUT exception
  • "no_remote_timeout": if True the "timeout" option will not be sent to the remote
Note
  • to calculate the total data queued for the socket I/O thread, multiply block by queue_block_size (default 16384 * 2 = 32768 bytes)
  • the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object

◆ constructor() [2/4]

OMQ::FsRemoteSend::constructor ( string  remote,
string  conn,
string  path,
*hash  options 
)

constructor taking a string giving the name of the remote connection for the remote server

Example:
FsRemoteSend fs(remote_name, target_connection, relative_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
Parameters
remotea string giving the name of the remote connection for the remote server
connan user connection
patha string with relative path of the target file (inluding file name) under conn
optionsoptional Streaming API Constructor Options as follows:
  • "timeout": an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
  • "loglevel": a default log level option for logging; see Log Levels for valid value
  • "block": data block size (minimum 4096); the number of bytes sent in each DataStream protocol chunk
  • "queue_block_size": the number of blocks to queue for sending before the main data thread will block (default: 2)
  • "queue_timeout": the number of milliseconds to wait for queue data before throwing a QUEUE-TIMEOUT exception
  • "no_remote_timeout": if True the "timeout" option will not be sent to the remote
Note
the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object

◆ constructor() [3/4]

OMQ::FsRemoteSend::constructor ( QorusSystemRestHelper  remote,
string  path,
*hash  options 
)

constructor taking an OMQ::QorusSystemRestHelper object for the remote connection

Example:
FsRemoteSend fs(qrest, target_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
Parameters
remotean OMQ::QorusSystemRestHelper object
patha string with full or relative path of the target file (inluding file name)
optionsoptional Streaming API Constructor Options as follows:
  • "timeout": an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
  • "loglevel": a default log level option for logging; see Log Levels for valid value
  • "block": data block size (minimum 4096); the number of bytes sent in each DataStream protocol chunk
  • "queue_block_size": the number of blocks to queue for sending before the main data thread will block (default: 2)
  • "queue_timeout": the number of milliseconds to wait for queue data before throwing a QUEUE-TIMEOUT exception
  • "no_remote_timeout": if True the "timeout" option will not be sent to the remote
Note
the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object

◆ constructor() [4/4]

OMQ::FsRemoteSend::constructor ( QorusSystemRestHelper  remote,
string  conn,
string  path,
*hash  options 
)

constructor taking an OMQ::QorusSystemRestHelper object for the remote connection

Example:
FsRemoteSend fs(qrest, remote_connection, relative_path, opts);
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
Parameters
remotean OMQ::QorusSystemRestHelper object
connan user connection
patha string with relative path of the target file (inluding file name) under conn
optionsoptional Streaming API Constructor Options as follows:
  • "timeout": an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
  • "loglevel": a default log level option for logging; see Log Levels for valid value
  • "block": data block size (minimum 4096); the number of bytes sent in each DataStream protocol chunk
  • "queue_block_size": the number of blocks to queue for sending before the main data thread will block (default: 2)
  • "queue_timeout": the number of milliseconds to wait for queue data before throwing a QUEUE-TIMEOUT exception
  • "no_remote_timeout": if True the "timeout" option will not be sent to the remote
Note
the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object

◆ del()

nothing OMQ::FsRemoteSend::del ( )

executes an implicit commit() and deletes the file given in path in the constructor()

Example:
FsRemoteSend fs(qrest, target_path, opts);
{
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
}
fs.del(target);
Note
do not call this method before all data have been sent (ie call only after calling commit() first as in the example above)

◆ rename()

nothing OMQ::FsRemoteSend::rename ( string  target,
softbool  man = False 
)

executes an implicit commit() and renames/moves the file given in path in the constructor()

Example:
FsRemoteSend fs(qrest, target_path, opts);
{
on_success fs.commit();
on_error fs.cancel();
map fs.append($1), dataIterator;
}
fs.rename(target);
Parameters
targeta new path with filename. Relative to user connection if the user connection constructor is used.
manrename the file manually by copying the file's data and then deleting the source file, this is sometimes necessary on some OSes when renaming files across filesystem boundaries
Warning
The original file after the rename does not exist anymore if man is set to True. Any additional append() call will result in an exception.
Note
do not call this method before all data have been sent (ie call only after calling commit() first as in the example above)

◆ sendDataImpl()

private any OMQ::FsRemoteSend::sendDataImpl ( )

reimplemented from DataStreamClient::DataStreamSendMessage

Returns
the data to send
Exceptions
STREAM-TERMINATEDif the stream was terminated in the main thread, this exception will be thrown here to terminate the I/O thread

◆ submitImpl()

private bool OMQ::FsRemoteSend::submitImpl ( )
virtual

queues buffered data to be sent in the socket I/O thread

Returns
True if the I/O thread is running, False if not
Exceptions
IO-ERRORthe I/O thread is not running so no data can be submitted

Implements OMQ::AbstractParallelWriteStream.


The documentation for this class was generated from the following file: