How KuFlow supports Temporal as a workflows engine for our processes?
Published 06-10-2022 by KuFlow Team
Credits: KuFlow
In such a diverse world, it would be boring to have a single way of doing things. That's why at KuFlow we support different ways to implement the logic of our processes and tasks. And in this post, we will talk about one of them, the orchestration through Temporal, which gives us a powerful way to manage our workflows.
We will not go into detail on the concepts that are handled in our platform, for that you can consult the documentation or other blog entries, but as a general idea, we can stay with the following. KuFlow is a platform that allows you to model business processes that involve a series of tasks to carry out their achievement. These tasks can be performed by humans or by systems and are basically activities to perform actions and/or collect information. We also need to establish a way to control the performance of these tasks, their order, their asynchronous or non-asynchronous nature, or their ability to recover from errors. In other words, we need a workflow to orchestrate our process. At KuFlow we call WorkFlow Engine for the different types of implementations we support. In this case, Temporal is the Workflow Engine.
Why use Temporal for your KuFlow processes?
The use of Temporal gives us several benefits, the main one being the ability to implement the resilience of our operations in a comfortable and reliable way. For example, let's imagine that a task of one of our processes makes a call to a third-party API. If this API is not available at the time of its call, the task fails and our process could be blocked. In this case, we would like the workflow of our process to be retried later and have to forget about all the scaffolding required to do so. Also, if we have already issued modification requests to other external APIs, we would like that when recovering the process, the consumption of these APIs would be deterministic and would not fail because we have consumed them twice. We achieve all this by implementing our workflows with Temporal.
How does Temporal integrate with KuFlow?
In order to simplify as much as possible the infrastructure of our customers, in KuFlow we have our own Temporal cloud avoiding the need to deploy this infrastructure by customers. Having a Temporal cloud is not our core business and in the future, we may also offer the possibility to use other clouds such as Temporal Cloud or any other.
To implement a Workflow in a process using Temporal, we need to develop what in KuFlow we call Worker. A worker is nothing more than an application that runs on our customers' servers and interacts with the KuFlow API and the Temporal API available in our cloud. This approach allows for a hybrid cloud architecture that provides flexibility when accessing the different services that you want to consume in the Worker. Thus, for example, a Worker deployed on the customer's servers would have access to the customer's private services in a secure and reliable way in order to extract data and make decisions based on their needs.
To facilitate the development of this Worker, a library that facilitates the consumption of the KuFlow API through Temporal Activities is provided. Together with the different SDKs provided by Temporal, the construction of a Worker is, in most cases, child's play. The various languages for which these libraries are provided are updated frequently.
Security
One of the most important aspects for us is everything related to security. And for this reason, apart from a powerful RBAC system that is deployed on the customer's resources, we use a series of authentication mechanisms to access the different APIs that we offer. To facilitate the management of access and credentials to these APIs, KuFlow defines the concept of 'Application'. By definition, an 'Application' is composed of credentials and access certificates. In turn, these applications are used as candidates or "principals" to grant roles and permissions to the resources you want.
How is authentication and authorization achieved with the Temporal API?
Schematically, to authenticate we will use Mutual TLS and to get authorization we will use a JWT bearer token. Obtaining the certificates and credentials is done from the KuFlow App in a simple way, without the need to create and upload the certificates manually. Armed with this data we can now connect our workers to the KuFlow cloud. The authentication mechanism follows the following flow: On the one hand, it is necessary to obtain a JWT bearer token that will allow us authorization in Temporal, for this a request is issued to the KuFlow API Rest (authenticated by BasicAuth) that returns the token. With this token, the communications to Temporal's API GRPC are made, besides using the certificates to encrypt the communication and to achieve the authentication with this API.
For more information, please refer to the documentation.
To deal with problems such as carrier token renewal and to make the developer's experience as comfortable as possible, this mechanism is implemented in the libraries we provide as support for Workers. An example of token renegotiation can be seen in this code extracted from the kuflow-engine-client library in Java.
public class KuFlowAuthorizationTokenSupplier implements AuthorizationTokenSupplier {
private static final Logger LOGGER = LoggerFactory.getLogger(KuFlowAuthorizationTokenSupplier.class);
private static final double EXPIRE_PERCENTAGE = 0.1;
private static final Duration EXPIRE_MAX_DURATION = Duration.ofMinutes(10);
private final AuthenticationApi authenticationApi;
private volatile String token;
private volatile Instant tokenExpireAt;
public KuFlowAuthorizationTokenSupplier(AuthenticationApi authenticationApi) {
this.authenticationApi = authenticationApi;
}
@Override
public String supply() {
String token = this.requestToken();
return "Bearer " + token;
}
private String requestToken() {
String token = this.token;
Instant tokenExpireAt = this.tokenExpireAt;
if (isTokenNonExpired(token, tokenExpireAt)) {
return token;
}
synchronized (this) {
token = this.token;
tokenExpireAt = this.tokenExpireAt;
if (isTokenNonExpired(token, tokenExpireAt)) {
return token;
}
AuthenticationResource authentication = new AuthenticationResource();
authentication.setType(AuthenticationTypeResource.ENGINE);
authentication = this.authenticationApi.createAuthentication(authentication);
Duration expireDuration = Duration.between(Instant.now(), authentication.getExpiredAt());
expireDuration = Duration.ofSeconds((long) (expireDuration.getSeconds() * EXPIRE_PERCENTAGE));
if (expireDuration.compareTo(EXPIRE_MAX_DURATION) > 0) {
expireDuration = EXPIRE_MAX_DURATION;
}
this.token = token = authentication.getToken();
this.tokenExpireAt = tokenExpireAt = Instant.now().plus(expireDuration);
LOGGER.debug("Regenerated JWT Temporal authorization token. Expired at: {}", tokenExpireAt);
return token;
}
}
private static boolean isTokenNonExpired(String token, Instant tokenExpireAt) {
return token != null && tokenExpireAt != null && Instant.now().isBefore(tokenExpireAt);
}
}
What we have done here is to implement the AuthorizationTokenSupplier interface provided by the Temporal SDK to use our KuFlow API client library to negotiate a token when needed. In the same way, we implemented the GrpcMetadataProvider interface with the purpose of adding as metadata the token provided by the previous implementation:
public class AuthorizationGrpcMetadataProvider implements GrpcMetadataProvider {
public static final Metadata.Key<String> AUTHORIZATION_HEADER_KEY =
Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
private final AuthorizationTokenSupplier authorizationTokenSupplier;
public AuthorizationGrpcMetadataProvider(AuthorizationTokenSupplier authorizationTokenSupplier) {
this.authorizationTokenSupplier = authorizationTokenSupplier;
}
@Override
public Metadata getMetadata() {
Metadata metadata = new Metadata();
metadata.put(AUTHORIZATION_HEADER_KEY, authorizationTokenSupplier.supply());
return metadata;
}
}
This metadata provider class for the GRPC connection to Temporal, will be the one we will use later when we configure our WorkflowServiceStub in the Worker.
WorkflowServiceStubsOptions.newBuilder().addGrpcMetadataProvider(
new AuthorizationGrpcMetadataProvider(new KuFlowAuthorizationTokenSupplier(this.authenticationApi))
);
As the last step to successfully connect to Temporal we need to create an SSL context that will be used in the negotiation with MutualTLS. For this, we need to extend the SslContext class of the SDK. The implementation depends on your needs but an example of it can be found in the following code snippet available in our example repositories on GitHub.
private SslContext createSslContext() {
MutualTlsProperties mutualTls = this.applicationProperties.getTemporal().getMutualTls();
if (StringUtils.isBlank(mutualTls.getCert()) && StringUtils.isBlank(mutualTls.getCertData())) {
return null;
}
if (
StringUtils.isNotBlank(mutualTls.getCert()) &&
(StringUtils.isBlank(mutualTls.getKey()) || StringUtils.isBlank(mutualTls.getCa()))
) {
throw new KuFlowEngineClientException("key and ca are required");
}
if (
StringUtils.isNotBlank(mutualTls.getCertData()) &&
(StringUtils.isBlank(mutualTls.getKeyData()) || StringUtils.isBlank(mutualTls.getCaData()))
) {
throw new KuFlowEngineClientException("keyData or caData are required");
}
try (
InputStream certInputStream = this.openInputStream(mutualTls.getCert(), mutualTls.getCertData());
InputStream keyInputStream = this.openInputStream(mutualTls.getKey(), mutualTls.getKeyData());
InputStream caInputStream = this.openInputStream(mutualTls.getCa(), mutualTls.getCaData())
) {
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
trustStore.load(null, null);
X509Certificate certificate = (X509Certificate) CertificateFactory.getInstance("X509").generateCertificate(caInputStream);
trustStore.setCertificateEntry("temporal-ca", certificate);
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(trustStore);
TrustManager trustManager = trustManagerFactory.getTrustManagers()[0];
return SimpleSslContextBuilder.forPKCS8(certInputStream, keyInputStream).setTrustManager(trustManager).build();
} catch (KeyStoreException | NoSuchAlgorithmException | CertificateException | IOException e) {
throw new KuFlowEngineClientException("Unable to configure mTLS", e);
}
}
Finally, like the GRPC metadata provider, we add this SSL context to our WorkflowServiceStubs options.
public WorkflowServiceStubs workflowServiceStubs() {
Builder builder = WorkflowServiceStubsOptions.newBuilder();
builder.setTarget(this.applicationProperties.getTemporal().getTarget());
builder.setSslContext(this.createSslContext());
builder.addGrpcMetadataProvider(
new AuthorizationGrpcMetadataProvider(new KuFlowAuthorizationTokenSupplier(this.authenticationApi))
);
WorkflowServiceStubsOptions options = builder.validateAndBuildWithDefaults();
return WorkflowServiceStubs.newServiceStubs(options);
}
At this point it is useful to remember that when configuring a Temporal-backed process in the KuFlow APP, it is possible to download a ready-to-use Worker template that uses these connection configuration mechanisms, so you have a working example just a few clicks away. Similarly, much more information and integration examples can be found in our GitHub repository and in our documentation.
Notes on the Temporal cloud deployed in KuFlow
As discussed earlier in the article, the purpose of offering a Temporal deployment is to facilitate integration for our customers. At this point, it is important to highlight some issues: Our Temporal deployment is a slightly modified version of the one available in the official Temporal repositories. Basically, security and authorization modifications have been added. As a result, some API methods are not available. For example, you will not be able to use our Temporal deployment to execute Workflows that are not launched from KuFlow processes, which is necessary to avoid abuse. Or in the same way, it is also not possible to cancel Workflows if not from the KuFlow App. Other APIs may not be available, unlike a normal Temporal installation. However, all the APIs that guarantee the correct functioning of your workers are enabled. If you need any of the currently disabled APIs, you can request them by contacting us.
Some other modifications made allow you, for example, to know from the App if there is any worker currently online for your process.
In the same way, we try to keep our Temporal deployments as up-to-date as possible, if you have any questions about this, please contact us.
Conclusion
In this article, we have superficially explored the use of Temporal as a workflow engine for our KuFlow processes. This solution provides us with a number of advantages over the other workflow engines we support, such as access to private services, allowing greater flexibility when implementing Worker. In the same way, the resilience and scalability offered by Temporal makes it the most recommendable option when the processes are highly complex and need to orchestrate different external APIs.The following table shows a summary of the main features of each of the supported Workflows Engine: