Ingest queries

An ingest query is a SQL statement that may be applied to data prior to loading it to a domain table. An ingest query is defined using Spark SQL syntax.

Spark SQL

Spark SQL is a high performance SQL query engine that is used by Amperity to ingest data, create domain tables, and extend the outcome of the Stitch process in your customer 360 database.

Use Spark SQL to define all SQL queries related to the following areas of Amperity:

  • Ingesting data, including ingest queries

  • Processing data into domain tables

  • Building custom domain tables

  • Loading data into Stitch

  • Running Stitch

  • Loading the results of Stitch into the customer 360 database

  • Defining tables in the customer 360 database

Note

Spark SQL is used to define all SQL queries related to the Stitch process up to (and including) building the tables in the customer 360 database. Presto SQL is used to define SQL queries for segments. Why both?

  • Spark SQL performs better in more traditional processes like machine learning and ETL-like processes that are resource intensive.

  • Presto SQL performs better when running real-time queries against cloud datasets.

Courier load operations

An ingest query can be used to transform data prior to loading it to Amperity. For example, to remove errant commas, removing extra line breaks, flattening hierarchical data structures, joining data, or resolving other formatting issues. An ingest query is written in Spark SQL and should be tested prior to running it in Amperity. An ingest query must be added to a courier as a load operation.

The structure for an ingest query load operation is similar to:

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
      {
        "file": "FILE_NAME",
        "options": {
          "delimiter": "\,",
          "escape": "\\",
          "multiline": "true",
          "quote": "\""
        }
      }
    ],
    "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

where:

  • file is set to the name of the file (FILE_NAME) against which the ingest query will run.

  • FEED_ID is set to the ID for the feed for which the ingest query will run.

  • delimiter specifies the delimiter to use with CSV, TSV, and PSV files. (These default to “,”, “t”, and “|”. respectively if not otherwise specified.)

  • escape defines the character used for escaping quotes inside an already quoted value in the source file. The default escape character in Spark is the backslash ( ).

  • multiline is an optional parameter that indicates the presence of records that cross multiple lines. Use only for CSV, TSV, or PSV file types and set to “true” only if that file has fields with newline characters in them.

    Caution

    multiline only supports files that contain Unix line endings (n). When multiline is enabled for files that contain DOS line endings (rn) the trailing r character remains in the file, which may cause an ingest failure.

    Use an ingest query to remove multiline elements, as necessary. For example:

    REGEXP_REPLACE(message_text,"\n","") AS message_text
    
    REGEXP_REPLACE(message_text,"\r\n","") AS message_text
    
    REGEXP_REPLACE(message_text,"\r","") AS message_text
    
  • quote defines the character used for escaping quoted values where the separator can be part of the value in the source file. The default character is the double quote ( ” ).

  • INGEST_QUERY_NAME is set to the name of the ingest query.

Examples

The following file formats can be loaded to Amperity using ingest queries:

Apache Avro

Apache Avro is a row-oriented remote procedure call and data serialization framework developed within the Apache Hadoop ecosystem. Avro uses JSON to define data types and protocols, and serializes data in a compact binary format.

Use Spark SQL to define an ingest query for the Apache Avro file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME"
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

Apache Parquet

Apache Parquet is a free and open-source column-oriented data storage format developed within the Apache Hadoop ecosystem. It is similar to RCFile and ORC, but provides more efficient data compression and encoding schemes with enhanced performance and can better handle large amounts of complex bulk data.

Use Spark SQL to define an ingest query for the Apache Parquet file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME"
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

CBOR

CBOR is a binary data serialization format loosely based on JSON. Like JSON it allows the transmission of data objects that contain name–value pairs, but in a more concise manner. This increases processing and transfer speeds at the cost of human-readability.

Use Spark SQL to define an ingest query for the CBOR file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME",
          "options": {
            "rowTag": "row"
          },
          "schema": {
            "fields": [
              {
                "metadata": {},
                "name": "field-1",
                "type": "string",
                "nullable": true
              },
              ...
              {
                "metadata": {},
                "name": "nested-group-1",
                "type": {
                  "fields": [
                    {
                      "metadata": {},
                      "name": "field-a",
                      "type": "string",
                      "nullable": true
                    },
                    {
                      "metadata": {},
                      "name": "nested-group-a",
                      "type": {
                        "fields": [
                          ...
                        ],
                        "type": "struct"
                      },
                      "nullable": true
                    },
                    {
                      "metadata": {},
                      "name": "field-xyz",
                      "type": "string",
                      "nullable": true
                    },
                  ],
                  "type": "struct"
                }
                "type": "struct"
              }
              ...
            }
            ...
          ]
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

CSV

A comma-separated values (CSV) file, defined by RFC 4180 , is a delimited text file that uses a comma to separate values. A CSV file stores tabular data (numbers and text) in plain text. Each line of the file is a data record. Each record consists of one or more fields, separated by commas. The use of the comma as a field separator is the source of the name for this file format.

Use Spark SQL to define an ingest query for the CSV file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME",
          "options": {
            "delimiter": ",",
            "escape": "\\",
            "multiline": "true",
            "quote": "\""
          }
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

Caution

Spark does not correctly implement RFC 4180 for escape characters in CSV files. The most common implementations of CSV files expect a double quote " as an escape character while Spark uses a backslash \. For more information about this issue view the SPARK-22236 issue within the Spark project.

You can override this behavior when working with RFC-compliant CSV files by specifying an escape character in the courier load operations using ' or " as the escape character.

For example:

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
      {
        "file": "FILE_NAME",
        "options": {
          "escape": "'"
        }
      }
    ],
    "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

If a CSV file uses \ as the delimiter, configure the load operation to specify an empty delimiter value, after which Spark will automatically apply the \ character as the delimiter.

For example:

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
      {
        "file": "FILE_NAME",
        "options": {
          "delimiter": ""
        }
      }
    ],
    "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

JSON

JavaScript Object Notation (JSON) is language-independent data format that is derived from (and structured similar to) JavaScript.

Use Spark SQL to define an ingest query for the JSON file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME"
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

NDJSON

Newline-delimited JSON (NDJSON) is a data format for structured data that defines the structure of JSON data using lines as separators. Each line in a NDJSON file is a valid JSON value.

Use Spark SQL to define an ingest query for the NDJSON file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME"
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

PSV

A pipe-separated values (PSV) file is a delimited text file that uses a pipe to separate values. A PSV file stores tabular data (numbers and text) in plain text. Each line of the file is a data record. Each record consists of one or more fields, separated by pipes. The use of the pipe as a field separator is the source of the name for this file format.

Use Spark SQL to define an ingest query for the PSV file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME"
          "options": {
            "delimiter": "\|",
            "escape": "\\",
            "multiline": "true",
            "quote": "\""
          }
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

Streaming JSON

Streaming JSON is a way to send increments of data using NDJSON formatting within each increment. Each line in a NDJSON file is a valid JSON value.

Use Spark SQL to define an ingest query for the Streaming JSON file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME"
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

TSV

A tab-separated values (TSV) file is a delimited text file that uses a tab to separate values. A TSV file stores tabular data (numbers and text) in plain text. Each line of the file is a data record. Each record consists of one or more fields, separated by tabs. The use of the tab as a field separator is the source of the name for this file format.

Use Spark SQL to define an ingest query for the TSV file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME",
          "options": {
            "delimiter": "\t",
            "escape": "\\",
            "multiline": "true",
            "quote": "\""
          }
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

XML

eXtensible Markup Language (XML) is supported data format for customer data sources.

Use Spark SQL to define an ingest query for the XML file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.

{
  "FEED_ID": [
    {
      "type": "spark-sql",
      "spark-sql-files": [
        {
          "file": "FILE_NAME",
          "options": {
            "rowTag": "row"
          }
        }
      ],
      "spark-sql-query": "INGEST_QUERY_NAME"
    }
  ]
}

How-tos

This section describes tasks related to managing ingest queries in Amperity:

Add ingest query

An ingest query allows transformations to be done against customer data prior to adding it to a domain table, such as field-level cleaning, row-level filtering, and joins between files. An ingest query is authored directly from the Sources page using an editor that supports Spark SQL syntax.

To add an ingest query

  1. From the Sources page, click Add SQL Query. This opens the Add SQL Query dialog box.

  2. Enter the name of the SQL query. This name must be unique.

  3. Enter a description for the SQL query.

  4. Under SQL Query define a SQL query using Spark SQL syntax. For example:

    end-before

  5. Click Save.

Add to courier as load operation

An ingest query can be used to transform data prior to loading it to Amperity. For example, to remove errant commas, removing extra line breaks, flattening hierarchical data structures, joining data, or resolving other formatting issues. An ingest query is written in Spark SQL and should be tested prior to running it in Amperity. An ingest query must be added to a courier as a load operation.

To add an ingest query to courier load operations

  1. From the Sources page, click Add Courier. The Add Courier dialog box opens.

  2. Complete the steps for naming the courier, choosing a plugin, and applying all necessary credentials and configuration settings.

  3. Under Settings, under Load Operations, add a load operation:

    The structure for an ingest query load operation is similar to:

    {
      "FEED_ID": [
        {
          "type": "spark-sql",
          "spark-sql-files": [
          {
            "file": "FILE_NAME",
            "options": {
              "delimiter": "\,",
              "escape": "\\",
              "multiline": "true",
              "quote": "\""
            }
          }
        ],
        "spark-sql-query": "INGEST_QUERY_NAME"
        }
      ]
    }
    

    where:

    • file is set to the name of the file (FILE_NAME) against which the ingest query will run.

    • FEED_ID is set to the ID for the feed for which the ingest query will run.

    • delimiter specifies the delimiter to use with CSV, TSV, and PSV files. (These default to ,, \t, and |. respectively if not otherwise specified.)

    • escape defines the character used for escaping quotes inside an already quoted value in the source file. The default escape character in Spark is the backslash (\).

    • multiline indicates the presence of records that cross multiple lines. Use only for CSV, TSV, or PSV file types and set to true only if that file has fields with newline characters in them.

      Note

      Use an ingest query to remove the multiline elements. For example:

      REGEXP_REPLACE(message_text,"\n","") AS message_text
      
    • quote defines the character used for escaping quoted values where the separator can be part of the value in the source file. The default character is the double quote (").

    • INGEST_QUERY_NAME is set to the name of the ingest query.

  4. Click Save.

Delete ingest query

You can delete an ingest query. Verify that both upstream and downstream processes no longer depend on this ingest query prior to deleting it.

To delete an ingest query

  1. From the Sources page, open the menu for an ingest query, and then select Delete. The Delete Query dialog box opens.

  2. Click Delete.

Edit ingest query

You can edit an ingest query.

To edit an ingest query

  1. From the Sources page, open the menu for an ingest query, and then select Edit. The Edit SQL Query dialog box opens.

  2. Make your changes.

  3. Click Save.

Example ingest queries

The following sections show examples of ingest queries:

Build birthdate

If incoming data contains birthdate data split by day, month, and year, you can build a complete birthdate using an ingest query. For example, incoming data has the following fields:

----- ------- ------
 day   month  year
----- ------- ------
 08    12     1969
 11    25     1978
 09    15     1981
----- ------- ------

The following example uses the IF() function to concatenate three fields together using a forward slash ( / ) as a separator:

SELECT
  *
  ,IF(birth_month != '0' AND birth_day != '0' AND  birth_year != '0',
      birth_month||'/'||birth_day||'/'||birth_year, NULL) AS birthdate
FROM Source_Table

Concatenate columns

The following example concatenates columns into a single field, where “ACME” represents a hardcoded value present in the filename.

SELECT
  'ACME' AS Brand_Name
  ,CONCAT_WS('-',
             ,'ACME'
             ,card_number) AS Loyalty_ID
  ,CONCAT_WS(' '
             ,residential_unit_number
             ,residential_street_number
             ,residential_street) AS residential_address
  ,*
FROM Loyalty_ACME
CLUSTER BY card_number

Explode transactions

Note

This example uses an example XML file as the data source for sales transactions.

Use the EXPLODE() function to process sales transaction data into a table using an ingest query similar to:

WITH explodedData AS (
  SELECT
    salesTransactionId
    ,EXPLODE(salesOrder.tenders.tender) AS tender FROM PosXml
)

SELECT
  salesTransactionId
  ,tender.type AS type
  ,tender.amount AS amount
FROM
  explodedData

Find records

The following example finds records in an incoming table that have been marked as “DELETE”, and then creates a table to stores a list of records to be deleted.

SELECT
  event.uniquesourceid AS uniqueSourceID
  ,current_date AS deleted_date
FROM icon
WHERE event.type='DELETE'

Flatten with identifiers

Note

This example uses an example XML file as the data source for sales transactions.

Use identifiers and aliases to flatten nested XML data with an ingest query, similar to:

SELECT
  salesTransactionId AS id
  ,type
  ,dateTime AS salesDateTime
  ,salesOrder.salesOrderId AS salesOrderId
  ,salesOrder.channelType AS channelType
  ,salesOrder.orderSummary.totalAmount AS totalAmount
FROM PosXml

returns a table similar to:

----- ------ ---------------------- -------------- ------------- -------------
 id    type   salesDateTime          salesOrderId   channelType   totalAmount
----- ------ ---------------------- -------------- ------------- -------------
 ABC   Add    2020-11-15T04:54:34Z   A1zyBCxwvDu    Cafe          120
 DEF   Add    2020-11-15T04:55:25Z   B1yxCDwvuEt    Cafe          14
 GHI   Add    2020-11-15T04:57:12Z   C1xwDEvutFs    Cafe          27
----- ------ ---------------------- -------------- ------------- -------------

Import billing as address

The following query imports billing addresses into the address table.

SELECT *
FROM Customer_AcmeAddress
WHERE isBilling = 'true'
CLUSTER BY address_id

JOIN datasets

Note

This example uses an example XML file as the data source for sales transactions.

Use a JOIN operation to join two datasets into a rectangular dataset, similar to:

SELECT
  M.*
  ,L.status
FROM main M
JOIN lookup L ON M.statusId=L.statusId

Load XML data

Note

This example uses an example XML file as the data source for sales transactions.

Use the EXPLODE() function to process sales transaction data into a table using an ingest query similar to:

WITH explodedData AS (
  SELECT
    salesTransactionId
    ,EXPLODE(salesOrder.tenders.tender) AS tender FROM PosXml
)

SELECT
  salesTransactionId
  ,tender.type AS type
  ,tender.amount AS amount
FROM
  explodedData

Parse fields from files

The following example shows an ingest query that parses fields from a DAT file. Each field (fields 1-6) has a starting point within the DAT file (1, 21, 52, 63, 69, 70) and a length (20, 30, 10, 15, 1, 140). Use an ordinal ( _c0 ) to define each source field within the DAT file.

SELECT
  ,NULLIF(TRIM(SUBSTR(`_c0`,1,20)),'') AS Field1
  ,NULLIF(TRIM(SUBSTR(`_c0`,21,30)),'') AS Field2
  ,NULLIF(TRIM(SUBSTR(`_c0`,52,10)),'') AS Field3
  ,NULLIF(TRIM(SUBSTR(`_c0`,63,15)),'') AS Field4
  ,NULLIF(TRIM(SUBSTR(`_c0`,69,1)),'') AS Field5
  ,NULLIF(TRIM(SUBSTR(`_c0`,70,140)),'') AS Field6
FROM DAT_FILE_NAME

Parse nested records

Occasionally, a CSV file will contain nested records. For example, a field within the CSV file contains JSON data similar to:

[{"street": "123 Main Street", "apt": "101", "city": "Seattle", "state": "WA", "zip": "98101"}]

which is JSON data in a nested format:

[
  {
    "street": "123 Main Street",
    "apt": "101",
    "city": "Seattle",
    "state": "WA",
    "zip": "98101"
  }
]

There are two general approaches to take when a CSV file contains nested records:

  1. Recommended. The customer should update the process for how the data is provided to Amperity to ensure the file does not contain nested records.

  2. If the file cannot be provided without nested records, use an ingest query to flatten the data in these fields prior to loading it to Amperity.

    Note

    This will increase the preprocessing effort required by Amperity. Large datasets will take more time and this approach should be avoided with very large datasets and should be used carefully with datasets that will be processed on a daily basis.

For example:

WITH explodedData AS (
  SELECT
    table_id
    ,EXPLODE(combined_address) AS address FROM Source
)

SELECT
  table_id
  ,address.street AS address
  ,address.apt AS address2
  ,address.city AS city
  ,address.state AS state
  ,address.zip AS postal
FROM Source

Remove field

Some data sources contain fields that should not be loaded to Amperity. The following table contains a field named _update that you want to remove before loading the table to Amperity:

----- ------ ---------------------- ---------
 id    type   datetime               _update
----- ------ ---------------------- ---------
 ABC   Add    2020-11-15T04:54:34Z   1
 DEF   Add    2020-11-15T04:55:25Z   0
 GHI   Add    2020-11-15T04:57:12Z   0
----- ------ ---------------------- ---------

To remove the _update field, use an ingest query similar to:

SELECT
  id
  ,type
  ,datetime
FROM
  table

Select all fields

In rare cases a file cannot be loaded using a feed and also requires no transforms. Use an ingest query to SELECT all of the fields:

SELECT
  *
FROM
  table