diff --git a/src/main/java/DistributedLearning.java b/src/main/java/DistributedLearning.java index d054b13..c48264c 100644 --- a/src/main/java/DistributedLearning.java +++ b/src/main/java/DistributedLearning.java @@ -92,7 +92,7 @@ public static void main(String[] args) throws Exception { //KAFKA INPUT SOURCE DataStream stream = env - .addSource(new FlinkKafkaConsumer<>("sine_original_100k", new SimpleStringSchema(), properties).setStartFromEarliest()) + .addSource(new FlinkKafkaConsumer<>("sine_3M_2_drifts_500000_2000000", new SimpleStringSchema(), properties).setStartFromEarliest()) .name("Kafka Input Source").setParallelism(parallelism).setMaxParallelism(parallelism); @@ -151,7 +151,7 @@ public void flatMap(String input_stream, Collector("localhost:9092", "all_together2", + partial_result.addSink(new FlinkKafkaProducer<>("localhost:9092", "sine_3M_2_drifts_500000_2000000_without_prop", (SerializationSchema>) element -> (element.getField(5).toString() + "," + element.getField(4).toString() + "," + element.getField(0).toString()).getBytes())) .name("Visualizing Performance Metrics").setParallelism(parallelism); @@ -192,7 +192,6 @@ public boolean filter(Tuple6, Tuple6> { private transient ValueState hoeffdingTreeValueState; - private transient ValueState backup_hoeffdingTreeValueState; private transient ValueState background_hoeffdingTreeValueState; private transient ValueState ConceptDriftDetectorValueState; private transient ValueState empty_state; @@ -247,10 +246,24 @@ public void flatMap(Tuple3 input_stream, Collector 1000000 && instance_id < 1000010) { + + if (instance_id > 10000 && instance_id < 10010) { + System.out.println("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht.SizeHT(ht.root) + " accuracy " + ht.getAccuracy()); + } + if (instance_id > 500000 && instance_id < 500010) { + System.out.println("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht.SizeHT(ht.root) + " accuracy " + ht.getAccuracy()); + } + + if (instance_id > 600000 && instance_id < 600010) { System.out.println("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht.SizeHT(ht.root) + " accuracy " + ht.getAccuracy()); } - if (instance_id > 3000000 && instance_id < 3000010) { + if (instance_id > 1900000 && instance_id < 1900010) { + System.out.println("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht.SizeHT(ht.root) + " accuracy " + ht.getAccuracy()); + } + if (instance_id > 2100000 && instance_id < 2100010) { + System.out.println("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht.SizeHT(ht.root) + " accuracy " + ht.getAccuracy()); + } + if (instance_id > 2990000 && instance_id < 2990010) { System.out.println("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht.SizeHT(ht.root) + " accuracy " + ht.getAccuracy()); } @@ -262,13 +275,8 @@ public void flatMap(Tuple3 input_stream, Collector(instance_id, prediction1, -1, purpose_id, backup_ht.getErrorRate(), 2)); + // Concept Drift Handler if (drift_detection_method_id != 0) { @@ -286,65 +294,42 @@ public void flatMap(Tuple3 input_stream, Collector(instance_id, prediction, -1, purpose_id, background_hoeffdingTree.getErrorRate(), 0)); } else if (empty_background_state.value()) { // System.out.println("===================================Warning Phase==================================="); - System.out.println("Background Tree " + instance_id + " Just Created "); +// System.out.println("Background Tree " + instance_id + " Just Created "); empty_background_state.update(false); // Warning Signal. Create & Train the Background Tree HoeffdingTree background_hoeffdingTree = new HoeffdingTree(); - background_hoeffdingTree.NEW_CreateHoeffdingTree(2, 2, 200, 0.000001, 0.05, this.combination_function, hoeffding_tree_id, age_of_maturity_input); + background_hoeffdingTree.NEW_CreateHoeffdingTree(4, 4, 200, 0.0001, 0.05, this.combination_function, hoeffding_tree_id, 1); // background_hoeffdingTree.print_m_features(); background_hoeffdingTreeValueState.update(background_hoeffdingTree); - collector.collect(new Tuple6<>(instance_id, prediction, -1, purpose_id, background_hoeffdingTree.getErrorRate(), 0)); - } - } else if ((current_stream_status == 1 || current_stream_status == 2) && updated_stream_status == 2) { + } else if (current_stream_status == 1 && updated_stream_status == 0) { // System.out.println("DS Signal: instance id " + instance_id); if (current_signal == 2) { // System.out.println("=============================Stable Phase/ Drift==================================="); // System.out.println("Stable phase after a Drift Signal"); -// System.out.println("Do the Switch: " + instance_id + " Background Tree taking over "+ht.getAccuracy()); + System.out.println("Do the Switch: " + instance_id + "Background Tree taking over"); // Drift Signal. Do the Switch HoeffdingTree background_tree = background_hoeffdingTreeValueState.value(); - background_tree.TestHoeffdingTree(background_tree.root, features, 0); - background_tree.UpdateHoeffdingTree(background_tree.root, features, instance_weight); - background_hoeffdingTreeValueState.update(background_tree); - collector.collect(new Tuple6<>(instance_id, prediction, -1, purpose_id, background_tree.getErrorRate(), 0)); - - if (current_stream_status == 1) { - System.out.println("Pending for the Switch: " + hoeffding_tree_id + " => " + instance_id + " Background Tree taking over " + ht.getAccuracy() + " => " + background_tree.getAccuracy()); - } -// System.out.println("Size "+ ht.SizeHT(ht.root)); -// System.out.println("Counter "+ ht.counter); - if (background_tree.getAccuracy() > ht.getAccuracy()) { - System.out.println("Do the Switch: " + hoeffding_tree_id + " => " + instance_id + " Background Tree taking over " + ht.getAccuracy() + " => " + background_tree.getAccuracy()); - ht.RemoveHoeffdingTree(); - hoeffdingTreeValueState.update(background_tree); - ht = hoeffdingTreeValueState.value(); - System.out.println(ht.getAccuracy() + " " + ht.getErrorRate()); - empty_background_state.update(true); - // System.out.println("Making the switch and resetting the Drift Detector"); - //RESET EVERYTHING - conceptDriftDetector.ResetConceptDrift(); - } - collector.collect(new Tuple6<>(instance_id, prediction, -1, purpose_id, background_tree.getErrorRate(), 0)); - - } - } else if (current_stream_status == 1 && updated_stream_status == 0) { - if (current_signal == -1) { + ht.RemoveHoeffdingTree(); + hoeffdingTreeValueState.update(background_tree); + empty_background_state.update(true); + // System.out.println("Making the switch and resetting the Drift Detector"); + //RESET EVERYTHING + conceptDriftDetector.ResetConceptDrift(); + } else if (current_signal == -1) { // System.out.println("=========================Stable Phase/ False Alarm================================="); - System.out.println("Stable phase after a false alarm"); +// System.out.println("Stable phase after a false alarm"); // System.out.println("FAS False Alarm Signal: instance id " + instance_id); HoeffdingTree background_tree = background_hoeffdingTreeValueState.value(); background_tree.RemoveHoeffdingTree(); background_hoeffdingTreeValueState.clear(); empty_background_state.update(true); - } //System.out.println("Training HT with id " + hoeffding_tree_id + " which has error-rate " + ht.getErrorRate() + " predicts " + prediction + " for the instance with id " + instance_id + " while the true label is " + true_label); } @@ -359,16 +344,13 @@ public void flatMap(Tuple3 input_stream, Collector(instance_id, prediction, true_label, purpose_id, ht.getErrorRate(), 1)); - collector.collect(new Tuple6<>(instance_id, prediction, true_label, purpose_id, backup_ht.getErrorRate(), 2)); } else if (instance_id == -1 && true_label == -1 && purpose_id == -1) { HoeffdingTree ht = hoeffdingTreeValueState.value(); int size = ht.SizeHT(ht.root); @@ -393,7 +375,6 @@ public void flatMap(Tuple3 input_stream, Collector input_stream, Collector input_stream, Collector descriptor_backup_hoeffdingTreeValueState = new ValueStateDescriptor("backup_hoeffdingTreeValueState", HoeffdingTree.class); - backup_hoeffdingTreeValueState = getRuntimeContext().getState(descriptor_backup_hoeffdingTreeValueState); + /* Background Hoeffding Tree */ ValueStateDescriptor descriptor_background_hoeffding_tree = new ValueStateDescriptor("background_hoeffdingTreeValueState", HoeffdingTree.class); diff --git a/src/main/java/HoeffdingTree/HoeffdingTree.java b/src/main/java/HoeffdingTree/HoeffdingTree.java index e8eb9db..fdd63cf 100644 --- a/src/main/java/HoeffdingTree/HoeffdingTree.java +++ b/src/main/java/HoeffdingTree/HoeffdingTree.java @@ -73,15 +73,15 @@ public boolean NeedUpdate() { statistics_ht_history[3] = 0; statistics_ht_history[4] = 0; } - if (this.getAccuracy() < statistics_ht_history[0] + 2*statistics_ht_history[3]) { - if (this.getAccuracy() > statistics_ht_history[0] - 2*statistics_ht_history[3]) { + if (this.getAccuracy() < statistics_ht_history[0] + statistics_ht_history[3]) { + if (this.getAccuracy() > statistics_ht_history[0] - statistics_ht_history[3]) { this.counter1 = this.counter1 + 1; // System.out.println("Counter 1" + this.counter1 + " Counter 2" + this.counter); } } - if ((this.counter1 / this.counter) > 0.8) { - return false; - } +// if ((this.counter1 / this.counter) > 0.8) { +// return false; +// } } return true; @@ -131,11 +131,13 @@ public void UpdateHoeffdingTree(Node node, String[] input, int weight) { // System.out.println(statistics_entropy_history[2] + " => " + Math.sqrt(statistics_entropy_history[1] / (statistics_entropy_history[2] - 2))); // System.out.println(statistics_ig_history[2] + " => " + Math.sqrt(statistics_ig_history[1] / (statistics_ig_history[2] - 2))); } + } else { + Node updatedNode = node.TraverseTree(node, input); + node.InsertNewSample(updatedNode, input, weight); } } - public int SizeHT(Node node) { return node.countNode(node); } diff --git a/src/main/java/HoeffdingTree/Node.java b/src/main/java/HoeffdingTree/Node.java index e8218ac..8efc8c3 100644 --- a/src/main/java/HoeffdingTree/Node.java +++ b/src/main/java/HoeffdingTree/Node.java @@ -33,7 +33,6 @@ public class Node implements Serializable { public Integer nmin; // Keep tracking the number of samples seen public Integer nmin_last_check; // Number of instances from last check public ArrayList>> statistics = new ArrayList<>(); // number of values for each column - /* * || feature 1 || ... || feature N || <- ArrayList < * || ------------------------------------------------------------- || @@ -67,7 +66,6 @@ public Integer getNmin() { return this.nmin; } - public int countNode(Node root) { //base case @@ -310,7 +308,6 @@ public double[] AttemptSplit(Node node, String[] sample, int weight, double[] st // true; - for testing boolean svfdt_ii_constraints = (entropy > 0) || (ig > 0); if (svfdt_ii_constraints) { - double[] values = {G[0][0], G[1][0], G[2][0],G[3][0],G[4][0]}; SplitFunction(node, values); diff --git a/src/main/java/Utilities/DefaultValues.java b/src/main/java/Utilities/DefaultValues.java index 9c97e29..4cf725c 100644 --- a/src/main/java/Utilities/DefaultValues.java +++ b/src/main/java/Utilities/DefaultValues.java @@ -26,8 +26,8 @@ public class DefaultValues { int number_of_hoeffding_trees = 1; int combination_function = 3; double weighted_voting_parameter = 1; - int age_of_maturity = 10000; - int drift_detection_method_id = 2; + int age_of_maturity = 1000; + int drift_detection_method_id = 1;