Poll An SFTP Server For CSV Data And Import To DB: Part 2 of 2

This is part two of the polling SFTP server for CSV data and importing it to database blog post series. Follow part one to learn how to implement a local file system connection, write a Qscript to create interface directories in the Qorus user file system, Implement an SFTP connection and implement a schema module for interface tables.

In this blog post, you’ll learn how to create a mapper to map the data from the CSV file to the database, create a workflow to process the CSV file into the DB and create a polling job that polls the SFTP server every five minutes, transfers all CSV files to a local directory and handles duplicate file and file archiving.

5. Creating a mapper to map the data from the CSV file to the database

ℹ️ Note: Make sure you follow step four from part one of this blog post series and deploy a schema before creating a Mapper. This is necessary as the Mapper will require a DB table.

  1. Click on the Create new (new file) icon next to the Mapper interface from the interface hierarchy view.
  2. In the Qorus Webview window, fill in the New mapper form with the following:
  • Target Directory : Browse through the Show Directories field and select a source directory. This will be the directory where the code generated by the interface is stored.
  • Name : example-import-csv-file This will be the name of our mapper.
  • Description : Import a CSV file into our target table The description field supports markdown so, feel free to format your text in markdown.
  • Version : 1.0
  • You can set an optional Target file field to name the code file that will be saved in your Target Directory. From the Optional Fields drop-down list to the upper-right corner of the form, add the target_file field. Now, populate it with a filename – Target file : example-import-csv-file-1.0.qmapper.yaml
3. Hit Next to create the mapper
4. Click on the Please select button next to the Select input field and then, select factory from the list
5. Make a click on the Please select button next to the Select input field again, and select csvread, the CSV reader data provider for the factory.
6. Press Add options button below the Select input field and add the following Factory options
  • Add the path option, then go to the Template tab and click on the dropdown labeled Please select. Select static from the dropdown and enter filename for the key.
    path : $static:filename. This will be the full path to the CSV.
  • Click on Add options and add the header_names option and set its value to true.
    header_names : true
  • Add the fields option and enter the following in Custom tab. These values describe the input fields in the CSV
fields :

StoreCode: string
ProductCode: string
ProductDescription: string
Available: int
Ordered: int
InTransit: int
ReportDate: date

7. Click on the Please select button next to the Select output field and then, select datasource from the list.
8.  Add omquser to select output field by clicking on the Please select button.
9.  Finally, add inventory_example to the Select output field
10. Create the following mappings:
  • StoreCode -> store_code
  • ProductCode -> product_code
  • ProductDescription -> product_desc
  • Available -> available
  • Ordered -> ordered
  • InTransit -> in_transit
  • ReportDate -> report_date
11. Now click on the programmatic output, `</>` symbol on the following output fields to edit:
  • id: ► Manage field mappings ► select sequence ► seq_inventory_example (the name of the sequence to be used to populate the field)
  • filename: ► Manage field mappings ► select context ► $keys:filename (the workflow order key “filename” will be used to populate the field)
  • uuid: ► Manage field mappings ► select context ► $static:uuid (the uuid key from workflow static data will be used to populate the field)
  • total: ► Manage field mappings ► select context ► $qore-expr:{$local:input.Ordered + $local:input.InTransit}
  • qorus_wfiid: ► Manage field mappings ► select context ► $local:workflow_instanceid
12. Deploy the mapper to the Qorus server by clicking on the cloud icon next to the mapper name in the hierarchy view.

ℹ️ Note: You can achieve the same by deploying the example-import-csv-file mapper to the server or with oload using the Python client

6. Creating a workflow to process the CSV file into the DB

Create a workflow in the IDE which is responsible for processing the CSV file into the DB.

ℹ️ Note: Workflow is a stateful IT orchestration object. Even though the current solution has no orchestration in this interface. we utilise the unique keys for workflow orders as the file name to ensure that we process each CSV file only once.

  1. Click on the Create new (new file) icon next to the Workflow interface from the interface hierarchy view.
  2. In the Qorus Webview window, fill the new workflow’s Fill Data form with the following:
  • Target Directory : Browse through the Show Directories field and select a source directory. This will be the directory where the code generate by interface is stored.
  • Name : EXAMPLE-IMPORT-CSV-FILE This will be the name of our workflow.
  • Description : Example CSV import workflow The description field supports markdown so, feel free to format your text in markdown.
  • Version : 1.0
  • From the Optional fields drop-down list to the upper-right corner of form, add the Key list field and populate it with
    Key list : filename
  • Now add the SLA threshold field from the Optional Fields list and set it 10. This means that the workflow needs to complete within 10 seconds of being created to meet the SLA
    SLA threshold : 10
3. Click Next to create a single workflow step for this workflow that will actually do the job.
4. To create a new step into the DB table, click on Add new step at the beginning / Add new step at the end, then click on the plus button next to the step field.
5. In the Qorus Webview window, fill the New step form with the following:
 
  • Target Directory : Browse through the Show Directories field and select a source directory. This will be the directory where the code generate by interface is stored.
  • Name : ExampleImportCsvFile This will be the name of your object.
  • Description : Import a CSV file into our target table The description field supports markdown so, feel free to format your text in markdown.
  • Click on Please select button next to the Base class name field and select QorusNormalStep, The abstract class for normal steps.
    Base class name : QorusNormalStep
  • Class name : ExampleImportCsvFile
  • Version : 1.0
6. From the Optional Fields list to the upper-right corner of the form, add the Finite State Machine field. This FSM is responsible for parsing the CSV file and saving it in the DB.
7. Click on the plus button in the Finite State Machine field and populate with the following.
  • Target Directory : Browse through the Show Directories field and select a source directory. This will be the directory where the code generated by the interface is stored.
  • Name : example-import-csv-file This will be the name of our FSM
  • Description : Import a CSV file into our target table The description field supports markdown so, feel free to format your text in markdown.
  • Go to the flow builder by clicking on the Go to flow builder button.
  • Drag and drop the Connector object into the diagram window and fill the resulting form with the following:
  • Name : Import CSV to DB
  • Initial : true
  • Click on the plus button in the Action value field and select BBM_AutoMapper
  • Select runMapper from the Please select dropdown
  • Press the Config items button at the bottom and then, on the Add new value button. Click on the Please select drop down and select automapper-mapper-name and set its value to example-import-csv-file
  • Close the config item manager and click Submit to save the state.
  • Click on Submit to save and close the FSM

ℹ️ Note : All class objects that start with “BBM” in their name are part of the Qorus’s building blocks. Each object/block has its own functionality that can be arranged in a certain logic to accomplish a particular task, providing a complete no-code approach to data integration. As such, You can also code a custom object/block to accomplish a custom task or write your version of an already existing building block.

8. Click on the plus button in the Finite State Machine field of the Fill Data form. In the Trigger Manager select Trigger. Click on the please select dropdown and select primary
9. From the Select Fields list to the left of Fill Data form, click on the plus button next to Mappers to add a mapper. In the search bar search and select example-import-csv-file:1.0
10. Click Submit on the step to save the step
11. Click Submit to save the workflow
12. Deploy the workflow to the Qorus server by clicking on the cloud icon next to the workflow name in the hierarchy view. When prompted, click on Yes – with all dependencies
ℹ️ Note: You can achieve the same by deploying the EXAMPLE-IMPORT-CSV-FILE workflow to the server with all dependencies or deploy the following files with oload using the Python client:
  • EXAMPLE-IMPORT-CSV-FILE-1-1.0.qwf.yaml
  • example-import-csv-file-1.0.qmapper.yaml
  • ExampleImportCsvFile-1.0.qstep.yaml
  • example-import-csv-file.qfsm.yaml

7. Create an SFTP Polling Job

Create an SFTP polling job in the IDE that polls the SFTP server every five minutes and transfers all CSV files to the local in-process directory.

1. Create a new duplicate directory $OMQ_DIR/user/csv-import/duplicate in the Qorus container.

ℹ️ Note: When the Qorus’ docker container is run for the first time, the environment setup script /opt/qorus/bin/env.sh in the container is executed, which exports the environment variable OMQ_DIR and sets it to /opt/qorus. Therefore, $OMQ_DIR/user/csv-import/duplicate is /opt/qorus/user/csv-imort/duplicate

2. Click on the Create new (new file) icon next to the Job interface from the interface hierarchy view.
3. In the Qorus Webview window, fill the new jobs’s Fill Data form with the following:
  • Target Directory : Browse through the Show Directories field and select a source directory. This will be the directory where the code generated by the interface is stored.
  • Name : example-import-csv-file This will be the name of our job
  • Description : SFTP Polling Example The description field supports markdown so, feel free to format your text in markdown.
  • Click on the Please select button in the Base class name field and select BBM_SftpPollerCreateOrderJob
  • Class name : ExampleSftpPoller
  • Populate the Schedule field with the following:
  • Minute : */5
  • Hours : *
  • Day : *
  • Month : *
  • Weekday : *
  • Version : 1.0
  • From the Optional Fields drop-down list to the upper-right of the form, add the Classes field and click on the please select button to select BBM_SftpPollerCreateOrderJob
4. Configure the job by clicking on Config items at the bottom and add values for the following config items:
Config groupNameDescriptionTypeValue
Create Workflow Order create-work-flow-name The name of the workflow Workflow EXAMPLE-IMPORT-CSV-FILE
SFTP Polling sftp-polling-connection-name Name of the SFTP connection Connection sftp-example-csv
SFTP Polling sftp-polling-mask the filename mask for files to match when polling string *csv
SFTP File Options sftp-polling-file-connection The name of a local file connection where to save files Connection fs-example-csv
SFTP File Options sftp-polling-file-name a template string giving the target local file name as offset from the connection identified with 'sftp-polling-file-connection' string in-process/$local:{name}
SFTP Polling Fine Tuning sftp-polling-file-temp-template The template for the temporary filename used while transferring the file string tmp.$local:{name}.part
Create Workflow Order Data create-workflow-staticdata A hash providing the initial workflow order static data; for resolving template values hash filename: $local:input.local_path uuid: $qore-expr:{UUID::get()}
Create Workflow Order Keys create-workflow-unique-key An optional hash of order keys that will ensure uniqueness for a particular workflow name (across all versions) hash filename: $local:input.name
SFTP Polling Workflow Creation Duplicate File Handling sftp-polling-duplicate-name-template The filename template for the target file when moving duplicate files string $local:name-$timestamp:{YYYY-MM-DD HH:mm:SS.xx}
5. Close the Config Item Manager and click on the Class connections at the bottom to create a class connection to run the job logic.
  • Click on the Add connection button in the Class Connections Manager to add a connection.
  • Press the edit button on the resulting connection and rename it to run
  • Click on the Add initial connector and populate the fields as follows:
  • Class : BBM_SftpPollerCreateOrderJOb
  • Connector : runJob
  • Trigger : run
  • Click Submit button to save
6. Click Submit to save the connection
7. Deploy the new job to the Qorus server by clicking on the cloud icon next to the job name in the hierarchy view. When prompted click on the Yes – with all dependencies
ℹ️ Note : You can achieve the same by deploying the example-import-csv-file job to the server or with oload using the Python client

Verify Your Installation and Deployment

  1. Go to the Qorus operational web interface at https://localhost:8011 and verify that the workflow EXAMPLE-IMPORT-CSV-FILE and the job example-import-csv-file are both running.
  2. Check the sftp sftp-example-csv and filesystem fs-example-csv connections to make sure they are up / working.

Testing the solution

1. Copy the example file StockReport-b88a43eb-4cdd-46c2-8d64-4b26bd3aed64.csv in the examples/csv-sftp-to-db-import/example-files directory to the SFTP directory by:
scp -i [path-to-your-private-key] -oIdentitiesOnly=true StockReport-b88a43eb-4cdd-46c2-8d64-4b26bd3aed64.csv [user]@[your-sftp-server]:[your-interface-directory]
2. check the log file of the example-import-csv-file job to ensure that the file is polled and retrieved and the workflow order for it is created
3. you can also check the job results for the job to see that its status is COMPLETE
4. then check the workflow EXAMPLE-IMPORT-CSV-FILE to ensure that the order was processed. The order should have a COMPLETE status
5. you can also verify the data in the target DB with the following command (using the Python client): qdp omquser/inventory_example search – which should output the 10 rows written to the inventory_example table in the omquser datasource (datasource for generic user use – in the Docker image its set up pointing to the same datasource as the Qorus system datasource, so please do not manipulate any other tables in that datasource)

6.Test duplicate file handling by resubmitting a CSV file that’s already been processed and make sure it arrives in the duplicate directory

Retrying

To retry and run a test scenario again, you need to delete the workflow orders for the EXAMPLE-IMPORT-CSV-FILE workflow, since we have a unique key for the CSV file; the rows need to be deleted from the target table – the following commands (using the Python client) will do this:
  • oload -Xwfinstances:EXAMPLE-IMPORT-CSV-FILE:1.0 (deletes all orders for the workflow)
  • qdp omquser/inventory_example del (deletes all rows in the interface table)
ℹ️ Note: you can create a new file with a new file name to avoid deleting rows in the interface table.

Grab your FREE application integrations eBook.
You'll quickly learn how to connect your apps and data into robust processes!