Técnico

Tabelas do Amazon S3: Exemplo de Configuração Rápida

Compartilhar

Nesta publicação, fornecemos um exemplo de código simples para você começar a explorar os recursos das Tabelas S3 na sua própria conta AWS. Neste exemplo, iremos:

  • Carregar dados em uma tabela Iceberg S3 utilizando o endpoint REST Iceberg das Tabelas Amazon S3, por meio da biblioteca Python PyIceberg.
  • E então consultá-los usando o Amazon Athena, aproveitando a integração do AWS Glue Data Catalog.

Se você quiser saber o básico sobre as tabelas S3, aprofundar-se em seus recursos e casos de uso, leia nossa publicação anterior desta série: Tabelas Amazon S3: O Futuro dos Lakehouses AWS.

Provisionar Recursos de Infraestrutura

O primeiro passo é provisionar e configurar os recursos necessários em nossa conta AWS. Conectaremos à conta localmente, usando a AWS CLI, e gerenciaremos nossos recursos como infraestrutura como código, usando Terraform.

Acesse sua conta AWS localmente

Uma forma recomendada de usar credenciais de curto prazo para se conectar à sua conta AWS a partir do seu computador local é usando a AWS CLI e atualizando os arquivos de configuração e credenciais na pasta .aws dos seus usuários. Usando seu perfil como o padrão, seus arquivos devem ser assim:

~/.aws/config:
[default]
region = <YOUR_AWS_REGION>
output = json
~/.aws/credentials:
[default]
aws_access_key_id=<YOUR_AWS_ACCESS_KEY_ID>
aws_secret_access_key=<YOUR_AWS_SECRET_ACCESS_KEY>
aws_session_token=<YOUR_AWS_SESSION_TOKEN>

Configure seus recursos

Os recursos que criaremos são:

  • Dois Buckets S3 de propósito geral, um para salvar o arquivo CSV de origem e outro para usar como local dos resultados das consultas do Athena.
  • Um bucket de tabela S3, para salvar nossa tabela Iceberg, e um namespace
  • Um workgroup do Athena, para executar as consultas.

Neste exemplo, provisionaremos nossos recursos usando Terraform, mas você pode usar o Console AWS ou a CLI se preferir. Nossos arquivos de configuração são os seguintes:

terraform.tf:
# Terraform Configuration ----------------------- 

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.92"
    }

    random = {
      source  = "hashicorp/random"
      version = "~> 3.0"
    }
  }

  required_version = ">= 1.2"
}

# Cloud Providers Info --------------------------

provider "aws" {
  region = "us-east-1"
}

provider "random" {

}
resources.tf:
data "aws_caller_identity" "current" {
  
}

resource "random_id" "suffix" {
  byte_length = 5
}

# S3 Standard Buckets ----------------------------

# --- Source data files Bucket

resource "aws_s3_bucket" "raw_files_bucket" {
  bucket = "raw-files-bucket-${random_id.suffix.hex}"
}

# --- Athena Query Results Bucket

resource "aws_s3_bucket" "athena_results_bucket" {
  bucket = "athena-results-bucket-${random_id.suffix.hex}"
}

# S3 Tables --------------------------------------

# --- Table Bucket

resource "aws_s3tables_table_bucket" "blog_test_table_bucket" {
  name = "blog-test-table-bucket-${random_id.suffix.hex}"

  # Explicitly define the default encryption to satisfy the provider
  encryption_configuration = {
    sse_algorithm = "AES256"
    kms_key_arn   = null
  }

  maintenance_configuration = {
    iceberg_snapshot_management = {
      status = "enabled"
    }
    iceberg_unreferenced_file_removal = {
      status = "enabled"
      settings = {
        non_current_days  = 7
        unreferenced_days = 7
      }
    }
  }
}

# --- Namespace

resource "aws_s3tables_namespace" "blog_test_namespace" {
  namespace        = "blog_test_namespace"
  table_bucket_arn = aws_s3tables_table_bucket.blog_test_table_bucket.arn
}


# Athena ------------------------------------------

# --- Workgroup

resource "aws_athena_workgroup" "blog_test_athena_workgroup" {
  name = "blog-test-athena-workgroup"

  configuration {
    result_configuration {
      output_location = 
	"s3://${aws_s3_bucket.athena_results_bucket.bucket}/"
    }

    engine_version {
      selected_engine_version = "Athena engine version 3"
    }
  }
}

No Console, você verá seus recursos em:

  • Amazon S3 - Buckets - Buckets de propósito geral
  • Amazon S3 - Buckets - Buckets de tabela
  • Amazon Athena - Administração - Workgroups

Carregar Dados em Tabelas Iceberg

Fonte de Dados

Em seguida, geraremos um arquivo CSV com dados de Pedidos fictícios e o carregaremos no bucket S3 de propósito geral. Para isso, usaremos os seguintes scripts Python:

generate_raw_data.py
import csv
import random
from datetime import datetime, timedelta

statuses = ["pending_payment", "paid", "partially_paid"]
start_date = datetime(2026, 3, 1)

with open("<your_file_path>", "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["order_id", "order_datetime", 
	              "order_client_id", "order_status"])

    for i in range(1, 1001):
        # Generate random values
        order_time = start_date + 
			 timedelta(hours=i, minutes=random.randint(0, 59))
        client_id = random.randint(1000, 1100)
        status = random.choice(statuses)

        writer.writerow(
            [i, order_time.strftime("%Y-%m-%d %H:%M:%S"), client_id, status]
        )

print("order_data.csv created with 1000 rows.")
upload_raw_data.py
import boto3
import os
from botocore.exceptions import ClientError

def upload_csv_to_s3(local_path, bucket_name, s3_key):
    """
    Uploads a local CSV file to a general purpose S3 bucket. 
    Parameters:
     - local_path: Path to the file on your computer 
(for example: 'data/orders.csv')
     - bucket_name: The name of your S3 bucket
     - s3_key: The destination path in S3
      (for example:  'raw-data/orders.csv')
    """
    # Initialize the S3 client
    s3_client = boto3.client("s3")

    try:
        print(f"Uploading {local_path} to s3://{bucket_name}/{s3_key}...")
        # Perform the upload
        s3_client.upload_file(local_path, bucket_name, s3_key)
        print("Upload Successful!")
        return True

    except FileNotFoundError:
        print(f"The file {local_path} was not found.")
    except ClientError as e:
        print(f"Failed to upload to S3: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

    return False


if __name__ == "__main__":
    LOCAL_FILE = "<your_file_path>"
    BUCKET = "<your_raw_data_bucket_name>"
    DESTINATION_KEY = "<your_destination_file_path>"

    upload_csv_to_s3(LOCAL_FILE, BUCKET, DESTINATION_KEY)

Para fazer o upload do arquivo para o bucket S3, usamos a boto3 biblioteca, que utiliza as credenciais AWS configuradas nos arquivos aws do usuário.

No Console, você verá seus recursos em:

  • Amazon S3 - Buckets - Buckets de uso geral - <your_raw_data_bucket_name>

Ingerir Dados em Tabelas S3

Para ler os dados CSV do bucket de uso geral, usaremos pandas e pyarrow bibliotecas Python. E, como mencionamos, para carregar os dados em uma tabela Iceberg, usaremos a PyIceberg biblioteca Python, e nos conectaremos diretamente às tabelas S3 usando o endpoint REST Iceberg de Tabelas Amazon S3.

Em nosso script Python, incluímos o código para criar opcionalmente a tabela Iceberg, usando o mesmo endpoint Iceberg.

upload_iceberg_data.py
import boto3
import pandas as pd
import pyarrow as pa
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, IntegerType, StringType, TimestampType

# --- CONFIGURATION ---
REGION = "<YOUR_AWS_REGION>"
TABLE_BUCKET_NAME = "<your_table_bucket_name>" 
NAMESPACE = "blog_test_namespace"
TABLE_NAME = "orders"
SOURCE_CSV_S3_PATH = 
"s3://<your_raw_data_bucket_name>/<your_destination_file_path>"

ORDERS_ICEBERG_SCHEMA = Schema(
    NestedField(1, "order_id", IntegerType()),
    NestedField(2, "order_datetime", TimestampType()),
    NestedField(3, "order_client_id", IntegerType()),
    NestedField(4, "order_status", StringType()),
)

ORDERS_CSV_SCHEMA = {
    "order_id": "Int32",
    "order_client_id": "Int32",
    "order_status": "string",
}
ORDERS_CSV_TIMESTAMPS = ["order_datetime"]
# --- ------------- ---

def get_s3_tables_catalog():
    """Initializes the Iceberg REST catalog for S3 Tables"""
    print("Getting Catalog ...")

    # Dynamically grab credentials from your local AWS session
    session = boto3.Session()
    creds = session.get_credentials().get_frozen_credentials()

    try:
        catalog = load_catalog(
            "s3tablescatalog",
            **{
                "type": "rest",
                "uri": f"https://s3tables.{REGION}.amazonaws.com/iceberg",
                "warehouse": "<your_table_bucket_arn>",
                "rest.sigv4-enabled": "true",
                "rest.signing-name": "s3tables",
                "rest.signing-region": REGION,
                # Explicitly pass the credentials, including the TOKEN
                "s3.access-key-id": creds.access_key,
                "s3.secret-access-key": creds.secret_key,
                "s3.session-token": creds.token,
            },
        )
    except Exception as e:
        raise Exception(f"Failed to load catalog: {e}")

    print("Catalog retrieved")
    return catalog


def load_data_to_iceberg(create_table=False, operation="append"):

    # 1. Initialize Catalog
    catalog = get_s3_tables_catalog()
    table_identifier = f"{NAMESPACE}.{TABLE_NAME}"
    table = None

    # 2. Obtain or Create table
    if create_table:
        print(f"Creating table {table_identifier}...")
        schema = ORDERS_ICEBERG_SCHEMA
        table = catalog.create_table(table_identifier, schema=schema)
        print(f"Table {table_identifier} created.")
    else:
        try:
            table = catalog.load_table(table_identifier)
            print(f"Table {table_identifier} found.")
        except Exception as e:
            print(f"Exception while obtaining the table: {e}")

    # 3. Read CSV from S3 into a Pandas/Arrow Table
    print(f"Reading source data from {SOURCE_CSV_S3_PATH}...")

    df = pd.read_csv(
        	SOURCE_CSV_S3_PATH, 
		dtype=ORDERS_CSV_SCHEMA, 
		parse_dates=ORDERS_CSV_TIMESTAMPS
    )
    arrow_table = pa.Table.from_pandas(df)

    # 4. Append or Overwrite Data
    if operation == "append":
        print(f"Appending {len(df)} rows to {table_identifier}...")
        table.append(arrow_table)
    elif operation == "overwrite":
        print(f"Overwriting {len(df)} rows to {table_identifier}...")
        table.overwrite(arrow_table)
    print("Done! Data is now live in the Lakehouse.")


if __name__ == "__main__":
    load_data_to_iceberg(create_table=False, operation="overwrite")

No Console, você verá seus recursos em:

  • Amazon S3 - Buckets - Buckets de tabela - <your_table_bucket_name>

Consumir Dados de Tabelas Iceberg

Integração com serviços de análise da AWS

Para ler os dados Iceberg usando o Athena, precisamos Habilitar a Integração com serviços de análise da AWS para os buckets de Tabela na sua conta. Você pode fazer isso pelo Console em:

  • Amazon S3 - Buckets - Buckets de Tabela

Depois disso, você também pode ver a tabela Iceberg no Catálogo de Dados do Glue. Você pode verificar pelo Console em:

  • AWS Glue - Catálogos de Dados - Catálogo

Lá, você verá um Catálogo Federado chamado s3tablescatalog, com Origem = Tabelas S3.

Ler os Dados em Tabelas S3

Após a integração com o Glue, você pode usar o Console para consultar sua tabela Iceberg:

  • Acesse Amazon Athena - Editor de Consultas.
  • No painel Dados, selecione: 
    • Fonte de dados: AwsDataCatalog
    • Catálogo: s3tablescatalog/<your_table_bucket_name> 
    • Banco de dados: "blog_test_namespace" ou <your_s3tables_namespace_name>
  • No menu suspenso do Grupo de Trabalho, no canto superior direito, selecione seu grupo de trabalho, provisionado no início do exemplo.

Agora você pode explorar seus dados usando o Athena!

keep exploring

News, Insights & Impact

View all
View all

Toda jornada de IA começa com uma conversa