使用 Python 和 Boto3 操作 Amazon DynamoDB 資料庫

建立專案

建立專案。

1
2
mkdir dynamodb-python-example
cd dynamodb-python-example

建立虛擬環境。

1
2
pyenv virtualenv 3.11.4 dynamodb-python-example
pyenv local dynamodb-python-example

新增 requirements.txt 檔。

1
boto3

安裝依賴套件。

1
pip install -r requirements.txt

新增 .gitignore 檔。

1
__pycache__/

查詢

新增 query.py 檔。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import boto3

session = boto3.Session()

dynamodb_client = session.client('dynamodb')

table_name = 'MyTable'
term = 'Apple1'

response = dynamodb_client.query(
TableName=table_name,
KeyConditionExpression='term = :value',
ExpressionAttributeValues={
':value': {
'S': term,
},
},
)

print(response)

執行。

1
AWS_PROFILE=your-profile python3 query.py

寫入物件

新增 put_item.py 檔。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import boto3

session = boto3.Session()

dynamodb_client = session.client('dynamodb')

table_name = 'MyTable'
term = 'Apple0'

response = dynamodb_client.put_item(
TableName=table_name,
Item={
'term': {
'S': term,
},
}
)

print(response)

執行。

1
AWS_PROFILE=your-profile python3 put_item.py

批量寫入物件

新增 batch_write_item.py 檔。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import boto3
import time

def batch_write_item(dynamodb_client, table_name, batch_items: list):
request_items = {
table_name: batch_items
}

response = dynamodb_client.batch_write_item(
RequestItems=request_items
)

max_retries = 5
retry_count = 0
backoff = 2
unprocessed_items = response.get('UnprocessedItems', {})
while unprocessed_items and retry_count < max_retries:
response = dynamodb_client.batch_write_item(
RequestItems=unprocessed_items
)
unprocessed_items = response.get('UnprocessedItems', {})

time.sleep(backoff**retry_count) # exponential backoff
retry_count += 1

failed_items = unprocessed_items.get(table_name, [])

return dict(
succeed_count=len(batch_items) - len(failed_items),
failed_items=failed_items
)

session = boto3.Session()

dynamodb_client = session.client('dynamodb')

table_name = 'MyTable'

items = [{'term': {'S': 'Apple' + str(i)}} for i in range(1, 100)]

batch_size = 25
batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]

succeeded_items = []
failed_items = []

for batch in batches:
try:
request_items = [{'PutRequest': {'Item': item}} for item in batch]
res = batch_write_item(dynamodb_client, table_name, request_items)
# If there are "failed_items" in the result, consider it a batch failure.
if res['failed_items']:
failed_items.extend(batch)
else:
succeeded_items.extend(batch)
except Exception as e:
print('e', e)
failed_items.extend(batch)

print('succeeded_items:', succeeded_items)
print('failed_items:', failed_items)

執行。

1
AWS_PROFILE=your-profile python3 batch_write_item.py

程式碼

參考資料