commit 1ff10f0cc510a36885edeb249dec677b902e0a47 Author: unknown Date: Tue Nov 28 01:36:48 2017 +0100 Initial commit diff --git a/.idea/.name b/.idea/.name new file mode 100644 index 0000000..a3d8be7 --- /dev/null +++ b/.idea/.name @@ -0,0 +1 @@ +Broadcast \ No newline at end of file diff --git a/.idea/artifacts/Broadcast_jar.xml b/.idea/artifacts/Broadcast_jar.xml new file mode 100644 index 0000000..2e8f9c9 --- /dev/null +++ b/.idea/artifacts/Broadcast_jar.xml @@ -0,0 +1,9 @@ + + + $PROJECT_DIR$/out/artifacts/Broadcast_jar + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..d1af2ea --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,15 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..8e8843c --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..47cfe2f --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/Broadcast.iml b/Broadcast.iml new file mode 100644 index 0000000..c90834f --- /dev/null +++ b/Broadcast.iml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/src/net/tofvesson/broadcast/client/Accumulator.java b/src/net/tofvesson/broadcast/client/Accumulator.java new file mode 100644 index 0000000..6f95e1c --- /dev/null +++ b/src/net/tofvesson/broadcast/client/Accumulator.java @@ -0,0 +1,202 @@ +package net.tofvesson.broadcast.client; + +import net.tofvesson.broadcast.support.ImmutableArray; +import net.tofvesson.broadcast.support.ImmutableReferenceMap; +import java.io.IOException; +import java.net.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Accumulates a list of hosts that are broadcasting to a given port. + */ +public class Accumulator { + + /** + * Timeout (in milliseconds) until a host is considered to no longer be active. + */ + public static final long DEFAULT_HOST_TIMEOUT = 60_000; + + /** + * Minimum timeout until a host is considered inactive. + */ + public static final long MINIMUM_HOST_TIMEOUT = 50; + + private final Runnable accumulate; + private Thread accumulatorThread; + private final int port; + + protected final DatagramSocket socket; + protected final DatagramPacket packet; + + protected volatile long continueUntil = -1; + + protected final Map hosts = new HashMap<>(); + protected final ImmutableReferenceMap accessHosts = new ImmutableReferenceMap<>(hosts); + protected final long hostTimeout; + protected final ImmutableArray signature; + + public OnHostTimeoutListener timeoutListener; + public OnNewHostListener newHostListener; + + /** + * Create an accumulator + * @param port Port to listen on. + * @param hostTimeout Timeout (milliseconds) until a host is lost (unless another broadcast is detected) + * @param sig Signature to check if a broadcast is originating from a compatible source. + * @throws SocketException Thrown if port is already bound + * @throws SecurityException Thrown if program isn't allowed to accept broadcasts + */ + public Accumulator(int port, long hostTimeout, byte[] sig) throws SocketException, SecurityException { + if(System.getSecurityManager()!=null) System.getSecurityManager().checkAccept("255.255.255.255", port); // Do a permission check + + this.socket = new DatagramSocket(this.port = port); + socket.setSoTimeout(125); + this.packet = new DatagramPacket(new byte[sig.length + 1], sig.length + 1); // One extra byte implicitly serves as metadata during packet comparison + this.hostTimeout = hostTimeout; + this.signature = ImmutableArray.from(sig); + + accumulate = () -> { + while(continueUntil==-1 || System.currentTimeMillis() { + e.printStackTrace(); + // NOP + }); + accumulatorThread.start(); + } + + /** + * Pause the accumulator. Resume by calling {@link #start()} + */ + public void pause(){ + continueUntil = 0; + if(accumulatorThread!=null && accumulatorThread.isAlive()) + try { + accumulatorThread.join(); + } + catch (InterruptedException e) { e.printStackTrace(); } + } + + /** + * Stop accumulating hosts and disable host timeout checks. Cannot be resumed from + */ + public void stop(){ + pause(); + TimeoutManager.theManager.removeAccumulator(this); + socket.close(); + } + + /** + * Get an immutable map of the currently available hosts. + * @return Immutable host map. + */ + public ImmutableReferenceMap getHosts() { return accessHosts; } + + /** + * Callback for when a new host is found + */ + public interface OnNewHostListener{ + /** + * Called when a new host is found. + * @param host The IP address of the host + * @param port Listener port + */ + void onNewHost(InetAddress host, int port); + } + + /** + * Callback for timed out remote hosts + */ + public interface OnHostTimeoutListener{ + /** + * Called when a host is timed out. + * @param host The IP address of the host + * @param port Listener port + */ + void onTimeout(InetAddress host, int port); + } + + + + private static class TimeoutManager{ + static TimeoutManager theManager = new TimeoutManager(); + + private final List checks = new ArrayList<>(); + + public TimeoutManager() { + Thread t = new Thread(() -> { + //noinspection InfiniteLoopStatement + while(true){ + try{ Thread.sleep(Accumulator.MINIMUM_HOST_TIMEOUT); }catch(Exception e){} + Accumulator[] a1; + synchronized (checks){ a1 = checks.toArray(new Accumulator[checks.size()]); } + for(Accumulator a : a1) + synchronized (a.hosts){ + for(InetAddress addr : a.hosts.keySet()) + if(a.hosts.get(addr) <=System.currentTimeMillis()-a.hostTimeout) { + a.hosts.remove(addr); + if(a.timeoutListener!=null) a.timeoutListener.onTimeout(addr, a.port); + } + } + } + }); + t.setDaemon(true); + t.setPriority(Thread.MIN_PRIORITY); + t.setName("Accumulator-TimeoutManager"); + t.setUncaughtExceptionHandler((r, e)->{}); + t.start(); + } + + void addAccumulator(Accumulator a){ checks.removeIf(it -> it==a); synchronized (checks){ checks.add(a); } } + void removeAccumulator(Accumulator a){ checks.removeIf(it -> it==a); synchronized (checks){ checks.remove(a); } } + + } +} diff --git a/src/net/tofvesson/broadcast/server/Server.java b/src/net/tofvesson/broadcast/server/Server.java new file mode 100644 index 0000000..22c4f76 --- /dev/null +++ b/src/net/tofvesson/broadcast/server/Server.java @@ -0,0 +1,98 @@ +package net.tofvesson.broadcast.server; + +import java.net.*; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Broadcasts a signature on the current subnet + */ +public class Server { + + private final Runnable serve; + private Thread serverThread; + private final int port; + + protected final DatagramSocket serverSocket; + protected final DatagramPacket packet; + + protected final AtomicBoolean isAlive = new AtomicBoolean(false); + + /** + * Create broadcaster + * @param port Port to broadcast to + * @param delay Millisecond delay between broadcasts + * @param sig Signature to broadcast + * @param offset Offset in the signature to broadcast + * @param length Length of signature to broadcast + * @throws SocketException Thrown if broadcast socket could not be created + */ + public Server(int port, long delay, byte[] sig, int offset, int length) throws SocketException { + try { + this.serverSocket = new DatagramSocket(); + this.packet = new DatagramPacket(sig, offset, length, InetAddress.getByAddress(new byte[]{-1, -1, -1, -1}), this.port = port); + serve = () -> { + while(getIsAlive()){ + try { + serverSocket.send(packet); + } catch (Exception e) { + e.printStackTrace(); + break; + } + try { Thread.sleep(delay); } catch (InterruptedException e) { } + } + }; + } catch (UnknownHostException e) { + throw new RuntimeException(e); // This is an internal Java error and should not be a declared exception + } + } + + /** + * Create broadcaster + * @param port Port to broadcast to + * @param delay Millisecond delay between broadcasts + * @param sig Signature to broadcast + * @throws SocketException Thrown if broadcast socket could not be created + */ + public Server(int port, long delay, byte[] sig) throws SocketException { this(port, delay, sig, 0, sig.length); } + + protected boolean getIsAlive(){ + boolean b; + synchronized (isAlive){ b = isAlive.get(); } + return b; + } + + protected void setIsAlive(boolean b){ synchronized (isAlive){ isAlive.set(b); } } + + /** + * Start broadcasting signature + */ + public void start(){ + if(serverSocket.isClosed()) throw new IllegalStateException("Socket is closed"); + if(serverThread!=null && serverThread.isAlive()) throw new IllegalStateException("Server is still alive"); + setIsAlive(true); + serverThread = new Thread(serve); + serverThread.setDaemon(true); + serverThread.setPriority(Thread.MAX_PRIORITY); + serverThread.setName("Server-"+port); + serverThread.start(); + } + + /** + * Pause broadcasting of signature. Can be resumed by calling {@link #start()} + */ + public void pause(){ + if(serverSocket.isClosed() && (serverThread==null || !serverThread.isAlive())) throw new IllegalStateException("Socket is closed"); + setIsAlive(false); + serverThread.interrupt(); + try { serverThread.join(); } + catch (InterruptedException e) { e.printStackTrace(); } + } + + /** + * Stop broadcasting and close port. Cannot be resumed from + */ + public void stop(){ + pause(); + serverSocket.close(); + } +} diff --git a/src/net/tofvesson/broadcast/support/ImmutableArray.java b/src/net/tofvesson/broadcast/support/ImmutableArray.java new file mode 100644 index 0000000..2620572 --- /dev/null +++ b/src/net/tofvesson/broadcast/support/ImmutableArray.java @@ -0,0 +1,85 @@ +package net.tofvesson.broadcast.support; + +import java.util.Arrays; + +/** + * Wrapper for arrays to prevent modification + * @param Element type + */ +public class ImmutableArray { + + protected final T[] array; + protected ImmutableArray(T[] array){ this.array = array; } + + public int length(){ return array.length; } + public T at(int index){ return array[index]; } + + @Override + public boolean equals(Object obj) { + Object[] compareTo; + if(obj instanceof ImmutableArray) compareTo = ((ImmutableArray) obj).array; + else if(obj!=null && obj.getClass().isArray()){ + try{ + compareTo = (Object[]) obj; + }catch(ClassCastException e){ + return false; // Object was a primitive array + } + } + else return false; + return Arrays.equals(array, compareTo); + } + + public boolean compare(ImmutableArray to, int targetOffset, int offset, int length){ + if(offset+length>array.length || targetOffset+length>to.array.length) return false; + for(int i = 0; i ImmutableArray from(T[] t){ return new ImmutableArray<>(t); } + public static ImmutableArray from(boolean[] t){ + Boolean[] t1 = new Boolean[t.length]; + for(int i = 0; i(t1); + } + public static ImmutableArray from(char[] t){ + Character[] t1 = new Character[t.length]; + for(int i = 0; i(t1); + } + public static ImmutableArray from(byte[] t){ + Byte[] t1 = new Byte[t.length]; + for(int i = 0; i(t1); + } + public static ImmutableArray from(short[] t){ + Short[] t1 = new Short[t.length]; + for(int i = 0; i(t1); + } + public static ImmutableArray from(int[] t){ + Integer[] t1 = new Integer[t.length]; + for(int i = 0; i(t1); + } + public static ImmutableArray from(long[] t){ + Long[] t1 = new Long[t.length]; + for(int i = 0; i(t1); + } + public static ImmutableArray from(float[] t){ + Float[] t1 = new Float[t.length]; + for(int i = 0; i(t1); + } + public static ImmutableArray from(double[] t){ + Double[] t1 = new Double[t.length]; + for(int i = 0; i(t1); + } +} diff --git a/src/net/tofvesson/broadcast/support/ImmutableReferenceMap.java b/src/net/tofvesson/broadcast/support/ImmutableReferenceMap.java new file mode 100644 index 0000000..e6d575b --- /dev/null +++ b/src/net/tofvesson/broadcast/support/ImmutableReferenceMap.java @@ -0,0 +1,137 @@ +package net.tofvesson.broadcast.support; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Immutable map that references mutable map. Used to dissuade modification of a map + * @param Key type + * @param Value type + */ +public class ImmutableReferenceMap implements Map { + + protected final Map reference; + + public ImmutableReferenceMap(Map reference){ + this.reference = reference; + } + + @Override + public int size() { + return reference.size(); + } + + @Override + public boolean isEmpty() { + return reference.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return reference.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + return reference.containsValue(value); + } + + @Override + public V get(Object key) { + return reference.get(key); + } + + @Override + public V put(K key, V value) { + throw new IllegalStateException("Unsupported action"); + } + + @Override + public V remove(Object key) { + throw new IllegalStateException("Unsupported action"); + } + + @Override + public void putAll(Map m) { + throw new IllegalStateException("Unsupported action"); + } + + @Override + public void clear() { + throw new IllegalStateException("Unsupported action"); + } + + @Override + public Set keySet() { + return reference.keySet(); + } + + @Override + public Collection values() { + return reference.values(); + } + + @Override + public Set> entrySet() { + return reference.entrySet(); + } + + @Override + public V getOrDefault(Object key, V defaultValue) { + return reference.getOrDefault(key, defaultValue); + } + + @Override + public void forEach(BiConsumer action) { + reference.forEach(action); + } + + @Override + public void replaceAll(BiFunction function) { + throw new IllegalStateException("Unsupported action"); + } + + @Override + public V putIfAbsent(K key, V value) { + throw new IllegalStateException("Unsupported action"); + } + + @Override + public boolean remove(Object key, Object value) { + throw new IllegalStateException("Unsupported action"); + } + + @Override + public boolean replace(K key, V oldValue, V newValue) { + throw new IllegalStateException("Unsupported action"); + } + + @Override + public V replace(K key, V value) { + throw new IllegalStateException("Unsupported action"); + } + + @Override + public V computeIfAbsent(K key, Function mappingFunction) { + throw new IllegalStateException("Unsupported action"); + } + + @Override + public V computeIfPresent(K key, BiFunction remappingFunction) { + throw new IllegalStateException("Unsupported action"); + } + + @Override + public V compute(K key, BiFunction remappingFunction) { + throw new IllegalStateException("Unsupported action"); + } + + @Override + public V merge(K key, V value, BiFunction remappingFunction) { + throw new IllegalStateException("Unsupported action"); + } +}