-
常用操作:Sort, shuffle, select, split, shard, rename, remove, cast, and flatten
-
结合 map 处理数据
-
不同格式的加载和存储
常用操作
加载
wiki = load_dataset("wikipedia", "20220301.en", split="train")
wiki = wiki.remove_columns([col for col in wiki.column_names if col != "text"]) # only keep the 'text' column
排序
sorted_dataset = dataset.sort("label")
shuffle
# 数据集打乱
shuffled_dataset = sorted_dataset.shuffle(seed=42)
选取
_dataset["train"].select([0, 1])
_dataset = dataset.select([0, 10, 20, 30, 40, 50])
# 使用数据集切片功能
_dataset[:3]
过滤
datasets["train"].filter(lambda example: "中国" in example["title"])
start_with_ar = dataset.filter(lambda example: example["sentence1"].startswith("Ar"))
even_dataset = dataset.filter(lambda example, idx: idx % 2 == 0, with_indices=True)
dataset.filter(lambda x: x["state"] == "California")
shard
dataset.shard(num_shards=4, index=0)
Rename
dataset = dataset.rename_column("sentence1", "sentenceA")
remove
dataset = dataset.remove_columns("label")
from datasets import concatenate_datasets, load_dataset
t_dataset = concatenate_datasets([bookcorpus, wiki])
# concatenate two datasets horizontally , axis=1
from datasets import Dataset
bookcorpus_ids = Dataset.from_dict({"ids": list(range(len(bookcorpus)))})
bookcorpus_with_ids = concatenate_datasets([bookcorpus, bookcorpus_ids], axis=1)
map映射
map 和 tokenizer结合
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-chinese")
dataset = dataset.map(lambda examples: tokenizer(examples["text"]), batched=True)
def preprocess_function(example):
model_inputs = tokenizer(example["content"], max_length=512, truncation=True)
labels = tokenizer(example["title"], max_length=32, truncation=True)
# label就是title编码的结果
model_inputs["labels"] = labels["input_ids"]
return model_inputs
#, remove_columns=dataset.column_names
processed_datasets = datasets.map(preprocess_function, batched=True)
加载不同格式数据集
支持的各式:
['.csv', '.tsv', '.json', '.jsonl', '.ndjson', '.parquet', '.geoparquet', '.gpq', '.arrow', '.txt', '.tar', '.xml', '.blp', '.bmp', '.dib', '.bufr', '.cur', '.pcx', '.dcx', '.dds', '.ps', '.eps', '.fit', '.fits', '.fli', '.flc', '.ftc', '.ftu', '.gbr', '.gif', '.grib', '.png', '.apng', '.jp2', '.j2k', '.jpc', '.jpf', '.jpx', '.j2c', '.icns', '.ico', '.im', '.iim', '.tif', '.tiff', '.jfif', '.jpe', '.jpg', '.jpeg', '.mpg', '.mpeg', '.msp', '.pcd', '.pxr', '.pbm', '.pgm', '.ppm', '.pnm', '.psd', '.bw', '.rgb', '.rgba', '.sgi', '.ras', '.tga', '.icb', '.vda', '.vst', '.webp', '.wmf', '.emf', '.xbm', '.xpm', '.BLP', '.BMP', '.DIB', '.BUFR', '.CUR', '.PCX', '.DCX', '.DDS', '.PS', '.EPS', '.FIT', '.FITS', '.FLI', '.FLC', '.FTC', '.FTU', '.GBR', '.GIF', '.GRIB', '.PNG', '.APNG', '.JP2', '.J2K', '.JPC', '.JPF', '.JPX', '.J2C', '.ICNS', '.ICO', '.IM', '.IIM', '.TIF', '.TIFF', '.JFIF', '.JPE', '.JPG', '.JPEG', '.MPG', '.MPEG', '.MSP', '.PCD', '.PXR', '.PBM', '.PGM', '.PPM', '.PNM', '.PSD', '.BW', '.RGB', '.RGBA', '.SGI', '.RAS', '.TGA', '.ICB', '.VDA', '.VST', '.WEBP', '.WMF', '.EMF', '.XBM', '.XPM', '.aiff', '.au', '.avr', '.caf', '.flac', '.htk', '.svx', '.mat4', '.mat5', '.mpc2k', '.ogg', '.paf', '.pvf', '.raw', '.rf64', '.sd2', '.sds', '.ircam', '.voc', '.w64', '.wav', '.nist', '.wavex', '.wve', '.xi', '.mp3', '.opus', '.AIFF', '.AU', '.AVR', '.CAF', '.FLAC', '.HTK', '.SVX', '.MAT4', '.MAT5', '.MPC2K', '.OGG', '.PAF', '.PVF', '.RAW', '.RF64', '.SD2', '.SDS', '.IRCAM', '.VOC', '.W64', '.WAV', '.NIST', '.WAVEX', '.WVE', '.XI', '.MP3', '.OPUS', '.mkv', '.mp4', '.avi', '.mov', '.MKV', '.MP4', '.AVI', '.MOV', '.zip']
json
from datasets import load_dataset
load_dataset("json", data_files=["./my_file.json"], field="data")
parquet
# 大型数据集可能存储在 Parquet 文件中,因为它比返回查询更高效和更快。
dataset = load_dataset("parquet", data_files={'train': 'train.parquet', 'test': 'test.parquet'})
url
base_url = "https://storage.googleapis.com/huggingface-nlp/cache/datasets/wikipedia/20200501.en/1.0.0/"
data_files = {"train": base_url + "wikipedia-train.parquet"}
wiki = load_dataset("parquet", data_files=data_files, split="train")
url = "https://domain.org/train_data.zip"
data_files = {"train": url}
dataset = load_dataset("csv", data_files=data_files)
arrow
dataset = load_dataset("arrow", data_files={'train': 'train.arrow', 'test': 'test.arrow'})
# 本地 arrow
dataset = Dataset.from_file("data.arrow")
SQL
dataset = Dataset.from_sql("data_table_name", con="sqlite:///sqlite_file.db")
dataset = Dataset.from_sql("SELECT text FROM table WHERE length(text) > 100 LIMIT 10", con="sqlite:///sqlite_file.db")
ds = Dataset.from_sql('SELECT * FROM states WHERE state="California";', uri)
dictionary
my_dict = {"a": [1, 2, 3]}
dataset = Dataset.from_dict(my_dict)
# list of dictionaries
my_list = [{"a": 1}, {"a": 2}, {"a": 3}]
dataset = Dataset.from_list(my_list)
generator
from datasets import Dataset
def my_gen():
for i in range(1, 4):
yield {"a": i}
dataset = Dataset.from_generator(my_gen)
Pandas DataFrame
# https://huggingface.co/docs/datasets/tabular_load#pandas-dataframes
from datasets import Dataset
import pandas as pd
df = pd.DataFrame({"a": [1, 2, 3]})
dataset = Dataset.from_pandas(df)
# pandas
dataset = Dataset.from_pandas(df)
csv
dataset = load_dataset('csv', data_files=file_dict, delimiter=';', column_names=['text', 'label'], features=emotion_features)
dataset['train'].features
dataset = load_dataset("csv", data_files="my_file.csv")
dataset = load_dataset("csv", data_files=["my_file_1.csv", "my_file_2.csv", "my_file_3.csv"])
dataset = load_dataset("csv", data_files={"train": ["my_train_file_1.csv", "my_train_file_2.csv"], "test": "my_test_file.csv"})
SQLite
import sqlite3
import pandas as pd
conn = sqlite3.connect("us_covid_data.db")
df = pd.read_csv("https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-states.csv")
df.to_sql("states", conn, if_exists="replace")
from datasets import Dataset
uri = "sqlite:///us_covid_data.db"
ds = Dataset.from_sql("states", uri)
sharded
# define a sharded dataset by passing lists to gen_kwargs
def gen(shards):
for shard in shards:
with open(shard) as f:
for line in f:
yield {"line": line}
shards = [f"data{i}.txt" for i in range(32)]
ds = IterableDataset.from_generator(gen, gen_kwargs={"shards": shards})
ds = ds.shuffle(seed=42, buffer_size=10_000) # shuffles the shards order + uses a shuffle buffer
from torch.utils.data import DataLoader
dataloader = DataLoader(ds.with_format("torch"), num_workers=4) # give each worker a subset of 32/4=8 shards
保存本地磁盘
from datasets import load_from_disk
disk_datasets = load_from_disk("./dataset")
processed_datasets.save_to_disk("./dataset")
encoded_dataset.to_csv("path/of/my/dataset.csv")
CSV :Dataset.to_csv()
JSON :Dataset.to_json()
Parquet: Dataset.to_parquet()
SQL : Dataset.to_sql()
Multiprocessing
imagenet = load_dataset("imagenet-1k", num_proc=8)
# multiprocess
from multiprocess import set_start_method
import torch
import os
set_start_method("spawn")
def gpu_computation(example, rank):
os.environ["CUDA_VISIBLE_DEVICES"] = str(rank % torch.cuda.device_count())
# Your big GPU call goes here
return examples
updated_dataset = dataset.map(gpu_computation, with_rank=True)
与pytorch操作
from datasets import Dataset
data = [[1, 2],[3, 4]]
ds = Dataset.from_dict({"data": data})
ds = ds.with_format("torch")
# {'data': tensor([[1, 2], [3, 4]])}
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
ds = ds.with_format("torch", device=device)
# {'data': tensor([1, 2], device='cuda:0')}
#num_workers
import numpy as np
from datasets import Dataset, load_from_disk
from torch.utils.data import DataLoader
data = np.random.rand(10_000)
Dataset.from_dict({"data": data}).save_to_disk("my_dataset")
ds = load_from_disk("my_dataset").with_format("torch")
dataloader = DataLoader(ds, batch_size=32, num_workers=4)
# BatchSampler
from torch.utils.data.sampler import BatchSampler, RandomSampler
batch_sampler = BatchSampler(RandomSampler(ds), batch_size=32, drop_last=False)
dataloader = DataLoader(ds, batch_sampler=batch_sampler)
# Stream
import numpy as np
from datasets import Dataset, load_dataset
from torch.utils.data import DataLoader
data = np.random.rand(10_000)
Dataset.from_dict({"data": data}).push_to_hub("<username>/my_dataset") # Upload to the Hugging Face Hub
my_iterable_dataset = load_dataset("<username>/my_dataset", streaming=True, split="train")
dataloader = DataLoader(my_iterable_dataset, batch_size=32)
# Stream n_shards
my_iterable_dataset = load_dataset("c4", "en", streaming=True, split="train")
my_iterable_dataset.n_shards
dataloader = DataLoader(my_iterable_dataset, batch_size=32, num_workers=4)
# Distributed
import os
from datasets.distributed import split_dataset_by_node
ds = split_dataset_by_node(ds, rank=int(os.environ["RANK"]), world_size=int(os.environ["WORLD_SIZE"]))