1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| import boto3 import time
def batch_write_item(dynamodb_client, table_name, batch_items: list): request_items = { table_name: batch_items }
response = dynamodb_client.batch_write_item( RequestItems=request_items )
max_retries = 5 retry_count = 0 backoff = 2 unprocessed_items = response.get('UnprocessedItems', {}) while unprocessed_items and retry_count < max_retries: response = dynamodb_client.batch_write_item( RequestItems=unprocessed_items ) unprocessed_items = response.get('UnprocessedItems', {})
time.sleep(backoff**retry_count) retry_count += 1
failed_items = unprocessed_items.get(table_name, [])
return dict( succeed_count=len(batch_items) - len(failed_items), failed_items=failed_items )
session = boto3.Session()
dynamodb_client = session.client('dynamodb')
table_name = 'MyTable'
items = [{'term': {'S': 'Apple' + str(i)}} for i in range(1, 100)]
batch_size = 25 batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
succeeded_items = [] failed_items = []
for batch in batches: try: request_items = [{'PutRequest': {'Item': item}} for item in batch] res = batch_write_item(dynamodb_client, table_name, request_items) if res['failed_items']: failed_items.extend(batch) else: succeeded_items.extend(batch) except Exception as e: print('e', e) failed_items.extend(batch)
print('succeeded_items:', succeeded_items) print('failed_items:', failed_items)
|