Integração com o AWS - Obter todas as tabelas do Tulip e gravar no S3
  • 12 Sep 2024
  • 2 Minutos para Ler
  • Contribuintes

Integração com o AWS - Obter todas as tabelas do Tulip e gravar no S3


Resumo do artigo

Simplifique a obtenção de dados do Tulip para o AWS S3 para oportunidades mais amplas de análise e integração

Objetivo

Este guia explica passo a passo como buscar todos os dados das Tulip Tables por meio de uma função Lambda e gravar em um bucket S3.

Isso vai além da consulta básica de busca e itera por todas as tabelas em uma determinada instância; isso pode ser ótimo para um trabalho semanal de ETL (Extrair, Transformar, Carregar)

A função lambda pode ser acionada por meio de uma variedade de recursos, como temporizadores do Event Bridge ou um API Gateway

Um exemplo de arquitetura está listado abaixo:image.png

Configuração

Esse exemplo de integração requer o seguinte:

  • Uso da API da Tulip Tables (obtenha a chave e o segredo da API nas configurações da conta)
  • Tabela Tulip (obtenha o ID exclusivo da tabela)

Etapas de alto nível:1. crie uma função AWS Lambda com o acionador relevante (API Gateway, Event Bridge Timer etc.)2. certifique-se de que a função AWS Lambda esteja funcionando corretamente 3. busque os dados da tabela Tulip com o exemplo abaixo``pythonimport jsonimport awswrangler as wrimport boto3from datetime import datetimeimport pandas as pdimport requestsimport os

# Obter o registro de data e hora atual para nomes de arquivos exclusivos

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

bucket = os.getenv('bucket_name')

# Função para converter dicionários em strings

def dict_to_str(cell): if isinstance(cell, dict): return str(cell) return cell

def query_table(table_id, base_url, api_header): offset = 0 function = f'/tables/{table_id}/records?limit=100&offset={offset}&includeTotalCount=false&filterAggregator=all' r = requests.get(base_url+function, headers=api_header) df = pd.DataFrame(r.json()) length = len(r.json()) while length > 0: offset += 100 function = f'/tables/{table_id}/records?limit=100&offset={offset}&includeTotalCount=false&filterAggregator=all' r = requests.get(base_url+function, headers=api_header) length = len(r.json()) df_append = pd.DataFrame(r.json()) df = pd.concat([df, df_append], axis=0) df = df.apply(lambda row: row.apply(dict_to_str), axis=1) return df

# criar função

def write_to_s3(row, base_url, api_header, bucket): table = row['label'] id = row['id'] df = query_table(id, base_url, api_header)

path = f's3://{bucket}/{timestamp}/{table}.csv'# Write DataFrame to S3 as CSVwr.s3.to_csv( df=df, path=path, index=False)print(f "Wrote {table} to {path}")return f "Wrote {table} to {path}"

def lambda_handler(event, context): api_header = {'Authorization' : os.getenv('tulip_api_basic_auth')} instance = os.getenv('tulip_instance') base_url = f'https://{instance}.tulip.co/api/v3' get_tables_function = '/tables' r = requests.get(base_url+get_tables_function, headers=api_header) table_names = pd.DataFrame(r.json())

# tabela de consulta functiontable_names.apply(lambda row: write_to_s3(row, base_url, api_header, bucket), axis=1)return { 'statusCode': 200, 'body': json.dumps('wrote to s3!')}


## Casos de uso e próximas etapas


Depois de finalizar a integração com o lambda, você pode analisar facilmente os dados com um notebook sagemaker, QuickSight ou várias outras ferramentas.


**1. Previsão de defeitos -** Identifique os defeitos de produção antes que eles ocorram e aumente o número de acertos na primeira vez - Identifique os principais fatores de qualidade da produção para implementar melhorias


**2. Otimização do custo da qualidade -** Identificar oportunidades para otimizar o design do produto sem afetar a satisfação do cliente


**3. Otimização da energia de produção -** Identificar as alavancas de produção para otimizar o consumo de energia


**4. Previsão e otimização da entrega e do planejamento -** Otimizar a programação da produção com base na demanda do cliente e na programação de pedidos em tempo real


**5. Benchmarking global de máquinas/linhas -** Benchmarking de máquinas ou equipamentos semelhantes com normalização


**6. Gerenciamento de desempenho digital global/regional -** Dados consolidados para criar painéis de controle em tempo real


Este artigo foi útil?