mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4mobile wallpaper 5mobile wallpaper 6
1572 字
4 分钟
Python项目实战教程(第三阶段)

Python项目实战教程(第三阶段)#

欢迎来到Python项目实战教程的第三阶段!在前面的教程中,我们学习了Python的基础知识和高级特性。在本阶段中,我们将通过实际项目来应用这些知识,学习Python在各个领域的实际应用场景,帮助你成为一名真正的Python开发者。

1. Web应用开发实战#

1.1 Flask项目实战 - 个人博客系统#

项目概述:创建一个功能完整的个人博客系统,包括用户注册、登录、文章发布、评论等功能。

技术栈

  • Flask:Web框架
  • Flask-SQLAlchemy:ORM数据库工具
  • Flask-Login:用户认证
  • Flask-WTF:表单处理
  • Bootstrap:前端框架

项目结构

blog/
├── app/
│ ├── __init__.py
│ ├── routes.py
│ ├── models.py
│ ├── forms.py
│ ├── templates/
│ │ ├── base.html
│ │ ├── index.html
│ │ ├── login.html
│ │ ├── register.html
│ │ ├── post.html
│ │ └── create_post.html
│ └── static/
│ ├── css/
│ └── js/
├── config.py
├── requirements.txt
└── run.py

核心代码

app/__init__.py

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
from config import Config
app = Flask(__name__)
app.config.from_object(Config)
db = SQLAlchemy(app)
login = LoginManager(app)
login.login_view = 'login'
from app import routes, models

app/models.py

from datetime import datetime
from app import db, login
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), index=True, unique=True)
email = db.Column(db.String(120), index=True, unique=True)
password_hash = db.Column(db.String(128))
posts = db.relationship('Post', backref='author', lazy='dynamic')
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(140))
content = db.Column(db.Text)
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
comments = db.relationship('Comment', backref='post', lazy='dynamic')
class Comment(db.Model):
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text)
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
post_id = db.Column(db.Integer, db.ForeignKey('post.id'))
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
@login.user_loader
def load_user(id):
return User.query.get(int(id))

app/forms.py

from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, SubmitField, TextAreaField
from wtforms.validators import DataRequired, Email, EqualTo, Length
class LoginForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
password = PasswordField('Password', validators=[DataRequired()])
submit = SubmitField('Sign In')
class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[DataRequired(), Length(min=2, max=64)])
email = StringField('Email', validators=[DataRequired(), Email()])
password = PasswordField('Password', validators=[DataRequired()])
password2 = PasswordField('Repeat Password', validators=[DataRequired(), EqualTo('password')])
submit = SubmitField('Register')
class PostForm(FlaskForm):
title = StringField('Title', validators=[DataRequired(), Length(max=140)])
content = TextAreaField('Content', validators=[DataRequired()])
submit = SubmitField('Post')
class CommentForm(FlaskForm):
content = TextAreaField('Comment', validators=[DataRequired()])
submit = SubmitField('Submit')

app/routes.py

from flask import render_template, redirect, url_for, flash, request
from flask_login import current_user, login_user, logout_user, login_required
from app import app, db
from app.models import User, Post, Comment
from app.forms import LoginForm, RegistrationForm, PostForm, CommentForm
@app.route('/')
@app.route('/index')
def index():
posts = Post.query.order_by(Post.timestamp.desc()).all()
return render_template('index.html', title='Home', posts=posts)
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('index'))
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user is None or not user.check_password(form.password.data):
flash('Invalid username or password')
return redirect(url_for('login'))
login_user(user)
return redirect(url_for('index'))
return render_template('login.html', title='Sign In', form=form)
@app.route('/logout')
def logout():
logout_user()
return redirect(url_for('index'))
@app.route('/register', methods=['GET', 'POST'])
def register():
if current_user.is_authenticated:
return redirect(url_for('index'))
form = RegistrationForm()
if form.validate_on_submit():
user = User(username=form.username.data, email=form.email.data)
user.set_password(form.password.data)
db.session.add(user)
db.session.commit()
flash('Congratulations, you are now a registered user!')
return redirect(url_for('login'))
return render_template('register.html', title='Register', form=form)
@app.route('/post/<int:post_id>', methods=['GET', 'POST'])
def post(post_id):
post = Post.query.get_or_404(post_id)
form = CommentForm()
if form.validate_on_submit() and current_user.is_authenticated:
comment = Comment(content=form.content.data, post_id=post_id, user_id=current_user.id)
db.session.add(comment)
db.session.commit()
flash('Your comment has been added!')
return redirect(url_for('post', post_id=post_id))
comments = post.comments.order_by(Comment.timestamp.desc()).all()
return render_template('post.html', title=post.title, post=post, form=form, comments=comments)
@app.route('/create_post', methods=['GET', 'POST'])
@login_required
def create_post():
form = PostForm()
if form.validate_on_submit():
post = Post(title=form.title.data, content=form.content.data, author=current_user)
db.session.add(post)
db.session.commit()
flash('Your post has been created!')
return redirect(url_for('index'))
return render_template('create_post.html', title='Create Post', form=form)

config.py

import os
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'you-will-never-guess'
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///blog.db'
SQLALCHEMY_TRACK_MODIFICATIONS = False

run.py

from app import app, db
from app.models import User, Post, Comment
@app.shell_context_processor
def make_shell_context():
return {'db': db, 'User': User, 'Post': Post, 'Comment': Comment}
if __name__ == '__main__':
app.run(debug=True)

requirements.txt

Flask
Flask-SQLAlchemy
Flask-Login
Flask-WTF
Werkzeug

运行项目

# 安装依赖
pip install -r requirements.txt
# 初始化数据库
flask shell
>>> from app import db
>>> db.create_all()
>>> exit()
# 运行项目
flask run

1.2 Django项目实战 - 电商网站#

项目概述:创建一个简单的电商网站,包括商品展示、购物车、订单管理等功能。

技术栈

  • Django:Web框架
  • Django ORM:数据库工具
  • Django Authentication:用户认证
  • Bootstrap:前端框架

项目结构

ecommerce/
├── ecommerce/
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── shop/
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── models.py
│ ├── views.py
│ ├── urls.py
│ └── templates/
│ └── shop/
│ ├── base.html
│ ├── index.html
│ ├── product_detail.html
│ ├── cart.html
│ └── checkout.html
├── manage.py
└── requirements.txt

核心代码

shop/models.py

from django.db import models
from django.contrib.auth.models import User
class Category(models.Model):
name = models.CharField(max_length=200, db_index=True)
slug = models.SlugField(max_length=200, unique=True)
class Meta:
ordering = ('name',)
verbose_name = 'category'
verbose_name_plural = 'categories'
def __str__(self):
return self.name
class Product(models.Model):
category = models.ForeignKey(Category, related_name='products', on_delete=models.CASCADE)
name = models.CharField(max_length=200, db_index=True)
slug = models.SlugField(max_length=200, db_index=True)
image = models.ImageField(upload_to='products/%Y/%m/%d', blank=True)
description = models.TextField(blank=True)
price = models.DecimalField(max_digits=10, decimal_places=2)
available = models.BooleanField(default=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
class Meta:
ordering = ('name',)
index_together = (('id', 'slug'),)
def __str__(self):
return self.name
class CartItem(models.Model):
user = models.ForeignKey(User, related_name='cart_items', on_delete=models.CASCADE)
product = models.ForeignKey(Product, related_name='cart_items', on_delete=models.CASCADE)
quantity = models.PositiveIntegerField(default=1)
created = models.DateTimeField(auto_now_add=True)
def __str__(self):
return f'{self.quantity} x {self.product.name}'
def get_total_price(self):
return self.quantity * self.product.price
class Order(models.Model):
user = models.ForeignKey(User, related_name='orders', on_delete=models.CASCADE)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
paid = models.BooleanField(default=False)
class Meta:
ordering = ('-created',)
def __str__(self):
return f'Order {self.id}'
def get_total_cost(self):
return sum(item.get_cost() for item in self.items.all())
class OrderItem(models.Model):
order = models.ForeignKey(Order, related_name='items', on_delete=models.CASCADE)
product = models.ForeignKey(Product, related_name='order_items', on_delete=models.CASCADE)
price = models.DecimalField(max_digits=10, decimal_places=2)
quantity = models.PositiveIntegerField(default=1)
def __str__(self):
return f'{self.quantity} x {self.product.name}'
def get_cost(self):
return self.price * self.quantity

shop/views.py

from django.shortcuts import render, get_object_or_404, redirect
from django.contrib.auth.decorators import login_required
from django.views.decorators.http import require_POST
from .models import Category, Product, CartItem, Order, OrderItem
@login_required
def index(request):
categories = Category.objects.all()
products = Product.objects.filter(available=True)
return render(request, 'shop/index.html', {'categories': categories, 'products': products})
@login_required
def product_detail(request, id, slug):
product = get_object_or_404(Product, id=id, slug=slug, available=True)
return render(request, 'shop/product_detail.html', {'product': product})
@login_required
def cart(request):
cart_items = CartItem.objects.filter(user=request.user)
total = sum(item.get_total_price() for item in cart_items)
return render(request, 'shop/cart.html', {'cart_items': cart_items, 'total': total})
@login_required
@require_POST
def add_to_cart(request, product_id):
product = get_object_or_404(Product, id=product_id)
cart_item, created = CartItem.objects.get_or_create(
user=request.user,
product=product,
defaults={'quantity': 1}
)
if not created:
cart_item.quantity += 1
cart_item.save()
return redirect('cart')
@login_required
@require_POST
def remove_from_cart(request, cart_item_id):
cart_item = get_object_or_404(CartItem, id=cart_item_id, user=request.user)
cart_item.delete()
return redirect('cart')
@login_required
def checkout(request):
cart_items = CartItem.objects.filter(user=request.user)
if not cart_items:
return redirect('cart')
if request.method == 'POST':
order = Order.objects.create(user=request.user)
for item in cart_items:
OrderItem.objects.create(
order=order,
product=item.product,
price=item.product.price,
quantity=item.quantity
)
cart_items.delete()
return redirect('order_success')
total = sum(item.get_total_price() for item in cart_items)
return render(request, 'shop/checkout.html', {'cart_items': cart_items, 'total': total})
@login_required
def order_success(request):
return render(request, 'shop/order_success.html')

shop/urls.py

from django.urls import path
from . import views
urlpatterns = [
path('', views.index, name='index'),
path('product/<int:id>/<slug:slug>/', views.product_detail, name='product_detail'),
path('cart/', views.cart, name='cart'),
path('add-to-cart/<int:product_id>/', views.add_to_cart, name='add_to_cart'),
path('remove-from-cart/<int:cart_item_id>/', views.remove_from_cart, name='remove_from_cart'),
path('checkout/', views.checkout, name='checkout'),
path('order-success/', views.order_success, name='order_success'),
]

ecommerce/urls.py

from django.contrib import admin
from django.urls import path, include
from django.contrib.auth import views as auth_views
urlpatterns = [
path('admin/', admin.site.urls),
path('shop/', include('shop.urls')),
path('login/', auth_views.LoginView.as_view(template_name='shop/login.html'), name='login'),
path('logout/', auth_views.LogoutView.as_view(), name='logout'),
]

运行项目

# 安装依赖
pip install -r requirements.txt
# 迁移数据库
python manage.py makemigrations
python manage.py migrate
# 创建超级用户
python manage.py createsuperuser
# 运行项目
python manage.py runserver

2. 数据分析项目实战#

2.1 数据分析项目 - 销售数据可视化#

项目概述:分析销售数据,生成可视化报表,帮助企业了解销售趋势和产品表现。

技术栈

  • Python
  • Pandas:数据处理
  • Matplotlib:数据可视化
  • Seaborn:高级数据可视化
  • Jupyter Notebook:交互式开发环境

项目结构

sales-analysis/
├── data/
│ └── sales_data.csv
├── notebooks/
│ └── sales_analysis.ipynb
└── requirements.txt

销售数据示例

OrderIDProductCategoryPriceQuantityDateRegion
1Product AElectronics10022026-01-01North
2Product BClothing5012026-01-02South
3Product CElectronics15012026-01-03East
4Product DClothing3032026-01-04West
5Product AElectronics10012026-01-05North

核心代码

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# 读取数据
df = pd.read_csv('data/sales_data.csv')
# 数据清洗
print("数据基本信息:")
print(df.info())
print("\n数据前5行:")
print(df.head())
print("\n数据统计摘要:")
print(df.describe())
# 计算销售额
df['Sales'] = df['Price'] * df['Quantity']
# 按日期分析销售趋势
df['Date'] = pd.to_datetime(df['Date'])
df.set_index('Date', inplace=True)
# 月度销售趋势
monthly_sales = df.resample('M').sum()['Sales']
plt.figure(figsize=(12, 6))
plt.plot(monthly_sales.index, monthly_sales.values)
plt.title('月度销售趋势')
plt.xlabel('月份')
plt.ylabel('销售额')
plt.grid(True)
plt.show()
# 按产品类别分析
category_sales = df.groupby('Category').sum()['Sales']
plt.figure(figsize=(10, 6))
sns.barplot(x=category_sales.index, y=category_sales.values)
plt.title('各产品类别销售额')
plt.xlabel('产品类别')
plt.ylabel('销售额')
plt.show()
# 按地区分析
region_sales = df.groupby('Region').sum()['Sales']
plt.figure(figsize=(10, 6))
plt.pie(region_sales.values, labels=region_sales.index, autopct='%1.1f%%')
plt.title('各地区销售占比')
plt.show()
# 产品销售排行
top_products = df.groupby('Product').sum()['Sales'].sort_values(ascending=False).head(10)
plt.figure(figsize=(12, 6))
sns.barplot(x=top_products.values, y=top_products.index)
plt.title('产品销售排行前10')
plt.xlabel('销售额')
plt.ylabel('产品')
plt.show()
# 销售数据相关性分析
corr = df[['Price', 'Quantity', 'Sales']].corr()
plt.figure(figsize=(8, 6))
sns.heatmap(corr, annot=True, cmap='coolwarm')
plt.title('销售数据相关性分析')
plt.show()
# 总结
print("\n销售数据分析总结:")
print(f"总销售额:{df['Sales'].sum():.2f}")
print(f"平均销售额:{df['Sales'].mean():.2f}")
print(f"最高销售额:{df['Sales'].max():.2f}")
print(f"最低销售额:{df['Sales'].min():.2f}")
print(f"销售最好的产品类别:{category_sales.idxmax()}")
print(f"销售最好的地区:{region_sales.idxmax()}")
print(f"销售最好的产品:{top_products.idxmax()}")

运行项目

# 安装依赖
pip install -r requirements.txt
# 运行Jupyter Notebook
jupyter notebook
# 在浏览器中打开notebooks/sales_analysis.ipynb并运行所有单元格

3. 爬虫项目实战#

3.1 爬虫项目 - 电影信息爬取#

项目概述:爬取电影网站的电影信息,包括电影名称、评分、导演、演员、上映日期等,存储到数据库中,并生成可视化报表。

技术栈

  • Python
  • requests:网络请求
  • BeautifulSoup:HTML解析
  • SQLite:数据库存储
  • Matplotlib:数据可视化

核心代码

import requests
from bs4 import BeautifulSoup
import sqlite3
import matplotlib.pyplot as plt
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 创建数据库连接
conn = sqlite3.connect('movies.db')
cursor = conn.cursor()
# 创建电影表
cursor.execute('''
CREATE TABLE IF NOT EXISTS movies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
rating REAL,
directors TEXT,
actors TEXT,
release_date TEXT,
genre TEXT,
duration TEXT,
description TEXT
)
''')
conn.commit()
# 爬取电影信息
def crawl_movies(url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, 'html.parser')
# 解析电影列表
movie_list = soup.find_all('div', class_='movie-item')
for movie in movie_list:
# 电影标题
title = movie.find('h2', class_='movie-title').text.strip()
# 电影评分
rating = movie.find('span', class_='rating').text.strip()
rating = float(rating) if rating else None
# 电影信息
info = movie.find('div', class_='movie-info').text.strip()
# 提取导演、演员、上映日期等信息
directors = None
actors = None
release_date = None
genre = None
duration = None
# 这里需要根据实际网站的HTML结构进行调整
# 示例代码,实际使用时需要根据目标网站的结构修改
if '导演:' in info:
directors = info.split('导演:')[1].split('主演:')[0].strip()
if '主演:' in info:
actors = info.split('主演:')[1].split('上映日期:')[0].strip()
if '上映日期:' in info:
release_date = info.split('上映日期:')[1].split('类型:')[0].strip()
if '类型:' in info:
genre = info.split('类型:')[1].split('片长:')[0].strip()
if '片长:' in info:
duration = info.split('片长:')[1].strip()
# 电影描述
description = movie.find('p', class_='movie-description').text.strip() if movie.find('p', class_='movie-description') else None
# 插入数据库
cursor.execute('''
INSERT INTO movies (title, rating, directors, actors, release_date, genre, duration, description)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (title, rating, directors, actors, release_date, genre, duration, description))
conn.commit()
print(f"成功爬取 {len(movie_list)} 部电影信息")
# 爬取多个页面
for page in range(1, 11): # 爬取前10页
url = f'https://example.com/movies?page={page}' # 替换为实际的电影网站URL
print(f"爬取第 {page} 页...")
crawl_movies(url)
# 分析电影数据
def analyze_movies():
# 读取数据库
cursor.execute('SELECT * FROM movies')
movies = cursor.fetchall()
print(f"\n数据库中共有 {len(movies)} 部电影")
# 按评分统计
cursor.execute('SELECT rating FROM movies WHERE rating IS NOT NULL')
ratings = [row[0] for row in cursor.fetchall()]
if ratings:
avg_rating = sum(ratings) / len(ratings)
print(f"平均评分:{avg_rating:.2f}")
# 评分分布
plt.figure(figsize=(10, 6))
plt.hist(ratings, bins=10, range=(0, 10))
plt.title('电影评分分布')
plt.xlabel('评分')
plt.ylabel('电影数量')
plt.show()
# 按类型统计
cursor.execute('SELECT genre FROM movies WHERE genre IS NOT NULL')
genres = [row[0] for row in cursor.fetchall()]
genre_counts = {}
for genre in genres:
if genre:
for g in genre.split('/'):
g = g.strip()
genre_counts[g] = genre_counts.get(g, 0) + 1
if genre_counts:
top_genres = sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)[:10]
plt.figure(figsize=(12, 6))
plt.bar([g[0] for g in top_genres], [g[1] for g in top_genres])
plt.title('电影类型分布前10')
plt.xlabel('类型')
plt.ylabel('电影数量')
plt.xticks(rotation=45)
plt.show()
# 评分最高的电影
cursor.execute('SELECT title, rating FROM movies WHERE rating IS NOT NULL ORDER BY rating DESC LIMIT 10')
top_rated = cursor.fetchall()
print("\n评分最高的10部电影:")
for movie in top_rated:
print(f"{movie[0]} - {movie[1]}")
# 运行分析
analyze_movies()
# 关闭数据库连接
conn.close()

运行项目

# 安装依赖
pip install requests beautifulsoup4 matplotlib
# 运行爬虫
python movie_crawler.py

4. 自动化脚本开发#

4.1 自动化脚本 - 文件整理工具#

项目概述:开发一个自动化脚本,根据文件类型自动整理文件夹中的文件,提高文件管理效率。

核心代码

import os
import shutil
import argparse
# 文件类型映射
extension_map = {
'images': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'],
'documents': ['.pdf', '.doc', '.docx', '.txt', '.rtf', '.odt'],
'spreadsheets': ['.xls', '.xlsx', '.csv', '.ods'],
'presentations': ['.ppt', '.pptx', '.odp'],
'archives': ['.zip', '.rar', '.7z', '.tar', '.gz'],
'audio': ['.mp3', '.wav', '.flac', '.aac', '.ogg'],
'video': ['.mp4', '.avi', '.mov', '.wmv', '.flv', '.mkv'],
'code': ['.py', '.java', '.cpp', '.js', '.html', '.css', '.php'],
'executables': ['.exe', '.msi', '.app', '.dmg']
}
# 反向映射,扩展名到文件夹名
extension_to_folder = {}
for folder, extensions in extension_map.items():
for ext in extensions:
extension_to_folder[ext.lower()] = folder
def organize_files(directory):
"""根据文件类型整理文件夹中的文件"""
if not os.path.exists(directory):
print(f"错误:目录 {directory} 不存在")
return
# 遍历目录中的所有文件
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
# 跳过文件夹
if os.path.isdir(filepath):
continue
# 获取文件扩展名
_, ext = os.path.splitext(filename)
ext = ext.lower()
# 确定目标文件夹
if ext in extension_to_folder:
target_folder = os.path.join(directory, extension_to_folder[ext])
else:
target_folder = os.path.join(directory, 'others')
# 创建目标文件夹(如果不存在)
if not os.path.exists(target_folder):
os.makedirs(target_folder)
# 移动文件
try:
target_filepath = os.path.join(target_folder, filename)
# 如果文件已存在,添加编号
counter = 1
while os.path.exists(target_filepath):
name, ext = os.path.splitext(filename)
target_filepath = os.path.join(target_folder, f"{name}_{counter}{ext}")
counter += 1
shutil.move(filepath, target_filepath)
print(f"移动:{filename} -> {extension_to_folder.get(ext, 'others')}/")
except Exception as e:
print(f"错误移动文件 {filename}{e}")
def main():
parser = argparse.ArgumentParser(description='根据文件类型自动整理文件夹中的文件')
parser.add_argument('directory', help='要整理的文件夹路径')
args = parser.parse_args()
print(f"开始整理文件夹:{args.directory}")
organize_files(args.directory)
print("整理完成!")
if __name__ == "__main__":
main()

运行脚本

# 运行脚本整理指定文件夹
python file_organizer.py /path/to/directory

4.2 自动化脚本 - 网络监控工具#

项目概述:开发一个网络监控工具,定期检查网站是否可访问,记录响应时间和状态码,并在网站不可用时发送通知。

核心代码

import requests
import time
import json
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import argparse
# 配置文件
CONFIG_FILE = 'config.json'
LOG_FILE = 'monitor.log'
# 加载配置
def load_config():
try:
with open(CONFIG_FILE, 'r') as f:
return json.load(f)
except Exception as e:
print(f"错误加载配置文件:{e}")
return {
'websites': [
{'url': 'https://www.google.com', 'name': 'Google'},
{'url': 'https://www.baidu.com', 'name': 'Baidu'}
],
'interval': 60, # 检查间隔(秒)
'timeout': 10, # 请求超时(秒)
'email': {
'enabled': False,
'smtp_server': 'smtp.example.com',
'smtp_port': 587,
'smtp_user': 'user@example.com',
'smtp_password': 'password',
'from_email': 'user@example.com',
'to_email': 'user@example.com'
}
}
# 保存配置
def save_config(config):
try:
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=4)
except Exception as e:
print(f"错误保存配置文件:{e}")
# 发送邮件通知
def send_email(config, subject, message):
if not config['email']['enabled']:
return
try:
msg = MIMEMultipart()
msg['From'] = config['email']['from_email']
msg['To'] = config['email']['to_email']
msg['Subject'] = subject
msg.attach(MIMEText(message, 'plain'))
server = smtplib.SMTP(config['email']['smtp_server'], config['email']['smtp_port'])
server.starttls()
server.login(config['email']['smtp_user'], config['email']['smtp_password'])
server.send_message(msg)
server.quit()
print("邮件通知已发送")
except Exception as e:
print(f"错误发送邮件:{e}")
# 记录日志
def log_message(message):
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[{timestamp}] {message}"
print(log_entry)
try:
with open(LOG_FILE, 'a') as f:
f.write(log_entry + '\n')
except Exception as e:
print(f"错误写入日志:{e}")
# 监控网站
def monitor_website(url, name, timeout):
try:
start_time = time.time()
response = requests.get(url, timeout=timeout)
end_time = time.time()
response_time = (end_time - start_time) * 1000 # 转换为毫秒
if response.status_code == 200:
status = "正常"
log_message(f"{name} ({url}) - 状态:{status},响应时间:{response_time:.2f}ms,状态码:{response.status_code}")
else:
status = "异常"
message = f"{name} ({url}) - 状态:{status},响应时间:{response_time:.2f}ms,状态码:{response.status_code}"
log_message(message)
return False, message
return True, None
except Exception as e:
status = "错误"
message = f"{name} ({url}) - 状态:{status},错误信息:{str(e)}"
log_message(message)
return False, message
# 主函数
def main():
parser = argparse.ArgumentParser(description='网站监控工具')
parser.add_argument('--config', help='配置文件路径', default=CONFIG_FILE)
args = parser.parse_args()
global CONFIG_FILE
CONFIG_FILE = args.config
config = load_config()
log_message("开始监控网站...")
# 初始化状态
website_status = {site['url']: True for site in config['websites']}
try:
while True:
for site in config['websites']:
success, message = monitor_website(site['url'], site['name'], config['timeout'])
# 检查状态变化
if not success and website_status[site['url']]:
# 网站从正常变为异常
website_status[site['url']] = False
send_email(config, f"【警告】网站 {site['name']} 不可访问", message)
elif success and not website_status[site['url']]:
# 网站从异常变为正常
website_status[site['url']] = True
send_email(config, f"【恢复】网站 {site['name']} 已恢复访问", f"网站 {site['name']} ({site['url']}) 已恢复正常访问")
time.sleep(config['interval'])
except KeyboardInterrupt:
log_message("监控已停止")
if __name__ == "__main__":
main()

运行脚本

# 运行网络监控工具
python network_monitor.py
# 使用自定义配置文件
python network_monitor.py --config my_config.json

5. 机器学习入门项目#

5.1 机器学习项目 - 图像分类#

项目概述:使用Python和机器学习库构建一个图像分类模型,能够识别不同类别的图像。

技术栈

  • Python
  • TensorFlow/Keras:深度学习框架
  • NumPy:数值计算
  • Matplotlib:数据可视化
  • scikit-learn:机器学习工具

核心代码

import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator
# 加载数据集
# 这里使用CIFAR-10数据集作为示例
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
# 数据预处理
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
# 标签编码
y_train = tf.keras.utils.to_categorical(y_train, 10)
y_test = tf.keras.utils.to_categorical(y_test, 10)
# 数据集标签
class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
# 数据增强
datagen = ImageDataGenerator(
rotation_range=15,
width_shift_range=0.1,
height_shift_range=0.1,
horizontal_flip=True,
zoom_range=0.1
)
datagen.fit(x_train)
# 构建模型
model = models.Sequential([
layers.Conv2D(32, (3, 3), activation='relu', input_shape=(32, 32, 3)),
layers.MaxPooling2D((2, 2)),
layers.Conv2D(64, (3, 3), activation='relu'),
layers.MaxPooling2D((2, 2)),
layers.Conv2D(64, (3, 3), activation='relu'),
layers.Flatten(),
layers.Dense(64, activation='relu'),
layers.Dense(10, activation='softmax')
])
# 编译模型
model.compile(optimizer='adam',
loss='categorical_crossentropy',
metrics=['accuracy'])
# 查看模型结构
model.summary()
# 训练模型
history = model.fit(datagen.flow(x_train, y_train, batch_size=64),
epochs=50,
validation_data=(x_test, y_test))
# 评估模型
test_loss, test_acc = model.evaluate(x_test, y_test, verbose=2)
print(f"\n测试准确率:{test_acc:.4f}")
# 绘制训练过程
plt.figure(figsize=(12, 4))
# 准确率
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='训练准确率')
plt.plot(history.history['val_accuracy'], label='验证准确率')
plt.title('准确率')
plt.xlabel(' epoch')
plt.ylabel('准确率')
plt.legend()
# 损失
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='训练损失')
plt.plot(history.history['val_loss'], label='验证损失')
plt.title('损失')
plt.xlabel('epoch')
plt.ylabel('损失')
plt.legend()
plt.show()
# 预测示例
predictions = model.predict(x_test)
# 显示预测结果
plt.figure(figsize=(15, 15))
for i in range(25):
plt.subplot(5, 5, i+1)
plt.xticks([])
plt.yticks([])
plt.grid(False)
plt.imshow(x_test[i])
predicted_label = np.argmax(predictions[i])
true_label = np.argmax(y_test[i])
if predicted_label == true_label:
color = 'green'
else:
color = 'red'
plt.xlabel(f"{class_names[predicted_label]} ({class_names[true_label]})", color=color)
plt.show()
# 保存模型
model.save('image_classification_model.h5')
print("模型已保存为 image_classification_model.h5")

运行项目

# 安装依赖
pip install tensorflow numpy matplotlib scikit-learn
# 运行项目
python image_classification.py

6. 部署和运维#

6.1 Docker部署Python应用#

项目概述:使用Docker容器化部署Python应用,提高应用的可移植性和可维护性。

核心代码

Dockerfile

# 使用官方Python镜像作为基础镜像
FROM python:3.9-slim
# 设置工作目录
WORKDIR /app
# 复制requirements.txt文件
COPY requirements.txt .
# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 5000
# 运行应用
CMD ["python", "app.py"]

docker-compose.yml

version: '3'
services:
web:
build: .
ports:
- "5000:5000"
environment:
- FLASK_ENV=production
restart: always

app.py

from flask import Flask
app = Flask(__name__)
@app.route('/')
def hello():
return "Hello, Docker!"
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)

requirements.txt

Flask

部署步骤

# 构建Docker镜像
docker build -t python-app .
# 运行Docker容器
docker run -p 5000:5000 python-app
# 或使用docker-compose
docker-compose up -d
# 查看运行状态
docker ps
# 访问应用
# 打开浏览器访问 http://localhost:5000

6.2 CI/CD流水线配置#

项目概述:配置CI/CD流水线,实现代码提交后自动构建、测试和部署。

GitHub Actions配置

.github/workflows/ci-cd.yml

name: CI/CD Pipeline
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest
- name: Run tests
run: |
pytest
- name: Build Docker image
run: |
docker build -t python-app .
- name: Deploy to server
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
run: |
# 这里可以添加部署脚本,例如使用SSH连接服务器并部署
echo "Deploying to server..."

GitLab CI/CD配置

.gitlab-ci.yml

stages:
- test
- build
- deploy
test:
stage: test
image: python:3.9-slim
script:
- pip install -r requirements.txt
- pip install pytest
- pytest
build:
stage: build
image: docker:latest
services:
- docker:dind
script:
- docker build -t python-app .
deploy:
stage: deploy
image: alpine:latest
script:
- echo "Deploying to server..."
only:
- main

7. 学习资源推荐#

7.1 Python官方文档#

7.2 B站Python视频(播放量高于50W)#

7.3 推荐书籍#

  • 《Python编程:从入门到实践》
  • 《流畅的Python》
  • 《Python Cookbook》
  • 《Effective Python》
  • 《Python数据科学手册》
  • 《Django实战》
  • 《Flask Web开发》
  • 《Python网络爬虫开发实战》
  • 《Python自动化运维》
  • 《机器学习实战》
  • 《Docker实战》

8. 总结#

本教程介绍了Python的实际应用场景和项目实战,包括:

  • Web应用开发实战(Flask博客系统、Django电商网站)
  • 数据分析项目实战(销售数据可视化)
  • 爬虫项目实战(电影信息爬取)
  • 自动化脚本开发(文件整理工具、网络监控工具)
  • 机器学习入门项目(图像分类)
  • 部署和运维(Docker部署、CI/CD流水线)

通过本教程的学习,你应该已经掌握了Python的实际应用技能,可以开始开发自己的Python项目了。在实际开发中,你可能会遇到各种问题,但是通过查阅官方文档、参考书籍和在线资源,你一定能够解决这些问题。

Python是一种功能强大、简单易学的编程语言,它的应用领域非常广泛。希望本教程能够帮助你更好地理解和应用Python,让你在编程的道路上越走越远。

祝你学习愉快!


提示:本教程是Python学习的第三阶段,后续将推出Python高级主题教程,包括性能优化、安全编程、分布式系统等内容。

分享

如果这篇文章对你有帮助,欢迎分享给更多人!

Python项目实战教程(第三阶段)
https://sakumonet.top/posts/python-project-practice/
作者
SakuMonet
发布于
2026-02-25
许可协议
Unlicensed

部分信息可能已经过时