python程序调用Hbase

最近在写一个专有的爬虫,目的是抓取电子商务网站的图片和商品信息。为了快速开发, 开发语言使用python,使用了BeautifulSoap和mechnize库去爬取网页和做网页信息提取。网页信息和parse完的库存储在了hbase系统里。

搭建一个单机版的hbase非常简单,先部署yahoo的hadoop release(不要用apache的,问题比较多,yahoo发布的版本较为稳定),再部署hbase,这个最好是使用cloudera的hbase发布版本,都是经过大量的测试的。

具体的流程是这样的:
0。 选择源url(root url),从一个购物网站(淘宝,京东都有较好的类目)选择自己需要的几个类目
1。 先抓取商品的list页面,这一部需要分析目的网站的url规则,各个网站都不同。
2。 然后抓起商品的detail页面,我们把商品的detail信息存储在hbase中

hbase的table结构如下:
row (pk) : item id item_property (column family): --column : title --column : price --column : category --column : location item_page_data(column family): --column : item_page_data (商品整个页面)

我们把item id作为row key,当然还需要加上前缀区分不同的购物网站;建立一个column family作为商品的属性,比如title,价格,类目,地点等等;再建立一个column family存储网页的所有信息,以备未来需要parse网页里面的其他数据。
hbase中,不同的column family列数据存放在不同的数据文件中,不考虑cache的情况下,做行选择时,如果所需的数据在同一个column family只需要一次磁盘读取,而数据位于多个column family,比如表里有多个column family又做了select * from的这种选择,则需要多次磁盘读取。

我们的item_property是轻量级的数据,放在同一个column family中,而item_page_data是一个原始网页,往往大小超过50KB,放在另外一个column family中。hbase设计的目的是支持宽表的应用,因此创建表时,只需要制定column family,不需要制定列的具体名字。

接下来使用python调用hbase,hbase自带了thrift支持,我们直接从apache的网站下载thrift包,编译安装好之后,到hbase的系统目录:

hbase/src/java/org/apache/hadoop/hbase/thrift/
执行: thrift --gen py Hbase.thrift 把生存的gen-py目录中的hbase目录拷贝到自己的代码中就可以直接使用了。
在hbase的sample目录hbase/src/examples/thrift中,有个非常好的例子,DemoClient.py,基本照着做就可以了。

下面贴的是我的建表脚本,记住只要指定column family就可以了,具体的column在插入数据调用mutateRow()时再指定。



#!/usr/bin/python import sys import time from thrift import Thrift from thrift.transport import TSocket, TTransport from thrift.protocol import TBinaryProtocol from hbase import ttypes from hbase.Hbase import * def create_all_tables(client): '''store table''' table_name = 'store_table' col_name = ['store:'] create_table(client, table_name, col_name) table_name2 = 'item_table' col_name2 = ['item:', 'item_page:'] create_table(client, table_name2, col_name2) def drop_all_tables(client): tables = ['store_table', 'item_table'] for table in tables: if client.isTableEnabled(table): print " disabling table: %s" %(table) client.disableTable(table) print " deleting table: %s" %(table) client.deleteTable(table) def create_table(client, table_name, table_desc): columns = [] for name in table_desc: col = ColumnDescriptor(name) columns.append(col) try: print "creating table: %s" %(table_name) client.createTable(table_name, columns) except AlreadyExists, ae: print "WARN: " + ae.message except Exception, e: print "error happend: " + str(e) def list_all_tables(client): # Scan all tables print "scanning tables..." for t in client.getTableNames(): print " -- %s" %(t) cols = client.getColumnDescriptors(t) print " -- -- column families in %s" %(t) for col_name in cols.keys(): col = cols[col_name] print " -- -- -- column: %s, maxVer: %d" % (col.name, col.maxVersions) def connect_hbase(): # Make socket transport = TSocket.TSocket('localhost', 9090) # Buffering is critical. Raw sockets are very slow transport = TTransport.TBufferedTransport(transport) # Wrap in a protocol protocol = TBinaryProtocol.TBinaryProtocol(transport) # Create a client to use the protocol encoder client = Client(protocol) # Connect! transport.open() return client def printRow(entry): print "row: " + entry.row + ", cols:", for k in sorted(entry.columns): print k + " => " + entry.columns[k].value, print "/n" def scann_table(client): t = 'store_table' columnNames = [] for (col, desc) in client.getColumnDescriptors(t).items(): print "column with name: "+desc.name print desc columnNames.append(desc.name+":") print columnNames print "Starting scanner..." scanner = client.scannerOpenWithStop(t, "", "", columnNames) r = client.scannerGet(scanner) i = 0 while r: if i % 100 == 0: printRow(r[0]) r = client.scannerGet(scanner) i += 1 client.scannerClose(scanner) print "Scanner finished, total %d row" % i if __name__ == '__main__': client = connect_hbase() create_all_tables(client) list_all_tables(client) scann_table(client)

【python程序调用Hbase】

    推荐阅读