工作中我们常用pandas作为数据处理的工具,读取Excel/csv/mysql等数据源后处理成dataframe,本文介绍一些常用的高级操作技巧、组合使用技巧,尤其是文本相关的处理。
另一篇:https://www.icnma.com/pandasre_re/
结合文本清洗函数,并根据长度筛选
def cleanquestion(x: str) -> str:
if isinstance(x, str):
str_text = re.sub(u"([^\u4e00-\u9fa5\u0030-\u0039\u0041-\u005a\u0061-\u007a])", "", x)
return str_text
else:
return None
def map_label(data, map_dict):
return map_dict.get(data) if data in map_dict.keys() else None
new_df= df.pipe(
lambda x: x.assign(**{
'text':x['text'].apply(lambda j: cleanquestion(j))
})
).drop_duplicates(['text']).pipe(
lambda x: x.loc[x['text'].apply(lambda j: len(j) >2)]
).pipe(
lambda x: x.assign(
标签=x['标签'].astype(str).apply(lambda j: map_label(j, data_dict)),
)
).dropna(subset=['标签', 'text'])
多文件遍历读取
alldata_df = pd.concat([pd.read_csv(i) for i in glob(pathname="data/*.csv")])
# 或
alldata_df = pd.concat([pd.read_csv(i) for i in Path(r"文件路径").rglob('*.csv')])
分组后新增列
通过 groupby().agg().reset_index(drop=False)
在agg中,通过 新列名 = ( 执行列,操作方法函数)方式新增一列。
df.pipe(
lambda x: x.groupby(['label1', 'label2']).agg(
num= ('text', 'count'), # 新生成一列 num,通过 count函数统计
new_text = ('text', lambda t: '|'.join(t)) # 新生成一列 new_text
).sort_values(by=['num'], ascending=False).reset_index(drop=False)
).pipe(
lambda x: x.loc[x['label1'] != x['label2']] # 筛选两个label不一样的数据
).pipe(
lambda x:x.loc[x['num'] > 30] # 筛选个数>30
).pipe(
lambda x: x.loc[~pd.isna(x['label1'])] # 非空值筛选
)
df.pipe(
lambda x: x.explode(['某列的列表'])
)
分组统计:不同概率下的各个分类的准确率
new_df = df.pipe(
lambda x: x.assign(**{
'scores':pd.cut(x['probability'],
bins=pd. IntervalIndex.from_breaks(np.arange(11)/10))
})
).pipe(
lambda x: x.groupby(['scores']).agg(
total = ('text', 'count'),
number=('text', lambda j: len(set(j)))
).reset_index(drop=False)
).pipe(
lambda x: x.assign(**{
'score':np.around(x['number'] / x['total'] * 100 , 3)
})
)
合并两个dataframe
new_df= df1.pipe(
lambda x: x.merge(
right=df2,
how='left',
left_on=['列名'],
right_on=['列名']
)
).sort_values(by=['列'], ascending=False)
各种过滤筛选文本技巧
df.pipe(
lambda x: x.query("label in @data_list")
).pipe( # 或者
lambda x: x.loc[x['label '].isin(data_list)]
).pipe(
lambda x: x.query('label1 == "标签名"')
).pipe(
lambda x: x.loc[x['label1'] != '标签名']
).pipe(
lambda x: x.loc[~x['label1'].isin(df2['列名'])]
).pipe(
lambda x: x.loc[x['label1'].apply(lambda j: j != ['NULL'])]
).pipe(
lambda x: x.loc[~pd.isna(x['question'])] # 清洗question列空值
).pipe(
lambda x: x.assign(**{
'id':np.random.choice(a=3, size=x.shape[0]) # 随机分组
})
)
f转dict
df_dict= df.dropna(subset='label1').set_index("label2").to_dict()["label1"]
处理时间
from datetime import datetime, timedelta
rawdata = df.pipe(
lambda x: x.assign(**{
'datetime': pd.to_datetime(x['datetime'])
})
).pipe(
lambda x: x.loc[x['datetime'] > datetime.now() - timedelta(days=10)]
).pipe(
lambda x: x.assign(**{
'datetime': x['datetime'].dt.strftime('%Y-%m-%d')
})
).pipe( # 根据人名和时间统计
lambda x: x.groupby(['人名', 'datetime']).agg(
count=('text', 'count')
).reset_index(drop=False)
)
pandas结合jieba和re
def load_jieba_words():
maintain_words = [line.strip('\n').strip()
for line in open('./data/maintain_words.txt', 'r', encoding='utf-8').readlines()]
for word in maintain_words:
jieba.add_word(word)
def jieba_cut_words(data_df):
# 所有的分词汇总列表
cut_sentences = [jieba.lcut(sentence) for sentence in data_df['相似问'].tolist()]
all_words = [word for word in cut_sentences if len(word) > 1 and word not in self.stopwords]
return all_words
def get_stopwords(path_stopwords):
# 获取停用词表,对分词结果停用
stopwords = [line.strip('\n').strip() for line in open(path_stopwords, 'r', encoding='utf-8').readlines()]
return stopwords
def clean_by_stopwords(data_df):
data_df['相似问'] = data_df['相似问'].apply(
lambda x: ''.join([word for word in jieba.lcut(x) if word not in self.stopwords]))
return data_df
def clean_by_regex(data_df):
# regex 结合 pandas 清洗文本
# 非中文 [^\u4e00-\u9fa5]+ 或 !“#¥%&、‘,-。/:;《=》?@【、】……——·「」~+
data_df['相似问'] = data_df['相似问'].astype('str').apply(
lambda x: re.sub(r'[^\u4e00-\u9fa5]+', '', x))
return data_df
合并多列数据
def concat_columns(row):
row_info = "ROW DETAIL:\n"
# SURGICAL DIAGNOSIS
if not pd.isnull(row["Text"]):
row_info += "ROW Text:\n"
row_info += row["Text"]
return row_info
tmp_df = df.pipe(
lambda x: x.assign(
**{
"processed": x["Text"]
.astype(str)
.apply(lambda x: x.replace(" ", ""))
}
)
).pipe(
lambda x: x.assign(
新列名=lambda x: x.apply(
lambda row: concat_columns(row),
axis=1,
)
)
)
tmp_df
处理加速
使用at和iat替换loc和iloc:
import timestart = time.time()
# Iterating through DataFrame
for index, row in df.iterrows():
df.at[index,'c'] = row.a + row.b
end = time.time()
print(end - start)
### Time taken: 40 seconds
df结合numpy快速处理数据:
import numpy as np
import pandas as pd
df = pd.DataFrame(np.random.randint(0, 50, size=(5000000, 4)), columns=('a','b','c','d'))
df.shape
# (5000000, 5)
df.head()
#loops
import time
start = time.time()
# Iterating through DataFrame using iterrows
for idx, row in df.iterrows():
# creating a new column
df.at[idx,'ratio'] = 100 * (row["d"] / row["c"])
end = time.time()
print(end - start)
### 109 Seconds
# Vectorization
start = time.time()
df["ratio"] = 100 * (df["d"] / df["c"])
end = time.time()
print(end - start)
### 0.12 seconds
原创文章。转载请注明:
作者:JiangYuan
网址: https://www.icnma.com