¿Cómo soportar Temporal como motor de workflow para nuestros procesos?
Publicado el 06-10-2022 por KuFlow Team
Credits: KuFlow
En un mundo tan diverso, sería aburrido tener una única forma de hacer las cosas. Por eso en KuFlow soportamos diferentes formas de implementar la lógica de nuestros procesos y tareas. Y en este post hablaremos sobre una de ellas, la orquestación mediante Temporal, que nos otorga una forma poderosa de manejar nuestros flujos de trabajo.
No vamos a entrar en detalle en los conceptos que se manejan en nuestra plataforma, para ello puede consultar la documentación u otras entradas del blog, pero a modo de idea general podemos quedarnos con lo siguientes: KuFlow es una plataforma que permite modelar procesos de negocio que involucran una serie de tareas para llevar a cabo su consecución. Estas tareas pueden ser realizadas por humanos o por sistemas y básicamente son actividades de ejecución de acciones y/o recolección de información. Además necesitamos establecer una forma de cómo se controla la ejecución de estas tareas, su orden, la naturaleza asíncrona o no de las mismas, o la capacidad de recuperación ante errores. En otras palabras, necesitamos un workflow que orqueste nuestro proceso. En KuFlow llamamos WorkFlow Engine a los distintos tipos de implementaciones que soportamos. En el caso que nos ocupa, Temporal como Workflow Engine.
¿Por qué utilizar Temporal para tus procesos en KuFlow?
El uso de Temporal nos otorga varios beneficios, el principal la capacidad de implementar de una forma cómoda y confiable la resiliencia de nuestras operaciones. Por ejemplo, imaginemos que una tarea de uno de nuestros procesos, realiza una llamada a un API de terceros. Si este API no está disponible en el momento de su llamada, la tarea falla y nuestro proceso podría quedar bloqueado. En este caso nos gustaría que el workflow de nuestro proceso se volviera a intentar posteriormente y tener que olvidarnos de todo el andamiaje necesario para ello. Además si ya hemos emitido peticiones de modificación a otras APIs externas, nos gustaría que al recuperar el proceso, el consumo de estas APIs fuera determinista y que no fallara porque las hemos consumido dos veces. Todo esto lo conseguimos implementando nuestros workflows con Temporal.
¿Cómo se integra Temporal en KuFlow?
Con el fin de simplificar lo más posible la infraestructura de nuestros clientes, en KuFlow disponemos de nuestra propia nube de Temporal evitando la necesidad de desplegar esta infraestructura por parte de los clientes. El disponer de una nube de Temporal no es nuestro negocio principal y puede que en un futuro ofrezcamos también la posibilidad de usar otras nubes como Temporal Cloud o cualquier otra.
Para implementar un Workflow en un proceso utilizando Temporal, necesitamos desarrollar lo que en KuFlow denominamos Worker. Un worker no es más que una aplicación que se ejecuta en los servidores de nuestros clientes y que interactúa con el API de KuFlow y el API de Temporal disponible en nuestra nube. Este enfoque permite una arquitectura de nube híbrida que proporciona flexibilidad a la hora de acceder a los distintos servicios que se deseen consumir en el Worker. Así por ejemplo, un Worker desplegado en los servidores del cliente tendría acceso a servicios privados del cliente de forma segura y confiable para poder extraer datos y efectuar decisiones en base a sus necesidades.
Para facilitar el desarrollo de este Worker, se ofrece una librería que facilita el consumo del API de KuFlow a través de Actividades de Temporal. Junto con las distintos SDK que provee Temporal, la construcción de un Worker es en la mayoría de los casos, un juego de niños. Los distintos lenguajes para los que se ofrecen estas librerías se actualizan frecuentemente.
Seguridad
Uno de los aspectos más importantes para nosotros es todo lo referente a la seguridad. Y por ello, aparte de un potente sistema RBAC que se despliega sobre los recursos del cliente, utilizamos una serie de mecanismos de autenticación para acceder a las distintas APIs que ofrecemos. Para facilitar la gestión de acceso y credenciales a estas APIs, en KuFlow se define el concepto ‘Aplicación’. Por definición, una ‘Aplicación’ se compone de credenciales y certificados de acceso. A su vez, estas aplicaciones se utilizan como candidatos o “principals” para otorgar roles y permisos a los recursos que desee.
¿Cómo se consigue la autenticación y autorización con el API de Temporal?
De forma esquemática, para autenticarnos utilizaremos Mutual TLS y para conseguir autorización utilizaremos un token portador JWT. La obtención de los certificados y las credenciales se realiza desde la App de KuFlow de forma sencilla, sin necesidad de crear y subir los certificados de forma manual. Armados con estos datos ya podemos conectar nuestro worker a la nube de KuFlow. El mecanismo de autenticación sigue el siguiente flujo: Por un lado se necesita obtener un token portador JWT que nos permitirá autorización en Temporal, para ello se emite una petición al API Rest de KuFlow (autenticada por BasicAuth) que nos devuelve el token. Con este token se realizan las comunicaciones hacia el API GRPC de Temporal además de utilizar los certificados para encriptar la comunicación y lograr la autenticación con este API.
Para más información consulte la documentación.
Para lidiar con problemas como la renovación del token portador y que la experiencia del desarrollador sea lo más cómoda posible, este mecanismo está implementado en las librerías que proveemos como soporte para los Worker. Un ejemplo de la renegociación del token puede verse en este código extraído de la librería kuflow-engine-client en Java.
public class KuFlowAuthorizationTokenSupplier implements AuthorizationTokenSupplier {
private static final Logger LOGGER = LoggerFactory.getLogger(KuFlowAuthorizationTokenSupplier.class);
private static final double EXPIRE_PERCENTAGE = 0.1;
private static final Duration EXPIRE_MAX_DURATION = Duration.ofMinutes(10);
private final AuthenticationApi authenticationApi;
private volatile String token;
private volatile Instant tokenExpireAt;
public KuFlowAuthorizationTokenSupplier(AuthenticationApi authenticationApi) {
this.authenticationApi = authenticationApi;
}
@Override
public String supply() {
String token = this.requestToken();
return "Bearer " + token;
}
private String requestToken() {
String token = this.token;
Instant tokenExpireAt = this.tokenExpireAt;
if (isTokenNonExpired(token, tokenExpireAt)) {
return token;
}
synchronized (this) {
token = this.token;
tokenExpireAt = this.tokenExpireAt;
if (isTokenNonExpired(token, tokenExpireAt)) {
return token;
}
AuthenticationResource authentication = new AuthenticationResource();
authentication.setType(AuthenticationTypeResource.ENGINE);
authentication = this.authenticationApi.createAuthentication(authentication);
Duration expireDuration = Duration.between(Instant.now(), authentication.getExpiredAt());
expireDuration = Duration.ofSeconds((long) (expireDuration.getSeconds() * EXPIRE_PERCENTAGE));
if (expireDuration.compareTo(EXPIRE_MAX_DURATION) > 0) {
expireDuration = EXPIRE_MAX_DURATION;
}
this.token = token = authentication.getToken();
this.tokenExpireAt = tokenExpireAt = Instant.now().plus(expireDuration);
LOGGER.debug("Regenerated JWT Temporal authorization token. Expired at: {}", tokenExpireAt);
return token;
}
}
private static boolean isTokenNonExpired(String token, Instant tokenExpireAt) {
return token != null && tokenExpireAt != null && Instant.now().isBefore(tokenExpireAt);
}
}
Lo que hemos hecho aquí es implementar la interfaz AuthorizationTokenSupplier que provee el SDK de Temporal para que utilizando nuestra librería cliente del API de KuFlow negocie un token cuando sea necesario. De igual forma implementamos la interfaz GrpcMetadataProvider con el propósito de añadir como metadato el token que nos provee la implementación anterior:
public class AuthorizationGrpcMetadataProvider implements GrpcMetadataProvider {
public static final Metadata.Key<String> AUTHORIZATION_HEADER_KEY =
Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
private final AuthorizationTokenSupplier authorizationTokenSupplier;
public AuthorizationGrpcMetadataProvider(AuthorizationTokenSupplier authorizationTokenSupplier) {
this.authorizationTokenSupplier = authorizationTokenSupplier;
}
@Override
public Metadata getMetadata() {
Metadata metadata = new Metadata();
metadata.put(AUTHORIZATION_HEADER_KEY, authorizationTokenSupplier.supply());
return metadata;
}
}
Esta clase proveedora de metadatos para la conexión GRPC hacia Temporal, será la que utilizaremos posteriormente cuando configuremos nuestro WorkflowServiceStub en el Worker.
WorkflowServiceStubsOptions.newBuilder().addGrpcMetadataProvider(
new AuthorizationGrpcMetadataProvider(new KuFlowAuthorizationTokenSupplier(this.authenticationApi))
);
Cómo último paso para conectarnos satisfactoriamente con Temporal necesitamos crear un contexto SSL que nos servirá en la negociación con MutualTLS. Para ello debemos extender la clase SslContext del SDK. La implementación depende de sus necesidades pero puede encontrarse un ejemplo de ello en el siguiente fragmento de código disponible en nuestros repositorios de ejemplo en GitHub.
private SslContext createSslContext() {
MutualTlsProperties mutualTls = this.applicationProperties.getTemporal().getMutualTls();
if (StringUtils.isBlank(mutualTls.getCert()) && StringUtils.isBlank(mutualTls.getCertData())) {
return null;
}
if (
StringUtils.isNotBlank(mutualTls.getCert()) &&
(StringUtils.isBlank(mutualTls.getKey()) || StringUtils.isBlank(mutualTls.getCa()))
) {
throw new KuFlowEngineClientException("key and ca are required");
}
if (
StringUtils.isNotBlank(mutualTls.getCertData()) &&
(StringUtils.isBlank(mutualTls.getKeyData()) || StringUtils.isBlank(mutualTls.getCaData()))
) {
throw new KuFlowEngineClientException("keyData or caData are required");
}
try (
InputStream certInputStream = this.openInputStream(mutualTls.getCert(), mutualTls.getCertData());
InputStream keyInputStream = this.openInputStream(mutualTls.getKey(), mutualTls.getKeyData());
InputStream caInputStream = this.openInputStream(mutualTls.getCa(), mutualTls.getCaData())
) {
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
trustStore.load(null, null);
X509Certificate certificate = (X509Certificate) CertificateFactory.getInstance("X509").generateCertificate(caInputStream);
trustStore.setCertificateEntry("temporal-ca", certificate);
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(trustStore);
TrustManager trustManager = trustManagerFactory.getTrustManagers()[0];
return SimpleSslContextBuilder.forPKCS8(certInputStream, keyInputStream).setTrustManager(trustManager).build();
} catch (KeyStoreException | NoSuchAlgorithmException | CertificateException | IOException e) {
throw new KuFlowEngineClientException("Unable to configure mTLS", e);
}
}
Finalmente, al igual que el proveedor de metadatos GRPC, añadimos este contexto SSL a las opciones de nuestro WorkflowServiceStubs.
public WorkflowServiceStubs workflowServiceStubs() {
Builder builder = WorkflowServiceStubsOptions.newBuilder();
builder.setTarget(this.applicationProperties.getTemporal().getTarget());
builder.setSslContext(this.createSslContext());
builder.addGrpcMetadataProvider(
new AuthorizationGrpcMetadataProvider(new KuFlowAuthorizationTokenSupplier(this.authenticationApi))
);
WorkflowServiceStubsOptions options = builder.validateAndBuildWithDefaults();
return WorkflowServiceStubs.newServiceStubs(options);
}
En este punto es útil recordar que cuando se configura un proceso respaldado por Temporal en la APP de KuFlow, es posible descargar una plantilla de Worker lista para usar que utiliza estos mecanismos de configuración de la conexión, por lo que se dispone de un ejemplo funcional a poco click de alcance. De igual forma, en nuestro repositorio GitHub y en nuestra documentación puede encontrar mucha más información y ejemplos de integración.
Notas sobre la nube Temporal desplegada en KuFlow
Tal y como se comentó anteriormente en el artículo, el propósito de ofrecer un despliegue de Temporal es el de facilitar la integración a nuestros clientes. En este punto es importante reseñar algunas cuestiones: Nuestro despliegue Temporal es una versión ligeramente modificada de la disponible en los repositorios oficiales de Temporal. Básicamente se han añadido modificaciones de seguridad y autorización. A consecuencia de esto, algunos métodos del API no están disponibles. Por ejemplo, no podrá utilizar nuestro despliegue de Temporal para ejecutar Workflows que no sean lanzados desde los procesos de KuFlow, lo cuál es necesario para evitar abusos de uso. O de igual forma, tampoco es posible cancelar Workflows si no es desde el App de KuFlow. Otras APIs pueden no estar disponibles a diferencia de una instalación normal de Temporal. Sin embargo, todas las APIs que garantizan un funcionamiento correcto de sus workers están habilitadas. Si necesita alguna API de las actualmente deshabilitadas, puede solicitarla poniéndose en contacto con nosotros.
Algunas otras modificaciones realizadas le permiten por ejemplo saber desde la App si existe algún worker actualmente online para su proceso.
De igual forma, intentamos mantener nuestros despliegues Temporal lo más actualizados posible, si tiene dudas al respecto póngase en contacto con nosotros.
Conclusión
En este artículo hemos explorado de forma superficial la utilización de Temporal como motor de workflows para nuestros procesos de KuFlow. Esta solución nos aporta una serie de ventajas respecto a los otros motores de workflow que soportamos, como por ejemplo el acceso a servicios privados, permitiéndole una mayor flexibilidad a la hora de implementar Worker. De la misma manera la capacidad de resiliencia y escalabilidad que nos ofrece Temporal hace que sea la opción más recomendable cuando los procesos son de gran complejidad y necesitan orquestar distintas APIs externas.La siguiente tabla muestra un resumen de las principales características de cada uno de los Workflows Engine soportados: