Técnico

Tablas de Amazon S3: Ejemplo de configuración rápida

Share

En esta publicación, proporcionamos un ejemplo de código sencillo para que empiece a explorar las capacidades de las tablas S3 en su propia cuenta de AWS. En este ejemplo, haremos lo siguiente:

  • Cargar datos en una tabla Iceberg de S3 aprovechando el Amazon S3 Tables Iceberg REST endpoint, a través de la biblioteca Python PyIceberg.
  • Y luego consultarlos utilizando Amazon Athena, aprovechando la AWS Glue Data Catalog integration.

Si desea conocer los conceptos básicos de las tablas S3, profundizar en sus características y casos de uso, lea nuestra publicación anterior de esta serie: Amazon S3 Tables: El futuro de los Lakehouses de AWS.

Aprovisionar recursos de infraestructura

El primer paso es aprovisionar y configurar los recursos necesarios en nuestra cuenta de AWS. Nos conectaremos a la cuenta localmente, utilizando la CLI de AWS, y gestionaremos nuestros recursos como infraestructura como código, utilizando Terraform.

Acceder a su cuenta de AWS localmente

Una forma recomendada de usar credenciales temporales para conectarse a su cuenta de AWS desde su equipo local es usar la CLI de AWS y actualizar los archivos de configuración y credenciales en la carpeta .aws de sus usuarios. Usando su perfil como predeterminado, sus archivos deberían verse así:

~/.aws/config:
[default]
region = <YOUR_AWS_REGION>
output = json
~/.aws/credentials:
[default]
aws_access_key_id=<YOUR_AWS_ACCESS_KEY_ID>
aws_secret_access_key=<YOUR_AWS_SECRET_ACCESS_KEY>
aws_session_token=<YOUR_AWS_SESSION_TOKEN>

Configurar sus recursos

Los recursos que crearemos son:

  • Dos Buckets S3 de propósito general, uno para guardar el archivo CSV de origen y otro para usar como ubicación de los resultados de las consultas de Athena.
  • Un bucket de tabla S3, para guardar nuestra tabla Iceberg, y un espacio de nombres
  • Un grupo de trabajo de Athena, para ejecutar las consultas.

En este ejemplo, aprovisionaremos nuestros recursos usando Terraform, pero puedes usar la Consola de AWS o la CLI en su lugar si lo prefieres. Nuestros archivos de configuración son los siguientes:

terraform.tf:
# Terraform Configuration ----------------------- 

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.92"
    }

    random = {
      source  = "hashicorp/random"
      version = "~> 3.0"
    }
  }

  required_version = ">= 1.2"
}

# Cloud Providers Info --------------------------

provider "aws" {
  region = "us-east-1"
}

provider "random" {

}
resources.tf:
data "aws_caller_identity" "current" {
  
}

resource "random_id" "suffix" {
  byte_length = 5
}

# S3 Standard Buckets ----------------------------

# --- Source data files Bucket

resource "aws_s3_bucket" "raw_files_bucket" {
  bucket = "raw-files-bucket-${random_id.suffix.hex}"
}

# --- Athena Query Results Bucket

resource "aws_s3_bucket" "athena_results_bucket" {
  bucket = "athena-results-bucket-${random_id.suffix.hex}"
}

# S3 Tables --------------------------------------

# --- Table Bucket

resource "aws_s3tables_table_bucket" "blog_test_table_bucket" {
  name = "blog-test-table-bucket-${random_id.suffix.hex}"

  # Explicitly define the default encryption to satisfy the provider
  encryption_configuration = {
    sse_algorithm = "AES256"
    kms_key_arn   = null
  }

  maintenance_configuration = {
    iceberg_snapshot_management = {
      status = "enabled"
    }
    iceberg_unreferenced_file_removal = {
      status = "enabled"
      settings = {
        non_current_days  = 7
        unreferenced_days = 7
      }
    }
  }
}

# --- Namespace

resource "aws_s3tables_namespace" "blog_test_namespace" {
  namespace        = "blog_test_namespace"
  table_bucket_arn = aws_s3tables_table_bucket.blog_test_table_bucket.arn
}


# Athena ------------------------------------------

# --- Workgroup

resource "aws_athena_workgroup" "blog_test_athena_workgroup" {
  name = "blog-test-athena-workgroup"

  configuration {
    result_configuration {
      output_location = 
	"s3://${aws_s3_bucket.athena_results_bucket.bucket}/"
    }

    engine_version {
      selected_engine_version = "Athena engine version 3"
    }
  }
}

En la Consola, verás tus recursos en:

  • Amazon S3 - Buckets - Buckets de propósito general
  • Amazon S3 - Buckets - Buckets de tabla
  • Amazon Athena - Administración - Grupos de trabajo

Cargar datos en tablas Iceberg

Fuente de datos

A continuación, generaremos un archivo CSV con datos de pedidos de ejemplo y lo subiremos al bucket S3 de propósito general. Para ello, utilizaremos los siguientes scripts de Python:

generate_raw_data.py
import csv
import random
from datetime import datetime, timedelta

statuses = ["pending_payment", "paid", "partially_paid"]
start_date = datetime(2026, 3, 1)

with open("<your_file_path>", "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["order_id", "order_datetime", 
	              "order_client_id", "order_status"])

    for i in range(1, 1001):
        # Generate random values
        order_time = start_date + 
			 timedelta(hours=i, minutes=random.randint(0, 59))
        client_id = random.randint(1000, 1100)
        status = random.choice(statuses)

        writer.writerow(
            [i, order_time.strftime("%Y-%m-%d %H:%M:%S"), client_id, status]
        )

print("order_data.csv created with 1000 rows.")
upload_raw_data.py
import boto3
import os
from botocore.exceptions import ClientError

def upload_csv_to_s3(local_path, bucket_name, s3_key):
    """
    Uploads a local CSV file to a general purpose S3 bucket. 
    Parameters:
     - local_path: Path to the file on your computer 
(for example: 'data/orders.csv')
     - bucket_name: The name of your S3 bucket
     - s3_key: The destination path in S3
      (for example:  'raw-data/orders.csv')
    """
    # Initialize the S3 client
    s3_client = boto3.client("s3")

    try:
        print(f"Uploading {local_path} to s3://{bucket_name}/{s3_key}...")
        # Perform the upload
        s3_client.upload_file(local_path, bucket_name, s3_key)
        print("Upload Successful!")
        return True

    except FileNotFoundError:
        print(f"The file {local_path} was not found.")
    except ClientError as e:
        print(f"Failed to upload to S3: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

    return False


if __name__ == "__main__":
    LOCAL_FILE = "<your_file_path>"
    BUCKET = "<your_raw_data_bucket_name>"
    DESTINATION_KEY = "<your_destination_file_path>"

    upload_csv_to_s3(LOCAL_FILE, BUCKET, DESTINATION_KEY)

Para cargar el archivo en el bucket de S3, utilizamos la boto3 librería, que aprovecha las credenciales de AWS configuradas en los archivos aws del usuario.

En la Consola, verá sus recursos en:

  • Amazon S3 - Buckets - Buckets de propósito general - <your_raw_data_bucket_name>

Ingesta de datos en tablas S3

Para leer los datos CSV del bucket de propósito general, utilizaremos pandas y pyarrow librerías de Python. Y, como mencionamos, para cargar los datos en una tabla Iceberg, utilizaremos la PyIceberg librería de Python, y nos conectaremos directamente a las tablas S3 utilizando el endpoint REST de Iceberg de tablas de Amazon S3.

En nuestro script de Python, incluimos el código para crear opcionalmente la tabla Iceberg, utilizando el mismo endpoint de Iceberg.

upload_iceberg_data.py
import boto3
import pandas as pd
import pyarrow as pa
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, IntegerType, StringType, TimestampType

# --- CONFIGURATION ---
REGION = "<YOUR_AWS_REGION>"
TABLE_BUCKET_NAME = "<your_table_bucket_name>" 
NAMESPACE = "blog_test_namespace"
TABLE_NAME = "orders"
SOURCE_CSV_S3_PATH = 
"s3://<your_raw_data_bucket_name>/<your_destination_file_path>"

ORDERS_ICEBERG_SCHEMA = Schema(
    NestedField(1, "order_id", IntegerType()),
    NestedField(2, "order_datetime", TimestampType()),
    NestedField(3, "order_client_id", IntegerType()),
    NestedField(4, "order_status", StringType()),
)

ORDERS_CSV_SCHEMA = {
    "order_id": "Int32",
    "order_client_id": "Int32",
    "order_status": "string",
}
ORDERS_CSV_TIMESTAMPS = ["order_datetime"]
# --- ------------- ---

def get_s3_tables_catalog():
    """Initializes the Iceberg REST catalog for S3 Tables"""
    print("Getting Catalog ...")

    # Dynamically grab credentials from your local AWS session
    session = boto3.Session()
    creds = session.get_credentials().get_frozen_credentials()

    try:
        catalog = load_catalog(
            "s3tablescatalog",
            **{
                "type": "rest",
                "uri": f"https://s3tables.{REGION}.amazonaws.com/iceberg",
                "warehouse": "<your_table_bucket_arn>",
                "rest.sigv4-enabled": "true",
                "rest.signing-name": "s3tables",
                "rest.signing-region": REGION,
                # Explicitly pass the credentials, including the TOKEN
                "s3.access-key-id": creds.access_key,
                "s3.secret-access-key": creds.secret_key,
                "s3.session-token": creds.token,
            },
        )
    except Exception as e:
        raise Exception(f"Failed to load catalog: {e}")

    print("Catalog retrieved")
    return catalog


def load_data_to_iceberg(create_table=False, operation="append"):

    # 1. Initialize Catalog
    catalog = get_s3_tables_catalog()
    table_identifier = f"{NAMESPACE}.{TABLE_NAME}"
    table = None

    # 2. Obtain or Create table
    if create_table:
        print(f"Creating table {table_identifier}...")
        schema = ORDERS_ICEBERG_SCHEMA
        table = catalog.create_table(table_identifier, schema=schema)
        print(f"Table {table_identifier} created.")
    else:
        try:
            table = catalog.load_table(table_identifier)
            print(f"Table {table_identifier} found.")
        except Exception as e:
            print(f"Exception while obtaining the table: {e}")

    # 3. Read CSV from S3 into a Pandas/Arrow Table
    print(f"Reading source data from {SOURCE_CSV_S3_PATH}...")

    df = pd.read_csv(
        	SOURCE_CSV_S3_PATH, 
		dtype=ORDERS_CSV_SCHEMA, 
		parse_dates=ORDERS_CSV_TIMESTAMPS
    )
    arrow_table = pa.Table.from_pandas(df)

    # 4. Append or Overwrite Data
    if operation == "append":
        print(f"Appending {len(df)} rows to {table_identifier}...")
        table.append(arrow_table)
    elif operation == "overwrite":
        print(f"Overwriting {len(df)} rows to {table_identifier}...")
        table.overwrite(arrow_table)
    print("Done! Data is now live in the Lakehouse.")


if __name__ == "__main__":
    load_data_to_iceberg(create_table=False, operation="overwrite")

En la Consola, verá sus recursos en:

  • Amazon S3 - Buckets - Buckets de tablas - <your_table_bucket_name>

Consumir datos de tablas Iceberg

Integración con servicios de análisis de AWS

Para leer los datos de Iceberg usando Athena, necesitamos Habilitar la integración con servicios de análisis de AWS para los buckets de tablas en su cuenta. Puede hacerlo desde la Consola en:

  • Amazon S3 - Buckets - Buckets de tablas

Después de esto, también podrá ver la tabla Iceberg en el Catálogo de datos de Glue. Puede verificarlo desde la Consola en:

  • AWS Glue - Catálogos de datos - Catálogo

Allí, verá un Catálogo Federado llamado s3tablescatalog, con Origen = Tablas S3.

Leer los datos en Tablas S3

Después de integrar con Glue, puede usar la Consola para consultar su tabla Iceberg:

  • Vaya a Amazon Athena - Editor de consultas.
  • En el panel Datos, seleccione: 
    • Origen de datos: AwsDataCatalog
    • Catálogo: s3tablescatalog/<your_table_bucket_name> 
    • Base de datos: "blog_test_namespace" o <your_s3tables_namespace_name>
  • En el menú desplegable de Grupo de trabajo, arriba a la derecha, selecciona tu grupo de trabajo, aprovisionado al principio del ejemplo.

¡Ahora ya puedes explorar tus datos usando Athena!

keep exploring

News, Insights & Impact

View all
View all

Cada viaje de IA comienza con una conversación