Poll An SFTP Server For CSV Data And Import To DB: Part 1 of 2

In this blog post, you’ll learn to poll an SFTP server for CSV files to read its contents into a DB table.

Qorus is a unique low-code/no-code IT integration and automation platform, as it allows developers full freedom to code while supporting powerful, reusable, configuration-based integration objects as well

As such, this blog post takes a complete no-code approach to achieve today’s objective.

Requirements

  • A local Qorus environment
  • Qorus Developer Tools extension for Visual Studio Code
  • Qorus python client
  • A running SFTP server and a user with read/write access to the server
  • A Database table to write the data to

Getting Started

You’ll get started by cloning our building-blocks repository, which contains all the required components/blocks to set you up on a no-code approach to data integration.

Clone the repository by
git clone https://git.qoretechnologies.com/qorus/building-blocks.git
The source to achieving this blog post’s goal is already made available in the examples/csv-sftp-to-db-import of the building-blocks repository. You’ll walk through the process involved in creating this solution.
Now that you’ve met all the requirements, let’s poll an SFTP server for CSV files and populate its contents to a DB table using the Qorus interface. You achieve that by the following:
  1. Implementing a local file system connection
  2. Writing a qscript to create interface directories in the Qorus user filesystem
  3. Implementing an SFTP connection
  4. Implementing a schema module for interface tables
  5. Creating a mapper to map the data from the CSV file to the database
  6. Creating a workflow to process the CSV file into the DB
  7. Building an SFTP polling job that polls the SFTP server every 5 minutes and transfers all CSV files to the local directory

1. Implementing a Local File System Connection

  1. Click on the purple “Q” to get the extension main menu and log in to your Qorus server by clicking on the power button.
  2. Click on the Create new (new file) icon next to the Connection interface from the interface hierarchy view.
  3. In the Qorus Webview window, fill in the new connection form to configure the connection.
  • Target Directory : Browse through the Show Directories field and select a source directory. This will be the directory where the code generated by the interface is stored.
  • Name : fs-example-csv This will be the name of your connection.
  • Description : CSV import file system connection The description field supports markdown so, feel free to format your text in markdown.
  • URL : file://$OMQ_DIR/user/csv-import The URL should point to the top of the CSV filesystem hierarchy.
When the Qorus’ docker container is run for the first time, the environment setup script /opt/qorus/bin/env.sh in the container is executed, which exports the environment variable OMQ_DIR and sets it to /opt/qorus. Therefore, $OMQ_DIR/user/csv-import is /opt/qorus/user/csv-imort
4. Deploy the new connection to the Qorus server by clicking on the cloud icon next to the connection name in the hierarchy view.
ℹ️ Note: You can achieve the same by deploying the fs-example-csv connection to the server or with oload using the Python client

2. Writing a qscript to create interface directories in the Qorus user filesystem

In this section, you’ll create at a Qscript to create the local CSV polling directories. Qscripts can be either Qore scripts, shell scripts, or Python scripts. In this example, you’ll write a Qscript in Qore.

ℹ️ Note: UNIX operating systems allow an executable script to specify their interpreter. This is done by setting the first line of the script to a special string indicating the location of the interpreter (Qore binary):

#!/usr/bin/env qore

Parse Directives Required

DirectiveDescription
%new-style Sets both %allow-bare-refs and %assume-local. These two options together make programming in Qore superficially more like programming in C++ or Java programs; use this if you dislike programming with the "$" sign, for example.
%require-types Requires type declarations for all function and method parameters, return types, variables, and object members. Additionally causes CALL-WITH-TYPE-ERRORS exceptions to be thrown if a function, method, or code call is made with excess arguments not handled by the target. In case calls are made to user code over a Program barrier, the parse options of the target code determine if this exception is raised;
%strict-args Prohibits access to builtin functions and methods flagged with RUNTIME_NOOP and also causes errors to be raised if excess arguments are given to functions that do not access excess arguments and if a non-list lvalue is passed to the Push Operator (push), Pop Operator (pop), or Shift Operator (shift). Additionally, If a function, method, or code call is made with excess arguments not handled by the target, a CALL-WITH-TYPE-ERRORS exception is thrown. In case calls are made to user code over a Program barrier, the parse options of the target code determine if this exception is raised;
%enable-all-warnings Enables all warnings while parsing. See Warnings for more information.
1. In your source directory, create a file with “.qscript” extension. Open the file and add specify the interpreter. Also, add the required parse directives.
2. Create the interface directories using the mkdir() function in the Qore namespace.
mkdir(ENV.OMQ_DIR + "/user/csv-import/in-process", 0755, True);
mkdir(ENV.OMQ_DIR + "/user/csv-import/archive", 0755, True);
mkdir(ENV.OMQ_DIR + "/user/csv-import/duplicate", 0755, True);
3. Create SSH/SFTP config directory for staring private keys
4. Deploy the qscript to the server with the IDE or with oload using the Python client
mkdir(ENV.OMQ_DIR + "/user/ssh-config", 0755, True);
ℹ️ Note: The above script is available in the building-blocks repository. You can find it at /examples/csv-sftp-to-db-import/setup-env.qscript

3. Implementing an SFTP Connection

Before trying to implement an SFTP connection, you must have access to an SFTP server and to a user with read/write privileges on it. Create an interface directory on your SFTP server, which you’ll point to (sftp://[your-server:port]/[path-to-your-interface-directory]) in the URL field later in this section.
 
You can now create an SFTP connection to the SFTP server and the interface directory on it by:
  1. Click on the Create new (new file) icon next to the Connection interface from the interface hierarchy view.
  2. In the Qorus Webview window, fill in the new connection form to configure the connection.
  • Target Directory : Browse through the Show Directories field and select a source directory. This will be the directory where the code generated by the interface is stored.
  • Name : sftp-example-csv This will be the name of our connection.
  • Description : CSV import SFTP server connection The description field supports markdown so, feel free to format your text in markdown.
  • URL : sftp://[your-server:port]/[path-to-your-interface-directory] points to the SFTP server and interface directory on it.
  • Go to the Connection options section and click on the Add option button to add a key file. In the Custom tab of the keyfile, enter the path to a private key file in PEM format to use for key-based authentication. Options – keyfile : $OMQ_DIR/user/ssh-config/id_ed25519
ℹ️ Note : Qorus provides the flexibility of either using a key file or a username/password to access the server
3. Deploy the new connection to the Qorus server by clicking on the cloud icon next to the connection name in the hierarchy view.
ℹ️ Note : You can achieve the same by deploying the sftp-example-csv connection to the server or with oload using the Python client

4. Implementing a schema module for interface tables

Qorus makes schema maintenance and deployments easy by letting its users describe a DB schema in a DB-independent way. When oload runs on a Qorus schema module file, it will align the DB with the schema. If the DB is empty, then the objects are created. If existing objects are found, then they are aligned with the description.

You can easily change schema definitions by editing the configuration in the schema module and then deploying it on the server. Qorus schema module can contain: tables, indexes, primary keys, foreign constraints, triggers, sequences, also reference data can be managed and more.

When using a Qorus schema module, you don’t have to worry about DDL for each specific DB, Qorus will take care of such details for you.

Parse Directives Required

DirectiveDescription
%requires Loads a Qore module immediately. The parse directive can be used to load a module during parsing, and the command line version can be used to load a module before parsing. If the optional (reexport) option is used in a user module, then any modules loaded with this form of the %requires parse directive will also be imported into the importing Program.
%append-module-path Appends the given directories to qore's module path; also performs environment variable substitution. Only directories that exist and are readable are added to the module path.
%new-style Sets both %allow-bare-refs and %assume-local. These two options together make programming in Qore superficially more like programming in C++ or Java programs; use this if you dislike programming with the "$" sign, for example.
%require-types Requires type declarations for all function and method parameters, return types, variables, and object members. Additionally causes CALL-WITH-TYPE-ERRORS exceptions to be thrown if a function, method, or code call is made with excess arguments not handled by the target. In case calls are made to user code over a Program barrier, the parse options of the target code determine if this exception is raised;
%strict-args Prohibits access to builtin functions and methods flagged with RUNTIME_NOOP and also causes errors to be raised if excess arguments are given to functions that do not access excess arguments and if a non-list lvalue is passed to the Push Operator (push), Pop Operator (pop), or Shift Operator (shift). Additionally, If a function, method, or code call is made with excess arguments not handled by the target, a CALL-WITH-TYPE-ERRORS exception is thrown. In case calls are made to user code over a Program barrier, the parse options of the target code determine if this exception is raised;
%enable-all-warnings Enables all warnings while parsing. See Warnings for more information.

Modules Required

ModuleDescription
Schema Provides automatic schema management functionality as a meta-layer for SqlUtil's medium and low-level schema management functionality
SqlUtil Provides a high-level DB-independent API for working with databases; for automated schema management, programmatic DB access, schema and data synchronization, and more
1. Create a file with “.qsm” extension and add the required parse directives and modules
%requires qore >= 1.0

%append-module-path /var/opt/qorus/qlib:$OMQ_DIR/qlib:/opt/qorus/qlib

%requires Schema
%requires SqlUtil

%new-style
%strict-args
%require-types
%strict-args
%enable-all-warnings
2. Create a module and mention your schema version, description, author and url like so:
module InventoryExampleSchema {
version = "1.0";
desc = "Qorus inventory example user schema module";
author = "Qore Technologies <info@qoretechnologies.com>";
url = "https://qoretechnologies.com";
}
3. Create the following namespaces for schema declarations:
namespace Private {
   const GenericOptions = {
      "replace":True,
   };

   const IndexOptions = {
      "driver": {
         "oracle": {
            "compute_statistics":True,
         },
      },
   };

   const ColumnOptions = {
      "driver": {
         "oracle": {"character_semantics":True,},
      },
   };

   const T_InventoryExample = {
      "columns": {
         "id":c_int(True, "PK ID field"),
         "filename":c_varchar(200, True, "input filename"),
         "uuid":c_varchar(40, True, "system-supplied UUID for the file"),
         "store_code":c_varchar(200, True, "input store code"),
         "product_code":c_varchar(50, True, "input product code / EAN"),
         "product_desc":c_varchar(200, True, "input product description"),
         "ordered":c_int(True),
         "available":c_int(True),
         "in_transit":c_int(True),
         "total":c_int(True),
         "report_date":c_timestamp(True),
         "qorus_wfiid":c_int(True),
      },
      "primary_key": {"name":"pk_inventory_example", "columns": ("id")},
      "indexes": {
         "sk_inventory_example_filename": {"columns": ("filename")},
         "sk_inventory_example_uuid": {"columns": ("uuid")},
         "sk_inventory_example_q_wfiid": {"columns": ("qorus_wfiid")},
       },
   };

   const Tables = {
      "inventory_example": T_InventoryExample,
   };

   const Sequences = {
      "seq_inventory_example": {},
   };
}

public namespace InventoryExampleSchema {
   public string sub get_datasource_name() {
      return"omquser";
   }
   public InventoryExampleSchema sub get_user_schema(AbstractDatasource ds, *string dts, *string its) {
      return new InventoryExampleSchema(ds, dts, its);
   }
   public class InventoryExampleSchema inherits AbstractSchema {
         public {
            const SchemaName = "InventoryExampleSchema";
            const SchemaVersion = "1.0";
      }
      constructor(AbstractDatasource ds, *string dts, *string its) :AbstractSchema(ds, dts, its) {
      }
      private string getNameImpl() {
         return SchemaName;
      }
      private string getVersionImpl() {
         return SchemaVersion;
      }
      private *hash getTablesImpl() {
         return Tables;
      }
      private *hash getSequencesImpl() {
         return Sequences;
      }
      private *hash getIndexOptionsImpl() {
         return IndexOptions;
      }
      private *hash getGenericOptionsImpl() {
         return GenericOptions;
      }
      private *hash getColumnOptionsImpl() {
         return ColumnOptions;
      }
   }
}
4. Deploy the schema module file you just created to the server with the IDE or with oload using the Python client
ℹ️ Note: The schema module file described above is available in the building-blocks repository. You can find it at /examples/csv-sftp-to-db-import/InventoryExampleSchema.qsm

Conclusion

In this part one of two in Polling SFTP for CSV data and importing it to database blog post series, you’ve looked at the requirements for constructing the solution, got started by cloning the Qorus building-blocks repository and accomplished the following:

  1. Implemented the connection interface for both File system and SFTP
  2. Writing a qscript to create interface directories in the Qorus user filesystem
  3. Implementing an SFTP connection
  4. Implementing a schema module for interface tables

In the part two of this series, you’ll look at how to create a mapper to map the data from the CSV file to the database, create a workflow to process the CSV file into the DB and create an SFTP polling job that also handles the duplicate file handling and file archiving.

Grab your FREE application integrations eBook.
You'll quickly learn how to connect your apps and data into robust processes!