Pull from Amazon Kinesis Data Firehose

Amazon Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to Amazon S3.

Amperity can pull data from Amazon Kinesis Data Firehose via Amazon S3. A common scenario: a Firehose delivery stream is configured to send streaming data to an Amazon S3 bucket, generating numerous files with a consistent datestamp pattern appended to a generated file name. Kinesis Data Firehose can be a source for any number of file types and formats. Couriers can be configured to pull NDJSON files.

This topic describes the steps that are required to pull streamed data to Amperity from Kinesis Data Firehose:

  1. Get details

  2. Configure Amazon Kinesis Data Firehose

  3. Add courier

  4. Get sample files

  5. Add feeds

  6. Add load operations

  7. Run courier

  8. Add to courier group

Get details

The Kinesis Data Firehose destination requires the following configuration details:

Detail one.

The name of the S3 bucket from which data will be pulled to Amperity, which is also the bucket to which the Firehose delivery stream will write data.

Detail two.

For cross-account role assumption you will need the value for the Target Role ARN, which enables Amperity to access the customer-managed Amazon S3 bucket.

Note

The values for the Amperity Role ARN and the External ID fields are provided automatically.

Review the following sample policy, and then add a similar policy to the customer-managed Amazon S3 bucket that allows Amperity access to the bucket. Add this policy as a trusted policy to the IAM role that is used to manage access to the customer-managed Amazon S3 bucket.

The policy for the customer-managed Amazon S3 bucket is unique, but will be similar to:

{
  "Statement": [
    {
      "Sid": "AllowAmperityAccess",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::account:role/resource"
       },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
           "sts:ExternalId": "01234567890123456789"
        }
      }
    }
  ]
}

The value for the role ARN is similar to:

arn:aws:iam::123456789012:role/prod/amperity-plugin

An external ID is an alphanumeric string between 2-1224 characters (without spaces) and may include the following symbols: plus (+), equal (=), comma (,), period (.), at (@), colon (:), forward slash (/), and hyphen (-).

Detail one.

A list of objects (by filename and file type) in the S3 bucket to be sent to Amperity and a sample for each file to simplify feed creation.

Review how to configure Kinesis Data Firehose.

Configure Kinesis Data Firehose

Amazon Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to Amazon S3.

You may configure any supported data producer to use Kinesis Data Firehose services to automatically send real-time streaming data to Kinesis Data Firehose, and then make that data available to Amperity. Amperity can be configured to pull the real-time data (in batches) from any Amazon S3 location. It is recommended to send this data to a customer-managed Amazon S3 bucket, and then configure Amperity to pull data from that bucket.

To configure Kinesis Data Firehose to send data to the Amperity S3 bucket

  1. Create a cross-account role IAM role in the customer’s Amazon AWS account.

    This role is required to grant Kinesis Data Firehose access to the Amazon S3 bucket that is part of the Amperity tenant. This role must have s3:PutObjectAcl configured as part of the list of allowed Kinesis Data Firehose actions.

  2. Configure the bucket policy in the Amperity S3 bucket to allow the IAM role access to the Amperity S3 bucket.

  3. Create a Kinesis Data Firehose delivery stream in the customer’s cloud infrastructure that uses this IAM role.

  4. Configure the delivery stream to send data to the Amperity S3 bucket.

  5. Configure applications to send data to the delivery stream.

  6. Amperity requires data to be encrypted. This may be done with a policy on the Amperity S3 bucket that is configured by Amperity.

Record separators

Data records are delivered to Amazon S3 as an Amazon S3 object. If you need to ensure that individual records are available to Amperity in Kinesis Data Firehose, you will need to configure the delivery stream to add a record separator at the end of each data record.

When using the Kinesis Data Firehose connector, which only accepts NDJSON files, you will need to ensure that each data record is followed by a newline character.

Filename patterns

Recommended filename patterns include:

  • Using the YYYY/MM/DD/HH format when writing objects to Kinesis Data Firehose. This prefix will create a logical hierarchy in the bucket by year, then month, then date, and finally hour.

  • Using the default Kinesis Data Firehose object naming pattern that increments (by an increase of 1) a random string at the end of the object’s filename.

Delivery frequency

The Kinesis Data Firehose buffer size and interval will determine the frequency of delivery . Incoming records will be concatenated based on the frequency of the delivery stream.

Warning

If data fails to deliver to Kinesis Data Firehose, Kinesis Data Firehose will retry for up to 24 hours. If data fails to deliver within 24 hours, the data will be lost, unless it is successfully delivered to a backup location. (You can re-send data if it’s backed up.)

Delivery failures

Kinesis Data Firehose will retry for up to 24 hours. The maximum data storage time for Kinesis Data Firehose is 24 hours . Data will be lost if delivery does not succeed within 24 hours. Consider using a secondary Kinesis Data Firehose bucket as a backup for data that cannot be lost.

Note

Delivery retries may introduce duplicates.

Add courier

A courier brings data from an external system to Amperity.

Tip

You can run a courier with an empty load operation using {} as the value for the load operation. Use this approach to get files to upload during feed creation, as a feed requires knowing the schema of a file before you can apply semantic tagging and other feed configuration settings.

To add a courier

  1. From the Sources page, click Add Courier. The Add Source page opens.

  2. Find, and then click the icon for Amazon S3. The Add Courier page opens.

    This automatically selects iam-credential as the Credential Type.

  3. From the Credential drop-down, select Create a new credential. This opens the Create New Credential dialog box.

  4. Enter a name for the credential, the IAM access key, and the IAM secret key. Click Save.

  5. Under Settings, add the name of the S3 bucket to which the Firehose delivery stream will write.

  6. Configure the list of files to pull to Amperity. Configure the Entities List for each file to be loaded to Amperity. For example, the files from two different streams: “customer-records” and “transaction-records”.

    [
      {
        "object/type": "file",
        "object/file-pattern": "'/production/customer-records/'yyyy/MM/dd'/*.ndjson'",
        "object/land-as": {
          "file/tag": "customer-records",
          "file/content-type": "application/x-ndjson"
        }
      },
      {
        "object/type": "file",
        "object/file-pattern": "'/production/transaction-records/'yyyy/MM/dd'/*.ndjson'",
        "object/land-as": {
          "file/tag": "transaction-records",
          "file/content-type": "application/x-ndjson"
        }
      }
    ]
    
  7. Under Settings set the load operations to a string that is obviously incorrect, such as df-xxxxxx. (You may also set the load operation to empty: “{}”.)

    Tip

    If you use an obviously incorrect string, the load operation settings will be saved in the courier configuration. After the schema for the feed is defined and the feed is activated, you can edit the courier and replace the feed ID with the correct identifier.

    Caution

    If load operations are not set to “{}” the validation test for the courier configuration settings will fail.

  8. Click Save.

Get sample files

Every Kinesis Data Firehose file that is pulled to Amperity must be configured as a feed. Before you can configure each feed you need to know the schema of that file. Run the courier without load operations to bring sample files from Kinesis Data Firehose to Amperity, and then use each of those files to configure a feed.

To get sample files

  1. From the Sources tab, open the menu for a courier configured for Kinesis Data Firehose with empty load operations, and then select Run. The Run Courier dialog box opens.

  2. Select Load data from a specific day, and then select today’s date.

  3. Click Run.

    Important

    The courier run will fail, but this process will successfully return a list of files from Kinesis Data Firehose.

    These files will be available for selection as an existing source from the Add Feed dialog box.

  4. Wait for the notification for this courier run to return an error similar to:

    Error running load-operations task
    Cannot find required feeds: "df-xxxxxx"
    

Add feeds

A feed defines how data should be loaded into a domain table, including specifying which columns are required and which columns should be associated with a semantic tag that indicates that column contains customer profile (PII) and transactions data.

Note

A feed must be added for each file that is pulled from Kinesis Data Firehose, including all files that contain customer records and interaction records, along with any other files that will be used to support downstream workflows.

To add a feed

  1. From the Sources tab, click Add Feed. This opens the Add Feed dialog box.

  2. Under Data Source, select Create new source, and then enter “Kinesis Data Firehose”.

  3. Enter the name of the feed in Feed Name. For example: “CustomerRecords”.

    Tip

    The name of the domain table will be “<data-source-name>:<feed-name>”. For example: “Kinesis Data Firehose:CustomerRecords”.

  4. Under Sample File, select Select existing file, and then choose from the list of files. For example: “filename_YYYY-MM-DD.csv”.

    Tip

    The list of files that is available from this drop-down menu is sorted from newest to oldest.

  5. Select Load sample file on feed activation.

  6. Click Continue. This opens the Feed Editor page.

  7. Select the primary key.

  8. Apply semantic tags to customer records and interaction records, as appropriate.

  9. Under Last updated field, specify which field best describes when records in the table were last updated.

    Tip

    Choose Generate an “updated” field to have Amperity generate this field. This is the recommended option unless there is a field already in the table that reliably provides this data.

  10. For feeds with customer records (PII data), select Make available to Stitch.

  11. Click Activate. Wait for the feed to finish loading data to the domain table, and then review the sample data for that domain table from the Data Explorer.

Add load operations

After the feeds are activated and domain tables are available, add the load operations to the courier used for Kinesis Data Firehose.

Example load operations

Load operations must specify each file that will be pulled to Amperity from Kinesis Data Firehose.

For example:

{
  "CUSTOMER-RECORDS-FEED-ID": [
    {
      "type": "truncate"
    },
    {
      "type": "load",
      "file": "customer-records"
    }
  ],
  "TRANSACTION-RECORDS-FEED-ID": [
    {
      "type": "load",
      "file": "transaction-records"
    }
  ]
}

To add load operations

  1. From the Sources tab, open the menu for the courier that was configured for Kinesis Data Firehose, and then select Edit. The Edit Courier dialog box opens.

  2. Edit the load operations for each of the feeds that were configured for Kinesis Data Firehose so they have the correct feed ID.

  3. Click Save.

Run courier manually

Run the courier again. This time, because the load operations are present and the feeds are configured, the courier will pull data from Kinesis Data Firehose.

To run the courier manually

  1. From the Sources tab, open the    menu for the courier with updated load operations that is configured for Kinesis Data Firehose, and then select Run. The Run Courier dialog box opens.

  2. Select the load option, either for a specific time period or all available data. Actual data will be loaded to a domain table because the feed is configured.

  3. Click Run.

    This time the notification will return a message similar to:

    Completed in 5 minutes 12 seconds
    

Add to courier group

A courier group is a list of one (or more) couriers that are run as a group, either ad hoc or as part of an automated schedule. A courier group can be configured to act as a constraint on downstream workflows.

To add the courier to a courier group

  1. From the Sources tab, click Add Courier Group. This opens the Create Courier Group dialog box.

  2. Enter the name of the courier. For example: “Kinesis Data Firehose”.

  3. Add a cron string to the Schedule field to define a schedule for the orchestration group.

    A schedule defines the frequency at which a courier group runs. All couriers in the same courier group run as a unit and all tasks must complete before a downstream process can be started. The schedule is defined using cron.

    Cron syntax specifies the fixed time, date, or interval at which cron will run. Each line represents a job, and is defined like this:

    ┌───────── minute (0 - 59)
    │ ┌─────────── hour (0 - 23)
    │ │ ┌───────────── day of the month (1 - 31)
    │ │ │ ┌────────────── month (1 - 12)
    │ │ │ │ ┌─────────────── day of the week (0 - 6) (Sunday to Saturday)
    │ │ │ │ │
    │ │ │ │ │
    │ │ │ │ │
    * * * * * command to execute
    

    For example, 30 8 * * * represents “run at 8:30 AM every day” and 30 8 * * 0 represents “run at 8:30 AM every Sunday”. Amperity validates your cron syntax and shows you the results. You may also use crontab guru to validate cron syntax.

  4. Set Status to Enabled.

  5. Specify a time zone.

    A courier group schedule is associated with a time zone. The time zone determines the point at which a courier group’s scheduled start time begins. A time zone should be aligned with the time zone of system from which the data is being pulled.

    Note

    The time zone that is chosen for an courier group schedule should consider every downstream business processes that requires the data and also the time zone(s) in which the consumers of that data will operate.

  6. Add at least one courier to the courier group. Select the name of the courier from the Courier drop-down. Click + Add Courier to add more couriers.

  7. Click Add a courier group constraint, and then select a courier group from the drop-down list.

    A wait time is a constraint placed on a courier group that defines an extended time window for data to be made available at the source location.

    A courier group typically runs on an automated schedule that expects customer data to be available at the source location within a defined time window. However, in some cases, the customer data may be delayed and isn’t made available within that time window.

  8. For each courier group constraint, apply any offsets.

    An offset is a constraint placed on a courier group that defines a range of time that is older than the scheduled time, within which a courier group will accept customer data as valid for the current job. Offset times are in Coordinated Universal Time (UTC).

    A courier group offset is typically set to be 24 hours. For example, it’s possible for customer data to be generated with a correct file name and datestamp appended to it, but for that datestamp to represent the previous day because of the customer’s own workflow. An offset ensures that the data at the source location is recognized by the courier as the correct data source.

    Warning

    An offset affects couriers in a courier group whether or not they run on a schedule. Manually run courier groups will not take their schedule into consideration when determining the date range; only the provided input day(s) to load data from are used as inputs.

  9. Click Save.