SubscriptionFilter.java

package de.dlr.shepard.filters;

import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.regex.Pattern;

import de.dlr.shepard.neo4Core.entities.Subscription;
import de.dlr.shepard.neo4Core.io.EventIO;
import de.dlr.shepard.neo4Core.io.HasIdIO;
import de.dlr.shepard.neo4Core.io.SubscriptionIO;
import de.dlr.shepard.neo4Core.services.SubscriptionService;
import de.dlr.shepard.security.PermissionsUtil;
import de.dlr.shepard.util.AccessType;
import de.dlr.shepard.util.HasId;
import de.dlr.shepard.util.RequestMethod;
import jakarta.ws.rs.ProcessingException;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.container.ContainerRequestContext;
import jakarta.ws.rs.container.ContainerResponseContext;
import jakarta.ws.rs.container.ContainerResponseFilter;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.ext.Provider;
import lombok.extern.slf4j.Slf4j;

@Subscribable
@Provider
@Slf4j
public class SubscriptionFilter implements ContainerResponseFilter {

	private Executor executor;

	/**
	 * Default constructor
	 */
	public SubscriptionFilter() {
		this.executor = Executors.newCachedThreadPool();
	}

	/**
	 * Constructor to inject your own executor service
	 *
	 * @param executor Your own Executor Service
	 */
	public SubscriptionFilter(Executor executor) {
		this.executor = executor;
	}

	@Override
	public void filter(ContainerRequestContext requestContext, ContainerResponseContext responseContext) {
		SubscriptionService subscriptionService = getService();

		// request not successful
		var status = responseContext.getStatus();
		if (!(status >= 200 && status < 300)) {
			log.debug("Skip subscriptions since the http statuscode is not between 200 and 299");
			return;
		}

		// request successful
		EventIO event = new EventIO(requestContext.getUriInfo().getAbsolutePath().toString(),
				RequestMethod.valueOf(requestContext.getMethod()));

		Object entity = responseContext.getEntity();
		if (entity instanceof HasId hasId) {
			event.setSubscribedObject(new HasIdIO(hasId));
		}

		var permissionsUtil = getPermissionsUtil();
		List<Subscription> subs = subscriptionService.getMatchingSubscriptions(event.getRequestMethod());
		for (Subscription sub : subs) {
			// TODO: This could develop into a bottleneck
			Pattern pattern = Pattern.compile(sub.getSubscribedURL());
			if (pattern.matcher(event.getUrl()).matches() && permissionsUtil.isAllowed(
					requestContext.getUriInfo().getPathSegments(), AccessType.Read, sub.getCreatedBy().getUsername())) {
				EventIO e = new EventIO(event);
				e.setSubscription(new SubscriptionIO(sub));
				log.debug("{} was triggered with {}", sub, e);
				executor.execute(() -> sendCallback(sub, e));
			}
		}
	}

	private void sendCallback(Subscription sub, EventIO event) {
		var client = ClientBuilder.newClient();
		var webTarget = client.target(sub.getCallbackURL());

		try {
			var entity = Entity.entity(event, MediaType.APPLICATION_JSON);
			var response = webTarget.request().buildPost(entity).invoke();
			log.info("Notification has been send to {} with response code: {}", sub.getCallbackURL(),
					response.getStatus());
		} catch (ProcessingException e) {
			log.error("Could not execute notification request");
		} finally {
			client.close();
		}
	}

	protected SubscriptionService getService() {
		return new SubscriptionService();
	}

	protected PermissionsUtil getPermissionsUtil() {
		return new PermissionsUtil();
	}

}