Issue
When using the Snowflake Kafka connector with Redpanda, an offset mismatch can occur if a topic is dropped and recreated without also dropping the corresponding Snowflake table.
This results in warning messages such as :
Ignoring invalid task provided offset integrations__support_to_sf_v0-0/OffsetAndMetadata{offset=2881, leaderEpoch=null, metadata=;} -- not yet consumed, taskOffset=2881 currentOffset=1592
and/or
Channel TEST.PUBLIC.SUPPORT_RP_TO_SF.SUPPORT_RP_TO_SF_7 - skipping current record - expected offset 23435 but received 9803. The current offset stored in Snowflake: 23435"}
Cause
The Snowflake connector (SNOWPIPE_STREAMING ingestion method)
maintains offset information in Snowflake channels.
There is one Snowflake channel per topic/partition
In the example below, can see 10 channels based on a topic called "support_rp_to_sf" which has 10 partitions.
If a Redpanda topic is recreated, the topic and consumer offsets will be reset , but the Snowflake channel still contains the old offset information, which causes a mismatch warning when the connector tries to load them
Solution
There are a number of ways to get round this.. the main thing to consider is whether you want to keep the data that was already loaded into the Snowflake table.
Option 1: Reset the Snowflake Channel offset to -1 (Data maintained in the Snowflake Table , but chances of duplicates)
Snowflake has the following documentation .
https://community.snowflake.com/s/article/FAQ-Snowflake-Kafka-connector-Snowpipe-Streaming
Where it states ....
>>>I need to reset ingestion and use Kafka as the source of truth for the starting point, can I do this as a one-off?
Yes, stop the connector and set the offset token to '-1' using the SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN function. When starting the connector, Kafka's consumer offset will be used as the source of truth to start ingestion from. Please note that after doing this, if any previously ingested offsets are sent to Snowflake, they will be processed, introducing the risk of duplicates.
This is a bit involved as you have to run this for each channel.. So for example.. if there were 100 partitions...
there are 100 channels (could create a function/script with a loop if so inclined)
For example.. resetting 10 partition topic...
.... SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN('TEST.PUBLIC.SUPPORT_RP_TO_SF', 'SUPPORT_RP_TO_SF_0', '-1'); SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN('TEST.PUBLIC.SUPPORT_RP_TO_SF', 'SUPPORT_RP_TO_SF_1', '-1'); SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN('TEST.PUBLIC.SUPPORT_RP_TO_SF', 'SUPPORT_RP_TO_SF_2', '-1'); ... SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN('TEST.PUBLIC.SUPPORT_RP_TO_SF', 'SUPPORT_RP_TO_SF_9', '-1');
Additionally you will need to fully stop the Redpanda>Snowflake Connector prior to running the channel resets. To fully stop a connector, it needs to be actually deleted. (Pausing is not enough)
Steps:
- Copy the contents of the existing Redpanda-Snowflake connector configuration from the JSON form page..
- Delete the existing Redpanda-Snowflake connector
- Apply the reset channel option (
SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN
) so the offsets are set to -1 per the Snowflake "SHOW CHANNELS" command
- Recreate and start the Redpanda-Snowflake connector using the same parameters saved from earlier.
- Run the Snowflake "SHOW CHANNELS" command periodically after the restart and you will see that the offsets changes from -1 to relevant values as the connector progresses
Option 2 : deleting the table in snowflake and start again .
- Drop the corresponding Snowflake table which will also drop the channel containing the old offset metadata..
- Restart connector to recreate the table/channel and start with fresh offset tracking.
Additional Notes
- For more information on Snowpipe streaming and offset tokens, refer to the Snowflake documentation.