Python项目实战教程(第三阶段)
欢迎来到Python项目实战教程的第三阶段!在前面的教程中,我们学习了Python的基础知识和高级特性。在本阶段中,我们将通过实际项目来应用这些知识,学习Python在各个领域的实际应用场景,帮助你成为一名真正的Python开发者。
1. Web应用开发实战
1.1 Flask项目实战 - 个人博客系统
项目概述:创建一个功能完整的个人博客系统,包括用户注册、登录、文章发布、评论等功能。
技术栈:
- Flask:Web框架
- Flask-SQLAlchemy:ORM数据库工具
- Flask-Login:用户认证
- Flask-WTF:表单处理
- Bootstrap:前端框架
项目结构:
blog/├── app/│ ├── __init__.py│ ├── routes.py│ ├── models.py│ ├── forms.py│ ├── templates/│ │ ├── base.html│ │ ├── index.html│ │ ├── login.html│ │ ├── register.html│ │ ├── post.html│ │ └── create_post.html│ └── static/│ ├── css/│ └── js/├── config.py├── requirements.txt└── run.py核心代码:
app/__init__.py:
from flask import Flaskfrom flask_sqlalchemy import SQLAlchemyfrom flask_login import LoginManagerfrom config import Config
app = Flask(__name__)app.config.from_object(Config)db = SQLAlchemy(app)login = LoginManager(app)login.login_view = 'login'
from app import routes, modelsapp/models.py:
from datetime import datetimefrom app import db, loginfrom flask_login import UserMixinfrom werkzeug.security import generate_password_hash, check_password_hash
class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(64), index=True, unique=True) email = db.Column(db.String(120), index=True, unique=True) password_hash = db.Column(db.String(128)) posts = db.relationship('Post', backref='author', lazy='dynamic')
def set_password(self, password): self.password_hash = generate_password_hash(password)
def check_password(self, password): return check_password_hash(self.password_hash, password)
class Post(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(140)) content = db.Column(db.Text) timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) comments = db.relationship('Comment', backref='post', lazy='dynamic')
class Comment(db.Model): id = db.Column(db.Integer, primary_key=True) content = db.Column(db.Text) timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow) post_id = db.Column(db.Integer, db.ForeignKey('post.id')) user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
@login.user_loaderdef load_user(id): return User.query.get(int(id))app/forms.py:
from flask_wtf import FlaskFormfrom wtforms import StringField, PasswordField, SubmitField, TextAreaFieldfrom wtforms.validators import DataRequired, Email, EqualTo, Length
class LoginForm(FlaskForm): username = StringField('Username', validators=[DataRequired()]) password = PasswordField('Password', validators=[DataRequired()]) submit = SubmitField('Sign In')
class RegistrationForm(FlaskForm): username = StringField('Username', validators=[DataRequired(), Length(min=2, max=64)]) email = StringField('Email', validators=[DataRequired(), Email()]) password = PasswordField('Password', validators=[DataRequired()]) password2 = PasswordField('Repeat Password', validators=[DataRequired(), EqualTo('password')]) submit = SubmitField('Register')
class PostForm(FlaskForm): title = StringField('Title', validators=[DataRequired(), Length(max=140)]) content = TextAreaField('Content', validators=[DataRequired()]) submit = SubmitField('Post')
class CommentForm(FlaskForm): content = TextAreaField('Comment', validators=[DataRequired()]) submit = SubmitField('Submit')app/routes.py:
from flask import render_template, redirect, url_for, flash, requestfrom flask_login import current_user, login_user, logout_user, login_requiredfrom app import app, dbfrom app.models import User, Post, Commentfrom app.forms import LoginForm, RegistrationForm, PostForm, CommentForm
@app.route('/')@app.route('/index')def index(): posts = Post.query.order_by(Post.timestamp.desc()).all() return render_template('index.html', title='Home', posts=posts)
@app.route('/login', methods=['GET', 'POST'])def login(): if current_user.is_authenticated: return redirect(url_for('index')) form = LoginForm() if form.validate_on_submit(): user = User.query.filter_by(username=form.username.data).first() if user is None or not user.check_password(form.password.data): flash('Invalid username or password') return redirect(url_for('login')) login_user(user) return redirect(url_for('index')) return render_template('login.html', title='Sign In', form=form)
@app.route('/logout')def logout(): logout_user() return redirect(url_for('index'))
@app.route('/register', methods=['GET', 'POST'])def register(): if current_user.is_authenticated: return redirect(url_for('index')) form = RegistrationForm() if form.validate_on_submit(): user = User(username=form.username.data, email=form.email.data) user.set_password(form.password.data) db.session.add(user) db.session.commit() flash('Congratulations, you are now a registered user!') return redirect(url_for('login')) return render_template('register.html', title='Register', form=form)
@app.route('/post/<int:post_id>', methods=['GET', 'POST'])def post(post_id): post = Post.query.get_or_404(post_id) form = CommentForm() if form.validate_on_submit() and current_user.is_authenticated: comment = Comment(content=form.content.data, post_id=post_id, user_id=current_user.id) db.session.add(comment) db.session.commit() flash('Your comment has been added!') return redirect(url_for('post', post_id=post_id)) comments = post.comments.order_by(Comment.timestamp.desc()).all() return render_template('post.html', title=post.title, post=post, form=form, comments=comments)
@app.route('/create_post', methods=['GET', 'POST'])@login_requireddef create_post(): form = PostForm() if form.validate_on_submit(): post = Post(title=form.title.data, content=form.content.data, author=current_user) db.session.add(post) db.session.commit() flash('Your post has been created!') return redirect(url_for('index')) return render_template('create_post.html', title='Create Post', form=form)config.py:
import os
class Config: SECRET_KEY = os.environ.get('SECRET_KEY') or 'you-will-never-guess' SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///blog.db' SQLALCHEMY_TRACK_MODIFICATIONS = Falserun.py:
from app import app, dbfrom app.models import User, Post, Comment
@app.shell_context_processordef make_shell_context(): return {'db': db, 'User': User, 'Post': Post, 'Comment': Comment}
if __name__ == '__main__': app.run(debug=True)requirements.txt:
FlaskFlask-SQLAlchemyFlask-LoginFlask-WTFWerkzeug运行项目:
# 安装依赖pip install -r requirements.txt
# 初始化数据库flask shell>>> from app import db>>> db.create_all()>>> exit()
# 运行项目flask run1.2 Django项目实战 - 电商网站
项目概述:创建一个简单的电商网站,包括商品展示、购物车、订单管理等功能。
技术栈:
- Django:Web框架
- Django ORM:数据库工具
- Django Authentication:用户认证
- Bootstrap:前端框架
项目结构:
ecommerce/├── ecommerce/│ ├── __init__.py│ ├── settings.py│ ├── urls.py│ └── wsgi.py├── shop/│ ├── __init__.py│ ├── admin.py│ ├── apps.py│ ├── models.py│ ├── views.py│ ├── urls.py│ └── templates/│ └── shop/│ ├── base.html│ ├── index.html│ ├── product_detail.html│ ├── cart.html│ └── checkout.html├── manage.py└── requirements.txt核心代码:
shop/models.py:
from django.db import modelsfrom django.contrib.auth.models import User
class Category(models.Model): name = models.CharField(max_length=200, db_index=True) slug = models.SlugField(max_length=200, unique=True)
class Meta: ordering = ('name',) verbose_name = 'category' verbose_name_plural = 'categories'
def __str__(self): return self.name
class Product(models.Model): category = models.ForeignKey(Category, related_name='products', on_delete=models.CASCADE) name = models.CharField(max_length=200, db_index=True) slug = models.SlugField(max_length=200, db_index=True) image = models.ImageField(upload_to='products/%Y/%m/%d', blank=True) description = models.TextField(blank=True) price = models.DecimalField(max_digits=10, decimal_places=2) available = models.BooleanField(default=True) created = models.DateTimeField(auto_now_add=True) updated = models.DateTimeField(auto_now=True)
class Meta: ordering = ('name',) index_together = (('id', 'slug'),)
def __str__(self): return self.name
class CartItem(models.Model): user = models.ForeignKey(User, related_name='cart_items', on_delete=models.CASCADE) product = models.ForeignKey(Product, related_name='cart_items', on_delete=models.CASCADE) quantity = models.PositiveIntegerField(default=1) created = models.DateTimeField(auto_now_add=True)
def __str__(self): return f'{self.quantity} x {self.product.name}'
def get_total_price(self): return self.quantity * self.product.price
class Order(models.Model): user = models.ForeignKey(User, related_name='orders', on_delete=models.CASCADE) created = models.DateTimeField(auto_now_add=True) updated = models.DateTimeField(auto_now=True) paid = models.BooleanField(default=False)
class Meta: ordering = ('-created',)
def __str__(self): return f'Order {self.id}'
def get_total_cost(self): return sum(item.get_cost() for item in self.items.all())
class OrderItem(models.Model): order = models.ForeignKey(Order, related_name='items', on_delete=models.CASCADE) product = models.ForeignKey(Product, related_name='order_items', on_delete=models.CASCADE) price = models.DecimalField(max_digits=10, decimal_places=2) quantity = models.PositiveIntegerField(default=1)
def __str__(self): return f'{self.quantity} x {self.product.name}'
def get_cost(self): return self.price * self.quantityshop/views.py:
from django.shortcuts import render, get_object_or_404, redirectfrom django.contrib.auth.decorators import login_requiredfrom django.views.decorators.http import require_POSTfrom .models import Category, Product, CartItem, Order, OrderItem
@login_requireddef index(request): categories = Category.objects.all() products = Product.objects.filter(available=True) return render(request, 'shop/index.html', {'categories': categories, 'products': products})
@login_requireddef product_detail(request, id, slug): product = get_object_or_404(Product, id=id, slug=slug, available=True) return render(request, 'shop/product_detail.html', {'product': product})
@login_requireddef cart(request): cart_items = CartItem.objects.filter(user=request.user) total = sum(item.get_total_price() for item in cart_items) return render(request, 'shop/cart.html', {'cart_items': cart_items, 'total': total})
@login_required@require_POSTdef add_to_cart(request, product_id): product = get_object_or_404(Product, id=product_id) cart_item, created = CartItem.objects.get_or_create( user=request.user, product=product, defaults={'quantity': 1} ) if not created: cart_item.quantity += 1 cart_item.save() return redirect('cart')
@login_required@require_POSTdef remove_from_cart(request, cart_item_id): cart_item = get_object_or_404(CartItem, id=cart_item_id, user=request.user) cart_item.delete() return redirect('cart')
@login_requireddef checkout(request): cart_items = CartItem.objects.filter(user=request.user) if not cart_items: return redirect('cart')
if request.method == 'POST': order = Order.objects.create(user=request.user) for item in cart_items: OrderItem.objects.create( order=order, product=item.product, price=item.product.price, quantity=item.quantity ) cart_items.delete() return redirect('order_success')
total = sum(item.get_total_price() for item in cart_items) return render(request, 'shop/checkout.html', {'cart_items': cart_items, 'total': total})
@login_requireddef order_success(request): return render(request, 'shop/order_success.html')shop/urls.py:
from django.urls import pathfrom . import views
urlpatterns = [ path('', views.index, name='index'), path('product/<int:id>/<slug:slug>/', views.product_detail, name='product_detail'), path('cart/', views.cart, name='cart'), path('add-to-cart/<int:product_id>/', views.add_to_cart, name='add_to_cart'), path('remove-from-cart/<int:cart_item_id>/', views.remove_from_cart, name='remove_from_cart'), path('checkout/', views.checkout, name='checkout'), path('order-success/', views.order_success, name='order_success'),]ecommerce/urls.py:
from django.contrib import adminfrom django.urls import path, includefrom django.contrib.auth import views as auth_views
urlpatterns = [ path('admin/', admin.site.urls), path('shop/', include('shop.urls')), path('login/', auth_views.LoginView.as_view(template_name='shop/login.html'), name='login'), path('logout/', auth_views.LogoutView.as_view(), name='logout'),]运行项目:
# 安装依赖pip install -r requirements.txt
# 迁移数据库python manage.py makemigrationspython manage.py migrate
# 创建超级用户python manage.py createsuperuser
# 运行项目python manage.py runserver2. 数据分析项目实战
2.1 数据分析项目 - 销售数据可视化
项目概述:分析销售数据,生成可视化报表,帮助企业了解销售趋势和产品表现。
技术栈:
- Python
- Pandas:数据处理
- Matplotlib:数据可视化
- Seaborn:高级数据可视化
- Jupyter Notebook:交互式开发环境
项目结构:
sales-analysis/├── data/│ └── sales_data.csv├── notebooks/│ └── sales_analysis.ipynb└── requirements.txt销售数据示例:
| OrderID | Product | Category | Price | Quantity | Date | Region |
|---|---|---|---|---|---|---|
| 1 | Product A | Electronics | 100 | 2 | 2026-01-01 | North |
| 2 | Product B | Clothing | 50 | 1 | 2026-01-02 | South |
| 3 | Product C | Electronics | 150 | 1 | 2026-01-03 | East |
| 4 | Product D | Clothing | 30 | 3 | 2026-01-04 | West |
| 5 | Product A | Electronics | 100 | 1 | 2026-01-05 | North |
核心代码:
import pandas as pdimport matplotlib.pyplot as pltimport seaborn as sns
# 设置中文字体plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# 读取数据df = pd.read_csv('data/sales_data.csv')
# 数据清洗print("数据基本信息:")print(df.info())
print("\n数据前5行:")print(df.head())
print("\n数据统计摘要:")print(df.describe())
# 计算销售额df['Sales'] = df['Price'] * df['Quantity']
# 按日期分析销售趋势df['Date'] = pd.to_datetime(df['Date'])df.set_index('Date', inplace=True)
# 月度销售趋势monthly_sales = df.resample('M').sum()['Sales']
plt.figure(figsize=(12, 6))plt.plot(monthly_sales.index, monthly_sales.values)plt.title('月度销售趋势')plt.xlabel('月份')plt.ylabel('销售额')plt.grid(True)plt.show()
# 按产品类别分析category_sales = df.groupby('Category').sum()['Sales']
plt.figure(figsize=(10, 6))sns.barplot(x=category_sales.index, y=category_sales.values)plt.title('各产品类别销售额')plt.xlabel('产品类别')plt.ylabel('销售额')plt.show()
# 按地区分析region_sales = df.groupby('Region').sum()['Sales']
plt.figure(figsize=(10, 6))plt.pie(region_sales.values, labels=region_sales.index, autopct='%1.1f%%')plt.title('各地区销售占比')plt.show()
# 产品销售排行top_products = df.groupby('Product').sum()['Sales'].sort_values(ascending=False).head(10)
plt.figure(figsize=(12, 6))sns.barplot(x=top_products.values, y=top_products.index)plt.title('产品销售排行前10')plt.xlabel('销售额')plt.ylabel('产品')plt.show()
# 销售数据相关性分析corr = df[['Price', 'Quantity', 'Sales']].corr()
plt.figure(figsize=(8, 6))sns.heatmap(corr, annot=True, cmap='coolwarm')plt.title('销售数据相关性分析')plt.show()
# 总结print("\n销售数据分析总结:")print(f"总销售额:{df['Sales'].sum():.2f}")print(f"平均销售额:{df['Sales'].mean():.2f}")print(f"最高销售额:{df['Sales'].max():.2f}")print(f"最低销售额:{df['Sales'].min():.2f}")print(f"销售最好的产品类别:{category_sales.idxmax()}")print(f"销售最好的地区:{region_sales.idxmax()}")print(f"销售最好的产品:{top_products.idxmax()}")运行项目:
# 安装依赖pip install -r requirements.txt
# 运行Jupyter Notebookjupyter notebook
# 在浏览器中打开notebooks/sales_analysis.ipynb并运行所有单元格3. 爬虫项目实战
3.1 爬虫项目 - 电影信息爬取
项目概述:爬取电影网站的电影信息,包括电影名称、评分、导演、演员、上映日期等,存储到数据库中,并生成可视化报表。
技术栈:
- Python
- requests:网络请求
- BeautifulSoup:HTML解析
- SQLite:数据库存储
- Matplotlib:数据可视化
核心代码:
import requestsfrom bs4 import BeautifulSoupimport sqlite3import matplotlib.pyplot as plt
# 设置中文字体plt.rcParams['font.sans-serif'] = ['SimHei']plt.rcParams['axes.unicode_minus'] = False
# 创建数据库连接conn = sqlite3.connect('movies.db')cursor = conn.cursor()
# 创建电影表cursor.execute('''CREATE TABLE IF NOT EXISTS movies ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, rating REAL, directors TEXT, actors TEXT, release_date TEXT, genre TEXT, duration TEXT, description TEXT)''')conn.commit()
# 爬取电影信息def crawl_movies(url): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } response = requests.get(url, headers=headers) soup = BeautifulSoup(response.text, 'html.parser')
# 解析电影列表 movie_list = soup.find_all('div', class_='movie-item')
for movie in movie_list: # 电影标题 title = movie.find('h2', class_='movie-title').text.strip()
# 电影评分 rating = movie.find('span', class_='rating').text.strip() rating = float(rating) if rating else None
# 电影信息 info = movie.find('div', class_='movie-info').text.strip()
# 提取导演、演员、上映日期等信息 directors = None actors = None release_date = None genre = None duration = None
# 这里需要根据实际网站的HTML结构进行调整 # 示例代码,实际使用时需要根据目标网站的结构修改 if '导演:' in info: directors = info.split('导演:')[1].split('主演:')[0].strip() if '主演:' in info: actors = info.split('主演:')[1].split('上映日期:')[0].strip() if '上映日期:' in info: release_date = info.split('上映日期:')[1].split('类型:')[0].strip() if '类型:' in info: genre = info.split('类型:')[1].split('片长:')[0].strip() if '片长:' in info: duration = info.split('片长:')[1].strip()
# 电影描述 description = movie.find('p', class_='movie-description').text.strip() if movie.find('p', class_='movie-description') else None
# 插入数据库 cursor.execute(''' INSERT INTO movies (title, rating, directors, actors, release_date, genre, duration, description) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', (title, rating, directors, actors, release_date, genre, duration, description))
conn.commit() print(f"成功爬取 {len(movie_list)} 部电影信息")
# 爬取多个页面for page in range(1, 11): # 爬取前10页 url = f'https://example.com/movies?page={page}' # 替换为实际的电影网站URL print(f"爬取第 {page} 页...") crawl_movies(url)
# 分析电影数据def analyze_movies(): # 读取数据库 cursor.execute('SELECT * FROM movies') movies = cursor.fetchall()
print(f"\n数据库中共有 {len(movies)} 部电影")
# 按评分统计 cursor.execute('SELECT rating FROM movies WHERE rating IS NOT NULL') ratings = [row[0] for row in cursor.fetchall()]
if ratings: avg_rating = sum(ratings) / len(ratings) print(f"平均评分:{avg_rating:.2f}")
# 评分分布 plt.figure(figsize=(10, 6)) plt.hist(ratings, bins=10, range=(0, 10)) plt.title('电影评分分布') plt.xlabel('评分') plt.ylabel('电影数量') plt.show()
# 按类型统计 cursor.execute('SELECT genre FROM movies WHERE genre IS NOT NULL') genres = [row[0] for row in cursor.fetchall()]
genre_counts = {} for genre in genres: if genre: for g in genre.split('/'): g = g.strip() genre_counts[g] = genre_counts.get(g, 0) + 1
if genre_counts: top_genres = sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)[:10]
plt.figure(figsize=(12, 6)) plt.bar([g[0] for g in top_genres], [g[1] for g in top_genres]) plt.title('电影类型分布前10') plt.xlabel('类型') plt.ylabel('电影数量') plt.xticks(rotation=45) plt.show()
# 评分最高的电影 cursor.execute('SELECT title, rating FROM movies WHERE rating IS NOT NULL ORDER BY rating DESC LIMIT 10') top_rated = cursor.fetchall()
print("\n评分最高的10部电影:") for movie in top_rated: print(f"{movie[0]} - {movie[1]}")
# 运行分析analyze_movies()
# 关闭数据库连接conn.close()运行项目:
# 安装依赖pip install requests beautifulsoup4 matplotlib
# 运行爬虫python movie_crawler.py4. 自动化脚本开发
4.1 自动化脚本 - 文件整理工具
项目概述:开发一个自动化脚本,根据文件类型自动整理文件夹中的文件,提高文件管理效率。
核心代码:
import osimport shutilimport argparse
# 文件类型映射extension_map = { 'images': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'], 'documents': ['.pdf', '.doc', '.docx', '.txt', '.rtf', '.odt'], 'spreadsheets': ['.xls', '.xlsx', '.csv', '.ods'], 'presentations': ['.ppt', '.pptx', '.odp'], 'archives': ['.zip', '.rar', '.7z', '.tar', '.gz'], 'audio': ['.mp3', '.wav', '.flac', '.aac', '.ogg'], 'video': ['.mp4', '.avi', '.mov', '.wmv', '.flv', '.mkv'], 'code': ['.py', '.java', '.cpp', '.js', '.html', '.css', '.php'], 'executables': ['.exe', '.msi', '.app', '.dmg']}
# 反向映射,扩展名到文件夹名extension_to_folder = {}for folder, extensions in extension_map.items(): for ext in extensions: extension_to_folder[ext.lower()] = folder
def organize_files(directory): """根据文件类型整理文件夹中的文件""" if not os.path.exists(directory): print(f"错误:目录 {directory} 不存在") return
# 遍历目录中的所有文件 for filename in os.listdir(directory): filepath = os.path.join(directory, filename)
# 跳过文件夹 if os.path.isdir(filepath): continue
# 获取文件扩展名 _, ext = os.path.splitext(filename) ext = ext.lower()
# 确定目标文件夹 if ext in extension_to_folder: target_folder = os.path.join(directory, extension_to_folder[ext]) else: target_folder = os.path.join(directory, 'others')
# 创建目标文件夹(如果不存在) if not os.path.exists(target_folder): os.makedirs(target_folder)
# 移动文件 try: target_filepath = os.path.join(target_folder, filename)
# 如果文件已存在,添加编号 counter = 1 while os.path.exists(target_filepath): name, ext = os.path.splitext(filename) target_filepath = os.path.join(target_folder, f"{name}_{counter}{ext}") counter += 1
shutil.move(filepath, target_filepath) print(f"移动:{filename} -> {extension_to_folder.get(ext, 'others')}/") except Exception as e: print(f"错误移动文件 {filename}:{e}")
def main(): parser = argparse.ArgumentParser(description='根据文件类型自动整理文件夹中的文件') parser.add_argument('directory', help='要整理的文件夹路径') args = parser.parse_args()
print(f"开始整理文件夹:{args.directory}") organize_files(args.directory) print("整理完成!")
if __name__ == "__main__": main()运行脚本:
# 运行脚本整理指定文件夹python file_organizer.py /path/to/directory4.2 自动化脚本 - 网络监控工具
项目概述:开发一个网络监控工具,定期检查网站是否可访问,记录响应时间和状态码,并在网站不可用时发送通知。
核心代码:
import requestsimport timeimport jsonimport smtplibfrom email.mime.text import MIMETextfrom email.mime.multipart import MIMEMultipartimport argparse
# 配置文件CONFIG_FILE = 'config.json'LOG_FILE = 'monitor.log'
# 加载配置def load_config(): try: with open(CONFIG_FILE, 'r') as f: return json.load(f) except Exception as e: print(f"错误加载配置文件:{e}") return { 'websites': [ {'url': 'https://www.google.com', 'name': 'Google'}, {'url': 'https://www.baidu.com', 'name': 'Baidu'} ], 'interval': 60, # 检查间隔(秒) 'timeout': 10, # 请求超时(秒) 'email': { 'enabled': False, 'smtp_server': 'smtp.example.com', 'smtp_port': 587, 'smtp_user': 'user@example.com', 'smtp_password': 'password', 'from_email': 'user@example.com', 'to_email': 'user@example.com' } }
# 保存配置def save_config(config): try: with open(CONFIG_FILE, 'w') as f: json.dump(config, f, indent=4) except Exception as e: print(f"错误保存配置文件:{e}")
# 发送邮件通知def send_email(config, subject, message): if not config['email']['enabled']: return
try: msg = MIMEMultipart() msg['From'] = config['email']['from_email'] msg['To'] = config['email']['to_email'] msg['Subject'] = subject msg.attach(MIMEText(message, 'plain'))
server = smtplib.SMTP(config['email']['smtp_server'], config['email']['smtp_port']) server.starttls() server.login(config['email']['smtp_user'], config['email']['smtp_password']) server.send_message(msg) server.quit() print("邮件通知已发送") except Exception as e: print(f"错误发送邮件:{e}")
# 记录日志def log_message(message): timestamp = time.strftime('%Y-%m-%d %H:%M:%S') log_entry = f"[{timestamp}] {message}" print(log_entry) try: with open(LOG_FILE, 'a') as f: f.write(log_entry + '\n') except Exception as e: print(f"错误写入日志:{e}")
# 监控网站def monitor_website(url, name, timeout): try: start_time = time.time() response = requests.get(url, timeout=timeout) end_time = time.time() response_time = (end_time - start_time) * 1000 # 转换为毫秒
if response.status_code == 200: status = "正常" log_message(f"{name} ({url}) - 状态:{status},响应时间:{response_time:.2f}ms,状态码:{response.status_code}") else: status = "异常" message = f"{name} ({url}) - 状态:{status},响应时间:{response_time:.2f}ms,状态码:{response.status_code}" log_message(message) return False, message
return True, None except Exception as e: status = "错误" message = f"{name} ({url}) - 状态:{status},错误信息:{str(e)}" log_message(message) return False, message
# 主函数def main(): parser = argparse.ArgumentParser(description='网站监控工具') parser.add_argument('--config', help='配置文件路径', default=CONFIG_FILE) args = parser.parse_args()
global CONFIG_FILE CONFIG_FILE = args.config
config = load_config() log_message("开始监控网站...")
# 初始化状态 website_status = {site['url']: True for site in config['websites']}
try: while True: for site in config['websites']: success, message = monitor_website(site['url'], site['name'], config['timeout'])
# 检查状态变化 if not success and website_status[site['url']]: # 网站从正常变为异常 website_status[site['url']] = False send_email(config, f"【警告】网站 {site['name']} 不可访问", message) elif success and not website_status[site['url']]: # 网站从异常变为正常 website_status[site['url']] = True send_email(config, f"【恢复】网站 {site['name']} 已恢复访问", f"网站 {site['name']} ({site['url']}) 已恢复正常访问")
time.sleep(config['interval']) except KeyboardInterrupt: log_message("监控已停止")
if __name__ == "__main__": main()运行脚本:
# 运行网络监控工具python network_monitor.py
# 使用自定义配置文件python network_monitor.py --config my_config.json5. 机器学习入门项目
5.1 机器学习项目 - 图像分类
项目概述:使用Python和机器学习库构建一个图像分类模型,能够识别不同类别的图像。
技术栈:
- Python
- TensorFlow/Keras:深度学习框架
- NumPy:数值计算
- Matplotlib:数据可视化
- scikit-learn:机器学习工具
核心代码:
import tensorflow as tffrom tensorflow.keras import layers, modelsimport numpy as npimport matplotlib.pyplot as pltfrom sklearn.model_selection import train_test_splitfrom tensorflow.keras.preprocessing.image import ImageDataGenerator
# 加载数据集# 这里使用CIFAR-10数据集作为示例(x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
# 数据预处理x_train = x_train.astype('float32') / 255.0x_test = x_test.astype('float32') / 255.0
# 标签编码y_train = tf.keras.utils.to_categorical(y_train, 10)y_test = tf.keras.utils.to_categorical(y_test, 10)
# 数据集标签class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
# 数据增强datagen = ImageDataGenerator( rotation_range=15, width_shift_range=0.1, height_shift_range=0.1, horizontal_flip=True, zoom_range=0.1)
datagen.fit(x_train)
# 构建模型model = models.Sequential([ layers.Conv2D(32, (3, 3), activation='relu', input_shape=(32, 32, 3)), layers.MaxPooling2D((2, 2)), layers.Conv2D(64, (3, 3), activation='relu'), layers.MaxPooling2D((2, 2)), layers.Conv2D(64, (3, 3), activation='relu'), layers.Flatten(), layers.Dense(64, activation='relu'), layers.Dense(10, activation='softmax')])
# 编译模型model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
# 查看模型结构model.summary()
# 训练模型history = model.fit(datagen.flow(x_train, y_train, batch_size=64), epochs=50, validation_data=(x_test, y_test))
# 评估模型test_loss, test_acc = model.evaluate(x_test, y_test, verbose=2)print(f"\n测试准确率:{test_acc:.4f}")
# 绘制训练过程plt.figure(figsize=(12, 4))
# 准确率plt.subplot(1, 2, 1)plt.plot(history.history['accuracy'], label='训练准确率')plt.plot(history.history['val_accuracy'], label='验证准确率')plt.title('准确率')plt.xlabel(' epoch')plt.ylabel('准确率')plt.legend()
# 损失plt.subplot(1, 2, 2)plt.plot(history.history['loss'], label='训练损失')plt.plot(history.history['val_loss'], label='验证损失')plt.title('损失')plt.xlabel('epoch')plt.ylabel('损失')plt.legend()
plt.show()
# 预测示例predictions = model.predict(x_test)
# 显示预测结果plt.figure(figsize=(15, 15))for i in range(25): plt.subplot(5, 5, i+1) plt.xticks([]) plt.yticks([]) plt.grid(False) plt.imshow(x_test[i]) predicted_label = np.argmax(predictions[i]) true_label = np.argmax(y_test[i]) if predicted_label == true_label: color = 'green' else: color = 'red' plt.xlabel(f"{class_names[predicted_label]} ({class_names[true_label]})", color=color)plt.show()
# 保存模型model.save('image_classification_model.h5')print("模型已保存为 image_classification_model.h5")运行项目:
# 安装依赖pip install tensorflow numpy matplotlib scikit-learn
# 运行项目python image_classification.py6. 部署和运维
6.1 Docker部署Python应用
项目概述:使用Docker容器化部署Python应用,提高应用的可移植性和可维护性。
核心代码:
Dockerfile:
# 使用官方Python镜像作为基础镜像FROM python:3.9-slim
# 设置工作目录WORKDIR /app
# 复制requirements.txt文件COPY requirements.txt .
# 安装依赖RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码COPY . .
# 暴露端口EXPOSE 5000
# 运行应用CMD ["python", "app.py"]docker-compose.yml:
version: '3'services: web: build: . ports: - "5000:5000" environment: - FLASK_ENV=production restart: alwaysapp.py:
from flask import Flask
app = Flask(__name__)
@app.route('/')def hello(): return "Hello, Docker!"
if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)requirements.txt:
Flask部署步骤:
# 构建Docker镜像docker build -t python-app .
# 运行Docker容器docker run -p 5000:5000 python-app
# 或使用docker-composedocker-compose up -d
# 查看运行状态docker ps
# 访问应用# 打开浏览器访问 http://localhost:50006.2 CI/CD流水线配置
项目概述:配置CI/CD流水线,实现代码提交后自动构建、测试和部署。
GitHub Actions配置:
.github/workflows/ci-cd.yml:
name: CI/CD Pipeline
on: push: branches: [ main ] pull_request: branches: [ main ]
jobs: build: runs-on: ubuntu-latest
steps: - uses: actions/checkout@v2
- name: Set up Python uses: actions/setup-python@v2 with: python-version: '3.9'
- name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt pip install pytest
- name: Run tests run: | pytest
- name: Build Docker image run: | docker build -t python-app .
- name: Deploy to server if: github.event_name == 'push' && github.ref == 'refs/heads/main' run: | # 这里可以添加部署脚本,例如使用SSH连接服务器并部署 echo "Deploying to server..."GitLab CI/CD配置:
.gitlab-ci.yml:
stages: - test - build - deploy
test: stage: test image: python:3.9-slim script: - pip install -r requirements.txt - pip install pytest - pytest
build: stage: build image: docker:latest services: - docker:dind script: - docker build -t python-app .
deploy: stage: deploy image: alpine:latest script: - echo "Deploying to server..." only: - main7. 学习资源推荐
7.1 Python官方文档
7.2 B站Python视频(播放量高于50W)
- Python全栈开发教程 - 黑马程序员 - 播放量超300W
- Python数据分析与可视化 - 尚硅谷 - 播放量超100W
- Python Web开发从入门到精通 - 尚硅谷 - 播放量超100W
- Python爬虫教程 - 黑马程序员 - 播放量超50W
- Python自动化办公教程 - 黑马程序员 - 播放量超50W
- Python机器学习入门 - 尚硅谷 - 播放量超50W
7.3 推荐书籍
- 《Python编程:从入门到实践》
- 《流畅的Python》
- 《Python Cookbook》
- 《Effective Python》
- 《Python数据科学手册》
- 《Django实战》
- 《Flask Web开发》
- 《Python网络爬虫开发实战》
- 《Python自动化运维》
- 《机器学习实战》
- 《Docker实战》
8. 总结
本教程介绍了Python的实际应用场景和项目实战,包括:
- Web应用开发实战(Flask博客系统、Django电商网站)
- 数据分析项目实战(销售数据可视化)
- 爬虫项目实战(电影信息爬取)
- 自动化脚本开发(文件整理工具、网络监控工具)
- 机器学习入门项目(图像分类)
- 部署和运维(Docker部署、CI/CD流水线)
通过本教程的学习,你应该已经掌握了Python的实际应用技能,可以开始开发自己的Python项目了。在实际开发中,你可能会遇到各种问题,但是通过查阅官方文档、参考书籍和在线资源,你一定能够解决这些问题。
Python是一种功能强大、简单易学的编程语言,它的应用领域非常广泛。希望本教程能够帮助你更好地理解和应用Python,让你在编程的道路上越走越远。
祝你学习愉快!
提示:本教程是Python学习的第三阶段,后续将推出Python高级主题教程,包括性能优化、安全编程、分布式系统等内容。
部分信息可能已经过时









