AWS Integration - Fetch All Tulip Tables and Write to S3
  • 29 Aug 2024
  • 2 Minutes to read
  • Contributors

AWS Integration - Fetch All Tulip Tables and Write to S3


Article summary

Streamline fetching data from Tulip to AWS S3 for broader analytics and integrations opportunities

Purpose

This guide walks through step by step how to fetch all Tulip Tables data via a Lambda function and write to an S3 bucket.

This goes beyond the basic fetch query and iterates through all tables in a given instance; this can be great for a weekly ETL job (Extract, Transform, Load)

The lambda function can be triggered via a variety of resources such as Event Bridge timers or an API Gateway

An example architecture is listed below:
image.png

Setup

This example integration requires the following:

  • Usage of Tulip Tables API (Get API Key and Secret in Account Settings)
  • Tulip Table (Get the Table Unique ID

High-level steps:

  1. Create an AWS Lambda function with the relevant trigger (API Gateway, Event Bridge Timer, etc.)
  2. Ensure that the
  3. Fetch the Tulip table data with the example below
import json
import awswrangler as wr
import boto3
from datetime import datetime
import pandas as pd
import requests
import os

# Get current timestamp for unique file names
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

bucket = os.getenv('bucket_name')

# Function to convert dictionaries to strings
def dict_to_str(cell):
    if isinstance(cell, dict):
        return str(cell)
    return cell

def query_table(table_id, base_url, api_header):
    offset = 0
    function = f'/tables/{table_id}/records?limit=100&offset={offset}&includeTotalCount=false&filterAggregator=all'
    r = requests.get(base_url+function, headers=api_header)
    df = pd.DataFrame(r.json())
    length = len(r.json())
    while length > 0:
        offset += 100
        function = f'/tables/{table_id}/records?limit=100&offset={offset}&includeTotalCount=false&filterAggregator=all'
        r = requests.get(base_url+function, headers=api_header)
        length = len(r.json())
        df_append = pd.DataFrame(r.json())
        df = pd.concat([df, df_append], axis=0)
    df = df.apply(lambda row: row.apply(dict_to_str), axis=1)
    return df

# create function
def write_to_s3(row, base_url, api_header, bucket):
    table = row['label']
    id = row['id']
    df = query_table(id, base_url, api_header)
    
    path = f's3://{bucket}/{timestamp}/{table}.csv'
    
    # Write DataFrame to S3 as CSV
    wr.s3.to_csv(
        df=df,
        path=path,
        index=False
    )
    print(f"Wrote {table} to {path}")
    return f"Wrote {table} to {path}"
    
    
def lambda_handler(event, context):
    api_header = {'Authorization' : os.getenv('tulip_api_basic_auth')}
    instance = os.getenv('tulip_instance')
    base_url = f'https://{instance}.tulip.co/api/v3'
    get_tables_function = '/tables'
    r = requests.get(base_url+get_tables_function, headers=api_header)
    table_names = pd.DataFrame(r.json())

    # query table function
    table_names.apply(lambda row: write_to_s3(row, base_url, api_header, bucket), axis=1)

    return { 'statusCode': 200, 'body': json.dumps('wrote to s3!')}
  1. The trigger can run on a timer or triggered via a URL
  2. Note the Pandas layer required in the image below
    image.png

Use Cases and Next Steps

Once you have finalized the integration with lambda, you can easily analyze the data with a sagemaker notebook, QuickSight, or a variety of other tools.

1. Defect prediction

  • Identify production defects before they happen and increase right first time.
  • Identify core production drivers of quality in order to implement improvements

2. Cost of quality optimization

  • Identify opportunities to optimize product design without impact customer satisfaction

3. Production energy optimization

  • Identify production levers to optimal energy consumption

4. Delivery and planning prediction and optimization

  • Optimize production schedule based on customer demand and real time order schedule

5. Global Machine / Line Benchmarking

  • Benchmark similar machines or equipment with normalization

6. Global / regional digital performance management

  • Consolidated data to create real time dashboards

Was this article helpful?