Real-time tables

Real-time tables contain data that is streamed to Amperity. Real-time tables are available alongside customer profiles in your brand’s customer 360 database. Real-time tables complement the daily batch process that is used to build and maintain complete and durable customer profiles over longer timeframes by enabling use cases that require the use of time-sensitive data that is updated more frequently.

How real-time tables work

Amperity uses a batch layer to build and maintain customer profiles that are durable and complete over longer timeframes. The batch layer is typically run (and refreshed) on a daily basis.

Amperity uses a streaming layer to support use cases that require data that is updated or refreshed more than once per day.

These layers are shown in the following diagram:

Batch and streaming layers.

The batch layer builds and maintains customer profiles that are durable and complete over long timeframes. The batch layer uses semantic tagging to standardize data into a collection of standard output tables for customer profiles, transactions, loyalty programs, product catalogs, and other types of data that your brand has made available to Amperity.

The streaming layer makes data available for use alongside those profiles to support use cases that cannot wait for the batch layer to finish maintaining customer profiles. To support real-time use cases, configure a real-time table for each stream, and then use a query to build a profile API endpoint that can be accessed programmatically by downstream workflows.

Note

The batch layer may also contain data that has arrived through the streaming layer. To support adding streamed data to your brand’s customer profiles, configure a courier to pull the streamed data into the workflow.

Architectural details

Amperity uses the following components to enable real-time tables: Apache Kafka, Apache Spark Structured Streaming, and Delta Tables.

Architecture for batch and streaming layers.
  1. Apache Kafka reliably handles real-time events and acts as the queuing and persistence layer for real-time tables.

  2. Apache Spark Structured Streaming moves data from Apache Kafka queues to the Amperity platform in real-time using fault-tolerant, low-latency stream processing that handles real-time data with precision.

  3. Real-time tables are stored as Delta Lake tables and expose real-time data to the Amperity Activation layer.

Real-time table limitations

Real-time tables have the following limitations:

  1. Streamed data takes a few minutes to appear in a real-time table.

  2. Data that is available from a Profile API endpoint is current as of the most recent index refresh.

  3. The Streaming Ingest API is the only supported data source for real-time tables.

  4. Data is loaded to real-time tables as an upsert; data may not be deleted from a real-time table.

  5. Real-time table schemas cannot be updated. Create a new real-time table, and then connect that table to the stream that contains the updated schema. (This may be the same Streaming Ingest API endpoint.)

Enable real-time workflows

Real-time workflows are designed to make data available to real-time use cases. Real-time workflows collect data that is streamed to Amperity, and then made available as tables in a database, from which you can use queries against real-time tables to make specific sets of data available to downstream real-time use cases.

To enable a real-time workflow:

  1. Add streaming endpoint

  2. Configure real-time table

  3. Create API key and access token

  4. Configure the POST request

  5. Run each database for which the streaming endpoint will be included

  6. Start streaming data to the streaming endpoint

  7. Build a query against the real-time table, and then verify data is returned

  8. Make data available to real-time use cases

Add streaming endpoint

Streaming endpoints are managed from the Sources page.

Note

Each streaming endpoint requires an API key and access token. Use the API key and access token within the POST request that is made by the upstream programmatic workflow.

To configure a streaming endpoint

Step 1.

Open the Sources page.

Step 2.

Under Streaming Ingest click Add stream.

Add a Streaming Ingest API endpoint.

Give the stream a name and description, and then click Save. This will return you to the Sources page.

Important

Be sure the name clearly indicates how the stream is to be used within Amperity by your brand.

Add a name and description for the Streaming Ingest API endpoint.
Step 3.

For the stream that was just created, open the    menu and click Copy URL.

The URL for the stream is similar to:

https://app.amperity.com/stream/v0/data/is-2hzqsgX1E

Click Copy ID.

The ID for the stream is located at the end of the URL and is similar to:

is-2hzqsgX1E

Save these two values. You will need them to configure the POST request to the streaming endpoint and (optionally) to configure streamed data to be part of the daily batched workflow.

The ID for the Streaming Ingest API endpoint is also available from the Stream ID column:

Get the ID for the Streaming Ingest API endpoint.
Step 4.

To add streamed data to the daily batch processing workflow, use a courier, and then use the stream ID to identify which stream’s data will be added to the daily workflow.

Configure real-time table

A real-time table collects data that is streamed to Amperity, and then makes that streamed data available to databases and queries.

To configure a real-time table

Step 1.

Open the Customer 360 page, and then select the Real-time tables tab.

Click Add table.

Step 2.

In the Create real-time table dialog, do the following:

Give the real-time table a name. Use a naming convention that associates the real-time table with its related streaming endpoint, and then identifies the type of data in the real-time table and/or the use case.

Choose the data format for streaming data to the real-time table: “JSON” or “XML”. If “XML” is selected a Row tag must be specified, which must identify a single row of XML data.

Define the schema for the real-time table. Each field in the schema must exist in the fields that are streamed to Amperity by the streaming source for this real-time table. The field names in the real-time table must match the fields that are defined for the streamed endpoint. If you have an existing feed configured for streaming purposes, you may refer to the feed for schema details.

Note

If your data has complex types, such as nested JSON, choose string as the type. This will allow the real-time table to process the complex object and make it available for querying purposes.

Batch and streaming layers.

Note

The schema for every real-time table will contain two additional fields at the query layer: received_at (the time at which data arrived at the streaming endpoint) and written_at (the time at which data was written to the real-time table). Use these fields to support filtering for recent data to join with data in batch layer database tables.

Click Next.

Step 2.

In the Create real-time table dialog, continue:

Select the streaming endpoint that will stream data to the real-time table. The streaming endpoint must already exist and be available from the drop-down list.

Select the databases for which this real-time table will be available from the Queries page.

Batch and streaming layers.

Click Active, then Save.

Important

Real-time tables must be set to Active before they can be accessed from the Queries page.

Step 3.

Run all of the databases for which this real-time table will be available. When this step is complete, open the Queries page and verify that the real-time table is available for queries and that data in the real-time table can be returned in the results.

Create API key and access token

Configure a Streaming Ingest API endpoint to collect data that will be streamed to Amperity, including creating an API key and access token. Each endpoint has a unique stream ID that is used to associate the stream to a real-time table.

Configure the POST request

Before you can stream data to Amperity using the Streaming Ingest API you must configure an API key and access token for that endpoint. Each endpoint is assigned a unique stream ID that is used to associate the stream to a real-time table.

The stream ID, API access key, and access token are required to ensure that your upstream workflow is sending data to the correct streaming endpoint (stream ID) and is authorized to stream data to Amperity (access token).

For example, a cURL request is similar to:

curl --location --request \
  POST 'https://<tenant>.amperity.com/stream/v0/data/<stream>' \
  --header 'x-amperity-tenant: <tenant>' \
  --header 'Content-Type: application/json' \
  --header 'Authorization: <Bearer token>' \
  --data-raw '{ \
      "order_id": "1234567890--2024-11-24web", \
      "email": "justinc@email.com", \
      "order_revenue": "79.99", \
      "order_datetime": "2024-11-24T04:40:00Z" \
    }'

where <stream> represents the unique stream ID, <token> represents the full Bearer token, and <tenant> represents the unique ID for your Amperity tenant.

The --data-raw section contains the list of fields and field values that are sent by the upstream system to the streaming endpoint. The schema that is sent to the streaming endpoint must match the schema that is defined for the real-time table.

Note

You may use any of the following cURL command line options to define the set of fields that are sent to the streaming endpoint: -d, -data-binary, and --data-raw.

Run database

Run each of the databases from which the real-time table will be available to queries. Use the Normal run option, which will refresh the database, add the real-time table, and make the real-time table available to accept data from the streaming endpoint.

Stream data to streaming endpoint

Configure the upstream workflow to use the POST request, and then stream data to Amperity from the upstream data source. If data is being received correctly by the streaming endpoint you will be able to see data in the real-time table about ~2 minutes after it has been accepted by the streaming endpoint.

Build query

After data is streaming to the real-time table you can build a query against it and validate that returns data that was streamed to the real-time table.

Open the Queries page. Real-time tables are shown in the list of tables in the lower right-side of the Query Editor.

You may build queries that reference real-time tables and other database tables in the same query.

To make a query that references real-time tables available to the segments and campaigns ensure that the query results return an Amperity ID. You may join the results to a table that already contains an Amperity ID.

For example, a query that returns data from a real-time table named Winback_Suppressions can be joined to the Customer 360 table:

SELECT
  c360.amperity_id
  ,wb.*
FROM
  winback_suppressions wb
  JOIN Customer360 c360 ON wb.email = c360.email

Make data available to real-time use cases

Data from real-time tables can be made available to any type of workflow in Amperity, depending on the needs for your use cases. Use queries to orchestrate results from Amperity to downstream workflows. Use the Profile API to build narrow indexes that support real-time use cases.

Example use cases

Real-time tables support many different types of use cases, including:

Redemption reminders

A retailer offers gifts to customers who belong to their loyalty program. Customers are sent the offer two weeks before their birthday with periodic reminders sent up to one week after their birthday.

Use real-time tables to capture birthday redemptions, and then use the real-time table to exclude customers who have redeemed their birthday gift from being sent additional reminders.

Suppress recent transactions

A winback campaign is an important part of a retailer’s marketing strategy. Customers who have not purchased within a specific date range are sent a series of offers in an attempt to get those customers back into an active state.

Use real-time tables to capture recent transactions, and then use the real-time table to exclude customers who have purchased recently from the winback campaign.

Add to batch workflow

To include data that is streamed to Amperity in your batch workflow, such as including customer profile updates or certain transaction details, you can use a courier to pull the data from the streaming layer to the batch layer using a similar series of steps for all data sources in the batch layer.

  1. Add courier

  2. Get sample file

  3. Add feed

  4. Add load operation

  5. Run courier and validate data is loaded to the domain table

  6. Add to courier group