SubscriptionFilter.java
package de.dlr.shepard.filters;
import de.dlr.shepard.neo4Core.entities.Subscription;
import de.dlr.shepard.neo4Core.io.EventIO;
import de.dlr.shepard.neo4Core.io.HasIdIO;
import de.dlr.shepard.neo4Core.io.SubscriptionIO;
import de.dlr.shepard.neo4Core.services.SubscriptionService;
import de.dlr.shepard.security.PermissionsUtil;
import de.dlr.shepard.util.AccessType;
import de.dlr.shepard.util.HasId;
import de.dlr.shepard.util.RequestMethod;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.RequestScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.ProcessingException;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.container.ContainerRequestContext;
import jakarta.ws.rs.container.ContainerResponseContext;
import jakarta.ws.rs.container.ContainerResponseFilter;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.ext.Provider;
import java.util.List;
import java.util.regex.Pattern;
@Subscribable
@Provider
@RequestScoped
public class SubscriptionFilter implements ContainerResponseFilter {
private ExecutorFactory executorFactory;
private PermissionsUtil permissionsUtil;
private SubscriptionService subscriptionService;
SubscriptionFilter() {}
@Inject
public SubscriptionFilter(
PermissionsUtil permissionsUtil,
SubscriptionService subscriptionService,
ExecutorFactory executorFactory
) {
this.permissionsUtil = permissionsUtil;
this.subscriptionService = subscriptionService;
this.executorFactory = executorFactory;
}
@Override
public void filter(ContainerRequestContext requestContext, ContainerResponseContext responseContext) {
// request not successful
var status = responseContext.getStatus();
if (!(status >= 200 && status < 300)) {
Log.debug("Skip subscriptions since the http status code is not between 200 and 299");
return;
}
// request successful
EventIO event = new EventIO(
requestContext.getUriInfo().getAbsolutePath().toString(),
RequestMethod.valueOf(requestContext.getMethod())
);
Object entity = responseContext.getEntity();
if (entity instanceof HasId hasId) {
event.setSubscribedObject(new HasIdIO(hasId));
}
List<Subscription> subs = subscriptionService.getMatchingSubscriptions(event.getRequestMethod());
for (Subscription sub : subs) {
// TODO: This could develop into a bottleneck
Pattern pattern = Pattern.compile(sub.getSubscribedURL());
if (
pattern.matcher(event.getUrl()).matches() &&
permissionsUtil.isAllowed(requestContext, AccessType.Read, sub.getCreatedBy().getUsername())
) {
EventIO e = new EventIO(event);
e.setSubscription(new SubscriptionIO(sub));
Log.debugf("%s was triggered with %s", sub, e);
executorFactory.getInstance().execute(() -> sendCallback(sub, e));
}
}
}
private void sendCallback(Subscription sub, EventIO event) {
var client = ClientBuilder.newClient();
var webTarget = client.target(sub.getCallbackURL());
try {
var entity = Entity.entity(event, MediaType.APPLICATION_JSON);
var response = webTarget.request().buildPost(entity).invoke();
Log.infof("Notification has been send to %s with response code: %s", sub.getCallbackURL(), response.getStatus());
} catch (ProcessingException e) {
Log.error("Could not execute notification request");
} finally {
client.close();
}
}
}