Skip to main content

Some fun with Apache Wayang and Spark / Tensorflow

Listen:

Apache Wayang is an open-source Federated Learning (FL) framework developed by the Apache Software Foundation. It provides a platform for distributed machine learning, with a focus on ease of use and flexibility. It supports multiple FL scenarios and provides a variety of tools and components for building FL systems. It also includes support for various communication protocols and data formats, as well as integration with other Apache projects such as Apache Kafka and Apache Pulsar for data streaming. The project aims to make it easier to develop and deploy machine learning models in decentralized environments.

It's important to note that this are just examples and they may not be the way for your project to interact with Apache Wayang, you may need to check the documentation of the Apache Wayang project (https://wayang.apache.org) to see how to interact with it. I just point out how easy it is to use different languages to interact between Wayang and Spark.

Also, you need to make sure that you have the correct permissions and credentials to interact with the Wayang API and make changes to the Spark cluster.

Wayang - Scala - Spark:

import org.apache.wayang.{Wayang, WayangClient}

class SparkScaler(wayangUrl: String) {
    val wayang = new WayangClient(wayangUrl)

    def scaleUp(numWorkers: Int): Unit = {
        wayang.addWorkers(numWorkers)
    }

    def scaleDown(numWorkers: Int): Unit = {
        wayang.removeWorkers(numWorkers)
    }
}

The SparkScaler class takes a single parameter, the URL of the Wayang API endpoint, when it is initialized. The scaleUp() method can be called to add a specified number of workers to the Spark cluster, and the scaleDown() method can be called to remove a specified number of workers.

Wayang - Python - Spark

from apache_wayang import Wayang

class SparkScaler:
    def __init__(self, wayang_url):
        self.wayang = Wayang(wayang_url)

    def scale_up(self, num_workers):
        self.wayang.add_workers(num_workers)

    def scale_down(self, num_workers):
        self.wayang.remove_workers(num_workers)

The SparkScaler class takes a single parameter, the URL of the Wayang API endpoint, when it is initialized. The scale_up() method can be called to add a specified number of workers to the Spark cluster, and the scale_down() method can be called to remove a specified number of workers.

Wayang - Java Streams - Spark

import org.apache.wayang.WayangClient;
import java.util.stream.IntStream;

public class SparkScaler {
    private WayangClient wayang;

    public SparkScaler(String wayangUrl) {
        wayang = new WayangClient(wayangUrl);
    }

    public void scaleUp(int numWorkers) {
        IntStream.range(0, numWorkers).forEach(i -> wayang.addWorker());
    }

    public void scaleDown(int numWorkers) {
        IntStream.range(0, numWorkers).forEach(i -> wayang.removeWorker());
    }
}

The SparkScaler class takes a single parameter, the URL of the Wayang API endpoint, when it is initialized. The scaleUp() method can be called to add a specified number of workers to the Spark cluster, and the scaleDown() method can be called to remove a specified number of workers.

Iterate the K-Means clustering algorithm from Apache Wayang to TensorFlow

import org.apache.wayang.WayangClient;
import org.tensorflow.Graph;
import org.tensorflow.Session;
import org.tensorflow.Tensor;

public class KMeansIteration {
    private WayangClient wayang;
    private Graph graph;
    private Session session;

    public KMeansIteration(String wayangUrl, String modelPath) {
        wayang = new WayangClient(wayangUrl);
        graph = new Graph();
        graph.importGraphDef(modelPath);
        session = new Session(graph);
    }

    public void iterate(Tensor input) {
        Tensor wayangOutput = wayang.runKMeans(input);
        Tensor tfOutput = session.runner().feed("input", wayangOutput).fetch("output").run().get(0);
        // Perform further processing on tfOutput
    }
}

That's are only examples to show how easy it can be to get started with FL and also get involved into Wayang as a developer. Also consider to contribute to the project, check the project under wayang.apache.org 

The KMeansIteration class takes two parameters, the URL of the Wayang API endpoint and the path of the TensorFlow model, when it is initialized. The iterate() method can be called with an input Tensor, it will pass it to the Wayang's K-Means clustering algorithm, it will receive the output, and then will pass it to the TensorFlow's model as an input.

Comments

  1. I like also to point to https://wayang.apache.org as open source alternative. Another good one, commercial wise, is Blossom Sky (https://www.databloom.ai/blossom-sky).

    ReplyDelete

Post a Comment

Popular posts from this blog

Deal with corrupted messages in Apache Kafka

Under some strange circumstances it can happen that a message in a Kafka topic is corrupted. This happens often by using 3rd party frameworks together with Kafka. Additionally, Kafka < 0.9 has no lock at Log.read() at the consumer read level, but has a lock on Log.write(). This can cause a rare race condition, as described in KAKFA-2477 [1]. Probably a log entry looks like: ERROR Error processing message, stopping consumer: (kafka.tools.ConsoleConsumer$) kafka.message.InvalidMessageException: Message is corrupt (stored crc = xxxxxxxxxx, computed crc = yyyyyyyyyy Kafka-Tools Kafka stores the offset of every consumer in Zookeeper. To read out the offsets, Kafka provides handy tools [2]. But also zkCli.sh can be used, at least to display the consumer and the stored offsets. First we need to find the consumer for a topic (> Kafka 0.9): bin/kafka-consumer-groups.sh --zookeeper management01:2181 --describe --group test Prior to Kafka 0.9 the only possibility to get this inform

Hive query shows ERROR "too many counters"

A hive job face the odd " Too many counters:"  like Ended Job = job_xxxxxx with exception 'org.apache.hadoop.mapreduce.counters.LimitExceededException(Too many counters: 201 max=200)' FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.MapRedTask Intercepting System.exit(1) These happens when operators are used in queries ( Hive Operators ). Hive creates 4 counters per operator, max upto 1000, plus a few additional counters like file read/write, partitions and tables. Hence the number of counter required is going to be dependent upon the query.  To avoid such exception, configure " mapreduce.job.counters.max " in mapreduce-site.xml to a value above 1000. Hive will fail when he is hitting the 1k counts, but other MR jobs not. A number around 1120 should be a good choice. Using " EXPLAIN EXTENDED " and " grep -ri operators | wc -l " print out the used numbers of operators. Use this value to tweak the MR s

GPT & GenAI for Startup Storytelling

OpenAI and Bard   are the most used GenAI tools today; the first one has a massive Microsoft investment, and the other one is an experiment from Google. But did you know that you can also use them to optimize and hack your startup?  For startups, creating pitch scripts, sales emails, and elevator pitches with generative AI (GenAI) can help you not only save time but also validate your marketing and wording. Curious? Here are a few prompt hacks for startups to create,improve, and validate buyer personas, your startup's mission/vision statements, and unique selling proposition (USP) definitions. First Step: Introduce yourself and your startup Introduce yourself, your startup, your website, your idea, your position, and in a few words what you are doing to the chatbot: Prompt : I'm NAME and our startup NAME, with website URL, is doing WHATEVER. With PRODUCT NAME, we aim to change or disrupt INDUSTRY. Bard is able to pull information from your website. I'm not sure if ChatGPT