Skip to main content

FlumeNG - the evolution

Listen:
Flume, the decentralized log collector, makes some great progress. Since the project has reached an Apache incubating tier the development on the next generation (NG) has reached a significant level.

Now, what's the advantage of a new flume? Simply the architecture. FlumeNG doesn't need zookeeper anymore, has no master / client concept nor nodes / collectors. Simply run a bunch of agents, connect each of them together and create your own flow. Flume now can run with 20MB heapsize, uses inMemory Channel for flows, can have multiflows, different sinks to one channel and will support a few more sinks as flume =< 1.0.0. But, flumeNG will not longer support tail and tailDir, here a general exec sink is available, which lets the user the freedom to use everything. 

Requirements
On the build host we need jre 1.6.x, maven 3.x and git or svn. 

Installation
To check out the code we use git and maven in a simple one-line command:
git clone git://git.apache.org/flume.git; cd flume; git checkout trunk && mvn clean && mvn package -DskipTests

After few seconds the build should be done:

[INFO] Apache Flume ...................................... SUCCESS [7.276s]
[INFO] Flume NG Core ..................................... SUCCESS [3.043s]
[INFO] Flume NG Sinks .................................... SUCCESS [0.275s]
[INFO] Flume NG HDFS Sink ................................ SUCCESS [0.892s]
[INFO] Flume NG IRC Sink ................................. SUCCESS [0.515s]
[INFO] Flume NG Channels ................................. SUCCESS [0.214s]
[INFO] Flume NG JDBC channel ............................. SUCCESS [0.802s]
[INFO] Flume NG Agent .................................... SUCCESS [0.893s]
[INFO] Flume NG file-based channel ....................... SUCCESS [0.516s]
[INFO] Flume NG distribution ............................. SUCCESS [16.602s]
[INFO] Flume legacy Sources .............................. SUCCESS [0.143s]
[INFO] Flume legacy Thrift Source ........................ SUCCESS [0.599s]
[INFO] Flume legacy Avro source .......................... SUCCESS [0.458s]
[INFO] Flume NG Clients .................................. SUCCESS [0.133s]
[INFO] Flume NG Log4j Appender ........................... SUCCESS [0.385s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS

Now we find the build and sources in flume-ng-dist/target. Copy the wanted distribution to the host you want to play with, unpack them and start to use flume. 

What is a flow?
A flow in flumeNG describes the whole transport from a source to a sink. The sink could also be a new source to collect different streams into one sink. The process flume starts is an agent. A setup could be run like the example:

source -             -> source => channel => sink
        \           /       
source - => channel => sink 
        /           \
source -             -> channel => source => channel => sink                  

Configuration
Before we can start to play with flumeNG, we need to configure it. The config is logical, but for the first time difficult to understand. The matrix is always <identifier>.type.subtype.parameter.config, where <identifier> is the name of the agent we call later to startup. As the picture above shows, we can have all complexity we want. For that we need also a config which reflects our complexity, so we have to define the source, channel, sink for all entry and end points. 
Let me explain an example (syslog-agent.cnf):

syslog-agent.sources = Syslog
syslog-agent.channels = MemoryChannel-1
syslog-agent.sinks = Console

syslog-agent.sources.Syslog.type = syslogTcp
syslog-agent.sources.Syslog.port = 5140

syslog-agent.sources.Syslog.channels = MemoryChannel-1
syslog-agent.sinks.Console.channel = MemoryChannel-1

syslog-agent.sinks.Console.type = logger
syslog-agent.channels.MemoryChannel-1.type = memory

In the configuration example above we define a simple syslog flow, start point Syslog, endpoint Console and transport channel MemoryChannel-1. The name of a segment we can define as we wish, that are the main identifiers to setup a valid flow. 

The example flow will listen at the configured port and will send all events to the logger (logger is an internal sink for debugging and sends captured events to stdout).  The same config with HDFS as an sink would looks like:

syslog-agent.sources = Syslog
syslog-agent.channels = MemoryChannel-1
syslog-agent.sinks = HDFS-LAB

syslog-agent.sources.Syslog.type = syslogTcp
syslog-agent.sources.Syslog.port = 5140

syslog-agent.sources.Syslog.channels = MemoryChannel-1
syslog-agent.sinks.HDFS-LAB.channel = MemoryChannel-1

syslog-agent.sinks.HDFS-LAB.type = hdfs

syslog-agent.sinks.HDFS-LAB.hdfs.path = hdfs://NN.URI:PORT/flumetest/'%{host}''
syslog-agent.sinks.HDFS-LAB.hdfs.file.Prefix = syslogfiles
syslog-agent.sinks.HDFS-LAB.hdfs.file.rollInterval = 60
syslog-agent.sinks.HDFS-LAB.hdfs.file.Type = SequenceFile
syslog-agent.channels.MemoryChannel-1.type = memory

Flume supports at the moment avro, syslog and exec sources, hdfs and logger sinks. 

Start the flow
Flume-ng starts a single flow per process. That's will be done with:
bin/flume-ng agent -n YOUR_IDENTIFIER -f YOUR_CONFIGFILE 
eg:
bin/flume-ng agent -n syslog-agent -f conf/syslog-agent.cnf

Links: flumeNG Wiki


Comments

  1. Thank you so much Alo for your wonderful writeup..It was of great help..After reading it I tried Hdfs sink but I am getting some error like -

    12/06/10 06:36:16 ERROR hdfs.HDFSEventSink: process failed
    java.lang.NoSuchMethodError: org.apache.hadoop.io.SequenceFile$Writer.syncFs()V
    at org.apache.flume.sink.hdfs.HDFSSequenceFile.sync(HDFSSequenceFile.java:77)
    at org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:276)
    at org.apache.flume.sink.hdfs.BucketWriter.access$500(BucketWriter.java:46)
    at org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:265)

    Could you please help me out with this..Many thanks.

    ReplyDelete
    Replies
    1. Anonymous13 June, 2012

      Hi,

      It looks like that you have mixed different versions of hadoop, could that be? Please check for an existend hadoop.jar in Flume's lib directory and delete that.

      cheers,
      Alex

      Delete
    2. Thank you for the valuable response alo..Sorry, I was a bit out of touch for sometime because of some other commitments..I got working..But I am not able to use any of the escape sequences..I have a small config file that looks like this -
      agent1.sources = tail
      agent1.channels = MemoryChannel-2
      agent1.sinks = HDFS

      agent1.sources.tail.type = exec
      agent1.sources.tail.command = tail -F /var/log/apache2/access.log.1
      agent1.sources.tail.channels = MemoryChannel-2

      agent1.sinks.HDFS.channel = MemoryChannel-2
      agent1.sinks.HDFS.type = hdfs
      agent1.sinks.HDFS.hdfs.path = hdfs://localhost:9000/flume/'%{host}'
      agent1.sinks.HDFS.hdfs.file.Type = DataStream

      agent1.channels.MemoryChannel-2.type = memory

      But instead of creating a directory with the hostname, it is creating a directory with name - ''

      Am I missing something here??Many thanks

      Delete
    3. Anonymous21 June, 2012

      Hi Mohammad,

      please use:
      agent.sinks.hdfsSink.hdfs.filePrefix = %{host}

      instead to add the escape sequence in the hdfs path (I know, in Flume 0.9x that was the correct way, but now we've moved them out).

      cheers,
      Alex

      Delete
  2. Please let me know how to add custom serialization for supporting custom appender (like in log4j).

    Please suggest me .

    ReplyDelete

Post a Comment

Popular posts from this blog

Deal with corrupted messages in Apache Kafka

Under some strange circumstances it can happen that a message in a Kafka topic is corrupted. This happens often by using 3rd party frameworks together with Kafka. Additionally, Kafka < 0.9 has no lock at Log.read() at the consumer read level, but has a lock on Log.write(). This can cause a rare race condition, as described in KAKFA-2477 [1]. Probably a log entry looks like: ERROR Error processing message, stopping consumer: (kafka.tools.ConsoleConsumer$) kafka.message.InvalidMessageException: Message is corrupt (stored crc = xxxxxxxxxx, computed crc = yyyyyyyyyy Kafka-Tools Kafka stores the offset of every consumer in Zookeeper. To read out the offsets, Kafka provides handy tools [2]. But also zkCli.sh can be used, at least to display the consumer and the stored offsets. First we need to find the consumer for a topic (> Kafka 0.9): bin/kafka-consumer-groups.sh --zookeeper management01:2181 --describe --group test Prior to Kafka 0.9 the only possibility to get this inform

Hive query shows ERROR "too many counters"

A hive job face the odd " Too many counters:"  like Ended Job = job_xxxxxx with exception 'org.apache.hadoop.mapreduce.counters.LimitExceededException(Too many counters: 201 max=200)' FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.MapRedTask Intercepting System.exit(1) These happens when operators are used in queries ( Hive Operators ). Hive creates 4 counters per operator, max upto 1000, plus a few additional counters like file read/write, partitions and tables. Hence the number of counter required is going to be dependent upon the query.  To avoid such exception, configure " mapreduce.job.counters.max " in mapreduce-site.xml to a value above 1000. Hive will fail when he is hitting the 1k counts, but other MR jobs not. A number around 1120 should be a good choice. Using " EXPLAIN EXTENDED " and " grep -ri operators | wc -l " print out the used numbers of operators. Use this value to tweak the MR s

GPT & GenAI for Startup Storytelling

OpenAI and Bard   are the most used GenAI tools today; the first one has a massive Microsoft investment, and the other one is an experiment from Google. But did you know that you can also use them to optimize and hack your startup?  For startups, creating pitch scripts, sales emails, and elevator pitches with generative AI (GenAI) can help you not only save time but also validate your marketing and wording. Curious? Here are a few prompt hacks for startups to create,improve, and validate buyer personas, your startup's mission/vision statements, and unique selling proposition (USP) definitions. First Step: Introduce yourself and your startup Introduce yourself, your startup, your website, your idea, your position, and in a few words what you are doing to the chatbot: Prompt : I'm NAME and our startup NAME, with website URL, is doing WHATEVER. With PRODUCT NAME, we aim to change or disrupt INDUSTRY. Bard is able to pull information from your website. I'm not sure if ChatGPT