.png)
Tablas de Amazon S3: Ejemplo de configuración rápida
En esta publicación, proporcionamos un ejemplo de código sencillo para que empiece a explorar las capacidades de las tablas S3 en su propia cuenta de AWS. En este ejemplo, haremos lo siguiente:
- Cargar datos en una tabla Iceberg de S3 aprovechando el Amazon S3 Tables Iceberg REST endpoint, a través de la biblioteca Python PyIceberg.
- Y luego consultarlos utilizando Amazon Athena, aprovechando la AWS Glue Data Catalog integration.
Si desea conocer los conceptos básicos de las tablas S3, profundizar en sus características y casos de uso, lea nuestra publicación anterior de esta serie: Amazon S3 Tables: El futuro de los Lakehouses de AWS.
Aprovisionar recursos de infraestructura
El primer paso es aprovisionar y configurar los recursos necesarios en nuestra cuenta de AWS. Nos conectaremos a la cuenta localmente, utilizando la CLI de AWS, y gestionaremos nuestros recursos como infraestructura como código, utilizando Terraform.
Acceder a su cuenta de AWS localmente
Una forma recomendada de usar credenciales temporales para conectarse a su cuenta de AWS desde su equipo local es usar la CLI de AWS y actualizar los archivos de configuración y credenciales en la carpeta .aws de sus usuarios. Usando su perfil como predeterminado, sus archivos deberían verse así:
~/.aws/config:
[default]
region = <YOUR_AWS_REGION>
output = json~/.aws/credentials:
[default]
aws_access_key_id=<YOUR_AWS_ACCESS_KEY_ID>
aws_secret_access_key=<YOUR_AWS_SECRET_ACCESS_KEY>
aws_session_token=<YOUR_AWS_SESSION_TOKEN>Configurar sus recursos
Los recursos que crearemos son:
- Dos Buckets S3 de propósito general, uno para guardar el archivo CSV de origen y otro para usar como ubicación de los resultados de las consultas de Athena.
- Un bucket de tabla S3, para guardar nuestra tabla Iceberg, y un espacio de nombres.
- Un grupo de trabajo de Athena, para ejecutar las consultas.
En este ejemplo, aprovisionaremos nuestros recursos usando Terraform, pero puedes usar la Consola de AWS o la CLI en su lugar si lo prefieres. Nuestros archivos de configuración son los siguientes:
terraform.tf:
# Terraform Configuration -----------------------
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.92"
}
random = {
source = "hashicorp/random"
version = "~> 3.0"
}
}
required_version = ">= 1.2"
}
# Cloud Providers Info --------------------------
provider "aws" {
region = "us-east-1"
}
provider "random" {
}resources.tf:
data "aws_caller_identity" "current" {
}
resource "random_id" "suffix" {
byte_length = 5
}
# S3 Standard Buckets ----------------------------
# --- Source data files Bucket
resource "aws_s3_bucket" "raw_files_bucket" {
bucket = "raw-files-bucket-${random_id.suffix.hex}"
}
# --- Athena Query Results Bucket
resource "aws_s3_bucket" "athena_results_bucket" {
bucket = "athena-results-bucket-${random_id.suffix.hex}"
}
# S3 Tables --------------------------------------
# --- Table Bucket
resource "aws_s3tables_table_bucket" "blog_test_table_bucket" {
name = "blog-test-table-bucket-${random_id.suffix.hex}"
# Explicitly define the default encryption to satisfy the provider
encryption_configuration = {
sse_algorithm = "AES256"
kms_key_arn = null
}
maintenance_configuration = {
iceberg_snapshot_management = {
status = "enabled"
}
iceberg_unreferenced_file_removal = {
status = "enabled"
settings = {
non_current_days = 7
unreferenced_days = 7
}
}
}
}
# --- Namespace
resource "aws_s3tables_namespace" "blog_test_namespace" {
namespace = "blog_test_namespace"
table_bucket_arn = aws_s3tables_table_bucket.blog_test_table_bucket.arn
}
# Athena ------------------------------------------
# --- Workgroup
resource "aws_athena_workgroup" "blog_test_athena_workgroup" {
name = "blog-test-athena-workgroup"
configuration {
result_configuration {
output_location =
"s3://${aws_s3_bucket.athena_results_bucket.bucket}/"
}
engine_version {
selected_engine_version = "Athena engine version 3"
}
}
}
En la Consola, verás tus recursos en:
- Amazon S3 - Buckets - Buckets de propósito general
- Amazon S3 - Buckets - Buckets de tabla
- Amazon Athena - Administración - Grupos de trabajo
Cargar datos en tablas Iceberg
Fuente de datos
A continuación, generaremos un archivo CSV con datos de pedidos de ejemplo y lo subiremos al bucket S3 de propósito general. Para ello, utilizaremos los siguientes scripts de Python:
generate_raw_data.py
import csv
import random
from datetime import datetime, timedelta
statuses = ["pending_payment", "paid", "partially_paid"]
start_date = datetime(2026, 3, 1)
with open("<your_file_path>", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["order_id", "order_datetime",
"order_client_id", "order_status"])
for i in range(1, 1001):
# Generate random values
order_time = start_date +
timedelta(hours=i, minutes=random.randint(0, 59))
client_id = random.randint(1000, 1100)
status = random.choice(statuses)
writer.writerow(
[i, order_time.strftime("%Y-%m-%d %H:%M:%S"), client_id, status]
)
print("order_data.csv created with 1000 rows.")upload_raw_data.py
import boto3
import os
from botocore.exceptions import ClientError
def upload_csv_to_s3(local_path, bucket_name, s3_key):
"""
Uploads a local CSV file to a general purpose S3 bucket.
Parameters:
- local_path: Path to the file on your computer
(for example: 'data/orders.csv')
- bucket_name: The name of your S3 bucket
- s3_key: The destination path in S3
(for example: 'raw-data/orders.csv')
"""
# Initialize the S3 client
s3_client = boto3.client("s3")
try:
print(f"Uploading {local_path} to s3://{bucket_name}/{s3_key}...")
# Perform the upload
s3_client.upload_file(local_path, bucket_name, s3_key)
print("Upload Successful!")
return True
except FileNotFoundError:
print(f"The file {local_path} was not found.")
except ClientError as e:
print(f"Failed to upload to S3: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
return False
if __name__ == "__main__":
LOCAL_FILE = "<your_file_path>"
BUCKET = "<your_raw_data_bucket_name>"
DESTINATION_KEY = "<your_destination_file_path>"
upload_csv_to_s3(LOCAL_FILE, BUCKET, DESTINATION_KEY)Para cargar el archivo en el bucket de S3, utilizamos la boto3 librería, que aprovecha las credenciales de AWS configuradas en los archivos aws del usuario.
En la Consola, verá sus recursos en:
- Amazon S3 - Buckets - Buckets de propósito general - <your_raw_data_bucket_name>
Ingesta de datos en tablas S3
Para leer los datos CSV del bucket de propósito general, utilizaremos pandas y pyarrow librerías de Python. Y, como mencionamos, para cargar los datos en una tabla Iceberg, utilizaremos la PyIceberg librería de Python, y nos conectaremos directamente a las tablas S3 utilizando el endpoint REST de Iceberg de tablas de Amazon S3.
En nuestro script de Python, incluimos el código para crear opcionalmente la tabla Iceberg, utilizando el mismo endpoint de Iceberg.
upload_iceberg_data.py
import boto3
import pandas as pd
import pyarrow as pa
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, IntegerType, StringType, TimestampType
# --- CONFIGURATION ---
REGION = "<YOUR_AWS_REGION>"
TABLE_BUCKET_NAME = "<your_table_bucket_name>"
NAMESPACE = "blog_test_namespace"
TABLE_NAME = "orders"
SOURCE_CSV_S3_PATH =
"s3://<your_raw_data_bucket_name>/<your_destination_file_path>"
ORDERS_ICEBERG_SCHEMA = Schema(
NestedField(1, "order_id", IntegerType()),
NestedField(2, "order_datetime", TimestampType()),
NestedField(3, "order_client_id", IntegerType()),
NestedField(4, "order_status", StringType()),
)
ORDERS_CSV_SCHEMA = {
"order_id": "Int32",
"order_client_id": "Int32",
"order_status": "string",
}
ORDERS_CSV_TIMESTAMPS = ["order_datetime"]
# --- ------------- ---
def get_s3_tables_catalog():
"""Initializes the Iceberg REST catalog for S3 Tables"""
print("Getting Catalog ...")
# Dynamically grab credentials from your local AWS session
session = boto3.Session()
creds = session.get_credentials().get_frozen_credentials()
try:
catalog = load_catalog(
"s3tablescatalog",
**{
"type": "rest",
"uri": f"https://s3tables.{REGION}.amazonaws.com/iceberg",
"warehouse": "<your_table_bucket_arn>",
"rest.sigv4-enabled": "true",
"rest.signing-name": "s3tables",
"rest.signing-region": REGION,
# Explicitly pass the credentials, including the TOKEN
"s3.access-key-id": creds.access_key,
"s3.secret-access-key": creds.secret_key,
"s3.session-token": creds.token,
},
)
except Exception as e:
raise Exception(f"Failed to load catalog: {e}")
print("Catalog retrieved")
return catalog
def load_data_to_iceberg(create_table=False, operation="append"):
# 1. Initialize Catalog
catalog = get_s3_tables_catalog()
table_identifier = f"{NAMESPACE}.{TABLE_NAME}"
table = None
# 2. Obtain or Create table
if create_table:
print(f"Creating table {table_identifier}...")
schema = ORDERS_ICEBERG_SCHEMA
table = catalog.create_table(table_identifier, schema=schema)
print(f"Table {table_identifier} created.")
else:
try:
table = catalog.load_table(table_identifier)
print(f"Table {table_identifier} found.")
except Exception as e:
print(f"Exception while obtaining the table: {e}")
# 3. Read CSV from S3 into a Pandas/Arrow Table
print(f"Reading source data from {SOURCE_CSV_S3_PATH}...")
df = pd.read_csv(
SOURCE_CSV_S3_PATH,
dtype=ORDERS_CSV_SCHEMA,
parse_dates=ORDERS_CSV_TIMESTAMPS
)
arrow_table = pa.Table.from_pandas(df)
# 4. Append or Overwrite Data
if operation == "append":
print(f"Appending {len(df)} rows to {table_identifier}...")
table.append(arrow_table)
elif operation == "overwrite":
print(f"Overwriting {len(df)} rows to {table_identifier}...")
table.overwrite(arrow_table)
print("Done! Data is now live in the Lakehouse.")
if __name__ == "__main__":
load_data_to_iceberg(create_table=False, operation="overwrite")
En la Consola, verá sus recursos en:
- Amazon S3 - Buckets - Buckets de tablas - <your_table_bucket_name>
Consumir datos de tablas Iceberg
Integración con servicios de análisis de AWS
Para leer los datos de Iceberg usando Athena, necesitamos Habilitar la integración con servicios de análisis de AWS para los buckets de tablas en su cuenta. Puede hacerlo desde la Consola en:
- Amazon S3 - Buckets - Buckets de tablas
Después de esto, también podrá ver la tabla Iceberg en el Catálogo de datos de Glue. Puede verificarlo desde la Consola en:
- AWS Glue - Catálogos de datos - Catálogo
Allí, verá un Catálogo Federado llamado s3tablescatalog, con Origen = Tablas S3.
Leer los datos en Tablas S3
Después de integrar con Glue, puede usar la Consola para consultar su tabla Iceberg:
- Vaya a Amazon Athena - Editor de consultas.
- En el panel Datos, seleccione:
- Origen de datos: AwsDataCatalog
- Catálogo: s3tablescatalog/<your_table_bucket_name>
- Base de datos: "blog_test_namespace" o <your_s3tables_namespace_name>
- En el menú desplegable de Grupo de trabajo, arriba a la derecha, selecciona tu grupo de trabajo, aprovisionado al principio del ejemplo.
%2010.11.26%E2%80%AFa.%C2%A0m..png)
¡Ahora ya puedes explorar tus datos usando Athena!





.png)