.png)
Tabelas do Amazon S3: Exemplo de Configuração Rápida
Nesta publicação, fornecemos um exemplo de código simples para você começar a explorar os recursos das Tabelas S3 na sua própria conta AWS. Neste exemplo, iremos:
- Carregar dados em uma tabela Iceberg S3 utilizando o endpoint REST Iceberg das Tabelas Amazon S3, por meio da biblioteca Python PyIceberg.
- E então consultá-los usando o Amazon Athena, aproveitando a integração do AWS Glue Data Catalog.
Se você quiser saber o básico sobre as tabelas S3, aprofundar-se em seus recursos e casos de uso, leia nossa publicação anterior desta série: Tabelas Amazon S3: O Futuro dos Lakehouses AWS.
Provisionar Recursos de Infraestrutura
O primeiro passo é provisionar e configurar os recursos necessários em nossa conta AWS. Conectaremos à conta localmente, usando a AWS CLI, e gerenciaremos nossos recursos como infraestrutura como código, usando Terraform.
Acesse sua conta AWS localmente
Uma forma recomendada de usar credenciais de curto prazo para se conectar à sua conta AWS a partir do seu computador local é usando a AWS CLI e atualizando os arquivos de configuração e credenciais na pasta .aws dos seus usuários. Usando seu perfil como o padrão, seus arquivos devem ser assim:
~/.aws/config:
[default]
region = <YOUR_AWS_REGION>
output = json~/.aws/credentials:
[default]
aws_access_key_id=<YOUR_AWS_ACCESS_KEY_ID>
aws_secret_access_key=<YOUR_AWS_SECRET_ACCESS_KEY>
aws_session_token=<YOUR_AWS_SESSION_TOKEN>Configure seus recursos
Os recursos que criaremos são:
- Dois Buckets S3 de propósito geral, um para salvar o arquivo CSV de origem e outro para usar como local dos resultados das consultas do Athena.
- Um bucket de tabela S3, para salvar nossa tabela Iceberg, e um namespace.
- Um workgroup do Athena, para executar as consultas.
Neste exemplo, provisionaremos nossos recursos usando Terraform, mas você pode usar o Console AWS ou a CLI se preferir. Nossos arquivos de configuração são os seguintes:
terraform.tf:
# Terraform Configuration -----------------------
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.92"
}
random = {
source = "hashicorp/random"
version = "~> 3.0"
}
}
required_version = ">= 1.2"
}
# Cloud Providers Info --------------------------
provider "aws" {
region = "us-east-1"
}
provider "random" {
}resources.tf:
data "aws_caller_identity" "current" {
}
resource "random_id" "suffix" {
byte_length = 5
}
# S3 Standard Buckets ----------------------------
# --- Source data files Bucket
resource "aws_s3_bucket" "raw_files_bucket" {
bucket = "raw-files-bucket-${random_id.suffix.hex}"
}
# --- Athena Query Results Bucket
resource "aws_s3_bucket" "athena_results_bucket" {
bucket = "athena-results-bucket-${random_id.suffix.hex}"
}
# S3 Tables --------------------------------------
# --- Table Bucket
resource "aws_s3tables_table_bucket" "blog_test_table_bucket" {
name = "blog-test-table-bucket-${random_id.suffix.hex}"
# Explicitly define the default encryption to satisfy the provider
encryption_configuration = {
sse_algorithm = "AES256"
kms_key_arn = null
}
maintenance_configuration = {
iceberg_snapshot_management = {
status = "enabled"
}
iceberg_unreferenced_file_removal = {
status = "enabled"
settings = {
non_current_days = 7
unreferenced_days = 7
}
}
}
}
# --- Namespace
resource "aws_s3tables_namespace" "blog_test_namespace" {
namespace = "blog_test_namespace"
table_bucket_arn = aws_s3tables_table_bucket.blog_test_table_bucket.arn
}
# Athena ------------------------------------------
# --- Workgroup
resource "aws_athena_workgroup" "blog_test_athena_workgroup" {
name = "blog-test-athena-workgroup"
configuration {
result_configuration {
output_location =
"s3://${aws_s3_bucket.athena_results_bucket.bucket}/"
}
engine_version {
selected_engine_version = "Athena engine version 3"
}
}
}
No Console, você verá seus recursos em:
- Amazon S3 - Buckets - Buckets de propósito geral
- Amazon S3 - Buckets - Buckets de tabela
- Amazon Athena - Administração - Workgroups
Carregar Dados em Tabelas Iceberg
Fonte de Dados
Em seguida, geraremos um arquivo CSV com dados de Pedidos fictícios e o carregaremos no bucket S3 de propósito geral. Para isso, usaremos os seguintes scripts Python:
generate_raw_data.py
import csv
import random
from datetime import datetime, timedelta
statuses = ["pending_payment", "paid", "partially_paid"]
start_date = datetime(2026, 3, 1)
with open("<your_file_path>", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["order_id", "order_datetime",
"order_client_id", "order_status"])
for i in range(1, 1001):
# Generate random values
order_time = start_date +
timedelta(hours=i, minutes=random.randint(0, 59))
client_id = random.randint(1000, 1100)
status = random.choice(statuses)
writer.writerow(
[i, order_time.strftime("%Y-%m-%d %H:%M:%S"), client_id, status]
)
print("order_data.csv created with 1000 rows.")upload_raw_data.py
import boto3
import os
from botocore.exceptions import ClientError
def upload_csv_to_s3(local_path, bucket_name, s3_key):
"""
Uploads a local CSV file to a general purpose S3 bucket.
Parameters:
- local_path: Path to the file on your computer
(for example: 'data/orders.csv')
- bucket_name: The name of your S3 bucket
- s3_key: The destination path in S3
(for example: 'raw-data/orders.csv')
"""
# Initialize the S3 client
s3_client = boto3.client("s3")
try:
print(f"Uploading {local_path} to s3://{bucket_name}/{s3_key}...")
# Perform the upload
s3_client.upload_file(local_path, bucket_name, s3_key)
print("Upload Successful!")
return True
except FileNotFoundError:
print(f"The file {local_path} was not found.")
except ClientError as e:
print(f"Failed to upload to S3: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
return False
if __name__ == "__main__":
LOCAL_FILE = "<your_file_path>"
BUCKET = "<your_raw_data_bucket_name>"
DESTINATION_KEY = "<your_destination_file_path>"
upload_csv_to_s3(LOCAL_FILE, BUCKET, DESTINATION_KEY)Para fazer o upload do arquivo para o bucket S3, usamos a boto3 biblioteca, que utiliza as credenciais AWS configuradas nos arquivos aws do usuário.
No Console, você verá seus recursos em:
- Amazon S3 - Buckets - Buckets de uso geral - <your_raw_data_bucket_name>
Ingerir Dados em Tabelas S3
Para ler os dados CSV do bucket de uso geral, usaremos pandas e pyarrow bibliotecas Python. E, como mencionamos, para carregar os dados em uma tabela Iceberg, usaremos a PyIceberg biblioteca Python, e nos conectaremos diretamente às tabelas S3 usando o endpoint REST Iceberg de Tabelas Amazon S3.
Em nosso script Python, incluímos o código para criar opcionalmente a tabela Iceberg, usando o mesmo endpoint Iceberg.
upload_iceberg_data.py
import boto3
import pandas as pd
import pyarrow as pa
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, IntegerType, StringType, TimestampType
# --- CONFIGURATION ---
REGION = "<YOUR_AWS_REGION>"
TABLE_BUCKET_NAME = "<your_table_bucket_name>"
NAMESPACE = "blog_test_namespace"
TABLE_NAME = "orders"
SOURCE_CSV_S3_PATH =
"s3://<your_raw_data_bucket_name>/<your_destination_file_path>"
ORDERS_ICEBERG_SCHEMA = Schema(
NestedField(1, "order_id", IntegerType()),
NestedField(2, "order_datetime", TimestampType()),
NestedField(3, "order_client_id", IntegerType()),
NestedField(4, "order_status", StringType()),
)
ORDERS_CSV_SCHEMA = {
"order_id": "Int32",
"order_client_id": "Int32",
"order_status": "string",
}
ORDERS_CSV_TIMESTAMPS = ["order_datetime"]
# --- ------------- ---
def get_s3_tables_catalog():
"""Initializes the Iceberg REST catalog for S3 Tables"""
print("Getting Catalog ...")
# Dynamically grab credentials from your local AWS session
session = boto3.Session()
creds = session.get_credentials().get_frozen_credentials()
try:
catalog = load_catalog(
"s3tablescatalog",
**{
"type": "rest",
"uri": f"https://s3tables.{REGION}.amazonaws.com/iceberg",
"warehouse": "<your_table_bucket_arn>",
"rest.sigv4-enabled": "true",
"rest.signing-name": "s3tables",
"rest.signing-region": REGION,
# Explicitly pass the credentials, including the TOKEN
"s3.access-key-id": creds.access_key,
"s3.secret-access-key": creds.secret_key,
"s3.session-token": creds.token,
},
)
except Exception as e:
raise Exception(f"Failed to load catalog: {e}")
print("Catalog retrieved")
return catalog
def load_data_to_iceberg(create_table=False, operation="append"):
# 1. Initialize Catalog
catalog = get_s3_tables_catalog()
table_identifier = f"{NAMESPACE}.{TABLE_NAME}"
table = None
# 2. Obtain or Create table
if create_table:
print(f"Creating table {table_identifier}...")
schema = ORDERS_ICEBERG_SCHEMA
table = catalog.create_table(table_identifier, schema=schema)
print(f"Table {table_identifier} created.")
else:
try:
table = catalog.load_table(table_identifier)
print(f"Table {table_identifier} found.")
except Exception as e:
print(f"Exception while obtaining the table: {e}")
# 3. Read CSV from S3 into a Pandas/Arrow Table
print(f"Reading source data from {SOURCE_CSV_S3_PATH}...")
df = pd.read_csv(
SOURCE_CSV_S3_PATH,
dtype=ORDERS_CSV_SCHEMA,
parse_dates=ORDERS_CSV_TIMESTAMPS
)
arrow_table = pa.Table.from_pandas(df)
# 4. Append or Overwrite Data
if operation == "append":
print(f"Appending {len(df)} rows to {table_identifier}...")
table.append(arrow_table)
elif operation == "overwrite":
print(f"Overwriting {len(df)} rows to {table_identifier}...")
table.overwrite(arrow_table)
print("Done! Data is now live in the Lakehouse.")
if __name__ == "__main__":
load_data_to_iceberg(create_table=False, operation="overwrite")
No Console, você verá seus recursos em:
- Amazon S3 - Buckets - Buckets de tabela - <your_table_bucket_name>
Consumir Dados de Tabelas Iceberg
Integração com serviços de análise da AWS
Para ler os dados Iceberg usando o Athena, precisamos Habilitar a Integração com serviços de análise da AWS para os buckets de Tabela na sua conta. Você pode fazer isso pelo Console em:
- Amazon S3 - Buckets - Buckets de Tabela
Depois disso, você também pode ver a tabela Iceberg no Catálogo de Dados do Glue. Você pode verificar pelo Console em:
- AWS Glue - Catálogos de Dados - Catálogo
Lá, você verá um Catálogo Federado chamado s3tablescatalog, com Origem = Tabelas S3.
Ler os Dados em Tabelas S3
Após a integração com o Glue, você pode usar o Console para consultar sua tabela Iceberg:
- Acesse Amazon Athena - Editor de Consultas.
- No painel Dados, selecione:
- Fonte de dados: AwsDataCatalog
- Catálogo: s3tablescatalog/<your_table_bucket_name>
- Banco de dados: "blog_test_namespace" ou <your_s3tables_namespace_name>
- No menu suspenso do Grupo de Trabalho, no canto superior direito, selecione seu grupo de trabalho, provisionado no início do exemplo.
%2010.11.26%E2%80%AFa.%C2%A0m..png)
Agora você pode explorar seus dados usando o Athena!





.png)