UDFs raise a warning: CachedKafkaConsumer does not work in UninterruptibleThread (KAFKA-1894)

In the usual structured_kafka_wordcount.py code

When I split the lines into words on udf, as shown below,

my_split = udf(lambda x: x.split(' '), ArrayType(StringType()))

words = lines.select(
    explode(
        my_split(lines.value)
    )
)

the warning will continue to show:

WARN CachedKafkaConsumer: CachedKafkaConsumer does not work UninterruptibleThread. It may freeze when CachedKafkaConsumer methods are interrupted due to KAFKA-1894

On the other hand, when I break the lines into words on pyspark.sql.functions.split, everything works well.

words = lines.select(
    explode(
        split(lines.value, ' ') 
    ) 
)

Why did this happen and how to fix the warning?

This is the code I'm trying to execute in practice:

pattern = "(.+) message repeated (\\d) times: \\[ (.+)\\]"
prog = re.compile(pattern)


def _unfold(x):
    ret = []
    result = prog.match(x)
    if result:
        log = " ".join((result.group(1), result.group(3)))
        times = result.group(2)
        for _ in range(int(times)):
            ret.append(log)
    else:
        ret.append(x)

    return ret

_udf = udf(lambda x: _unfold(x), ArrayType(StringType()))
lines = lines.withColumn('value', explode(_udf(lines['value'])))
+6
1

UDF Python *, . UninterruptibleThread - Kafka (KAFKA-1894) , KafkaConsumer.

PythonUDFRunner (, , , ).

, . Python KafkaConsumer. - , - JIRA ticket.


* unfold SQL, . :

from pyspark.sql.functions import concat_ws, col, expr, coalesce, lit, regexp_extract, when

p = "(.+) message repeated (\\d) times: \\[ (.+)\\]"

lines = spark.createDataFrame(
    ["asd message repeated 3 times: [ 12]", "some other message"], "string"
)

lines_with_count = lines.withColumn(
   "message_count", coalesce(regexp_extract("value", p, 2).cast("int"), lit(1)))

explode

exploded = lines_with_count.withColumn(
     "i", 
      expr("explode(split(repeat('1', message_count - 1),''))")
).drop("message_count", "i")

:

exploded.withColumn(
    "value",
    when(
        col("value").rlike(p),
         concat_ws(" ", regexp_extract("value", p, 1), regexp_extract("value", p, 3))
    ).otherwise(col("value"))).show(4, False)


# +------------------+
# |value             |
# +------------------+
# |asd 12            |
# |asd 12            |
# |asd 12            |
# |some other message|
# +------------------+
+2

All Articles