NoSQL Databases 4 Data Science
AWS Lake Formation. De los productos más chonchos de AWS.
⚠️Lamentablemente, este producto no está disponible en AWS Academy⚠️
Qué nos da AWS Lake Formation?
bronze
para raw data, osea, la que se ingiere desde nuestras fuentes transaccionalessilver
para data procesada y limpiadagold
para agregados y sumarizados para presentar directo a herramientas de BIbronze
silver
silver
, agregarlos, formar cubos de info, y guardarlos en la zona/directorio gold
Pasos para crear el Lake:
Como este producto no lo tenemos con nuestro usr de AWS Academy, vamos a tener que crear un nuevo usr con su cuenta de correo.
Este usuario va a ser el root, pero esto no es suficiente. El usr root no puede ser el administrador del data lake. Tenemos que crear otro usuario, al cual llamaremos datalakeadmin
y le asignaremos rol de administrador.**
⚠️Obviamente en producción no podemos ser tan laxos, pero para propósitos didácticos, vamos a ser permisivos.⚠️
Vamos a hacer lo mismo con los siguientes permisos:
Y vamos a agregar 2 in-line policies:
Y pegar los siguientes policies, por separado, sin olvidar de reemplazar <account-id>
por el número que sacamos en el paso 0.4 más abajo.
LakeFormationSLR
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iam:CreateServiceLinkedRole",
"Resource": "*",
"Condition": {
"StringEquals": {
"iam:AWSServiceName": "lakeformation.amazonaws.com"
}
}
},
{
"Effect": "Allow",
"Action": [
"iam:PutRolePolicy"
],
"Resource": "arn:aws:iam::<account-id>:role/aws-service-role/lakeformation.amazonaws.com/AWSServiceRoleForLakeFormationDataAccess"
}
]
}
UserPassRole
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "PassRolePermissions",
"Effect": "Allow",
"Action": [
"iam:PassRole"
],
"Resource": [
"arn:aws:iam::<account-id>:role/LakeFormationWorkflowRole"
]
}
]
}
Y asignar al usuario que acabamos de crear como administrador
Vamos a hacer logout y vamos a volver a entrar a la consola de AWS con este usuario recién creado.
Para volver a hacer login con este usuario no root y que está asociado a nuestra cuenta, debemos de fijarnos bien en nuestro account id. Lo podemos ver acá:
Una vez que lo tengamos guardado, entonces podemos hacer logout y entrar con la nueva cuenta:
Aquí es donde vamos a poner nuestro account id:
Posiblemente les pida cambiar el password. Cámbienlo.
Lake Formation va a preguntarles que asignen un administrador y si quieren ser uds (osea, su usuario). Obvio acepten los defaults.
Ya estamos adentro de la consola de Lake Formation. Ahora debemos configurar más perimsos para nuestro usuario nuevo:
Ahora vamos a crear un bucket de S3 que servirá como data lake.
S3 (Simple Storage Service) es el servicio de almacenamiento genérico de AWS. Podemos meter lo que sea ahí.
La unidad mínima de S3 es el bucket.
Tenemos que crear 1 bucket con 3 áreas (como directorios):
⚠️OJO! El nombre del bucket debe ser único A LO LARGO DE AWS⚠️
De *TODO AWS.
En cuanto a la región, puede ser donde uds quieran.
bronze
, silver
y gold
**Lo mismo para silver
y gold
.
Dentro de bronze
, crear directorio ingest
.
Dentro de silver
, crear directorio output
.
Con esto terminamos hasta este momento con S3. Vamos ahora a simular una BD transaccional.
Para simular estos movimientos transaccionales, vamos a crear una tabla y disparar un evento cada X tiempo para que se inserte 1 nuevo registro en ella.
Vamos a configurar los siguientes componentes en AWS:
pg_hba.conf
debe ser host all all trust
en lugar de host all all md5
.CREATE TABLE random_data (
id serial4 NOT NULL,
valor text NULL,
fecha timestamp NULL,
CONSTRAINT random_data_pkey PRIMARY KEY (id)
);
Pueden descargar el código fuente de la lambda de mi repo: https://github.com/xuxoramos/lambda-transactionaldb-insert
Al descargar este repo, deben:
/db.ini
y capturar sus credenciales de su instalación de PostgreSQLChequen su tabla, debe haber un registro cada 2 mins:
Antes de regresar a Lake Formation, debemos crear un endpoint de nuestra VPC de AWS.
Cada vez que nosotros creamos recursos en AWS, se crea una Virtual Private Cloud, que es una estructura de networking dentr de la cual cae todo lo que creamos.
Sin embargo, hay recursos de AWS, sobre todo los servicios administrados (i.e. los que son plataformas más que infraestructura, como RDS en lugar de EC2 + PostgreSQL o DocumentDB en lugar de EC2 + MongoDB) que no tienen VPC.
AWS Lake Formation es un servicio adminsitrado, y va a ingerir datos desde un PostgreSQL en una EC2. EC2 está en una VPC, por lo que para tener acceso a ella, debemos de crear un VPC Endpoint para que Lake Formation pueda a través de él llegar a nuestro PostgreSQL en la EC2.
Y listo!
En AWS, como en todas las nubes, todo, absolutamente todo se corre con un usuario o rol asignado.
En AWS hay Service Roles que son un conjunto de permisos con los que se ejeuta un servicio.
LakeFormationWorkflowRole
es el rol con el que se van a corren los procesos de Glue, que es la herramienta de ETLs de AWS y con la cual vamos a “ingerir” datos desde nuestro PostgreSQL en EC2 para copiarlos a nuestro Data Lake.
Necesitamos darle permiso a este rol nuevo de que acceda sin restricciones a S3, y que haga las veces de administrador.
⚠️OJO! Esto obviamente en un setting productivo no es recomendable, pero para propósitos educativos lo vamos a hacer.⚠️
Para hacer esto necesitamos crear un rol:
Finalmente, agregamos el siguiente inline policy:
Y pegar lo siguiente.
⚠️OJO! reemplazar la parte de <account_id>
por el account id que obtuvieron en el paso 0.4
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lakeformation:GetDataAccess",
"lakeformation:GrantPermissions"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": ["iam:PassRole"],
"Resource": [
"arn:aws:iam::<account-id>:role/LakeFormationWorkflowRole"
]
}
]
}
Listo, ahora si regresamos a LakeFormation!
⚠️OJO! Fíjense que siempre estén usando la región donde estamos agregando toda la infra. En este caso es us-east-2
, u “Ohio”.⚠️
Tenemos ahora una BD vacía.
Con qué la vamos a llenar?
Vamos a dejar que Lake Formation vaya a nuestra BD transaccional de juguete en PostgreSQL, la examine, saque sus datos y sus metadatos, y los meta al data lake:
Recordemos que AWS Glue es la herramienta de AWS para hacer ETLs, es decir, los procesos que pasan datos de un lugar a otro.
En la siguiente pantalla vamos a capturar la info de nuestra BD en PostgreSQL.
La VPC y la subnet las obtenemos de la máquina EC2 donde vive nuestro PostgreSQL.
⚠️OJO! El URL de nuestra BD debe contener la IP address interna de la EC2, no la Elastic IP que le asignamos!⚠️
Esto se debe a que la Elastic IP es visible hacia el mundo exterior, PERO el AWS Glue y demás servicios ESTÁN EN EL INTERIOR de la red de AWS! Por lo que Glue NUNCA va a alcanzar esa IP pública, por lo que tiene que utilizar la interna!
Y de dónde sacamos la IP interna? De la consola de EC2.
Después de dar click en el botón de Finish, podemos probar la conexión:
Va la explicación:
database/schema/table
, en nuestro caso transactionaldb/public/random_data
Continúa la explicación:
Finalmente, con estos datos terminamos de definir nuestro workflow
incremental
Ya que está creado el workflow, AWS nos preguntará si queremos arrancarlo. Digámos que si.
Cómo podemos ver si está corriendo o qué caramba está haciendo?
Debemos ir a AWS Glue y examinar el “grafo de ejecución” del ETL que acabamos de hacer:
Esperamos unos minutos a que se termine de ejecutar…en este caso, 10 mins es suficiente…
🥳
El error más común es el definir la frecuencia de ejecución de los workflows de tal forma que queden muy cerca una de la siguiente.
Por ejemplo, si definiéramos esta ejecución cada 30 mins tendríamos lo siguiente:
Dejamos corriendo el workflow y nos encontramos con esto:
Vamos a inspeccionar las ejecuciones del workflow:
Examinemos la última ejecución:
Vemos que la fase de crawling tuvo un error.
La fase de crawling es en donde Lake Formation inspecciona nuestro PostgreSQL, la tabla, los registros, y las estructuras para en automático generar los catálogos de metadatos y preparar la extracción de datos de SQL a archivos Parquet como lo especificamos en el paso 5.2.
Si hacemos click en esa fase del grafo de ejecución, podemos ver el error:
Parece que 2 ejecuciones se comenzaron a “pisar las agujetas”.
Esto sucede cuando el espacio entre ejecuciones del workflow no es suficiente para dejar terminar a uno cuando ya está comenzando otro.
Pero esto significa que la ejecución anterior debió haber terminado, no? Veamos.
Entonces seguramente de todas nuestras ejecuciones, tenemos una que si terminó, y otra que no, y así sucesivamente. Esto se debe a que no dejamos tiempo suficiente entre ejecuciones.
Cómo lo corregimos? Vamos a tener que eliminar completamente el blueprint y crear otro con la frecuencia adecuada. Esto es lo más certero que intentar modificar el parámetro en Glue.
Esto se logra repitiendo desde el paso 5.2.
Ya que el blueprint tuvo una ejecución exitosa, vamos a ver el resultado en la zona bronze
de nuestro data lake en S3:
Vemos aquí 2 tablas creadas por Lake Formation:
postgresql
, que no es mas que la descripción de nuestra tabla dentro de Lake Formation, como lo podemos ver cuando le damos click:s3://lakeformation-nosql4ds/bronze/ingest/incremental_transactionaldb_public_random_data/
, que es la materialización de la tabla de PostgreSQL en archivos Parquet.Demos click en la liga Location para ir a S3 :
Como estamos ejecutando un workflow de un blueprint incremental, esto significa que la 1a ejecución del workflow nos traería toda la BD hasta ese punto, y ejecuciones subsecuentes nos traerían al datalake solamente los incrementos o deltas, es decir, los registros creados o presentes desde la última ejecución hasta la siguiente.
Los archivos parquet son columnares binarios, por lo que no serviría de mucho descargar uno y explorar su interior. Más bien debemos de explorar esta data con otra herramienta, pero antes, vamos a arreglar la frecuencia de ejecución de nuestro workflow.
Vamos ahora a examinar la data en nuestro data lake:
Athena es un servicio de AWS que nos permite correrle queries tipo SQL a archivos que estén guardados en S3.
Vamos a ir al home de este servicio:
Vemos que el costo es de $5 USD por terabyte. Este es uno de los servicios más caros de AWS, y con justa razón, imagínense aventarle lo que sea al S3 y poderle tirar queries con SQL normalito! Esta funcionalidad es poderosa.
Este es el home de Athena. Como podemos ver, han pasado 3 cosas interesantes:
AwsDataCatalog
. Este es el catálogo creado automágicamente por Lake Formation cuando ejecutamos el workflow, particularmente en la fase de crawling.AwsDataCatalog
creado por Lake Formation, pues Athena nos la asigna por default. Esta BD transactionaldb-ingest
fue creada por nosotros en el paso 4.2 arribasilver
de nuestro data lake.bronze
silver
.Una vez que tenemos seleccionada la zona silver
, vamos a imaginar la siguiente pregunta.
Qué tan diferente es el promedio de la distancia de Levenshtein de los registros cuyo
value
comienza con ‘D’ contra los registros cuyovalue
comienza con ‘Z’?
Como se los enseñé el semestre pasado, vamos a partir el problema en cachos:
valor
comience con ‘D’lag
para poder comparar 2 campos value
en el mismo registrolevenshtein
a ambos campos value
y ponerlo en una columna leven_dist
silver
avg
a columna leven_dist
.valor
comience con ‘Z’⚠️OJO! Los pasos 5 y 6, estrictamente hablando, deben de realizarse con otra base de datos dentro del lake, otro blueprint, otro crawler y otro workflow enteramente distinto (tecnicamente desde el paso 4.2), porque estamos consumiendo datos procesados de silver
y estamos agregando, y los agregados, ortodoxamente, van en gold
, pero no lo vamos a hacer aquí porque si no se haría laaaaaargo el tutorial.⚠️
El query que nos va a ayudar a resolver todo el merequetengue de arriba es el siguiente. Para más info, ver la documentación de Athena respecto a funciones y mis apuntes del semestre pasado 😠 tanto de window functions como de common table expressions:
with lagged_values_z as
(
SELECT id, valor,
lag(valor,1) OVER (ORDER BY id) AS prev_valor
FROM "catalog__transactionaldb_public_random_data"
where valor like 'Z%'
ORDER BY id
),
lagged_values_d as
(
SELECT id, valor,
lag(valor,1) OVER (ORDER BY id) AS prev_valor
FROM "catalog__transactionaldb_public_random_data"
where valor like 'D%'
ORDER BY id
),
leven_dist_d as
(
select id, valor, prev_valor, 'd' as start_with, levenshtein_distance(valor, prev_valor) as leven_dist_valor
from lagged_values_d
),
leven_dist_z as
(
select id, valor, prev_valor, 'z' as start_with, levenshtein_distance(valor, prev_valor) as leven_dist_valor
from lagged_values_z
),
all_levens as
(
select * from leven_dist_d
union
select * from leven_dist_z
)
select start_with, avg(leven_dist_valor) as avg_leven_dist
from all_levens
group by start_with;
Vamos a ejecutarlo:
Luego vamos a guardarlo…
…como vista
Y vamos a configurar el guardado con las siguientes opciones:
⚠️OJO! el folder output
de la zona silver
en nuestro bucket de S3 no existe y debemos crearlo ANTES de crear la tabla!⚠️
Athena nos va a mostrar un preview de como va a crear la tabla desde nuestro query, y solo damos click en Create table
.
Y listo!
Lo que acabamos de hacer es súuuuuuper poderoso. Recordemos que la BD se compone de archivos parquet. Los archivos parquet tienen una estructura columnar, PERO NO SABEN NADA DEL TIPO DE DATO QUE GUARDAN.
Cómo es posible, entonces, que con select
podamos hacer operaciones numéricas, aritméticas, o de strings sobre estos datos si no tienen un esquema definido?
La respuesta está en una funcionalidad poderosa de los componentes de lectura SQL de los data lakes llamado schema-on-read, esto es, no necesitamos definir la estructura de los datos que vamos a consumir sino hasta que los consumimos, a diferencia de una BD SQL relacional, donde el esquema, es decir, la estructura, está definida desde que estamod diseñando la BD en una diagrama Entidad-Relación.
Ahora si, vamos a visualizar ahora esta tabla con AWS Quicksight!
Primero accedamos a Quicksight.
Y pidamos acceso al servicio.
Muy vivillos, los de AWS nos quieren enjaretar una suscripción Enterprise, y no conformes con eso, están subrepresentando el botón para las suscripciones estándar usando un viejo truco de UX. No vamos a caer en su trampa y vamos a darle click en Standard arriba a la derecha:
Vamos ahora a configurar Quicksight con las siguientes opciones.
Más abajo debemos configurar a qué tendrá acceso Quicksight. Lo más importante es que tenga acceso al bucket de S3 donde tenemos nuestro Data Lake.
Una vez que nuestra cuenta de Qucksight esté lista, y nos brinquemos el tutorial, vamos a tener unas visualizaciones y datasets pre-hechos como ejemplo.
Vamos a dar click en Datasets, luego en New Dataset.
Vemos que tenemos muchísimas opciones para conectarnos. Vamos a dar click en Athena:
Y nos va a pedir que nombremos el data source. De dónde sale ese [primary]?
De acá:
Vamos ahora a seleccionar la tabla que acabamos de crear con los resultados del query de la sección 6:
Vamos a setear estas opciones. Es importante mencionar que nos conviene dejar la conf de SPICE porque nos va a ayudar a refrescar la visualización que vamos a crear en caso de que la data cambie.
Y listo. Quicksight nos va a seleccionar la mejor visualización para nuestra gráfica:
😠 TANTO PARA UNA GRÁFICA DE 2 BARRAS?!?!?! 😠
Tengan en cuenta que esto es un ejemplo de juguete. En un setting empresarial van a tener cientos de tablas, decenas de gráficas, y veintenas de queries y analíticos, lo que justifica el uso del data lake. Lo más importante es que una vez que terminamos todo este flujo, ya se queda forever, y entonces habremos construido un pipeline que va desde datos crudos hasta datos refinados.
Los data lakes son caros. En AWS lo más caro es el servicio de Glue (ETL), sobre todo porque cataloga, importa y organiza la info automática y periódicamente. Esto reemplaza a un grupo pequeño de ingenieros de datos programando queries y ETL jobs sin ningún problema.
Solo por esta demo, AWS me mandó esta factura:
En qué gastamos $270 USD?
Como podemos ver, menos de $10 USD por la maquinita donde tenemos nuestro PostgreSQL.
⚠️Y $260 USD por los jobs de Glue para extraer de nuestra BD transaccional, catalogar, convertir a Parquet, grabar en S3, y formar una BD con esos archivos!⚠️