问答

pyspark下foreachPartition()向hbase中写数据,数据没有完全写入

作者:admin 2021-04-21 我要评论

1.问题描述 在使用pyspark过程中,遇到了一个向hbase中写数据的问题,在foreachPartition()方法中使用happybase对每个partition中的数据进行写入hbase的时候会出...

在说正事之前,我要推荐一个福利:你还在原价购买阿里云、腾讯云、华为云服务器吗?那太亏啦!来这里,新购、升级、续费都打折,能够为您省60%的钱呢!2核4G企业级云服务器低至69元/年,点击进去看看吧>>>)
1.问题描述

在使用pyspark过程中,遇到了一个向hbase中写数据的问题,在foreachPartition()方法中使用happybase对每个partition中的数据进行写入hbase的时候会出现数据丢失的问题,在hbase中并未完全的写入所有的数据,只写入了一小部分。

2.具体的业务代码如下:

articleVector是文章的向量,similar是文章之间的相似度

article_vector表结构如下:

create temporary table article.article_vector
(
    id            string comment 'id',
    major_id      int comment 'major_id',
    vector array<string> comment 'keyword vector'
);

计算相似的代码:

from pyspark.ml.feature import Word2Vec
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import Word2VecModel
from pyspark.ml.feature import BucketedRandomProjectionLSH

articleVector = spark.sql("select * from article_vector")
def toVector(row):
    return row.id, Vectors.dense(row.vector)
    
train = articleVector.rdd.map(toVector).toDF(["id", "vector"])
brp = BucketedRandomProjectionLSH(inputCol='vector', outputCol='hashes', seed=12345, bucketLength=1.0)
model = brp.fit(train)
similar = model.approxSimilarityJoin(train, train, 2.0, distCol='EuclideanDistance')

存储进hbase中

import happybase

def save_hbase(partitions):
    pool = happybase.ConnectionPool(size=10, host='hbase-url')
    
    with pool.connection() as conn:
        article_similar = conn.table('article_similar')
        for row in partitions:
            article_similar.put(str(row.datasetA.id).encode(),
                                {'similar:{}'.format(row.datasetB.id).encode(): b'%0.4f' % (row.EuclideanDistance)})
        conn.close()
        
similar.foreachPartition(save_hbase)
3.具体问题

article_vector中的数据量为120w条数据,取出来计算完相似度之后得到similar。但是到了save_hbase()这一步就出现问题了,程序跑的过程中并无报错,spark日志中也没有发现异常,但是最终的hbase中article_similar表中却只有6万条记录数。按理说hbase中存储的记录数应该和article_vector中的数据量一致,可以在hbase中根据每一个id查到这个id的对应的相似数据。实际上只存了6万条左右的id,只能查到六万个id对应的相似信息,为什么会这样,是happybase的存储过程中出了什么问题吗?

###

与happybase无关,LSH的桶长度设置过小,增大BucketedRandomProjectionLSH中的bucketLength,再增大approxSimilarityJoin中的欧氏距离的阈值。详细的可以查看pyspark.ml.feature中的BucketedRandomProjectionLSH类源码。

版权声明:本文转载自网络,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。本站转载出于传播更多优秀技术知识之目的,如有侵权请联系QQ/微信:153890879删除

相关文章
  • pyspark下foreachPartition()向hbase中

    pyspark下foreachPartition()向hbase中

  • 如何获取抖音和快手的直播或者播放量等

    如何获取抖音和快手的直播或者播放量等

  • php使用静态方法问题

    php使用静态方法问题

  • elasticsearch7中like的原理是什么?

    elasticsearch7中like的原理是什么?

腾讯云代理商
海外云服务器