Ingest queries¶
An ingest query is a SQL statement that may be applied to data prior to loading it to a domain table. An ingest query is defined using Spark SQL syntax.
Spark SQL¶
Spark SQL is a high performance SQL query engine that is used by Amperity to ingest data, create domain tables, and extend the outcome of the Stitch process in your customer 360 database.
Use Spark SQL to define all SQL queries related to the following areas of Amperity:
Ingesting data, including ingest queries
Processing data into domain tables
Building custom domain tables
Loading data into Stitch
Running Stitch
Loading the results of Stitch into the customer 360 database
Defining tables in the customer 360 database
Note
Spark SQL is used to define all SQL queries related to the Stitch process up to (and including) building the tables in the customer 360 database. Presto SQL is used to define SQL queries for segments. Why both?
Spark SQL performs better in more traditional processes like machine learning and ETL-like processes that are resource intensive.
Presto SQL performs better when running real-time queries against cloud datasets.
Courier load operations¶
An ingest query can be used to transform data prior to loading it to Amperity. For example, to remove errant commas, removing extra line breaks, flattening hierarchical data structures, joining data, or resolving other formatting issues. An ingest query is written in Spark SQL and should be tested prior to running it in Amperity. An ingest query must be added to a courier as a load operation.
The structure for an ingest query load operation is similar to:
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME",
"options": {
"delimiter": "\,",
"escape": "\\",
"multiline": "true",
"quote": "\""
}
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
where:
file is set to the name of the file (FILE_NAME) against which the ingest query will run.
FEED_ID is set to the ID for the feed for which the ingest query will run.
delimiter specifies the delimiter to use with CSV, TSV, and PSV files. (These default to “,”, “t”, and “|”. respectively if not otherwise specified.)
escape defines the character used for escaping quotes inside an already quoted value in the source file. The default escape character in Spark is the backslash (
\
).multiline is an optional parameter that indicates the presence of records that cross multiple lines. Use only for CSV, TSV, or PSV file types and set to “true” only if that file has fields with newline characters in them.
Caution
multiline only supports files that contain Unix line endings (n). When multiline is enabled for files that contain DOS line endings (rn) the trailing r character remains in the file, which may cause an ingest failure.
Use an ingest query to remove multiline elements, as necessary. For example:
REGEXP_REPLACE(message_text,"\n","") AS message_text REGEXP_REPLACE(message_text,"\r\n","") AS message_text REGEXP_REPLACE(message_text,"\r","") AS message_text
quote defines the character used for escaping quoted values where the separator can be part of the value in the source file. The default character is the double quote ( “ ).
INGEST_QUERY_NAME is set to the name of the ingest query.
Examples¶
The following file formats can be loaded to Amperity using ingest queries:
Apache Avro¶
Apache Avro is a row-oriented remote procedure call and data serialization framework developed within the Apache Hadoop ecosystem. Avro uses JSON to define data types and protocols, and serializes data in a compact binary format.
Use Spark SQL to define an ingest query for the Apache Avro file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME"
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
Apache Parquet¶
Apache Parquet is a free and open-source column-oriented data storage format developed within the Apache Hadoop ecosystem. It is similar to RCFile and ORC, but provides more efficient data compression and encoding schemes with enhanced performance and can better handle large amounts of complex bulk data.
Use Spark SQL to define an ingest query for the Apache Parquet file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME"
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
CBOR¶
CBOR is a binary data serialization format loosely based on JSON. Like JSON it allows the transmission of data objects that contain name–value pairs, but in a more concise manner. This increases processing and transfer speeds at the cost of human-readability.
Use Spark SQL to define an ingest query for the CBOR file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME",
"options": {
"rowTag": "row"
},
"schema": {
"fields": [
{
"metadata": {},
"name": "field-1",
"type": "string",
"nullable": true
},
...
{
"metadata": {},
"name": "nested-group-1",
"type": {
"fields": [
{
"metadata": {},
"name": "field-a",
"type": "string",
"nullable": true
},
{
"metadata": {},
"name": "nested-group-a",
"type": {
"fields": [
...
],
"type": "struct"
},
"nullable": true
},
{
"metadata": {},
"name": "field-xyz",
"type": "string",
"nullable": true
},
],
"type": "struct"
}
"type": "struct"
}
...
}
...
]
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
CSV¶
A comma-separated values (CSV) file, defined by RFC 4180 , is a delimited text file that uses a comma to separate values. A CSV file stores tabular data (numbers and text) in plain text. Each line of the file is a data record. Each record consists of one or more fields, separated by commas. The use of the comma as a field separator is the source of the name for this file format.
Use Spark SQL to define an ingest query for the CSV file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME",
"options": {
"delimiter": ",",
"escape": "\\",
"multiline": "true",
"quote": "\""
}
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
Caution
Spark does not correctly implement RFC 4180 for escape characters in CSV files. The most common implementations of CSV files expect a double quote "
as an escape character while Spark uses a backslash \
. For more information about this issue view the SPARK-22236 issue within the Spark project.
You can override this behavior when working with RFC-compliant CSV files by specifying an escape character in the courier load operations using '
or "
as the escape character.
For example:
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME",
"options": {
"escape": "'"
}
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
If a CSV file uses \
as the delimiter, configure the load operation to specify an empty delimiter value, after which Spark will automatically apply the \
character as the delimiter.
For example:
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME",
"options": {
"delimiter": ""
}
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
JSON¶
JavaScript Object Notation (JSON) is language-independent data format that is derived from (and structured similar to) JavaScript.
Use Spark SQL to define an ingest query for the JSON file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME"
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
NDJSON¶
Newline-delimited JSON (NDJSON) is a data format for structured data that defines the structure of JSON data using lines as separators. Each line in a NDJSON file is a valid JSON value.
Use Spark SQL to define an ingest query for the NDJSON file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME"
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
PSV¶
A pipe-separated values (PSV) file is a delimited text file that uses a pipe to separate values. A PSV file stores tabular data (numbers and text) in plain text. Each line of the file is a data record. Each record consists of one or more fields, separated by pipes. The use of the pipe as a field separator is the source of the name for this file format.
Use Spark SQL to define an ingest query for the PSV file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME"
"options": {
"delimiter": "\|",
"escape": "\\",
"multiline": "true",
"quote": "\""
}
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
Streaming JSON¶
Streaming JSON is a way to send increments of data using NDJSON formatting within each increment. Each line in a NDJSON file is a valid JSON value.
Use Spark SQL to define an ingest query for the Streaming JSON file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME"
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
TSV¶
A tab-separated values (TSV) file is a delimited text file that uses a tab to separate values. A TSV file stores tabular data (numbers and text) in plain text. Each line of the file is a data record. Each record consists of one or more fields, separated by tabs. The use of the tab as a field separator is the source of the name for this file format.
Use Spark SQL to define an ingest query for the TSV file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME",
"options": {
"delimiter": "\t",
"escape": "\\",
"multiline": "true",
"quote": "\""
}
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
XML¶
eXtensible Markup Language (XML) is supported data format for customer data sources.
Use Spark SQL to define an ingest query for the XML file. Use a SELECT statement to specify which fields should be pulled to Amperity. Apply transforms to those fields as necessary.
{
"FEED_ID": [
{
"type": "spark-sql",
"spark-sql-files": [
{
"file": "FILE_NAME",
"options": {
"rowTag": "row"
}
}
],
"spark-sql-query": "INGEST_QUERY_NAME"
}
]
}
How-tos¶
This section describes tasks related to managing ingest queries in Amperity:
Add ingest query¶
An ingest query allows transformations to be done against customer data prior to adding it to a domain table, such as field-level cleaning, row-level filtering, and joins between files. An ingest query is authored directly from the Sources page using an editor that supports Spark SQL syntax.
To add an ingest query
From the Sources page, click Add SQL Query. This opens the Add SQL Query dialog box.
Enter the name of the SQL query. This name must be unique.
Enter a description for the SQL query.
Under SQL Query define a SQL query using Spark SQL syntax. For example:
- end-before:
Click Save.
Add to courier as load operation¶
An ingest query can be used to transform data prior to loading it to Amperity. For example, to remove errant commas, removing extra line breaks, flattening hierarchical data structures, joining data, or resolving other formatting issues. An ingest query is written in Spark SQL and should be tested prior to running it in Amperity. An ingest query must be added to a courier as a load operation.
To add an ingest query to courier load operations
From the Sources page, click Add Courier. The Add Courier dialog box opens.
Complete the steps for naming the courier, choosing a plugin, and applying all necessary credentials and configuration settings.
Under Settings, under Load Operations, add a load operation:
The structure for an ingest query load operation is similar to:
{ "FEED_ID": [ { "type": "spark-sql", "spark-sql-files": [ { "file": "FILE_NAME", "options": { "delimiter": "\,", "escape": "\\", "multiline": "true", "quote": "\"" } } ], "spark-sql-query": "INGEST_QUERY_NAME" } ] }
where:
file
is set to the name of the file (FILE_NAME
) against which the ingest query will run.FEED_ID
is set to the ID for the feed for which the ingest query will run.delimiter
specifies the delimiter to use with CSV, TSV, and PSV files. (These default to,
,\t
, and|
. respectively if not otherwise specified.)escape
defines the character used for escaping quotes inside an already quoted value in the source file. The default escape character in Spark is the backslash (\
).multiline
indicates the presence of records that cross multiple lines. Use only for CSV, TSV, or PSV file types and set totrue
only if that file has fields with newline characters in them.Note
Use an ingest query to remove the multiline elements. For example:
REGEXP_REPLACE(message_text,"\n","") AS message_text
quote
defines the character used for escaping quoted values where the separator can be part of the value in the source file. The default character is the double quote ("
).INGEST_QUERY_NAME
is set to the name of the ingest query.
Click Save.
Delete ingest query¶
You can delete an ingest query. Verify that both upstream and downstream processes no longer depend on this ingest query prior to deleting it.
To delete an ingest query
From the Sources page, open the menu for an ingest query, and then select Delete. The Delete Query dialog box opens.
Click Delete.
Edit ingest query¶
You can edit an ingest query.
To edit an ingest query
From the Sources page, open the menu for an ingest query, and then select Edit. The Edit SQL Query dialog box opens.
Make your changes.
Click Save.
Example ingest queries¶
The following sections show examples of ingest queries:
Build birthdate¶
If incoming data contains birthdate data split by day, month, and year, you can build a complete birthdate using an ingest query. For example, incoming data has the following fields:
----- ------- ------
day month year
----- ------- ------
08 12 1969
11 25 1978
09 15 1981
----- ------- ------
The following example uses the IF() function to concatenate three fields together using a forward slash ( / ) as a separator:
SELECT
*
,IF(birth_month != '0' AND birth_day != '0' AND birth_year != '0',
birth_month||'/'||birth_day||'/'||birth_year, NULL) AS birthdate
FROM Source_Table
Concatenate columns¶
The following example concatenates columns into a single field, where “ACME” represents a hardcoded value present in the filename.
SELECT
'ACME' AS Brand_Name
,CONCAT_WS('-',
,'ACME'
,card_number) AS Loyalty_ID
,CONCAT_WS(' '
,residential_unit_number
,residential_street_number
,residential_street) AS residential_address
,*
FROM Loyalty_ACME
CLUSTER BY card_number
Explode transactions¶
Note
This example uses an example XML file as the data source for sales transactions.
Use the EXPLODE() function to process sales transaction data into a table using an ingest query similar to:
WITH explodedData AS (
SELECT
salesTransactionId
,EXPLODE(salesOrder.tenders.tender) AS tender FROM PosXml
)
SELECT
salesTransactionId
,tender.type AS type
,tender.amount AS amount
FROM
explodedData
Find records¶
The following example finds records in an incoming table that have been marked as “DELETE”, and then creates a table to stores a list of records to be deleted.
SELECT
event.uniquesourceid AS uniqueSourceID
,current_date AS deleted_date
FROM icon
WHERE event.type='DELETE'
Flatten with identifiers¶
Note
This example uses an example XML file as the data source for sales transactions.
Use identifiers and aliases to flatten nested XML data with an ingest query, similar to:
SELECT
salesTransactionId AS id
,type
,dateTime AS salesDateTime
,salesOrder.salesOrderId AS salesOrderId
,salesOrder.channelType AS channelType
,salesOrder.orderSummary.totalAmount AS totalAmount
FROM PosXml
returns a table similar to:
----- ------ ---------------------- -------------- ------------- -------------
id type salesDateTime salesOrderId channelType totalAmount
----- ------ ---------------------- -------------- ------------- -------------
ABC Add 2020-11-15T04:54:34Z A1zyBCxwvDu Cafe 120
DEF Add 2020-11-15T04:55:25Z B1yxCDwvuEt Cafe 14
GHI Add 2020-11-15T04:57:12Z C1xwDEvutFs Cafe 27
----- ------ ---------------------- -------------- ------------- -------------
Import billing as address¶
The following query imports billing addresses into the address table.
SELECT *
FROM Customer_AcmeAddress
WHERE isBilling = 'true'
CLUSTER BY address_id
JOIN datasets¶
Note
This example uses an example XML file as the data source for sales transactions.
Use a JOIN operation to join two datasets into a rectangular dataset, similar to:
SELECT
M.*
,L.status
FROM main M
JOIN lookup L ON M.statusId=L.statusId
Load XML data¶
Note
This example uses an example XML file as the data source for sales transactions.
Use the EXPLODE() function to process sales transaction data into a table using an ingest query similar to:
WITH explodedData AS (
SELECT
salesTransactionId
,EXPLODE(salesOrder.tenders.tender) AS tender FROM PosXml
)
SELECT
salesTransactionId
,tender.type AS type
,tender.amount AS amount
FROM
explodedData
Parse fields from files¶
The following example shows an ingest query that parses fields from a DAT file. Each field (fields 1-6) has a starting point within the DAT file (1, 21, 52, 63, 69, 70) and a length (20, 30, 10, 15, 1, 140). Use an ordinal ( _c0 ) to define each source field within the DAT file.
SELECT
,NULLIF(TRIM(SUBSTR(`_c0`,1,20)),'') AS Field1
,NULLIF(TRIM(SUBSTR(`_c0`,21,30)),'') AS Field2
,NULLIF(TRIM(SUBSTR(`_c0`,52,10)),'') AS Field3
,NULLIF(TRIM(SUBSTR(`_c0`,63,15)),'') AS Field4
,NULLIF(TRIM(SUBSTR(`_c0`,69,1)),'') AS Field5
,NULLIF(TRIM(SUBSTR(`_c0`,70,140)),'') AS Field6
FROM DAT_FILE_NAME
Parse nested records¶
Occasionally, a CSV file will contain nested records. For example, a field within the CSV file contains JSON data similar to:
[{"street": "123 Main Street", "apt": "101", "city": "Seattle", "state": "WA", "zip": "98101"}]
which is JSON data in a nested format:
[
{
"street": "123 Main Street",
"apt": "101",
"city": "Seattle",
"state": "WA",
"zip": "98101"
}
]
There are two general approaches to take when a CSV file contains nested records:
Recommended. The customer should update the process for how the data is provided to Amperity to ensure the file does not contain nested records.
If the file cannot be provided without nested records, use an ingest query to flatten the data in these fields prior to loading it to Amperity.
Note
This will increase the preprocessing effort required by Amperity. Large datasets will take more time and this approach should be avoided with very large datasets and should be used carefully with datasets that will be processed on a daily basis.
For example:
WITH explodedData AS (
SELECT
table_id
,EXPLODE(combined_address) AS address FROM Source
)
SELECT
table_id
,address.street AS address
,address.apt AS address2
,address.city AS city
,address.state AS state
,address.zip AS postal
FROM Source
Remove field¶
Some data sources contain fields that should not be loaded to Amperity. The following table contains a field named _update that you want to remove before loading the table to Amperity:
----- ------ ---------------------- ---------
id type datetime _update
----- ------ ---------------------- ---------
ABC Add 2020-11-15T04:54:34Z 1
DEF Add 2020-11-15T04:55:25Z 0
GHI Add 2020-11-15T04:57:12Z 0
----- ------ ---------------------- ---------
To remove the _update field, use an ingest query similar to:
SELECT
id
,type
,datetime
FROM
table
Select all fields¶
In rare cases a file cannot be loaded using a feed and also requires no transforms. Use an ingest query to SELECT all of the fields:
SELECT
*
FROM
table