About Couriers

A courier brings data from external system to Amperity. A courier relies on a feed to know which fileset to bring to Amperity for processing.

What a courier does:

  1. Checks if data is available at the source location.

  2. Collects data from the source location, and then pulls that data to Amperity.

What a courier needs:

  1. Access to the source location. Most data sources–Amazon S3, Azure Blob Storage, Google Cloud Storage, or any SFTP site–allow the use of many file formats, while others may use Snowflake or REST APIs.

  2. A location from which to copy data.

  3. An associated feed.

  4. A file format–CSV, TSV, Apache Parquet, etc., along with additional details for compression, archive, and encryption.

  5. A combination of load settings and load operations. The exact combination of settings and operations depends on the data source and the types of files to be pulled to Amperity.

File couriers

A file data source can provide files to Amperity in just about any file format, such as CSV, JSON, Apache Parquet, Apache AVRO, PSV, and TSV. Locations from which file data sources can be pulled include Amazon S3, Azure Blob Storage, Google Cloud Storage, and Any SFTP site.

Load settings

File data sources define load settings in two parts:

  1. A list of files that should be pulled to Amperity.

  2. A list of load operations that associate each file with a feed.

The exact combination of files and load operations depends on the data source from which data is made available to Amperity.

Load settings define the location of a data source, it’s type, and how it should be processed by Amperity. The syntax for file load settings is similar to:

[
  {
    "object/file-pattern": "'CUSTOMER/ENV/FILENAME_'MM-dd-yyyy'.csv'",
    "object/type": "file",
    "object/land-as": {
      "file/tag": "FILE_TAG",
      "file/content-type": "text/csv",
      "file/header-rows": 1
    }
  },
  {
    "object/file-pattern": "'ARCHIVED/FILENAME_'MM-dd-yyyy'.zip'",
    "object/type": "archive",
    "archive/contents": {
      "FILENAME": {
        "subobject/land-as": {
          "file/tag": "FILENAME_TAG",
          "file/content-type": "text/csv"
        }
      }
    }
  },
  {
    "object/file-pattern": "'ARCHIVED/FILENAME_'MM-dd-yyyy'.zip'",
    "object/type": "archive",
    "object/land-as": {
      "file/tag": "FILENAME_TAG",
      "file/content-type": "text/csv",
      "file/header-rows": 1
    }
  }
 ]

Each filedrop load setting must specify the file pattern, which is the path to the file, its filename, a date stamp, and a file extension. The rest of the load settings block must match, i.e. the content type for a “some-file.csv” must be “text/csv”. An archive must specify the archive and the file contained within it. If the file is archived as “some-file.zip” then the "object/type" would be “archive” and the content type of the file within it would be “text/csv”.

If an archive contains only a single file or if all the files within the archive have the same file tag, content type, and other settings, then "archive/contents" can be omitted and "object/land-as" can be specified instead. The files within the archive will all use the specified "object/land-as" settings.

File patterns

A courier looks for objects in a filedrop location using a combination of the path to a directory, the name of a file, and a date. These are defined by the "object/file-pattern" setting for each object. A courier runs based on a date or a date range, and then looks for files in the filedrop location for that date or date range.

A file pattern may use a combination of literal strings, wildcard characters (*) within literal strings, and date components, separated by single quotes and forward slashes.

Wildcards

A wildcard can match zero (or more) characters up until a forward-slash character.

Note

When a file pattern with a wildcard matches more than one file for a given date or date range, the matched files are loaded in such a way that guarantees per-day ordering. If your courier uses an ingest query, ascending lexicographical ordering by file is not guaranteed or preserved within a single day’s files.

Caution

Wildcards may not be used for files contained within a zip archive.

Examples

The following example shows using a wildcard at the end of a file pattern:

'files/'yyyy'/'MM'/'dd'/customers-*.csv'

will match any of these files:

  • /customers-.csv

  • /customers-1.csv

  • /customers-hello-world.csv

and will not match any of these:

  • /customers-.csv.0

  • /customers-0.json

  • /customers-0/1/file.csv

  • /customers.csv

The following example shows using multiple wildcards:

{
  "object/type": "file",
  "object/file-pattern": "'responsys/outbound_files/*_STATE_'yyyyMMdd'_*.txt.gpg'",
  "object/land-as": {
    "file/header-rows": 1,
    "file/tag": "launch",
    "file/content-type": "text/csv"
  }
},

Literal strings

A literal string must be an exact match to characters in the file path, with the exception of the presence of wildcard characters within literal strings. Wrap literal strings that match Joda-Time format in single quotes. For example:

  • ‘files/’

  • ‘/’

  • ‘/’

  • ‘MM-dd-YYYY’

Date components

Date components act as placeholders for months, days, and years. Real values are applied when the courier runs on a given date or date range. Date components must match Joda-Time pattern-based formatting , but should generally be limited to the following patterns:

Pattern

Meaning

Examples

yyyy

4-digit year

2020, 2021, …

MM

2-digit month

01, 02, … 12

dd

2-digit date

01, 02, … 31

A courier that runs using this pattern:

'files/'yyyy'/'MM'/'dd'/customers-*.csv'

when run on April 10, 2020 will look for files at 'files/2020/04/10/customers-*.csv' and will return any files that match.

File compression / archive

Amperity supports the following compression and archive types:

GZIP

{
  "object/type": "file",
  "object/optional": false,
  "object/file-pattern": "'ARCHIVED/FILENAME_'MM-dd-yyyy'.csv.gz'",
  "object/land-as": {
    "file/header-rows": 1,
    "file/tag": "FILENAME_TAG",
    "file/content-type": "text/csv"
  }
}

TAR

{
  "object/file-pattern": "'ARCHIVED/FILENAME_'MM-dd-yyyy'.tar'",
  "object/type": "archive",
  "archive/contents": {
    "FILENAME": {
      "subobject/land-as": {
        "file/tag": "FILENAME_TAG",
        "file/content-type": "text/csv"
      }
    }
  }
}

ZIP

{
  "object/file-pattern": "'ARCHIVED/FILENAME_'MM-dd-yyyy'.zip'",
  "object/type": "archive",
  "archive/contents": {
    "FILENAME": {
      "subobject/land-as": {
        "file/tag": "FILENAME_TAG",
        "file/content-type": "text/csv"
      }
    }
  }
}

Pretty Good Privacy (PGP)

Pretty Good Privacy (PGP) is an encryption program that provides cryptographic privacy and authentication for data communication by signing, encrypting, and decrypting data files and formats. Amperity supports PGP encryption.

PGP encryption is the encryption type that may be applied to files sent to Amperity to improve data security and help to ensure file integrity and completeness. Amperity requires. Amperity recommends:

  • 4096-bit keys

  • Protected by a strong passphrase

  • One PGP key per-tenant (minimum); one PGP key per system (recommended)

Amperity Support will generate PGP keys (both public and private key-pairs) to use when generating PGP encrypted files to be sent to Amperity. Key pairs are created in the same cloud–Amazon AWS or Microsoft Azure–in which the customer’s tenant is located.

Amperity will provide to the customer the public key using SnapPass. The customer must use that key to encrypt files prior to adding them to the filedrop location. Files that are encrypted using PGP should be compressed prior to encryption. (Compression applied after encryption does not reduce the size of the file.) Amperity will use the private key to decrypt files prior to loading them.

Important

There are two types of PGP public keys: a primary key and a subkey. Amperity does not allow the use of a primary key for public-private key encryption. If you attempt to use a primary key you will see an error similar to “Destination failed validation: PGP public key is a primary key. Please provide a subkey or a keyring with exactly one subkey.”

Input examples

The following examples show how files input to Amperity are unpacked, depending on various combinations of encryption, compression type, and file format. All examples use yyyy_MM_dd for the date format.

for single files

PGP, TGZ, CSV

  1. Input to Amperity: table_name_yyyy_MM_dd.tgz.pgp

  2. After decryption: table_name_yyyy_MM_dd.tgz

  3. After decompression: table_name_yyyy_MM_dd.csv

PGP, GZip, TAR, CSV

  1. Input to Amperity: table_name_yyyy_MM_dd.csv.tar.gz.pgp

  2. After decryption: table_name_yyyy_MM_dd.csv.tar.gz

  3. After decompression: table_name_yyyy_MM_dd.csv.tar

  4. After the archive is opened: table_name_yyyy_MM_dd.csv

PGP, TAR, Apache Parquet

  1. Input to Amperity: table_name_yyyy_MM_dd.tar.pgp

  2. After decryption: table_name_yyyy_MM_dd.tar

  3. After decompression: table_name_yyyy_MM_dd.parquet, with 1 to n Apache Parquet part files within the directory.

for multiple files

PGP, TAR, Apache Parquet

  1. Input to Amperity: input_name_yyyy_MM_dd.parquet.tar.pgp

  2. After decryption: input_name_yyyy_MM_dd.parquet.tar

  3. After decompression: table_name_yyyy_MM_dd.parquet, where, for each table, 1 to n Apache Parquet files will be located within a single directory.

PGP, TGZ, CSV

  1. Input to Amperity: input_name_yyyy_MM_dd.csv.tgz.pgp

  2. After decryption: input_name_yyyy_MM_dd.csv.tgz

  3. After decompression: table_name.csv, where all tables that were input are located within a single directory.

API couriers

An API data source will vary, depending on the file format and other configuration details. API data sources include Campaign Monitor, Google Analytics, Salesforce Sales Cloud, and Zendesk.

Snowflake couriers

A Snowflake data source provides a list of tables that are consolidated into a fileset. Snowflake data sources include Snowflake itself, and then also any FiveTran data source, such as Klaviyo, Shopify, Kustomer, and HubSpot.

Table lists

A table list defines the list of tables to be pulled to Amperity from Snowflake.

[
  "AMPERITY_A1BO987C.ACME.CAMPAIGN",
  "AMPERITY_A1BO987C.ACME.LIST",
  "AMPERITY_A1BO987C.ACME.CONTACT",
]

Stage names

A stage defines the location of objects that are available within Snowflake.

AMPERITY_A1BO987C.ACME.ACME_STAGE

Load operations

Load operations associate each table in the list of tables to a feed. (The initial setup for this courier will use an incorrect feed ID – df-xxxxxx.)

{
  "df-xxxxx": [
    {
      "type": "load",
      "file": "AMPERITY_A1BO987C.ACME.CAMPAIGN"
    }
  ],
  "df-xxxxx": [
    {
      "type": "load",
      "file": "AMPERITY_A1BO987C.ACME.LIST"
    }
  ],
  "df-xxxxx": [
    {
      "type": "load",
      "file": "AMPERITY_A1BO987C.ACME.CONTACT"
    }
  ]
}

Load operation types

A fileset is a group of files that are processed as a unit by a single courier. A fileset defines each file individually by name, datestamp, file format, and load operation. A courier expects all files in a fileset to be available for processing, unless a file is specified as optional.

Each file in a fileset must be associated with one of the following load operation types:

Empty

An empty load operation will bring files to Amperity, but not try to load those files into a feed. An empty load operation is ideal for bringing sample files to Amperity prior to configuring the feed that defines the schema within Amperity for a data source. Use the sample file while configuring the feed, and then update the load operation to match the configuration requirements for the associated file type.

{}

Tip

You cannot use an empty load operation for files that require the use of an ingest query to transform the data prior to it being made available to the feed.

For example, a JSON file with nested data must use an ingest query to flatten the file. A feed cannot use a JSON file with nested data as a sample file. And a courier cannot run to successful completion unless the courier is configured with a feed ID.

In this type of situation, create a file outside of this workflow to use as the sample file for the feed. For example, use Databricks to generate a zero-row sample file, and then upload that file during feed creation.

Another option is to define the schema without using a sample file. Select the Don’t use sample file option when adding the feed, and then use the Add field button to define each field in the schema.

Incorrect feed ID

Instead of using an empty load operation you can use an obviously incorrect feed ID to pull files to the Amperity landing area. This approach uses the default load configuration, but but sets the feed ID to a string that will not be available to the courier after feeds have been updated. For example, replacing the digits with six “x” characters:

{
  "df-xxxxxx": [
    {
      "type": "truncate"
    },
    {
      "type": "load",
      "file": "campaign-members"
    }
  ]
}

This will return an error message similar to:

Error running load-operations task
Cannot find required feeds: "df-xxxxxx"

The load operation will pull the files to the landing area and make them available for use with defining a feed schema.

Load files

You can load contents of a data file to a domain table as a load operation as an UPSERT operation that is based off of the primary key in the table.

"OTHER_FEED_ID": [
  {
    "type": "load",
    "file": "OTHER_FILE_TAG",
  }
]

Load ingest query

Spark SQL is a high performance SQL query engine that is used by Amperity to ingest data, create domain tables, and extend the outcome of the Stitch process in your customer 360 database.

Use Spark SQL to define all SQL queries related to the following areas of Amperity:

  • Ingesting data, including ingest queries

  • Processing data into domain tables

  • Building custom domain tables

  • Loading data into Stitch

  • Running Stitch

  • Loading the results of Stitch into the customer 360 database

  • Defining tables in the customer 360 database

Note

Spark SQL is used to define all SQL queries related to the Stitch process up to (and including) building the tables in the customer 360 database. Presto SQL is used to define SQL queries for segments. Why both?

  • Spark SQL performs better in more traditional processes like machine learning and ETL-like processes that are resource intensive.

  • Presto SQL performs better when running real-time queries against cloud datasets.

The configuration for an ingest query load operation depends on the data source against which the ingest query will run:

Truncate, then load

You can empty the contents of a table prior to loading a data file to a domain table as a load operation.

Note

A truncate operation is always run first, regardless of where it’s specified within the load operation.

"FEED_ID": [
  {
    "type": "truncate"
  },
  {
    "type": "load",
    "file": "FILE_NAME"
  }
],

Examples

The following sections provide examples for load settings and load operations by data source and/or by file type:

Apache Avro

Apache Avro is a row-oriented remote procedure call and data serialization framework developed within the Apache Hadoop ecosystem. Avro uses JSON to define data types and protocols, and serializes data in a compact binary format.

Load settings

{
  "object/type": "file",
  "object/file-pattern": "'path/to/file'-YYYY-MM-dd'.avro'",
  "object/land-as": {
    "file/tag": "FILE_NAME",
    "file/content-type": "application/avro"
  }
}

Load operations

{
  "FEED_ID": [
    {
      "type": "OPERATION",
      "file": "FILE_NAME",
    }
  ]
}

Apache Parquet

Apache Parquet is a free and open-source column-oriented data storage format developed within the Apache Hadoop ecosystem. It is similar to RCFile and ORC, but provides more efficient data compression and encoding schemes with enhanced performance and can better handle large amounts of complex bulk data.

Note

Apache Parquet files are almost always partitioned, where a single logical Parquet file is comprised of multiple physical files in a directory structure, each of them representing a partition.

Parquet partitioning optionally permits for data to be nested in a directory structure determined by the value of partitioning columns. Amperity only detects Parquet partition files one directory level below the configured file pattern. For example:

"path/to/file-YYYY-MM-dd.parquet/part-0000.parquet"

Load settings

{
  "object/type": "file",
  "object/file-pattern": "'path/to/file'-YYYY-MM-dd'.parquet/'",
  "object/land-as": {
    "file/tag": "FILE_NAME",
    "file/content-type": "application/x-parquet"
  }
}

Load operations

{
  "FEED_ID": [
    {
      "type": "OPERATION",
      "file": "FILE_NAME",
    }
  ]
}

Campaign Monitor

Campaign Monitor is an email marketing platform that tracks details related to email campaigns (opens, clicks, bounces, unsubscribes, spam complaints, and recipients) and email subscriber lists (active, unconfirmed, bounced, and deleted subscribers), and other details.

Load settings

The Campaign Monitor REST API has a clearly defined set of files that can be made available to Amperity. The load settings are built into Amperity by default.

{
   "open": "opens-file",
   "unsubscribe": "unsubscribes-file",
   "spam": "spam-file",
   "unsubscribed-subscriber": "unsubscribed-subscribers-file",
   "unconfirmed-subscriber": "unconfirmed-subscribers-file",
   "recipient": "recipients-file",
   "bounced-subscriber": "bounced-subscribers-file",
   "bounce": "bounces-file",
   "click": "clicks-file",
   "campaign": "campaigns-file",
   "deleted-subscriber": "deleted-subscribers-file",
   "suppression": "suppression-list-file",
   "subscriber-list": "list-stats-file",
   "active-subscriber": "active-subscribers-file"
 }

Load operations

"OPENS_FEED_ID": [
  {
    "type": "load",
    "file": "opens-file"
  }
],
"BOUNCED-SUBSCRIBERS_FEED_ID": [
  {
    "type": "truncate"
  },
  {
    "type": "load",
    "file": "bounced-subscribers-file"
  }
],
"ACTIVE-SUBSCRIBERS_FEED_ID": [
  {
    "type": "truncate"
  },
  {
    "type": "load",
    "file": "active-subscribers-file"
  }
],
"CLICKS_FEED_ID": [
  {
    "type": "load",
    "file": "clicks-file"
  }
],
"DELETED-SUBSCRIBERS_FEED_ID": [
  {
    "type": "truncate"
  },
  {
    "type": "load",
    "file": "deleted-subscribers-file"
  }
],
"SUPPRESSION-LIST_FEED_ID": [
  {
    "type": "truncate"
  },
  {
    "type": "load",
    "file": "suppression-list-file"
  }
],
"CAMPAIGNS_FEED_ID": [
  {
    "type": "truncate"
  },
  {
    "type": "load",
     "file": "campaigns-file"
  }
],
"RECIPIENTS_FEED_ID": [
  {
    "type": "load",
    "file": "recipients-file"
  }
],
"BOUNCES_FEED_ID": [
  {
    "type": "load",
    "file": "bounces-file"
  }
],
"LIST-STATS_FEED_ID": [
  {
    "type": "load",
    "file": "list-stats-file"
  }
],
"SPAM_FEED_ID": [
  {
    "type": "load",
    "file": "spam-file"
  }
],
"UNSUBSCRIBED-SUBSCRIBERS_FEED_ID": [
  {
    "type": "truncate"
  },
  {
    "type": "load",
    "file": "unsubscribed-subscribers-file"
  }
],
"UNSUBSCRIBES_FEED_ID": [
  {
    "type": "load",
    "file": "unsubscribes-file"
  }
]

CBOR

CBOR (Concise Binary Object Representation) is a binary data serialization format loosely based on JSON. Like JSON it allows the transmission of data objects that contain name–value pairs, but in a more concise manner. This increases processing and transfer speeds at the cost of human-readability.

Load settings for Amazon AWS

{
  "object/type": "file",
  "object/file-pattern": "'ingest/stream/TENANT/STREAM_ID/'yyyy-MM-dd'/'*'.cbor'",
  "object/land-as": {
     "file/header-rows": 1,
     "file/tag": "FILE_NAME",
     "file/content-type": "application/ingest-pack+cbor"
  }
},

Load settings for Microsoft Azure

{
  "object/type": "file",
  "object/file-pattern": "'STREAM_ID/'yyyy-MM-dd'/'*'.cbor'",
  "object/land-as": {
     "file/header-rows": 1,
     "file/tag": "FILE_NAME",
     "file/content-type": "application/ingest-pack+cbor"
  }
},

Load operations

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME",
          "options": {
            "rowTag": "row"
          },
          "schema": {
            "fields": [
              {
                "metadata": {},
                "name": "field-1",
                "type": "string",
                "nullable": true
              },
              ...
              {
                "metadata": {},
                "name": "nested-group-1",
                "type": {
                  "fields": [
                    {
                      "metadata": {},
                      "name": "field-a",
                      "type": "string",
                      "nullable": true
                    },
                    {
                      "metadata": {},
                      "name": "nested-group-a",
                      "type": {
                        "fields": [
                          ...
                        ],
                        "type": "struct"
                      },
                      "nullable": true
                    },
                    {
                      "metadata": {},
                      "name": "field-xyz",
                      "type": "string",
                      "nullable": true
                    },
                  ],
                  "type": "struct"
                }
                "type": "struct"
              }
              ...
            }
            ...
          ],
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

Important

The "schema" must match the structure of the incoming file, including all nested groupings and data types. Set "nullable" to True to allow fields to contain NULL values. A CBOR file can have hundreds of fields. The ellipses (...) in this example represents locations within this example structure where additional fields may be present.

Tip

Set rowTag to the element in the CBOR file that should be treated as a row in a table. The default value is row.

CSV

A comma-separated values (CSV) file, defined by RFC 4180 , is a delimited text file that uses a comma to separate values. A CSV file stores tabular data (numbers and text) in plain text. Each line of the file is a data record. Each record consists of one or more fields, separated by commas. The use of the comma as a field separator is the source of the name for this file format.

Load settings with header rows

A headerless CSV file does not contain a row of data that defines headers for the data set. When working with a headerless CSV file you can configure the load settings for the courier to accept headerless files or you may use an ingest query when the data must be changed in some way prior to loading it to Amperity.

Load settings with non-standard quotes and separators

Some CSV files may use non-standard characters for quotes and separators, such as ' for quotes and \ for separators. If a CSV file contains non-standard characters, you must specify these characters in the courier load settings.

Load settings without header rows

{
  "object/type": "file",
  "object/file-pattern": "'path/to/file'-YYYY-MM-dd'.csv'",
  "object/land-as": {
     "file/header-rows": 0,
     "file/tag": "FILE_NAME",
     "file/content-type": "text/csv"
  }
},

Load operations for feed

{
  "FEED_ID": [
    {
      "type": "OPERATION",
      "file": "FILE_NAME",
    }
  ]
}

Load operations for ingest query

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME",
          "options": {
            "delimiter": ",",
            "escape": "\\",
            "multiline": "true",
            "quote": "\""
          }
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

Caution

Spark does not correctly implement RFC 4180 for escape characters in CSV files. The most common implementations of CSV files expect a double quote " as an escape character while Spark uses a backslash \. For more information about this issue view the SPARK-22236 issue within the Spark project.

You can override this behavior when working with RFC-compliant CSV files by specifying an escape character in the courier load operations using ' or " as the escape character.

For example:

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
      {
        "file": "FILE_NAME",
        "options": {
          "escape": "'"
        }
      }
    ],
    "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

If a CSV file uses \ as the delimiter, configure the load operation to specify an empty delimiter value, after which Spark will automatically apply the \ character as the delimiter.

For example:

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
      {
        "file": "FILE_NAME",
        "options": {
          "delimiter": ""
        }
      }
    ],
    "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

JSON

JavaScript Object Notation (JSON) is language-independent data format that is derived from (and structured similar to) JavaScript.

Load settings

{
  "object/type": "file",
  "object/file-pattern": "'path/to/file'-YYYY-MM-dd'.json'",
  "object/land-as": {
     "file/header-rows": 1,
     "file/tag": "FILE_NAME",
     "file/content-type": "application/json"
  }
},

Load operations

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME"
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

NDJSON

Newline-delimited JSON (NDJSON) is a data format for structured data that defines the structure of JSON data using lines as separators. Each line in a NDJSON file is a valid JSON value.

Load settings with header rows

{
  "object/type": "file",
  "object/file-pattern": "'path/to/file'-YYYY-MM-dd'.ndjson'",
  "object/land-as": {
     "file/header-rows": 1,
     "file/tag": "FILE_NAME",
     "file/content-type": "application/x-ndjson"
  }
},

Load settings without header rows

{
  "object/type": "file",
  "object/file-pattern": "'path/to/file'-YYYY-MM-dd'.ndjson'",
  "object/land-as": {
     "file/header-rows": 0,
     "file/tag": "FILE_NAME",
     "file/content-type": "application/x-ndjson"
  }
},

Load operations

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME"
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

PSV

A pipe-separated values (PSV) file is a delimited text file that uses a pipe to separate values. A PSV file stores tabular data (numbers and text) in plain text. Each line of the file is a data record. Each record consists of one or more fields, separated by pipes. The use of the pipe as a field separator is the source of the name for this file format.

Load settings with header rows

{
  "object/type": "file",
  "object/file-pattern": "'path/to/file'-YYYY-MM-dd'.psv'",
  "object/land-as": {
     "file/header-rows": 1,
     "file/tag": "FILE_NAME",
     "file/content-type": "text/pipe-separated-values"
  }
},

Load settings with non-standard quotes and separators

{
  "object/type": "file",
  "object/file-pattern": "'path/to/file'-YYYY-MM-dd'.psv'",
  "object/land-as": {
     "file/header-rows": 1,
     "file/tag": "FILE_NAME",
     "file/quote": "*",
     "file/separator": ";"
     "file/content-type": "text/pipe-separated-values"
  }
},

Load settings without header rows

{
  "object/type": "file",
  "object/file-pattern": "'path/to/file'-YYYY-MM-dd'.psv'",
  "object/land-as": {
     "file/header-rows": p,
     "file/tag": "FILE_NAME",
     "file/content-type": "text/pipe-separated-values"
  }
},

Load operations for feed

{
  "FEED_ID": [
    {
      "type": "OPERATION",
      "file": "FILE_NAME",
    }
  ]
}

Load operations for ingest query

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME"
          "options": {
            "delimiter": "\|",
            "escape": "\\",
            "multiline": "true",
            "quote": "\""
          }
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

Salesforce Commerce Cloud

Salesforce Commerce Cloud​ is a multi-tenant, cloud-based commerce platform that empowers brands to create intelligent, unified buying experiences across all channels.

Load settings

The Salesforce Commerce Cloud REST API has a clearly defined set of files that can be made available to Amperity. The load settings are built into Amperity by default. (Salesforce Commerce Cloud was previously known as Demandware.)

[
  {
    "entity-type": "returns",
     "file-tag": "returns-file"
  },
  {
    "entity-type": "customers",
    "file-tag": "customers-file"
  },
  {
    "entity-type": "sites",
    "file-tag": "sites-file"
  },
  {
    "entity-type": "shipping_orders",
    "file-tag": "shipping_orders-file"
  },
  {
    "entity-type": "purchase_orders",
    "file-tag": "purchase_orders-file"
  },
  {
    "entity-type": "vendors",
    "file-tag": "vendors-file"
  },
  {
    "entity-type": "return_orders",
    "file-tag": "return_orders-file"
  },
  {
    "entity-type": "items",
    "file-tag": "items-file"
  },
  {
    "entity-type": "shipments",
    "file-tag": "shipments-file"
  },
  {
    "entity-type": "payments",
    "file-tag": "payments-file"
  },
  {
    "entity-type": "invoices",
    "file-tag": "invoices-file"
  },
  {
    "entity-type": "orders",
    "file-tag": "orders-file",
    "expand": [
      "invoices",
      "order_items"
    ],
    "json-path": "..."
  }
]

Load operations

{
  "SHIPMENTS_FEED_ID": [
    {
      "type": "load",
      "file": "shipments-file"
    }
  ],
  "PURCHASE_ORDERS_FEED_ID": [
    {
      "type": "load",
      "file": "purchase_orders-file"
    }
  ],
  "INVOICES_FEED_ID": [
    {
      "type": "load",
      "file": "invoices-file"
    }
  ],
  "SITES_FEED_ID": [
    {
      "type": "load",
      "file": "sites-file"
    }
  ],
  "RETURN_ORDERS_FEED_ID": [
    {
      "type": "load",
      "file": "return_orders-file"
    }
  ],
  "ORDERS_FEED_ID": [
    {
      "type": "load",
      "file": "orders-file"
    }
  ],
  "PAYMENTS_FEED_ID": [
    {
      "type": "load",
      "file": "payments-file"
    }
  ],
  "ITEMS_FEED_ID": [
    {
      "type": "load",
      "file": "items-file"
    }
  ],
  "VENDORS_FEED_ID": [
    {
      "type": "load",
      "file": "vendors-file"
    }
  ],
  "RETURNS_FEED_ID": [
    {
      "type": "load",
      "file": "returns-file"
    }
  ],
  "SHIPPING_ORDERS_FEED_ID": [
    {
      "type": "load",
      "file": "shipping_orders-file"
    }
  ],
  "CUSTOMERS_FEED_ID": [
    {
      "type": "load",
      "file": "customers-file"
    }
  ]
}

Salesforce Sales Cloud

SalesForce Sales Cloud brings customer information together into an integrated platform, and then provides access to thousands of applications through the AppExchange.

Load settings

The Sales Cloud integration allows you to use SQL patterns to specify which fields in an Object should be brought back to Amperity. Use the fields grouping to define which fields to bring back. Use * for all fields, otherwise specify a list of fields. Use where to specify values in the fields. The following table shows examples of Objects and the equivalent SQL query used to define load settings.

[
  {
    "from": "ObjectName",
    "file/tag": "objectname-file",
    "fields": [
      "*"
    ]
  },
  {
    "from": "CustomObject",
    "file/tag": "custom-object-file",
    "fields": [
      "*"
    ]
  },
  {
    "from": "AnotherObject",
    "file/tag": "another-object-file",
    "fields": [
      "field-a.name",
      "field-b.name"
    ]
  },
  {
    "from": "Object2",
    "file/tag": "object2-file",
    "fields": [
      "field-one.name",
      "field-two.name"
    ]
    "where": "field-two = 'true'"
  }
]

Select all fields in an Object

The following SQL query:

SELECT * FROM Account

Is equivalent to the following load operation:

{
  "from": "Account",
  "file/tag": "account-file",
  "fields": [
    "*"
  ]
},

Select fields in an Object with specified values

The following SQL query:

SELECT Id, Name FROM Opportunity
WHERE Name = 'John'

Is equivalent to the following load operation:

{
  "from": "Opportunity",
  "file/tag": "opportunity-file",
  "fields": [
    "Id",
    "Name"
  ]
  "where": "Name = 'John'"
},

Select only direct reports

The following SQL query:

SELECT *, ReportsTo.Name FROM Contact

Is equivalent to the following load operation:

{
  "from": "Contact",
  "file/tag": "contact-file",
  "fields": [
    "*",
    "ReportsTo.Name"
  ]
},

Select only rows with a certain value in a custom column

The following SQL query:

SELECT * FROM CustomTable__c
WHERE CustomField__c = 34

Is equivalent to the following load operation:

{
  "from": "CustomObject",
  "file/tag": "custom-object-file",
  "fields": [
    "*"
  ]
  "where": "CustomField__c = 34"
},

Load operations

{
  "ACCOUNTS_FEED": [
    {
      "type": "truncate"
    },
    {
      "type": "load",
      "file": "accounts-file"
    }
  ],
  "CUSTOM_OBJECTS_FEED": [
    {
      "type": "truncate"
    },
    {
      "type": "load",
      "file": "custom-objects-file"
    }
 ]
}

Snowflake

Snowflake is an analytic data warehouse that is fast, easy to use, and flexible. Snowflake uses a SQL database engine that is designed for the cloud. Snowflake can provide tables as a data source to Amperity.

Load settings

For tables in a data warehouse, such as Snowflake, a list of table names must be specified.

[
   "table.name",
   "table.name"
]

Load operations

{
  "FEED_ID": [
    {
      "type": "load",
      "file": "marketing.public.customer"
    }
  ]
}

Streaming JSON

Streaming JSON is a way to send increments of data using NDJSON formatting within each increment. Each line in a NDJSON file is a valid JSON value.

Load settings

{
  "object/type": "file",
  "object/file-pattern": "'path/to/file'-YYYY-MM-dd'.ndjson'",
  "object/land-as": {
     "file/header-rows": 1,
     "file/tag": "FILE_NAME",
     "file/content-type": "application/x-json-stream"
  }
},

Load operations

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME"
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

TSV

A tab-separated values (TSV) file is a delimited text file that uses a tab to separate values. A TSV file stores tabular data (numbers and text) in plain text. Each line of the file is a data record. Each record consists of one or more fields, separated by tabs. The use of the tab as a field separator is the source of the name for this file format.

Load settings with header rows

{
  "object/type": "file",
  "object/file-pattern": "'path/to/file'-YYYY-MM-dd'.tsv'",
  "object/land-as": {
     "file/header-rows": 1,
     "file/tag": "FILE_NAME",
     "file/content-type": "text/tab-separated-values"
  }
},

Load settings with non-standard quotes and separators

{
  "object/type": "file",
  "object/file-pattern": "'path/to/file'-YYYY-MM-dd'.tsv'",
  "object/land-as": {
     "file/header-rows": 1,
     "file/tag": "FILE_NAME",
     "file/quote": "*",
     "file/separator": ";"
     "file/content-type": "text/tab-separated-values"
  }
},

Load settings without header rows

{
  "object/type": "file",
  "object/file-pattern": "'path/to/file'-YYYY-MM-dd'.tsv'",
  "object/land-as": {
     "file/header-rows": 0,
     "file/tag": "FILE_NAME",
     "file/content-type": "text/tab-separated-values"
  }
},

Load operations for feed

{
  "FEED_ID": [
    {
      "type": "OPERATION",
      "file": "FILE_NAME",
    }
  ]
}

Load operations for ingest query

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME"
          "options": {
            "delimiter": "\t",
            "escape": "\\",
            "multiline": "true",
            "quote": "\""
          }
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

XML

eXtensible Markup Language (XML) is supported data format for customer data sources.

Load settings

{
  "object/type": "file",
  "object/file-pattern": "'path/to/file'-YYYY-MM-dd'.xml'",
  "object/land-as": {
    "file/tag": "FILE_NAME",
    "file/content-type": "application/xml"
  }
}

Load operations

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME",
          "options": {
            "rowTag": "row"
          }
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

Tip

Set ROW to the element in the XML schema that should be treated as a row in a table. For example, if the XML schema contained:

<salesTransactions>
  <salesTransaction> ... </salesTransaction>
</salesTransactions>

then use salesTransaction as the value for rowTag. The default value is row.

{
  "df-5Jagkabc": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "PosData",
          "options": {
            "rowTag": "salesTransaction"
          }
        }
      ],
      "spark-sql-query": "API_Test_Headers"
    }
  ]
}

How-tos

This section describes tasks related to managing couriers in Amperity:

Add courier

Use the Add Courier button to add a courier to Amperity. A courier should be created for each feed that exists in Amperity.

For smaller data sources, a courier may be associated with more than one feed. For larger data sources, a courier should be associated with a single feed. This is, in part, because couriers are run in parallel, but multiple feeds associated with a single courier are run sequentially.

For example: if Snowflake is configured to send six tables to Amperity via six feeds, but all running as part of the same courier, table one must finish before table two, which must finish before table three, and so on. Whereas if each table is configured with its own courier, all six tables could start processing at the same time.

A courier configured from the Amperity UI must be configured to use one of the existing plugins in Amperity, such as for Amazon S3, Azure Blob Storage, Azure Data Lake Storage, SFTP, or Snowflake.

Some of these plugins have more than one option for credentials.

Use SnapPass to securely share configuration data with your Amperity representative.

To add a courier

  1. From the Sources tab, click Add Courier. The Add Courier page opens.

  2. Enter the name of the courier.

  3. From the Plugin drop-down, select a plugin.

    Note

    The settings for a courier will vary, depending on the courier selected from the Plugin drop-down.

  4. Enter the credentials for the courier type.

  5. Enter any courier-specific settings.

  6. Under <COURIER NAME> Settings configure the file load settings. This is done in two parts: a list of files that should be available to Amperity (including how they are made available), and then a series of load operations that associates each file in the list to a feed.

  7. Click Save.

Add courier as copy

You may add a courier by copying an existing courier. This is useful when couriers share plugin, credential, and other common settings. A copied courier will retain all of the configured settings as the original, but will be assigned a unique name based on the name of the copied courier.

To add a courier as a copy

  1. From the Sources tab, open the menu for a courier, and then select Make a copy. The Add Courier page opens.

  2. Update the name of the courier.

  3. Verify all other configuration settings. Edit them as necessary.

  4. Under <COURIER NAME> Settings configure the file load settings. This is done in two parts: a list of files that should be available to Amperity (including how they are made available), and then a series of load operations that associates each file in the list to a feed.

  5. Click Save.

Add to courier group

A courier group is a list of one (or more) couriers that are run as a group, either ad hoc or as part of an automated schedule. A courier group can be configured to act as a constraint on downstream workflows.

To add a courier to a courier group

  1. From the Sources tab, click Add Courier Group. This opens the Create Courier Group dialog box.

  2. Enter the name of the courier.

  3. Add a cron string to the Schedule field to define a schedule for the orchestration group.

    A schedule defines the frequency at which a courier group runs. All couriers in the same courier group run as a unit and all tasks must complete before a downstream process can be started. The schedule is defined using cron.

    Cron syntax specifies the fixed time, date, or interval at which cron will run. Each line represents a job, and is defined like this:

    ┌───────── minute (0 - 59)
    │ ┌─────────── hour (0 - 23)
    │ │ ┌───────────── day of the month (1 - 31)
    │ │ │ ┌────────────── month (1 - 12)
    │ │ │ │ ┌─────────────── day of the week (0 - 6) (Sunday to Saturday)
    │ │ │ │ │
    │ │ │ │ │
    │ │ │ │ │
    * * * * * command to execute
    

    For example, 30 8 * * * represents “run at 8:30 AM every day” and 30 8 * * 0 represents “run at 8:30 AM every Sunday”. Amperity validates your cron syntax and shows you the results. You may also use crontab guru to validate cron syntax.

  4. Set Status to Enabled

  5. Specify a time zone.

    A courier group schedule is associated with a time zone. The time zone determines the point at which an courier group’s scheduled start time begins. A time zone should be aligned with the time zone of system from which the data is being pulled.

    Note

    The time zone that is chosen for an courier group schedule should consider every downstream business processes that requires the data and also the time zone(s) in which the consumers of that data will operate.

  6. Set SLA? to False. (You can change this later after you have verified the end-to-end workflows.)

  7. Add at least one courier to the courier group. Select the name of a courier from the Courier drop-down. Click + Add Courier to add additional couriers to the courier group.

  8. Click Add a courier group constraint, and then select a courier group from the drop-down list.

    A wait time is a constraint placed on a courier group that defines an extended time window for data to be made available at the source location. A courier group typically runs on an automated schedule that expects customer data to be available at the source location within a defined time window. However, in some cases, the customer data may be delayed and isn’t made available within that time window.

  9. For each courier group constraint, apply any offsets.

    An offset is a constraint placed on a courier group that defines a range of time that is older than the scheduled time, within which a courier group will accept customer data as valid for the current job.

    A courier group offset is typically set to be 24 hours. For example, it’s possible for customer data to be generated with a correct file name and datestamp appended to it, but for that datestamp to represent the previous day because of the customer’s own workflow. An offset ensures that the data at the source location is recognized by the courier as the correct data source.

    Warning

    An offset affects couriers in a courier group whether or not they run on a schedule.

  10. Click Save.

Delete courier

Use the Delete option to remove a courier from Amperity. This should be done carefully. Verify that both upstream and downstream processes no longer depend on this courier prior to deleting it. This action will not delete the feeds associated with the courier.

To delete a courier

  1. From the Sources tab, open the menu for a courier, and then select Delete. The Delete Courier dialog box opens.

  2. Click Delete.

Edit courier

Use the Edit option in the row for a specific courier to make configuration changes. For example, a new file is added to an Amazon S3 filedrop location already configured to send data to Amperity. After the feed is created, it can be added to the existing courier objects and load operations.

In other cases, a courier may need editing because the credentials to the data source have changed.

To edit a courier

  1. From the Sources tab, open the menu for a courier, and then select Edit. The Edit Courier page opens.

  2. Make your changes.

  3. Click Save.

Load data only

A courier can be run to load data to a domain table and prevent downstream processes, such as Stitch, customer 360 database runs, SLA queries, and orchestrations.

To load data (without downstream processing)

  1. From the Sources tab, open the menu for a courier, and then select Run. The Run Courier page opens.

  2. Select Load all data.

  3. To prevent downstream processing, select Load Only.

  4. Click Run.

Run couriers

Use the Run option to run the courier manually.

A courier can be run in the following ways:

for date range

A courier can be configured to load all data for a specific date range.

To run a courier for a time period

  1. From the Sources tab, open the menu for a courier, and then select Run. The Run Courier page opens.

  2. Select Load data from a specific time period.

  3. Select a start date and an end date.

    Important

    The start of the selected date range is inclusive, whereas the end of the selected date range is exclusive.

  4. To prevent downstream processing, select Load Only.

    Warning

    When a data source is changed, and then loaded using the Load Only option, downstream processes are not started automatically. Data that contains PII must be stitched. Databases that contain interaction records must be regenerated so that attributes and predictions are recalculated.

  5. Click Run.

for all data

A courier can be configured to load all data that is available. This can be a large amount of data if the courier is running for the first time.

To run a courier to collect all available data

  1. From the Sources tab, open the menu for a courier, and then select Run. The Run Courier page opens.

  2. Select Load all data.

  3. To prevent downstream processing, select Load Only.

    Warning

    Stitch must be run for data to be available in databases. Jobs that are run as load only do not automatically run Stitch.

  4. Click Run.

without downstream processing

A courier can be configured to load data, but not start any downstream processing, including Stitch, database generation, or queries.

Warning

Stitch must be run for data to be available in databases. Jobs that are run as load only do not automatically run Stitch.

To run a courier without downstream processing

  1. From the Sources tab, open the menu for a courier, and then select Run. The Run Courier page opens.

  2. Select Load data from a specific day or Load data from a specific time period.

  3. Under Load options, select Load Only.

  4. Click Run.

without load operations

You can run a courier without load operations. Use this approach to get files to upload during feed creation, as a feed requires knowing the schema of a file before you can apply semantic tagging and other feed configuration settings.

To run a courier without load operations

  1. From the Sources tab, open the menu for a courier with an empty load operation, and then select Run. The Run Courier page opens.

  2. The setting you choose on the Run Courier page do not matter. You must choose one of Load all data or Load data from a specified date, but because the load operation is set to {} no data will be loaded.

  3. Set the load operation to {} (empty).

  4. Click Run.

View error log

If a courier runs and returns an error, you may view the errors from that feed.

To view errors

  1. From the Notifications pane, for the stage error, open the View Load Details link.

  2. From the View Load Details pane, select View Error Log for the feed with errors.

  3. Investigate the errors reported.

Restart job

If a courier runs and returns an error, you may view the error, resolve that error by updating the feed configuration or Spark SQL query, and then restart it without having to reload the data associated with the job.

Note

Only non-SLA couriers may be rerun.

To restart a job

  1. From the Notifications pane, for the stage error, open the View Load Details link and investigate why the job failed.

  2. Edit the feed configuration or Spark SQL query to address the reasons for the error.

  3. From the Notifications pane, click Restart Ingest Job.

View courier

The Sources tab shows the status of every courier, including when it last ran or updated, and its current status.

To view a courier

From the Sources tab, open the menu for a courier, and then select View. The View Courier page opens.