Streamline fetching data from Tulip to AWS S3 for broader analytics and integrations opportunities
Purpose
This guide walks through step by step how to fetch all Tulip Tables data via a Lambda function and write to an S3 bucket.
This goes beyond the basic fetch query and iterates through all tables in a given instance; this can be great for a weekly ETL job (Extract, Transform, Load)
The lambda function can be triggered via a variety of resources such as Event Bridge timers or an API Gateway
An example architecture is listed below:
Setup
This example integration requires the following:
- Usage of Tulip Tables API (Get API Key and Secret in Account Settings)
- Tulip Table (Get the Table Unique ID
High-level steps:
- Create an AWS Lambda function with the relevant trigger (API Gateway, Event Bridge Timer, etc.)
- Ensure that the
- Fetch the Tulip table data with the example below
import json
import awswrangler as wr
import boto3
from datetime import datetime
import pandas as pd
import requests
import os
# Get current timestamp for unique file names
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
bucket = os.getenv('bucket_name')
# Function to convert dictionaries to strings
def dict_to_str(cell):
if isinstance(cell, dict):
return str(cell)
return cell
def query_table(table_id, base_url, api_header):
offset = 0
function = f'/tables/{table_id}/records?limit=100&offset={offset}&includeTotalCount=false&filterAggregator=all'
r = requests.get(base_url+function, headers=api_header)
df = pd.DataFrame(r.json())
length = len(r.json())
while length > 0:
offset += 100
function = f'/tables/{table_id}/records?limit=100&offset={offset}&includeTotalCount=false&filterAggregator=all'
r = requests.get(base_url+function, headers=api_header)
length = len(r.json())
df_append = pd.DataFrame(r.json())
df = pd.concat([df, df_append], axis=0)
df = df.apply(lambda row: row.apply(dict_to_str), axis=1)
return df
# create function
def write_to_s3(row, base_url, api_header, bucket):
table = row['label']
id = row['id']
df = query_table(id, base_url, api_header)
path = f's3://{bucket}/{timestamp}/{table}.csv'
# Write DataFrame to S3 as CSV
wr.s3.to_csv(
df=df,
path=path,
index=False
)
print(f"Wrote {table} to {path}")
return f"Wrote {table} to {path}"
def lambda_handler(event, context):
api_header = {'Authorization' : os.getenv('tulip_api_basic_auth')}
instance = os.getenv('tulip_instance')
base_url = f'https://{instance}.tulip.co/api/v3'
get_tables_function = '/tables'
r = requests.get(base_url+get_tables_function, headers=api_header)
table_names = pd.DataFrame(r.json())
# query table function
table_names.apply(lambda row: write_to_s3(row, base_url, api_header, bucket), axis=1)
return { 'statusCode': 200, 'body': json.dumps('wrote to s3!')}
- The trigger can run on a timer or triggered via a URL
- Note the Pandas layer required in the image below
Use Cases and Next Steps
Once you have finalized the integration with lambda, you can easily analyze the data with a sagemaker notebook, QuickSight, or a variety of other tools.
1. Defect prediction
- Identify production defects before they happen and increase right first time.
- Identify core production drivers of quality in order to implement improvements
2. Cost of quality optimization
- Identify opportunities to optimize product design without impact customer satisfaction
3. Production energy optimization
- Identify production levers to optimal energy consumption
4. Delivery and planning prediction and optimization
- Optimize production schedule based on customer demand and real time order schedule
5. Global Machine / Line Benchmarking
- Benchmark similar machines or equipment with normalization
6. Global / regional digital performance management
- Consolidated data to create real time dashboards