About couriers¶
A courier brings data from an external system to Amperity.
What a courier does:
Checks if data is available at the source location.
Collects data from the source location, and then pulls that data to Amperity.
What a courier needs:
Access to the source location. Most data sources–Amazon S3, Azure Blob Storage, Google Cloud Storage, or any SFTP site–allow the use of many file formats, while others may use Snowflake or REST APIs.
A location from which to copy data.
An associated feed.
A file format–CSV, TSV, Apache Parquet, etc., along with additional details for compression, archive, and encryption.
A combination of load settings and load operations. The exact combination of settings and operations depends on the data source and the types of files to be pulled to Amperity.
File couriers¶
A file data source can provide files to Amperity in just about any file format, such as CSV, JSON, Apache Parquet, Apache AVRO, PSV, and TSV. Locations from which file data sources can be pulled include Amazon S3, Azure Blob Storage, Google Cloud Storage, and Any SFTP site.
Load settings¶
File data sources define load settings in two parts:
A list of files that should be pulled to Amperity.
A list of load operations that associate each file with a feed.
The exact combination of files and load operations depends on the data source from which data is made available to Amperity.
Load settings define the location of a data source, it’s type, and how it should be processed by Amperity. The syntax for file load settings is similar to:
[
{
"object/file-pattern": "'CUSTOMER/ENV/FILENAME_'MM-dd-yyyy'.csv'",
"object/type": "file",
"object/land-as": {
"file/tag": "FILE_TAG",
"file/content-type": "text/csv",
"file/header-rows": 1
}
},
{
"object/file-pattern": "'ARCHIVED/FILENAME_'MM-dd-yyyy'.zip'",
"object/type": "archive",
"archive/contents": {
"FILENAME": {
"subobject/land-as": {
"file/tag": "FILENAME_TAG",
"file/content-type": "text/csv"
}
},
"archive/skip-missing-contents": true
}
},
{
"object/file-pattern": "'ARCHIVED/FILENAME_'MM-dd-yyyy'.zip'",
"object/type": "archive",
"object/land-as": {
"file/tag": "FILENAME_TAG",
"file/content-type": "text/csv",
"file/header-rows": 1
}
}
]
Note
Refer to individual file formats – Apache Avro, Apache Parquet, CBOR, CSV, JSON, NDJSON, PSV, streaming JSON, TSV, and XML – for the list of available load settings.
Each filedrop load setting must specify the file pattern, which is the path to the file, its filename, a date stamp, and a file extension. The rest of the load settings block must match, i.e. the content type for a “some-file.csv” must be “text/csv”. An archive must specify the archive and the file contained within it. If the file is archived as “some-file.zip” then the "object/type"
would be “archive” and the content type of the file within it would be “text/csv”.
If an archive contains only a single file or if all the files within the archive have the same file tag, content type, and other settings, then "archive/contents"
can be omitted and "object/land-as"
can be specified instead. The files within the archive will all use the specified "object/land-as"
settings.
If an archive is missing contents and you would like to ignore or skip those missing contents, then "archive/skip-missing-contents"
can be added to your settings.
File patterns¶
A courier looks for objects in a filedrop location using a combination of the path to a directory, the name of a file, and a date. These are defined by the "object/file-pattern"
setting for each object. A courier runs based on a date or a date range, and then looks for files in the filedrop location for that date or date range.
A file pattern may use a combination of literal strings, wildcard characters (*
) within literal strings, wildcard characters for filenames in archives, and date components, separated by single quotes and forward slashes.
Wildcards¶
A wildcard can match zero (or more) characters up until a forward-slash character.
Note
When a file pattern with a wildcard matches more than one file for a given date or date range, the matched files are loaded in such a way that guarantees per-day ordering. If your courier uses an ingest query, ascending lexicographical ordering by file is not guaranteed or preserved within a single day’s files.
Examples
The following example shows using a wildcard at the end of a file pattern:
'files/'yyyy'/'MM'/'dd'/customers-*.csv'
will match any of these files:
/customers-.csv
/customers-1.csv
/customers-hello-world.csv
and will not match any of these:
/customers-.csv.0
/customers-0.json
/customers-0/1/file.csv
/customers.csv
The following example shows using multiple wildcards:
{
"object/type": "file",
"object/file-pattern": "'responsys/outbound_files/*_STATE_'yyyyMMdd'_*.txt.gpg'",
"object/land-as": {
"file/header-rows": 1,
"file/tag": "launch",
"file/content-type": "text/csv"
}
},
Wildcards within archives¶
A wildcard can be used to match one (or more) files in an archive.
The following example shows how to use a wildcard to match a set of CSV files contained within the same archive:
[
{
"archive/contents": {
"ArchiveName/files_*.csv:" {
"subobject/land-as": {
"file/header-rows": 1,
"file/separator": ",",
"file/tag": "launch",
"file/content-type": "text/csv"
}
}
},
"object/type": "archive",
"object/file-pattern": "/path/to/archive/ArchiveName.zip"
}
]
Literal strings¶
A literal string must be an exact match to characters in the file path, with the exception of the presence of wildcard characters within literal strings. Wrap literal strings that match Joda-Time format in single quotes. For example:
‘files/’
‘/’
‘/’
‘MM-dd-YYYY’
Date components¶
Date components act as placeholders for months, days, and years. Real values are applied when the courier runs on a given date or date range. Date components must match Joda-Time pattern-based formatting, but should generally be limited to the following patterns:
Pattern |
Meaning |
Examples |
---|---|---|
yyyy |
4-digit year |
2020, 2021, … |
MM |
2-digit month |
01, 02, … 12 |
dd |
2-digit date |
01, 02, … 31 |
A courier that runs using this pattern:
'files/'yyyy'/'MM'/'dd'/customers-*.csv'
when run on April 10, 2020 will look for files at 'files/2020/04/10/customers-*.csv'
and will return any files that match.
File compression / archive¶
Amperity supports the following compression and archive types:
GZIP¶
{
"object/type": "file",
"object/optional": false,
"object/file-pattern": "'ARCHIVED/FILENAME_'MM-dd-yyyy'.csv.gz'",
"object/land-as": {
"file/header-rows": 1,
"file/tag": "FILENAME_TAG",
"file/content-type": "text/csv"
}
}
TAR¶
{
"object/file-pattern": "'ARCHIVED/FILENAME_'MM-dd-yyyy'.tar'",
"object/type": "archive",
"archive/contents": {
"FILENAME": {
"subobject/land-as": {
"file/tag": "FILENAME_TAG",
"file/content-type": "text/csv"
}
}
}
}
ZIP¶
{
"object/file-pattern": "'ARCHIVED/FILENAME_'MM-dd-yyyy'.zip'",
"object/type": "archive",
"archive/contents": {
"FILENAME": {
"subobject/land-as": {
"file/tag": "FILENAME_TAG",
"file/content-type": "text/csv"
}
}
}
}
Input examples¶
The following examples show how files input to Amperity are unpacked, depending on various combinations of encryption, compression type, and file format. All examples use yyyy_MM_dd
for the date format.
for single files¶
PGP, TGZ, CSV
Input to Amperity: table_name_yyyy_MM_dd.tgz.pgp
After decryption: table_name_yyyy_MM_dd.tgz
After decompression: table_name_yyyy_MM_dd.csv
PGP, GZip, TAR, CSV
Input to Amperity: table_name_yyyy_MM_dd.csv.tar.gz.pgp
After decryption: table_name_yyyy_MM_dd.csv.tar.gz
After decompression: table_name_yyyy_MM_dd.csv.tar
After the archive is opened: table_name_yyyy_MM_dd.csv
PGP, TAR, Apache Parquet
Input to Amperity: table_name_yyyy_MM_dd.tar.pgp
After decryption: table_name_yyyy_MM_dd.tar
After decompression: table_name_yyyy_MM_dd.parquet, with 1 to n Apache Parquet part files within the directory.
for multiple files¶
PGP, TAR, Apache Parquet
Input to Amperity: input_name_yyyy_MM_dd.parquet.tar.pgp
After decryption: input_name_yyyy_MM_dd.parquet.tar
After decompression: table_name_yyyy_MM_dd.parquet, where, for each table, 1 to n Apache Parquet files will be located within a single directory.
PGP, TGZ, CSV
Input to Amperity: input_name_yyyy_MM_dd.csv.tgz.pgp
After decryption: input_name_yyyy_MM_dd.csv.tgz
After decompression: table_name.csv, where all tables that were input are located within a single directory.
API couriers¶
An API data source will vary, depending on the file format and other configuration details. API data sources include Campaign Monitor, Google Analytics, Salesforce Sales Cloud, and Zendesk.
Snowflake couriers¶
A Snowflake data source provides a list of tables that are consolidated into a fileset. Snowflake data sources include Snowflake itself, and then also any FiveTran data source, such as Klaviyo, Shopify, Kustomer, and HubSpot.
Table lists¶
A table list defines the list of tables to be pulled to Amperity from Snowflake.
[
"AMPERITY_A1BO987C.ACME.CAMPAIGN",
"AMPERITY_A1BO987C.ACME.LIST",
"AMPERITY_A1BO987C.ACME.CONTACT"
]
Stage names¶
A stage defines the location of objects that are available within Snowflake.
AMPERITY_A1BO987C.ACME.ACME_STAGE
Load operations¶
Load operations associate each table in the list of tables to a feed. (The initial setup for this courier will use an incorrect feed ID – df-xxxxxx
.)
{
"df-xxxxx": [
{
"type": "load",
"file": "AMPERITY_A1BO987C.ACME.CAMPAIGN"
}
],
"df-xxxxx": [
{
"type": "load",
"file": "AMPERITY_A1BO987C.ACME.LIST"
}
],
"df-xxxxx": [
{
"type": "load",
"file": "AMPERITY_A1BO987C.ACME.CONTACT"
}
]
}
Load operation types¶
A fileset is a group of files that are processed as a unit by a single courier. A fileset defines each file individually by name, datestamp, file format, and load operation. A courier expects all files in a fileset to be available for processing, unless a file is specified as optional.
Each file in a fileset must be associated with one of the following load operation types:
Empty¶
An empty load operation will bring files to Amperity, but not try to load those files into a feed. An empty load operation is ideal for bringing sample files to Amperity prior to configuring the feed that defines the schema within Amperity for a data source. Use the sample file while configuring the feed, and then update the load operation to match the configuration requirements for the associated file type.
{}
Tip
You cannot use an empty load operation for files that require the use of an ingest query to transform the data prior to it being made available to the feed.
For example, a JSON file with nested data must use an ingest query to flatten the file. A feed cannot use a JSON file with nested data as a sample file. And a courier cannot run to successful completion unless the courier is configured with a feed ID.
In this type of situation, create a file outside of this workflow to use as the sample file for the feed. For example, use Databricks to generate a zero-row sample file, and then upload that file during feed creation.
Another option is to define the schema without using a sample file. Select the Don’t use sample file option when adding the feed, and then use the Add field button to define each field in the schema.
Incorrect feed ID¶
Instead of using an empty load operation you can use an obviously incorrect feed ID to pull files to Amperity. This approach uses the default load configuration, but but sets the feed ID to a string that will not be available to the courier after feeds have been updated. For example, replacing the digits with six “x” characters:
{
"df-xxxxxx": [
{
"type": "truncate"
},
{
"type": "load",
"file": "campaign-members"
}
]
}
This will return an error message similar to:
Error running load-operations task
Cannot find required feeds: "df-xxxxxx"
The load operation will pull the files and make them available for use with defining a feed schema.
Load files¶
You can load contents of a data file to a domain table as a load operation as an UPSERT
operation that is based off of the primary key in the table.
"OTHER_FEED_ID": [
{
"type": "load",
"file": "OTHER_FILE_TAG"
}
]
Load ingest query¶
Spark SQL is a high performance SQL query engine that is used by Amperity to ingest data, create domain tables, and extend the outcome of the Stitch process in your customer 360 database.
Use Spark SQL to define all SQL queries related to the following areas of Amperity:
Ingesting data, including ingest queries
Processing data into domain tables
Building custom domain tables
Loading data into Stitch
Running Stitch
Loading the results of Stitch into the customer 360 database
Defining tables in the customer 360 database
Note
Spark SQL is used to define all SQL queries related to the Stitch process up to (and including) building the tables in the customer 360 database. Presto SQL is used to define SQL queries for segments. Why both?
Spark SQL performs better in more traditional processes like machine learning and ETL-like processes that are resource intensive.
Presto SQL performs better when running real-time queries against cloud datasets.
The configuration for an ingest query load operation depends on the data source against which the ingest query will run:
Truncate, then load¶
You can empty the contents of a table prior to loading a data file to a domain table as a load operation.
Note
A truncate operation is always run first, regardless of where it’s specified within the load operation.
"FEED_ID": [
{
"type": "truncate"
},
{
"type": "load",
"file": "FILE_NAME"
}
],
Examples¶
The following sections provide examples for load settings and load operations by data source and/or by file type:
Apache Avro¶
Apache Avro is a row-oriented remote procedure call and data serialization framework developed within the Apache Hadoop ecosystem. Avro uses JSON to define data types and protocols, and serializes data in a compact binary format.
Load settings
{
"object/type": "file",
"object/file-pattern": "'path/to/file'-YYYY-MM-dd'.avro'",
"object/land-as": {
"file/tag": "FILE_NAME",
"file/content-type": "application/avro"
}
}
Load operations
{
"FEED_ID": [
{
"type": "OPERATION",
"file": "FILE_NAME"
}
]
}
Apache Parquet¶
Apache Parquet is a free and open-source column-oriented data storage format developed within the Apache Hadoop ecosystem. It is similar to RCFile and ORC, but provides more efficient data compression and encoding schemes with enhanced performance and can better handle large amounts of complex bulk data.
Note
Apache Parquet files are almost always partitioned, where a single logical Parquet file is comprised of multiple physical files in a directory structure, each of them representing a partition.
Parquet partitioning optionally permits for data to be nested in a directory structure determined by the value of partitioning columns. Amperity only detects Parquet partition files one directory level below the configured file pattern. For example:
"path/to/file-YYYY-MM-dd.parquet/part-0000.parquet"
Load settings
{
"object/type": "file",
"object/file-pattern": "'path/to/file'-YYYY-MM-dd'.parquet/'",
"object/land-as": {
"file/tag": "FILE_NAME",
"file/content-type": "application/x-parquet"
}
}
Load operations
{
"FEED_ID": [
{
"type": "OPERATION",
"file": "FILE_NAME"
}
]
}
Campaign Monitor¶
Campaign Monitor is an email marketing platform that tracks details related to email campaigns (opens, clicks, bounces, unsubscribes, spam complaints, and recipients) and email subscriber lists (active, unconfirmed, bounced, and deleted subscribers), and other details.
Load settings
The Campaign Monitor REST API has a clearly defined set of files that can be made available to Amperity. The load settings are built into Amperity by default.
{
"open": "opens-file",
"unsubscribe": "unsubscribes-file",
"spam": "spam-file",
"unsubscribed-subscriber": "unsubscribed-subscribers-file",
"unconfirmed-subscriber": "unconfirmed-subscribers-file",
"recipient": "recipients-file",
"bounced-subscriber": "bounced-subscribers-file",
"bounce": "bounces-file",
"click": "clicks-file",
"campaign": "campaigns-file",
"deleted-subscriber": "deleted-subscribers-file",
"suppression": "suppression-list-file",
"subscriber-list": "list-stats-file",
"active-subscriber": "active-subscribers-file"
}
Load operations
"OPENS_FEED_ID": [
{
"type": "load",
"file": "opens-file"
}
],
"BOUNCED-SUBSCRIBERS_FEED_ID": [
{
"type": "truncate"
},
{
"type": "load",
"file": "bounced-subscribers-file"
}
],
"ACTIVE-SUBSCRIBERS_FEED_ID": [
{
"type": "truncate"
},
{
"type": "load",
"file": "active-subscribers-file"
}
],
"CLICKS_FEED_ID": [
{
"type": "load",
"file": "clicks-file"
}
],
"DELETED-SUBSCRIBERS_FEED_ID": [
{
"type": "truncate"
},
{
"type": "load",
"file": "deleted-subscribers-file"
}
],
"SUPPRESSION-LIST_FEED_ID": [
{
"type": "truncate"
},
{
"type": "load",
"file": "suppression-list-file"
}
],
"CAMPAIGNS_FEED_ID": [
{
"type": "truncate"
},
{
"type": "load",
"file": "campaigns-file"
}
],
"RECIPIENTS_FEED_ID": [
{
"type": "load",
"file": "recipients-file"
}
],
"BOUNCES_FEED_ID": [
{
"type": "load",
"file": "bounces-file"
}
],
"LIST-STATS_FEED_ID": [
{
"type": "load",
"file": "list-stats-file"
}
],
"SPAM_FEED_ID": [
{
"type": "load",
"file": "spam-file"
}
],
"UNSUBSCRIBED-SUBSCRIBERS_FEED_ID": [
{
"type": "truncate"
},
{
"type": "load",
"file": "unsubscribed-subscribers-file"
}
],
"UNSUBSCRIBES_FEED_ID": [
{
"type": "load",
"file": "unsubscribes-file"
}
]
CBOR¶
CBOR is a binary data serialization format loosely based on JSON. Like JSON it allows the transmission of data objects that contain name–value pairs, but in a more concise manner. This increases processing and transfer speeds at the cost of human-readability.
Load settings for Amazon AWS
{
"object/type": "file",
"object/file-pattern": "'ingest/stream/TENANT/STREAM_ID/'yyyy-MM-dd'/'*'.cbor'",
"object/land-as": {
"file/header-rows": 1,
"file/tag": "FILE_NAME",
"file/content-type": "application/ingest-pack+cbor"
}
},
Load settings for Microsoft Azure
{
"object/type": "file",
"object/file-pattern": "'STREAM_ID/'yyyy-MM-dd'/'*'.cbor'",
"object/land-as": {
"file/header-rows": 1,
"file/tag": "FILE_NAME",
"file/content-type": "application/ingest-pack+cbor"
}
},
Load operations
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME",
"options": {
"rowTag": "row"
},
"schema": {
"fields": [
{
"metadata": {},
"name": "field-1",
"type": "string",
"nullable": true
},
...
{
"metadata": {},
"name": "nested-group-1",
"type": {
"fields": [
{
"metadata": {},
"name": "field-a",
"type": "string",
"nullable": true
},
{
"metadata": {},
"name": "nested-group-a",
"type": {
"fields": [
...
],
"type": "struct"
},
"nullable": true
},
{
"metadata": {},
"name": "field-xyz",
"type": "string",
"nullable": true
},
],
"type": "struct"
}
"type": "struct"
}
...
}
...
]
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
Important
The "schema"
must match the structure of the incoming file, including all nested groupings and data types. Set "nullable"
to True
to allow fields to contain NULL
values. A CBOR file can have hundreds of fields. The ellipses (...
) in this example represents locations within this example structure where additional fields may be present.
Tip
Set rowTag
to the element in the CBOR file that should be treated as a row in a table. The default value is row
.
CSV¶
A comma-separated values (CSV) file, defined by RFC 4180, is a delimited text file that uses a comma to separate values. A CSV file stores tabular data (numbers and text) in plain text. Each line of the file is a data record. Each record consists of one or more fields, separated by commas. The use of the comma as a field separator is the source of the name for this file format.
Load settings with header rows
A headerless CSV file does not contain a row of data that defines headers for the data set. When working with a headerless CSV file you can configure the load settings for the courier to accept headerless files or you may use an ingest query when the data must be changed in some way prior to loading it to Amperity.
Load settings with non-standard quotes and separators
Some CSV files may use non-standard characters for quotes and separators, such as '
for quotes and \
for separators. If a CSV file contains non-standard characters, you must specify these characters in the courier load settings.
Load settings without header rows
{
"object/type": "file",
"object/file-pattern": "'path/to/file'-YYYY-MM-dd'.csv'",
"object/land-as": {
"file/header-rows": 0,
"file/tag": "FILE_NAME",
"file/content-type": "text/csv"
}
},
Load operations for feed
{
"FEED_ID": [
{
"type": "OPERATION",
"file": "FILE_NAME"
}
]
}
Load operations for ingest query
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME",
"options": {
"delimiter": ",",
"escape": "\\",
"multiline": "true",
"quote": "\""
}
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
Caution
Spark does not correctly implement RFC 4180 for escape characters in CSV files. The most common implementations of CSV files expect a double quote "
as an escape character while Spark uses a backslash \
. For more information about this issue view the SPARK-22236 issue within the Spark project.
You can override this behavior when working with RFC-compliant CSV files by specifying an escape character in the courier load operations using '
or "
as the escape character.
For example:
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME",
"options": {
"escape": "'"
}
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
If a CSV file uses \
as the delimiter, configure the load operation to specify an empty delimiter value, after which Spark will automatically apply the \
character as the delimiter.
For example:
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME",
"options": {
"delimiter": ""
}
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
JSON¶
JavaScript Object Notation (JSON) is language-independent data format that is derived from (and structured similar to) JavaScript.
Load settings
{
"object/type": "file",
"object/file-pattern": "'path/to/file'-YYYY-MM-dd'.json'",
"object/land-as": {
"file/tag": "FILE_NAME",
"file/content-type": "application/json"
}
},
Load operations
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME"
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
NDJSON¶
Newline-delimited JSON (NDJSON) is a data format for structured data that defines the structure of JSON data using lines as separators. Each line in a NDJSON file is a valid JSON value.
Load settings
{
"object/type": "file",
"object/file-pattern": "'path/to/file'-YYYY-MM-dd'.ndjson'",
"object/land-as": {
"file/tag": "FILE_NAME",
"file/content-type": "application/x-ndjson"
}
},
Load operations
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME"
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
PSV¶
A pipe-separated values (PSV) file is a delimited text file that uses a pipe to separate values. A PSV file stores tabular data (numbers and text) in plain text. Each line of the file is a data record. Each record consists of one or more fields, separated by pipes. The use of the pipe as a field separator is the source of the name for this file format.
Load settings with header rows
{
"object/type": "file",
"object/file-pattern": "'path/to/file'-YYYY-MM-dd'.psv'",
"object/land-as": {
"file/header-rows": 1,
"file/tag": "FILE_NAME",
"file/content-type": "text/pipe-separated-values"
}
},
Load settings with non-standard quotes and separators
{
"object/type": "file",
"object/file-pattern": "'path/to/file'-YYYY-MM-dd'.psv'",
"object/land-as": {
"file/header-rows": 1,
"file/tag": "FILE_NAME",
"file/quote": "*",
"file/separator": ";",
"file/content-type": "text/pipe-separated-values"
}
},
Load settings without header rows
{
"object/type": "file",
"object/file-pattern": "'path/to/file'-YYYY-MM-dd'.psv'",
"object/land-as": {
"file/header-rows": p,
"file/tag": "FILE_NAME",
"file/content-type": "text/pipe-separated-values"
}
},
Load operations for feed
{
"FEED_ID": [
{
"type": "OPERATION",
"file": "FILE_NAME"
}
]
}
Load operations for ingest query
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME"
"options": {
"delimiter": "\|",
"escape": "\\",
"multiline": "true",
"quote": "\""
}
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
Salesforce Commerce Cloud¶
Salesforce Commerce Cloud is a multi-tenant, cloud-based commerce platform that enables brands to create intelligent, unified buying experiences across all channels.
Load settings
The Salesforce Commerce Cloud REST API has a clearly defined set of files that can be made available to Amperity. The load settings are built into Amperity by default. (Salesforce Commerce Cloud was previously known as Demandware.)
[
{
"entity-type": "returns",
"file-tag": "returns-file"
},
{
"entity-type": "customers",
"file-tag": "customers-file"
},
{
"entity-type": "sites",
"file-tag": "sites-file"
},
{
"entity-type": "shipping_orders",
"file-tag": "shipping_orders-file"
},
{
"entity-type": "purchase_orders",
"file-tag": "purchase_orders-file"
},
{
"entity-type": "vendors",
"file-tag": "vendors-file"
},
{
"entity-type": "return_orders",
"file-tag": "return_orders-file"
},
{
"entity-type": "items",
"file-tag": "items-file"
},
{
"entity-type": "shipments",
"file-tag": "shipments-file"
},
{
"entity-type": "payments",
"file-tag": "payments-file"
},
{
"entity-type": "invoices",
"file-tag": "invoices-file"
},
{
"entity-type": "orders",
"file-tag": "orders-file",
"expand": [
"invoices",
"order_items"
],
"json-path": "..."
}
]
Load operations
{
"SHIPMENTS_FEED_ID": [
{
"type": "load",
"file": "shipments-file"
}
],
"PURCHASE_ORDERS_FEED_ID": [
{
"type": "load",
"file": "purchase_orders-file"
}
],
"INVOICES_FEED_ID": [
{
"type": "load",
"file": "invoices-file"
}
],
"SITES_FEED_ID": [
{
"type": "load",
"file": "sites-file"
}
],
"RETURN_ORDERS_FEED_ID": [
{
"type": "load",
"file": "return_orders-file"
}
],
"ORDERS_FEED_ID": [
{
"type": "load",
"file": "orders-file"
}
],
"PAYMENTS_FEED_ID": [
{
"type": "load",
"file": "payments-file"
}
],
"ITEMS_FEED_ID": [
{
"type": "load",
"file": "items-file"
}
],
"VENDORS_FEED_ID": [
{
"type": "load",
"file": "vendors-file"
}
],
"RETURNS_FEED_ID": [
{
"type": "load",
"file": "returns-file"
}
],
"SHIPPING_ORDERS_FEED_ID": [
{
"type": "load",
"file": "shipping_orders-file"
}
],
"CUSTOMERS_FEED_ID": [
{
"type": "load",
"file": "customers-file"
}
]
}
Salesforce Sales Cloud¶
SalesForce Sales Cloud brings customer information together into an integrated platform, and then provides access to thousands of applications through the AppExchange.
Load settings
The Sales Cloud integration allows you to use SQL patterns to specify which fields in an Object should be brought back to Amperity. Use the fields
grouping to define which fields to bring back. Use *
for all fields, otherwise specify a list of fields. Use where
to specify values in the fields. The following table shows examples of Objects and the equivalent SQL query used to define load settings.
[
{
"from": "ObjectName",
"file/tag": "objectname-file",
"fields": [
"*"
]
},
{
"from": "CustomObject",
"file/tag": "custom-object-file",
"fields": [
"*"
]
},
{
"from": "AnotherObject",
"file/tag": "another-object-file",
"fields": [
"field-a.name",
"field-b.name"
]
},
{
"from": "Object2",
"file/tag": "object2-file",
"fields": [
"field-one.name",
"field-two.name"
]
"where": "field-two = 'true'"
}
]
Select all fields in an Object
The following SQL query:
SELECT * FROM Account
Is equivalent to the following load operation:
{
"from": "Account",
"file/tag": "account-file",
"fields": [
"*"
]
},
Select fields in an Object with specified values
The following SQL query:
SELECT Id, Name FROM Opportunity
WHERE Name = 'John'
Is equivalent to the following load operation:
{
"from": "Opportunity",
"file/tag": "opportunity-file",
"fields": [
"Id",
"Name"
]
"where": "Name = 'John'"
},
Select only direct reports
The following SQL query:
SELECT *, ReportsTo.Name FROM Contact
Is equivalent to the following load operation:
{
"from": "Contact",
"file/tag": "contact-file",
"fields": [
"*",
"ReportsTo.Name"
]
},
Select only rows with a certain value in a custom column
The following SQL query:
SELECT * FROM CustomTable__c
WHERE CustomField__c = 34
Is equivalent to the following load operation:
{
"from": "CustomObject",
"file/tag": "custom-object-file",
"fields": [
"*"
],
"where": "CustomField__c = 34"
},
Load operations
{
"ACCOUNTS_FEED": [
{
"type": "truncate"
},
{
"type": "load",
"file": "accounts-file"
}
],
"CUSTOM_OBJECTS_FEED": [
{
"type": "truncate"
},
{
"type": "load",
"file": "custom-objects-file"
}
]
}
Snowflake¶
Snowflake is an analytic data warehouse that is fast, easy to use, and flexible. Snowflake uses a SQL database engine that is designed for the cloud.
Load settings
For tables in a data warehouse, such as Snowflake, a list of table names must be specified.
[
"table.name",
"table.name"
]
Load operations
{
"FEED_ID": [
{
"type": "load",
"file": "marketing.public.customer"
}
]
}
Streaming JSON¶
Streaming JSON is a way to send increments of data using NDJSON formatting within each increment. Each line in a NDJSON file is a valid JSON value.
Load settings
{
"object/type": "file",
"object/file-pattern": "'path/to/file'-YYYY-MM-dd'.ndjson'",
"object/land-as": {
"file/header-rows": 1,
"file/tag": "FILE_NAME",
"file/content-type": "application/x-json-stream"
}
},
Load operations
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME"
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
TSV¶
A tab-separated values (TSV) file is a delimited text file that uses a tab to separate values and stores tabular data (numbers and text) in plain text. Each line in the file is a data record. Each record consists of one or more fields, separated by tabs.
Load settings with header rows
{
"object/type": "file",
"object/file-pattern": "'path/to/file'-YYYY-MM-dd'.tsv'",
"object/land-as": {
"file/header-rows": 1,
"file/tag": "FILE_NAME",
"file/content-type": "text/tab-separated-values"
}
},
Load settings with non-standard quotes and separators
{
"object/type": "file",
"object/file-pattern": "'path/to/file'-YYYY-MM-dd'.tsv'",
"object/land-as": {
"file/header-rows": 1,
"file/tag": "FILE_NAME",
"file/quote": "*",
"file/separator": ";",
"file/content-type": "text/tab-separated-values"
}
},
Load settings without header rows
{
"object/type": "file",
"object/file-pattern": "'path/to/file'-YYYY-MM-dd'.tsv'",
"object/land-as": {
"file/header-rows": 0,
"file/tag": "FILE_NAME",
"file/content-type": "text/tab-separated-values"
}
},
Load operations for feed
{
"FEED_ID": [
{
"type": "OPERATION",
"file": "FILE_NAME"
}
]
}
Load operations for ingest query
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME",
"options": {
"delimiter": "\t",
"escape": "\\",
"multiline": "true",
"quote": "\""
}
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
XML¶
eXtensible Markup Language (XML) is supported data format for customer data sources.
Load settings
{
"object/type": "file",
"object/file-pattern": "'path/to/file'-YYYY-MM-dd'.xml'",
"object/land-as": {
"file/tag": "FILE_NAME",
"file/content-type": "application/xml"
}
}
Load operations
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME",
"options": {
"rowTag": "row"
}
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
Tip
Set ROW
to the element in the XML schema that should be treated as a row in a table. For example, if the XML schema contained:
<salesTransactions>
<salesTransaction> ... </salesTransaction>
</salesTransactions>
then use salesTransaction
as the value for rowTag
. The default value is row
.
{
"df-5Jagkabc": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "PosXml",
"options": {
"rowTag": "salesTransaction"
}
}
],
"spark-sql-query": "API_Test_Headers"
}
]
}
How-tos¶
This section describes tasks related to managing couriers in Amperity:
Add courier¶
Use the Add Courier button to add a courier to Amperity. A courier should be created for each feed that exists in Amperity.
For smaller data sources, a courier may be associated with more than one feed. For larger data sources, a courier should be associated with a single feed. This is, in part, because couriers are run in parallel, but multiple feeds associated with a single courier are run sequentially.
For example: if Snowflake is configured to send six tables to Amperity via six feeds, but all running as part of the same courier, table one must finish before table two, which must finish before table three, and so on. Whereas if each table is configured with its own courier, all six tables could start processing at the same time.
A courier configured from the Amperity UI must be configured to use one of the existing plugins in Amperity, such as for Amazon S3, Azure Blob Storage, Azure Data Lake Storage, SFTP, or Snowflake.
Some of these plugins have more than one option for credentials.
Use SnapPass to securely share configuration data with your Amperity representative.
To add a courier
From the Sources page, click Add Courier. The Add Courier page opens.
Enter the name of the courier.
From the Plugin drop-down, select a plugin.
Note
The settings for a courier will vary, depending on the courier selected from the Plugin drop-down.
Enter the credentials for the courier type.
Enter any courier-specific settings.
Under <COURIER NAME> Settings configure the file load settings. This is done in two parts: a list of files that should be available to Amperity (including how they are made available), and then a series of load operations that associates each file in the list to a feed.
Click Save.
Add courier as copy¶
You may add a courier by copying an existing courier. This is useful when couriers share plugin, credential, and other common settings. A copied courier will retain all of the configured settings as the original, but will be assigned a unique name based on the name of the copied courier.
To add a courier as a copy
From the Sources page, open the menu for a courier, and then select Make a copy. The Add Courier page opens.
Update the name of the courier.
Verify all other configuration settings. Edit them as necessary.
Under <COURIER NAME> Settings configure the file load settings. This is done in two parts: a list of files that should be available to Amperity (including how they are made available), and then a series of load operations that associates each file in the list to a feed.
Click Save.
Add to courier group¶
A courier group is a list of one (or more) couriers that are run as a group, either ad hoc or as part of an automated schedule. A courier group can be configured to act as a constraint on downstream workflows.
To add a courier to a courier group
From the Sources page, open the menu for a courier group, and then select Edit.
On the Couriers tab, click the Add courier link.
Select the name of a courier from the drop-down list, set the wait time and range for which data is loaded. Enable alerts for when files are missing.
Click Save.
Delete courier¶
Use the Delete option to remove a courier from Amperity. This should be done carefully. Verify that both upstream and downstream processes no longer depend on this courier before you delete it. This action will not delete the feeds associated with the courier.
To delete a courier
From the Sources page, open the menu for a courier, and then select Delete. The Delete Courier dialog box opens.
Click Delete.
Edit courier¶
Use the Edit option in the row for a specific courier to make configuration changes. For example, a new file is added to an Amazon S3 filedrop location already configured to send data to Amperity. After the feed is created, it can be added to the existing courier objects and load operations.
In other cases, a courier may need editing because the credentials to the data source have changed.
To edit a courier
From the Sources page, open the menu for a courier, and then select Edit. The Edit Courier page opens.
Make your changes.
Click Save.
Load data only¶
A courier can be run to load data to a domain table and prevent downstream processes, such as Stitch, customer 360 database runs, queries, and orchestrations.
To load data (without downstream processing)
From the Sources page, open the menu for a courier, and then select Run. The Run Courier page opens.
Select Load data from a specific day or Load data from a specific time period.
To prevent downstream processing, select Ingest only.
Click Run.
Run couriers¶
Use the Run option to run the courier manually.
A courier can be run in the following ways:
Only load files¶
A courier can be configured to load data, but not start any downstream processing, including Stitch, database generation, or queries.
Warning
Stitch must be run for data to be available in databases. Jobs that are run as load only do not automatically run Stitch.
To run a courier without downstream processing
From the Sources page, open the menu for a courier, and then select Run. The Run Courier page opens.
Select Load data from a specific day or Load data from a specific time period.
Under Load options, select Ingest only.
Click Run.
Run for a specific day¶
A courier can be configured to load data from a specific day.
To run a courier and load data from a specific day
From the Sources page, open the menu for a courier, and then select Run. The Run Courier page opens.
Select Load data from a specific day.
Select a calendar date.
To prevent downstream processing, select Ingest only.
Warning
When a data source is changed, and then loaded using the Ingest only option, downstream processes are not started automatically. Data that contains PII must be stitched. Databases that contain interaction records must be regenerated so that attributes and predictions are recalculated.
Click Run.
Run for a time period¶
A courier can be configured to load all data from a specified time period.
To run a courier to load all data from a specific time period
From the Sources page, open the menu for a courier, and then select Run. The Run Courier page opens.
Select Load data from a specific time period.
Select a start date and an end date.
Important
The start of the selected date range is inclusive, whereas the end of the selected date range is exclusive.
To prevent downstream processing, select Ingest only.
Warning
When a data source is changed, and then loaded using the Ingest only option, downstream processes are not started automatically. Data that contains PII must be stitched. Databases that contain interaction records must be regenerated so that attributes and predictions are recalculated.
Click Run.
Skip missing files¶
A courier can be configured to skip sources files that are missing and continue running.
To skip missing files
From the Sources page, open the menu for a courier, and then select Run. The Run Courier page opens.
Select Load data from a specific day or Load data from a specific time period.
Under Load options, select Skip missing files.
Click Run.
With empty load operation¶
You can run a courier with an empty load operation using {}
as the value for the load operation. Use this approach to get files to upload during feed creation, as a feed requires knowing the schema of a file before you can apply semantic tagging and other feed configuration settings.
View error log¶
If a courier runs and returns an error, you may view the errors from that feed.
To view errors
From the Notifications pane, for the stage error, open the View Load Details link.
From the View Load Details pane, select View Error Log for the feed with errors.
Investigate the errors reported.
Restart job¶
If a courier runs and returns an error, you may view the error, resolve that error by updating the feed configuration or Spark SQL query, and then restart it without having to reload the data associated with the job.
To restart a job
From the Notifications pane, for the stage error, open the View Load Details link and investigate why the job failed.
Edit the feed configuration or Spark SQL query to address the reasons for the error.
From the Notifications pane, click Restart Ingest Job.
View courier¶
The Sources page shows the status of every courier, including when it last ran or updated, and its current status.
To view a courier
From the Sources page, open the menu for a courier, and then select View. The View Courier page opens.