PHPFixing
  • Privacy Policy
  • TOS
  • Ask Question
  • Contact Us
  • Home
  • PHP
  • Programming
  • SQL Injection
  • Web3.0

Tuesday, September 20, 2022

[FIXED] How to select starting offset in Pykafka simpleconsumer?

 September 20, 2022     apache-kafka, consumer, pykafka, python     No comments   

Issue

In my kafka cluster single partition topic i have a simple consumer processing all incoming messages, in case of error about data processed i want to reprocess in the same order all message from a certain offset (not the beginning) to fix the inconsistency and keep the original ordered sequence of message from kafka.

Is there a way to do it in with Pykafka? i'm not figuring it out


Solution

You need to call reset_offsets(). For example:

consumer = topic.get_simple_consumer(consumer_group="example")
partition_offset_pairs = [(p, get_offset_for_partition(p)) for p in consumer.partitions.itervalues()]
# because we passed in a consumer_group the new offsets will be saved in Kafka
consumer.reset_offsets(partition_offsets=partition_offset_pairs)

(where get_offset_for_partition() is a function you define). Or for a single-partition topic:

# read from offset 123456
consumer = topic.get_simple_consumer()
partition = topic.partitions[0]
consumer.reset_offsets([(partition, 123456)])

The same reset_offsets() method is also available on BalancedConsumer & ManagedBalanceConsumer classes too.

Note that as part of Kafka's design, messages are only guaranteed in-order for each topic partition independently.



Answered By - rcoup
Answer Checked By - Clifford M. (PHPFixing Volunteer)
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg
Newer Post Older Post Home

0 Comments:

Post a Comment

Note: Only a member of this blog may post a comment.

Total Pageviews

Featured Post

Why Learn PHP Programming

Why Learn PHP Programming A widely-used open source scripting language PHP is one of the most popular programming languages in the world. It...

Subscribe To

Posts
Atom
Posts
Comments
Atom
Comments

Copyright © PHPFixing