UDF Python *, . UninterruptibleThread - Kafka (KAFKA-1894) , KafkaConsumer.
PythonUDFRunner (, , , ).
, . Python KafkaConsumer. - , - JIRA ticket.
* unfold SQL, . :
from pyspark.sql.functions import concat_ws, col, expr, coalesce, lit, regexp_extract, when
p = "(.+) message repeated (\\d) times: \\[ (.+)\\]"
lines = spark.createDataFrame(
["asd message repeated 3 times: [ 12]", "some other message"], "string"
)
lines_with_count = lines.withColumn(
"message_count", coalesce(regexp_extract("value", p, 2).cast("int"), lit(1)))
explode
exploded = lines_with_count.withColumn(
"i",
expr("explode(split(repeat('1', message_count - 1),''))")
).drop("message_count", "i")
:
exploded.withColumn(
"value",
when(
col("value").rlike(p),
concat_ws(" ", regexp_extract("value", p, 1), regexp_extract("value", p, 3))
).otherwise(col("value"))).show(4, False)