python - 圧縮された大きなcsvファイルから何百万ものレコードをmongoDBに効率的に挿入するにはどうすればよいですか?

原文 python mongodb csv zip

Mongoに約800万件のレコードを挿入しようとしていますが、1秒あたり1000件のレコードという非常に遅い速度でレコードを挿入しているようです。

コードはpythonで書かれているので、pythonの問題かもしれませんが、疑問です。これがコードです:

def str2datetime(str):
  return None if (not str or str == r'\N') else datetime.strptime(str, '%Y-%m-%d %H:%M:%S')
def str2bool(str):
  return None if (not str or str == r'\N') else (False if str == '0' else True)
def str2int(str):
  return None if (not str or str == r'\N') else int(str)
def str2float(str):
  return None if (not str or str == r'\N') else float(str)
def str2float2int(str):
  return None if (not str or str == r'\N') else int(float(str) + 0.5)
def str2latin1(str):
  return unicode(str, 'latin-1')

_ = lambda x: x

converters_map = {
  'test_id': str2int,
  'android_device_id': str2int,
  'android_fingerprint': _,
  'test_date': str2datetime,
  'client_ip_address': _,
  'download_kbps': str2int,
  'upload_kbps': str2int,
  'latency': str2int,
  'server_name': _,
  'server_country': _,
  'server_country_code': _,
  'server_latitude': str2float,
  'server_longitude': str2float,
  'client_country': _,
  'client_country_code': _,
  'client_region_name': str2latin1,
  'client_region_code': _,
  'client_city': str2latin1,
  'client_latitude': str2float,
  'client_longitude': str2float,
  'miles_between': str2float2int,
  'connection_type': str2int,
  'isp_name': _,
  'is_isp': str2bool,
  'network_operator_name': _,
  'network_operator': _,
  'brand': _,
  'device': _,
  'hardware': _,
  'build_id': _,
  'manufacturer': _,
  'model': str2latin1,
  'product': _,
  'cdma_cell_id': str2int,
  'gsm_cell_id': str2int,
  'client_ip_id': str2int,
  'user_agent': _,
  'client_net_speed': str2int,
  'iphone_device_id': str2int,
  'carrier_name': _,
  'iso_country_code': _,
  'mobile_country_code': str2int,
  'mobile_network_code': str2int,
  'model': str2latin1,
  'version': _,
  'server_sponsor_name': _,
}

def read_csv_zip(path):
  with ZipFile(path) as z:
    with z.open(z.namelist()[0]) as input:
      r = csv.reader(input)
      header = r.next()
      converters = tuple((title if title != 'test_id' else '_id', converters_map[title]) for title in header)
      for row in r:
        row = {converter[0]:converter[1](value) for converter, value in zip(converters, row)}
        yield row

argv = [x for x in argv if not x == '']
if len(argv) == 1:
  print("Usage: " + argv[0] + " zip-file")
  exit(1)

zip_file = argv[1]
collection_name = zip_file[:zip_file.index('_')]

print("Populating " + collection_name + " with the data from " + zip_file)
with Connection() as connection:
  db = connection.db
  collection = db.__getattr__(collection_name)
  i = 0;
  try:
    start = time()
    for item in read_csv_zip(zip_file):
      i += 1
      if (i % 1000) == 0:
        stdout.write("\r%d " % i)
        stdout.flush()
      try:
        collection.insert(item)
      except Exception as exc:
        print("Failed at the record #{0} (id = {1})".format(i,item['_id']))
        print exc
    print("Elapsed time = {0} seconds, {1} records.".format(time() - start, i))
    raw_input("Press ENTER to exit")
  except Exception as exc:
    print("Failed at the record #{0} (id = {1})".format(i,item['_id']))
    print exc
    exit(1)


262796レコード(1つのcsvファイル)を挿入するのに350秒かかります。

mongoサーバーは同じマシンで実行されており、誰も使用していません。したがって、方法があれば、データベースファイルに直接書き込むことができます。

800万件のレコードはシャーディングを必要としないため、シャーディングには興味がありませんね。

私の質問は私が何を間違っているのですか?多分私のDBの選択は間違っていますか?典型的なフローは、月に一度レコードが更新され、データベースに対してクエリのみが行われることです。

ありがとう。

編集

ボトルネックはmongoではなく、zipファイルを読み取ることが判明しました。 1000行のチャンクでzipファイルを読み取り、Collection.insertへの1回の呼び出しでそれらをmongoにフィードするようにコードを変更しました。これは、zipファイルです。変更されたコードは次のとおりです。

def insert_documents(collection, source, i, batch_size):
  count = 0;
  while True:
    items = list(itertools.islice(source, batch_size))
    if len(items) == 0:
      break;
    old_i = i
    count += len(items)
    i += len(items)
    if (old_i / 1000) != (i / 1000):
      sys.stdout.write("\r%d " % i)
      sys.stdout.flush()
    try:
      collection.insert(items)
    except Exception as exc:
      print("Failed at some record between #{0} (id = {1}) and #{2} (id = {3})".format(old_i,items[0]['_id'],i,items[-1]['_id']))
      print exc
  return count

def main():
  argv = [x for x in sys.argv if not x == '']
  if len(argv) == 1:
    print("Usage: " + argv[0] + " zip-file")
    exit(1)

  zip_file = argv[1]
  collection_name = zip_file[:zip_file.index('_')]

  print("Populating " + collection_name + " with the data from " + zip_file)
  with Connection() as connection:
    ookla = connection.ookla
    collection = ookla.__getattr__(collection_name)
    i = 0;
    start = time()
    count = insert_documents(collection, read_csv_zip(zip_file), i, 1000)
    i += count
    print("Elapsed time = {0} seconds, {1} records.".format(time() - start, count))
    raw_input("Press ENTER to exit")

if __name__ == "__main__":
  main()


ほとんどの場合、items = list(itertools.islice(source, batch_size))に入ることがわかります。

それを改善する方法に関するアイデアはありますか?
答え
コメントで指摘したように、mongoimportは使用できません。日付はstr2latin変換と同様に完全にインポートできます。単純にcsvを前処理してmongoimportと互換性があり、最適です。

日付を{myDate:{$date: msSinceEpoch}}に変換すると、mongoimportはそれを理解します。したがって、1つの前処理ステップでmongoimportを使用でき、ユースケースを考えると、それが問題になる理由はわかりません。

とは言っても、mongoimportはバッチ挿入よりも桁違いに高速であってはならず、1000 /秒は遅いわけではありませんが、単純な開発マシンでさえ得ているパフォーマンスのタイプとは一致しません。特にsafe = false書き込みの場合、モノ挿入ではなくバッチ挿入を使用した場合、30k /秒に達する可能性が高く、おそらくそれ以上になります(インポート後の第2ステップとして確認できるため、この場合は問題ありません)。あなたのボトルネックは何ですか? (mongostatおよびtopで確認してください)
関連記事

python - Python Tkinterを使用してボタンに特定の.csvファイルを開く方法は?

python - Django:多対多をフォームに保存する

if-statement - チェックとポーリングのメカニズムのPython的な方法

python - このWhileステートメントの最後のインクリメンターが起動しないのはなぜですか

python - Numpy配列の割り当て

python - WAFを使用したQTプロジェクトの構築

c++ - qextserialportはWindowsでデータをドロップします—それに対して何ができますか?

python - PyQt4:QListWidget間でアイテムを移動する

.net - XMLファイルの重複する値を削除して最後の値を保持する方法

python - Django:不要なSQLステートメントを回避するにはどうすればよいですか?